Commit e2a8426f authored by unknown's avatar unknown

Bug #18863 NDB node fails to restart, cluster stuck in state trying to restart it.

- remove some event code to get rid of some bugs
parent 5dab95e2
......@@ -15,7 +15,6 @@ ndbapi/NdbApi.hpp \
ndbapi/NdbTransaction.hpp \
ndbapi/NdbDictionary.hpp \
ndbapi/NdbError.hpp \
ndbapi/NdbEventOperation.hpp \
ndbapi/NdbIndexOperation.hpp \
ndbapi/NdbOperation.hpp \
ndbapi/ndb_cluster_connection.hpp \
......
......@@ -735,9 +735,11 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_SUB_CREATE_REQ 576
#define GSN_SUB_CREATE_REF 577
#define GSN_SUB_CREATE_CONF 578
/*
#define GSN_SUB_START_REQ 579
#define GSN_SUB_START_REF 580
#define GSN_SUB_START_CONF 581
*/
#define GSN_SUB_SYNC_REQ 582
#define GSN_SUB_SYNC_REF 583
#define GSN_SUB_SYNC_CONF 584
......@@ -903,10 +905,11 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/**
* SUMA restart protocol
*/
/*
#define GSN_SUMA_START_ME 691
#define GSN_SUMA_HANDOVER_REQ 692
#define GSN_SUMA_HANDOVER_CONF 693
*/
/* not used 694 */
/* not used 695 */
/* not used 696 */
......@@ -923,6 +926,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/*
* EVENT Signals
*/
/*
#define GSN_SUB_GCP_COMPLETE_ACC 699
#define GSN_CREATE_EVNT_REQ 700
......@@ -932,7 +936,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_DROP_EVNT_REQ 703
#define GSN_DROP_EVNT_CONF 704
#define GSN_DROP_EVNT_REF 705
*/
#define GSN_TUX_BOUND_INFO 710
#define GSN_ACC_LOCKREQ 711
......
......@@ -38,9 +38,6 @@
In addition, the NDB API defines a structure NdbError, which contains the
specification for an error.
It is also possible to receive "events" triggered when data in the database in changed.
This is done through the NdbEventOperation class.
There are also some auxiliary classes, which are listed in the class hierarchy.
The main structure of an application program is as follows:
......@@ -968,7 +965,6 @@
class NdbObjectIdMap;
class NdbOperation;
class NdbEventOperationImpl;
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbIndexOperation;
......@@ -981,13 +977,11 @@ class NdbSubroutine;
class NdbCall;
class Table;
class BaseString;
class NdbEventOperation;
class NdbBlob;
class NdbReceiver;
class Ndb_local_table_info;
template <class T> struct Ndb_free_list_t;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
#if defined NDB_OSE
/**
......@@ -1049,7 +1043,6 @@ class Ndb
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class NdbReceiver;
friend class NdbOperation;
friend class NdbEventOperationImpl;
friend class NdbTransaction;
friend class Table;
friend class NdbApiSignal;
......@@ -1193,46 +1186,6 @@ public:
class NdbDictionary::Dictionary* getDictionary() const;
/** @} *********************************************************************/
/**
* @name Event subscriptions
* @{
*/
/**
* Create a subcription to an event defined in the database
*
* @param eventName
* unique identifier of the event
* @param bufferLength
* circular buffer size for storing event data
*
* @return Object representing an event, NULL on failure
*/
NdbEventOperation* createEventOperation(const char* eventName,
const int bufferLength);
/**
* Drop a subscription to an event
*
* @param eventOp
* Event operation
*
* @return 0 on success
*/
int dropEventOperation(NdbEventOperation* eventOp);
/**
* Wait for an event to occur. Will return as soon as an event
* is detected on any of the created events.
*
* @param aMillisecondNumber
* maximum time to wait
*
* @return the number of events that has occured, -1 on failure
*/
int pollEvents(int aMillisecondNumber);
/** @} *********************************************************************/
/**
......
......@@ -29,7 +29,6 @@
#include "NdbScanFilter.hpp"
#include "NdbRecAttr.hpp"
#include "NdbDictionary.hpp"
#include "NdbEventOperation.hpp"
#include "NdbPool.hpp"
#include "NdbBlob.hpp"
#endif
......@@ -937,165 +937,6 @@ public:
Index(NdbIndexImpl&);
};
/**
* @brief Represents an Event in NDB Cluster
*
*/
class Event : public Object {
public:
/**
* Specifies the type of database operations an Event listens to
*/
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** TableEvent must match 1 << TriggerEvent */
#endif
enum TableEvent {
TE_INSERT=1, ///< Insert event on table
TE_DELETE=2, ///< Delete event on table
TE_UPDATE=4, ///< Update event on table
TE_ALL=7 ///< Any/all event on table (not relevant when
///< events are received)
};
/**
* Specifies the durability of an event
* (future version may supply other types)
*/
enum EventDurability {
ED_UNDEFINED
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
= 0
#endif
#if 0 // not supported
,ED_SESSION = 1,
// Only this API can use it
// and it's deleted after api has disconnected or ndb has restarted
ED_TEMPORARY = 2
// All API's can use it,
// But's its removed when ndb is restarted
#endif
,ED_PERMANENT ///< All API's can use it.
///< It's still defined after a cluster system restart
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
= 3
#endif
};
/**
* Constructor
* @param name Name of event
*/
Event(const char *name);
/**
* Constructor
* @param name Name of event
* @param table Reference retrieved from NdbDictionary
*/
Event(const char *name, const NdbDictionary::Table& table);
virtual ~Event();
/**
* Set unique identifier for the event
*/
void setName(const char *name);
/**
* Get unique identifier for the event
*/
const char *getName() const;
/**
* Define table on which events should be detected
*
* @note calling this method will default to detection
* of events on all columns. Calling subsequent
* addEventColumn calls will override this.
*
* @param table reference retrieved from NdbDictionary
*/
void setTable(const NdbDictionary::Table& table);
/**
* Set table for which events should be detected
*
* @note preferred way is using setTable(const NdbDictionary::Table&)
* or constructor with table object parameter
*/
void setTable(const char *tableName);
/**
* Get table name for events
*
* @return table name
*/
const char* getTableName() const;
/**
* Add type of event that should be detected
*/
void addTableEvent(const TableEvent te);
/**
* Set durability of the event
*/
void setDurability(EventDurability);
/**
* Get durability of the event
*/
EventDurability getDurability() const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
void addColumn(const Column &c);
#endif
/**
* Add a column on which events should be detected
*
* @param attrId Column id
*
* @note errors will mot be detected until createEvent() is called
*/
void addEventColumn(unsigned attrId);
/**
* Add a column on which events should be detected
*
* @param columnName Column name
*
* @note errors will not be detected until createEvent() is called
*/
void addEventColumn(const char * columnName);
/**
* Add several columns on which events should be detected
*
* @param n Number of columns
* @param columnNames Column names
*
* @note errors will mot be detected until
* NdbDictionary::Dictionary::createEvent() is called
*/
void addEventColumns(int n, const char ** columnNames);
/**
* Get no of columns defined in an Event
*
* @return Number of columns, -1 on error
*/
int getNoOfEventColumns() const;
/**
* Get object status
*/
virtual Object::Status getObjectStatus() const;
/**
* Get object version
*/
virtual int getObjectVersion() const;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
void print();
#endif
private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class NdbEventImpl;
friend class NdbEventOperationImpl;
#endif
class NdbEventImpl & m_impl;
Event(NdbEventImpl&);
};
/**
* @class Dictionary
* @brief Dictionary for defining and retreiving meta data
......@@ -1214,33 +1055,6 @@ public:
int listIndexes(List & list, const char * tableName);
int listIndexes(List & list, const char * tableName) const;
/** @} *******************************************************************/
/**
* @name Events
* @{
*/
/**
* Create event given defined Event instance
* @param event Event to create
* @return 0 if successful otherwise -1.
*/
int createEvent(const Event &event);
/**
* Drop event with given name
* @param eventName Name of event to drop.
* @return 0 if successful otherwise -1.
*/
int dropEvent(const char * eventName);
/**
* Get event with given name.
* @param eventName Name of event to get.
* @return an Event if successful, otherwise NULL.
*/
const Event * getEvent(const char * eventName);
/** @} *******************************************************************/
/**
......
......@@ -156,12 +156,6 @@ SignalDataPrintFunctions[] = {
{ GSN_SUB_REMOVE_REQ, printSUB_REMOVE_REQ },
{ GSN_SUB_REMOVE_REF, printSUB_REMOVE_REF },
{ GSN_SUB_REMOVE_CONF, printSUB_REMOVE_CONF },
{ GSN_SUB_START_REQ, printSUB_START_REQ },
{ GSN_SUB_START_REF, printSUB_START_REF },
{ GSN_SUB_START_CONF, printSUB_START_CONF },
{ GSN_SUB_STOP_REQ, printSUB_STOP_REQ },
{ GSN_SUB_STOP_REF, printSUB_STOP_REF },
{ GSN_SUB_STOP_CONF, printSUB_STOP_CONF },
{ GSN_SUB_SYNC_REQ, printSUB_SYNC_REQ },
{ GSN_SUB_SYNC_REF, printSUB_SYNC_REF },
{ GSN_SUB_SYNC_CONF, printSUB_SYNC_CONF },
......
......@@ -502,18 +502,6 @@ const GsnName SignalNames [] = {
//,{ GSN_TCINDEXNEXTCONF, "TCINDEXNEXTCONF" }
//,{ GSN_TCINDEXNEXREF, "TCINDEXNEXREF" }
,{ GSN_CREATE_EVNT_REQ, "CREATE_EVNT_REQ" }
,{ GSN_CREATE_EVNT_CONF, "CREATE_EVNT_CONF" }
,{ GSN_CREATE_EVNT_REF, "CREATE_EVNT_REF" }
,{ GSN_SUMA_START_ME, "SUMA_START_ME" }
,{ GSN_SUMA_HANDOVER_REQ, "SUMA_HANDOVER_REQ"}
,{ GSN_SUMA_HANDOVER_CONF, "SUMA_HANDOVER_CONF"}
,{ GSN_DROP_EVNT_REQ, "DROP_EVNT_REQ" }
,{ GSN_DROP_EVNT_CONF, "DROP_EVNT_CONF" }
,{ GSN_DROP_EVNT_REF, "DROP_EVNT_REF" }
,{ GSN_BACKUP_TRIG_REQ, "BACKUP_TRIG_REQ" }
,{ GSN_BACKUP_REQ, "BACKUP_REQ" }
,{ GSN_BACKUP_DATA, "BACKUP_DATA" }
......@@ -581,12 +569,6 @@ const GsnName SignalNames [] = {
,{ GSN_SUB_REMOVE_REQ, "SUB_REMOVE_REQ" }
,{ GSN_SUB_REMOVE_REF, "SUB_REMOVE_REF" }
,{ GSN_SUB_REMOVE_CONF, "SUB_REMOVE_CONF" }
,{ GSN_SUB_START_REQ, "SUB_START_REQ" }
,{ GSN_SUB_START_REF, "SUB_START_REF" }
,{ GSN_SUB_START_CONF, "SUB_START_CONF" }
,{ GSN_SUB_STOP_REQ, "SUB_STOP_REQ" }
,{ GSN_SUB_STOP_REF, "SUB_STOP_REF" }
,{ GSN_SUB_STOP_CONF, "SUB_STOP_CONF" }
,{ GSN_SUB_SYNC_REQ, "SUB_SYNC_REQ" }
,{ GSN_SUB_SYNC_REF, "SUB_SYNC_REF" }
,{ GSN_SUB_SYNC_CONF, "SUB_SYNC_CONF" }
......@@ -596,7 +578,6 @@ const GsnName SignalNames [] = {
,{ GSN_SUB_SYNC_CONTINUE_REF, "SUB_SYNC_CONTINUE_REF" }
,{ GSN_SUB_SYNC_CONTINUE_CONF, "SUB_SYNC_CONTINUE_CONF" }
,{ GSN_SUB_GCP_COMPLETE_REP, "SUB_GCP_COMPLETE_REP" }
,{ GSN_SUB_GCP_COMPLETE_ACC, "SUB_GCP_COMPLETE_ACC" }
,{ GSN_CREATE_SUBID_REQ, "CREATE_SUBID_REQ" }
,{ GSN_CREATE_SUBID_REF, "CREATE_SUBID_REF" }
......
This diff is collapsed.
This diff is collapsed.
......@@ -1970,9 +1970,6 @@ void Dbdih::execINCL_NODECONF(Signal* signal)
signal->theData[0] = reference();
signal->theData[1] = c_nodeStartSlave.nodeId;
sendSignal(BACKUP_REF, GSN_INCL_NODEREQ, signal, 2, JBB);
// Suma will not send response to this for now, later...
sendSignal(SUMA_REF, GSN_INCL_NODEREQ, signal, 2, JBB);
return;
}//if
if (TstartNode_or_blockref == numberToRef(BACKUP, getOwnNodeId())){
......@@ -7971,12 +7968,6 @@ void Dbdih::writingCopyGciLab(Signal* signal, FileRecordPtr filePtr)
if (reason == CopyGCIReq::GLOBAL_CHECKPOINT) {
jam();
cgcpParticipantState = GCP_PARTICIPANT_READY;
SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
rep->gci = coldgcp;
rep->senderData = 0;
sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal,
SubGcpCompleteRep::SignalLength, JBB);
}
jam();
......
......@@ -1461,9 +1461,6 @@ void Ndbcntr::execNODE_FAILREP(Signal* signal)
sendSignal(BACKUP_REF, GSN_NODE_FAILREP, signal,
NodeFailRep::SignalLength, JBB);
sendSignal(SUMA_REF, GSN_NODE_FAILREP, signal,
NodeFailRep::SignalLength, JBB);
if (c_stopRec.stopReq.senderRef)
{
jam();
......
......@@ -2340,7 +2340,6 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo)
failedNodePtr.p->failState = WAITING_FOR_FAILCONF1;
sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
/**-------------------------------------------------------------------------
* THE OTHER NODE WAS AN API NODE. THE COMMUNICATION LINK IS ALREADY
......
This diff is collapsed.
......@@ -76,14 +76,6 @@ protected:
void execSUB_SYNC_CONTINUE_REF(Signal* signal);
void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
/**
* Trigger logging
*/
void execTRIG_ATTRINFO(Signal* signal);
void execFIRE_TRIG_ORD(Signal* signal);
void execSUB_GCP_COMPLETE_REP(Signal* signal);
void runSUB_GCP_COMPLETE_ACC(Signal* signal);
/**
* DIH signals
*/
......@@ -92,14 +84,6 @@ protected:
void execDIGETPRIMREF(Signal* signal);
void execDIGETPRIMCONF(Signal* signal);
/**
* Trigger administration
*/
void execCREATE_TRIG_REF(Signal* signal);
void execCREATE_TRIG_CONF(Signal* signal);
void execDROP_TRIG_REF(Signal* signal);
void execDROP_TRIG_CONF(Signal* signal);
/**
* continueb
*/
......@@ -189,22 +173,6 @@ public:
void nextMeta(Signal*);
void completeMeta(Signal*);
/**
* Create triggers
*/
Uint32 m_latestTriggerId;
void startTrigger(Signal* signal);
void nextTrigger(Signal* signal);
void completeTrigger(Signal* signal);
void createAttributeMask(AttributeMask&, Table*);
/**
* Drop triggers
*/
void startDropTrigger(Signal* signal);
void nextDropTrigger(Signal* signal);
void completeDropTrigger(Signal* signal);
/**
* Sync data
*/
......@@ -229,18 +197,12 @@ public:
suma.progError(line, cause, extra);
}
void runLIST_TABLES_CONF(Signal* signal);
void runGET_TABINFO_CONF(Signal* signal);
void runGET_TABINFOREF(Signal* signal);
void runDI_FCOUNTCONF(Signal* signal);
void runDIGETPRIMCONF(Signal* signal);
void runCREATE_TRIG_CONF(Signal* signal);
void runDROP_TRIG_CONF(Signal* signal);
void runDROP_TRIG_REF(Signal* signal);
void runDropTrig(Signal* signal, Uint32 triggerId, Uint32 tableId);
Uint32 ptrI;
union { Uint32 nextPool; Uint32 nextList; };
};
......@@ -294,24 +256,11 @@ public:
Uint32 m_subscriberRef;
Uint32 m_subscriberData;
Uint32 m_subPtrI; //reference to subscription
Uint32 m_firstGCI; // first GCI to send
Uint32 m_lastGCI; // last acnowledged GCI
Uint32 nextList;
union { Uint32 nextPool; Uint32 prevList; };
};
typedef Ptr<Subscriber> SubscriberPtr;
struct Bucket {
bool active;
bool handover;
bool handover_started;
Uint32 handoverGCI;
};
#define NO_OF_BUCKETS 24
struct Bucket c_buckets[NO_OF_BUCKETS];
bool c_handoverToDo;
Uint32 c_lastCompleteGCI;
/**
*
*/
......@@ -335,26 +284,9 @@ public:
ArrayPool<SyncRecord> c_syncPool;
DataBuffer<15>::DataBufferPool c_dataBufferPool;
/**
* for restarting Suma not to start sending data too early
*/
bool c_restartLock;
/**
* for flagging that a GCI containg inconsistent data
* typically due to node failiure
*/
Uint32 c_lastInconsistentGCI;
Uint32 c_nodeFailGCI;
NodeBitmask c_failedApiNodes;
/**
* Functions
*/
bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId,
SyncRecord* syncPtr_p);
bool checkTableTriggers(SegmentedSectionPtr ptr);
......@@ -365,52 +297,11 @@ public:
void sendSubIdRef(Signal* signal, Uint32 errorCode);
void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr);
void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode);
void sendSubStartRef(SubscriptionPtr subPtr, Signal* signal,
Uint32 errorCode, bool temporary = false);
void sendSubStartRef(Signal* signal,
Uint32 errorCode, bool temporary = false);
void sendSubStopRef(Signal* signal,
Uint32 errorCode, bool temporary = false);
void sendSubSyncRef(Signal* signal, Uint32 errorCode);
void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
Uint32 errorCode, bool temporary = false);
void sendSubStartComplete(Signal*, SubscriberPtr, Uint32,
SubscriptionData::Part);
void sendSubStopComplete(Signal*, SubscriberPtr);
void sendSubStopReq(Signal* signal, bool unlock= false);
void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);
Uint32 getFirstGCI(Signal* signal);
Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci);
virtual Uint32 getStoreBucket(Uint32 v) = 0;
virtual Uint32 getResponsibleSumaNodeId(Uint32 D) = 0;
virtual Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true) = 0;
struct FailoverBuffer {
// FailoverBuffer(DataBuffer<15>::DataBufferPool & p);
FailoverBuffer();
bool subTableData(Uint32 gci, Uint32 *src, int sz);
bool subGcpCompleteRep(Uint32 gci);
bool nodeFailRep();
// typedef DataBuffer<15> GCIDataBuffer;
// GCIDataBuffer m_GCIDataBuffer;
// GCIDataBuffer::DataBufferIterator m_GCIDataBuffer_it;
Uint32 *c_gcis;
int c_sz;
// Uint32 *c_buf;
// int c_buf_sz;
int c_first;
int c_next;
bool c_full;
} c_failoverBuffer;
/**
* Table admin
*/
......@@ -441,8 +332,6 @@ private:
* Framework signals
*/
void getNodeGroupMembers(Signal* signal);
void execREAD_CONFIG_REQ(Signal* signal);
void execSTTOR(Signal* signal);
......@@ -454,35 +343,13 @@ private:
void execINCL_NODEREQ(Signal* signal);
void execCONTINUEB(Signal* signal);
void execSIGNAL_DROPPED_REP(Signal* signal);
void execAPI_FAILREQ(Signal* signal) ;
void execSUB_GCP_COMPLETE_ACC(Signal* signal);
/**
* Controller interface
*/
void execSUB_CREATE_REF(Signal* signal);
void execSUB_CREATE_CONF(Signal* signal);
void execSUB_DROP_REF(Signal* signal);
void execSUB_DROP_CONF(Signal* signal);
void execSUB_START_REF(Signal* signal);
void execSUB_START_CONF(Signal* signal);
void execSUB_STOP_REF(Signal* signal);
void execSUB_STOP_CONF(Signal* signal);
void execSUB_SYNC_REF(Signal* signal);
void execSUB_SYNC_CONF(Signal* signal);
void execSUB_ABORT_SYNC_REF(Signal* signal);
void execSUB_ABORT_SYNC_CONF(Signal* signal);
void execSUMA_START_ME(Signal* signal);
void execSUMA_HANDOVER_REQ(Signal* signal);
void execSUMA_HANDOVER_CONF(Signal* signal);
/**
* Subscription generation interface
*/
......@@ -494,49 +361,6 @@ private:
void execUTIL_SEQUENCE_REF(Signal* signal);
void execCREATE_SUBID_REQ(Signal* signal);
Uint32 getStoreBucket(Uint32 v);
Uint32 getResponsibleSumaNodeId(Uint32 D);
/**
* for Suma that is restarting another
*/
struct Restart {
Restart(Suma& s);
Suma & suma;
bool c_okToStart[MAX_REPLICAS];
bool c_waitingToStart[MAX_REPLICAS];
DLHashTable<SumaParticipant::Subscription>::Iterator c_subPtr; // TODO [MAX_REPLICAS]
SubscriberPtr c_subbPtr; // TODO [MAX_REPLICAS]
void progError(int line, int cause, const char * extra) {
suma.progError(line, cause, extra);
}
void resetNode(Uint32 sumaRef);
void runSUMA_START_ME(Signal*, Uint32 sumaRef);
void startNode(Signal*, Uint32 sumaRef);
void createSubscription(Signal* signal, Uint32 sumaRef);
void nextSubscription(Signal* signal, Uint32 sumaRef);
void completeSubscription(Signal* signal, Uint32 sumaRef);
void startSync(Signal* signal, Uint32 sumaRef);
void nextSync(Signal* signal, Uint32 sumaRef);
void completeSync(Signal* signal, Uint32 sumaRef);
void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
Signal* signal, Uint32 sumaRef);
void startSubscriber(Signal* signal, Uint32 sumaRef);
void nextSubscriber(Signal* signal, Uint32 sumaRef);
void completeSubscriber(Signal* signal, Uint32 sumaRef);
void completeRestartingNode(Signal* signal, Uint32 sumaRef);
} Restart;
private:
friend class Restart;
struct SubCoordinator {
......@@ -590,14 +414,4 @@ private:
DLList<SubCoordinator> c_runningSubscriptions;
};
inline Uint32
Suma::RtoI(Uint32 sumaRef, bool dieOnNotFound) {
for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
if (sumaRef == calcSumaBlockRef(c_nodesInGroup[i]))
return i;
}
ndbrequire(!dieOnNotFound);
return RNIL;
}
#endif
......@@ -35,19 +35,11 @@ SumaParticipant::SumaParticipant(const Configuration & conf) :
*/
addRecSignal(GSN_SUB_CREATE_REQ, &SumaParticipant::execSUB_CREATE_REQ);
addRecSignal(GSN_SUB_REMOVE_REQ, &SumaParticipant::execSUB_REMOVE_REQ);
addRecSignal(GSN_SUB_START_REQ, &SumaParticipant::execSUB_START_REQ);
addRecSignal(GSN_SUB_STOP_REQ, &SumaParticipant::execSUB_STOP_REQ);
addRecSignal(GSN_SUB_SYNC_REQ, &SumaParticipant::execSUB_SYNC_REQ);
addRecSignal(GSN_SUB_STOP_CONF, &SumaParticipant::execSUB_STOP_CONF);
addRecSignal(GSN_SUB_STOP_REF, &SumaParticipant::execSUB_STOP_REF);
/**
* Dict interface
*/
//addRecSignal(GSN_LIST_TABLES_REF, &SumaParticipant::execLIST_TABLES_REF);
addRecSignal(GSN_LIST_TABLES_CONF, &SumaParticipant::execLIST_TABLES_CONF);
//addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFO_REF);
addRecSignal(GSN_GET_TABINFO_CONF, &SumaParticipant::execGET_TABINFO_CONF);
addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFOREF);
#if 0
......@@ -76,32 +68,6 @@ SumaParticipant::SumaParticipant(const Configuration & conf) :
addRecSignal(GSN_SUB_SYNC_CONTINUE_CONF,
&SumaParticipant::execSUB_SYNC_CONTINUE_CONF);
/**
* Trigger stuff
*/
addRecSignal(GSN_TRIG_ATTRINFO, &SumaParticipant::execTRIG_ATTRINFO);
addRecSignal(GSN_FIRE_TRIG_ORD, &SumaParticipant::execFIRE_TRIG_ORD);
addRecSignal(GSN_CREATE_TRIG_REF, &Suma::execCREATE_TRIG_REF);
addRecSignal(GSN_CREATE_TRIG_CONF, &Suma::execCREATE_TRIG_CONF);
addRecSignal(GSN_DROP_TRIG_REF, &Suma::execDROP_TRIG_REF);
addRecSignal(GSN_DROP_TRIG_CONF, &Suma::execDROP_TRIG_CONF);
addRecSignal(GSN_SUB_GCP_COMPLETE_REP,
&SumaParticipant::execSUB_GCP_COMPLETE_REP);
for( int i = 0; i < NO_OF_BUCKETS; i++) {
c_buckets[i].active = false;
c_buckets[i].handover = false;
c_buckets[i].handover_started = false;
c_buckets[i].handoverGCI = 0;
}
c_handoverToDo = false;
c_lastInconsistentGCI = RNIL;
c_lastCompleteGCI = RNIL;
c_nodeFailGCI = 0;
c_failedApiNodes.clear();
}
SumaParticipant::~SumaParticipant()
......@@ -110,7 +76,6 @@ SumaParticipant::~SumaParticipant()
Suma::Suma(const Configuration & conf) :
SumaParticipant(conf),
Restart(*this),
c_nodes(c_nodePool),
c_runningSubscriptions(c_subCoordinatorPool)
{
......@@ -120,29 +85,12 @@ Suma::Suma(const Configuration & conf) :
addRecSignal(GSN_NDB_STTOR, &Suma::execNDB_STTOR);
addRecSignal(GSN_DUMP_STATE_ORD, &Suma::execDUMP_STATE_ORD);
addRecSignal(GSN_READ_NODESCONF, &Suma::execREAD_NODESCONF);
addRecSignal(GSN_API_FAILREQ, &Suma::execAPI_FAILREQ);
addRecSignal(GSN_NODE_FAILREP, &Suma::execNODE_FAILREP);
addRecSignal(GSN_INCL_NODEREQ, &Suma::execINCL_NODEREQ);
addRecSignal(GSN_CONTINUEB, &Suma::execCONTINUEB);
addRecSignal(GSN_SIGNAL_DROPPED_REP, &Suma::execSIGNAL_DROPPED_REP, true);
addRecSignal(GSN_UTIL_SEQUENCE_CONF, &Suma::execUTIL_SEQUENCE_CONF);
addRecSignal(GSN_UTIL_SEQUENCE_REF, &Suma::execUTIL_SEQUENCE_REF);
addRecSignal(GSN_CREATE_SUBID_REQ,
&Suma::execCREATE_SUBID_REQ);
addRecSignal(GSN_SUB_CREATE_CONF, &Suma::execSUB_CREATE_CONF);
addRecSignal(GSN_SUB_CREATE_REF, &Suma::execSUB_CREATE_REF);
addRecSignal(GSN_SUB_SYNC_CONF, &Suma::execSUB_SYNC_CONF);
addRecSignal(GSN_SUB_SYNC_REF, &Suma::execSUB_SYNC_REF);
addRecSignal(GSN_SUB_START_CONF, &Suma::execSUB_START_CONF);
addRecSignal(GSN_SUB_START_REF, &Suma::execSUB_START_REF);
addRecSignal(GSN_SUMA_START_ME, &Suma::execSUMA_START_ME);
addRecSignal(GSN_SUMA_HANDOVER_REQ, &Suma::execSUMA_HANDOVER_REQ);
addRecSignal(GSN_SUMA_HANDOVER_CONF, &Suma::execSUMA_HANDOVER_CONF);
addRecSignal(GSN_SUB_GCP_COMPLETE_ACC,
&Suma::execSUB_GCP_COMPLETE_ACC);
}
Suma::~Suma()
......
......@@ -24,8 +24,6 @@ libndbapi_la_SOURCES = \
NdbOperationExec.cpp \
NdbScanOperation.cpp NdbScanFilter.cpp \
NdbIndexOperation.cpp \
NdbEventOperation.cpp \
NdbEventOperationImpl.cpp \
NdbApiSignal.cpp \
NdbRecAttr.cpp \
NdbUtil.cpp \
......
......@@ -28,7 +28,6 @@ Name: Ndb.cpp
#include "NdbImpl.hpp"
#include <NdbOperation.hpp>
#include <NdbTransaction.hpp>
#include <NdbEventOperation.hpp>
#include <NdbRecAttr.hpp>
#include <md5_hash.hpp>
#include <NdbSleep.h>
......@@ -1300,51 +1299,6 @@ Ndb::getSchemaFromInternalName(const char * internalName)
return ret;
}
NdbEventOperation* Ndb::createEventOperation(const char* eventName,
const int bufferLength)
{
NdbEventOperation* tOp;
tOp = new NdbEventOperation(this, eventName, bufferLength);
if (tOp == 0)
{
theError.code= 4000;
return NULL;
}
if (tOp->getState() != NdbEventOperation::EO_CREATED) {
theError.code= tOp->getNdbError().code;
delete tOp;
tOp = NULL;
}
//now we have to look up this event in dict
return tOp;
}
int Ndb::dropEventOperation(NdbEventOperation* op) {
delete op;
return 0;
}
NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle()
{
return theGlobalEventBufferHandle;
}
//void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs)
//{
//}
int
Ndb::pollEvents(int aMillisecondNumber)
{
return NdbEventOperation::wait(theGlobalEventBufferHandle,
aMillisecondNumber);
}
#ifdef VM_TRACE
#include <NdbMutex.h>
extern NdbMutex *ndb_print_state_mutex;
......
......@@ -610,132 +610,6 @@ NdbDictionary::Index::getObjectVersion() const {
return m_impl.m_version;
}
/*****************************************************************
* Event facade
*/
NdbDictionary::Event::Event(const char * name)
: m_impl(* new NdbEventImpl(* this))
{
setName(name);
}
NdbDictionary::Event::Event(const char * name, const Table& table)
: m_impl(* new NdbEventImpl(* this))
{
setName(name);
setTable(table);
}
NdbDictionary::Event::Event(NdbEventImpl & impl)
: m_impl(impl)
{
}
NdbDictionary::Event::~Event()
{
NdbEventImpl * tmp = &m_impl;
if(this != tmp){
delete tmp;
}
}
void
NdbDictionary::Event::setName(const char * name)
{
m_impl.setName(name);
}
const char *
NdbDictionary::Event::getName() const
{
return m_impl.getName();
}
void
NdbDictionary::Event::setTable(const Table& table)
{
m_impl.setTable(table);
}
void
NdbDictionary::Event::setTable(const char * table)
{
m_impl.setTable(table);
}
const char*
NdbDictionary::Event::getTableName() const
{
return m_impl.getTableName();
}
void
NdbDictionary::Event::addTableEvent(const TableEvent t)
{
m_impl.addTableEvent(t);
}
void
NdbDictionary::Event::setDurability(EventDurability d)
{
m_impl.setDurability(d);
}
NdbDictionary::Event::EventDurability
NdbDictionary::Event::getDurability() const
{
return m_impl.getDurability();
}
void
NdbDictionary::Event::addColumn(const Column & c){
NdbColumnImpl* col = new NdbColumnImpl;
(* col) = NdbColumnImpl::getImpl(c);
m_impl.m_columns.push_back(col);
}
void
NdbDictionary::Event::addEventColumn(unsigned attrId)
{
m_impl.m_attrIds.push_back(attrId);
}
void
NdbDictionary::Event::addEventColumn(const char * name)
{
const Column c(name);
addColumn(c);
}
void
NdbDictionary::Event::addEventColumns(int n, const char ** names)
{
for (int i = 0; i < n; i++)
addEventColumn(names[i]);
}
int NdbDictionary::Event::getNoOfEventColumns() const
{
return m_impl.getNoOfEventColumns();
}
NdbDictionary::Object::Status
NdbDictionary::Event::getObjectStatus() const
{
return m_impl.m_status;
}
int
NdbDictionary::Event::getObjectVersion() const
{
return m_impl.m_version;
}
void NdbDictionary::Event::print()
{
m_impl.print();
}
/*****************************************************************
* Dictionary facade
*/
......@@ -885,28 +759,6 @@ NdbDictionary::Dictionary::getIndexTable(const char * indexName,
return 0;
}
int
NdbDictionary::Dictionary::createEvent(const Event & ev)
{
return m_impl.createEvent(NdbEventImpl::getImpl(ev));
}
int
NdbDictionary::Dictionary::dropEvent(const char * eventName)
{
return m_impl.dropEvent(eventName);
}
const NdbDictionary::Event *
NdbDictionary::Dictionary::getEvent(const char * eventName)
{
NdbEventImpl * t = m_impl.getEvent(eventName);
if(t)
return t->m_facade;
return 0;
}
int
NdbDictionary::Dictionary::listObjects(List& list, Object::Type type)
{
......
This diff is collapsed.
......@@ -208,55 +208,6 @@ public:
NdbDictionary::Index * m_facade;
};
class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
public:
NdbEventImpl();
NdbEventImpl(NdbDictionary::Event &);
~NdbEventImpl();
void init();
void setName(const char * name);
const char * getName() const;
void setTable(const NdbDictionary::Table& table);
void setTable(const char * table);
const char * getTableName() const;
void addTableEvent(const NdbDictionary::Event::TableEvent t);
void setDurability(NdbDictionary::Event::EventDurability d);
NdbDictionary::Event::EventDurability getDurability() const;
void addEventColumn(const NdbColumnImpl &c);
int getNoOfEventColumns() const;
void print() {
ndbout_c("NdbEventImpl: id=%d, key=%d",
m_eventId,
m_eventKey);
};
Uint32 m_eventId;
Uint32 m_eventKey;
Uint32 m_tableId;
AttributeMask m_attrListBitmask;
//BaseString m_internalName;
BaseString m_externalName;
Uint32 mi_type;
NdbDictionary::Event::EventDurability m_dur;
NdbTableImpl *m_tableImpl;
BaseString m_tableName;
Vector<NdbColumnImpl *> m_columns;
Vector<unsigned> m_attrIds;
int m_bufferId;
NdbEventOperation *eventOp;
static NdbEventImpl & getImpl(NdbDictionary::Event & t);
static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
NdbDictionary::Event * m_facade;
};
class NdbDictInterface {
public:
NdbDictInterface(NdbError& err) : m_error(err) {
......@@ -294,24 +245,12 @@ public:
const NdbTableImpl &);
int createIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag);
int createEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
int dropTable(const NdbTableImpl &);
int dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int dropIndex(const NdbIndexImpl &, const NdbTableImpl &);
int dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int dropEvent(const NdbEventImpl &);
int dropEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
int executeSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
int executeSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int stopSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
int stopSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool fullyQualifiedNames);
int listObjects(NdbApiSignal* signal);
......@@ -357,17 +296,6 @@ private:
void execDROP_INDX_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_INDX_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execCREATE_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execCREATE_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_START_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_START_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_TABLE_DATA(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_GCP_COMPLETE_REP(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_STOP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execSUB_STOP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_TABLE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execDROP_TABLE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
void execLIST_TABLES_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
......@@ -402,12 +330,6 @@ public:
NdbTableImpl * getIndexTable(NdbIndexImpl * index,
NdbTableImpl * table);
int createEvent(NdbEventImpl &);
int dropEvent(const char * eventName);
int executeSubscribeEvent(NdbEventImpl &);
int stopSubscribeEvent(NdbEventImpl &);
int listObjects(List& list, NdbDictionary::Object::Type type);
int listIndexes(List& list, Uint32 indexId);
......@@ -418,8 +340,6 @@ public:
const char * tableName);
NdbIndexImpl * getIndex(const char * indexName,
NdbTableImpl * table);
NdbEventImpl * getEvent(const char * eventName);
NdbEventImpl * getEventImpl(const char * internalName);
const NdbError & getNdbError() const;
NdbError m_error;
......@@ -440,18 +360,6 @@ private:
Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
};
inline
NdbEventImpl &
NdbEventImpl::getImpl(const NdbDictionary::Event & t){
return t.m_impl;
}
inline
NdbEventImpl &
NdbEventImpl::getImpl(NdbDictionary::Event & t){
return t.m_impl;
}
inline
NdbColumnImpl &
NdbColumnImpl::getImpl(NdbDictionary::Column & t){
......
......@@ -21,7 +21,6 @@
#include <NdbOperation.hpp>
#include <NdbTransaction.hpp>
#include <NdbBlob.hpp>
#include "NdbEventOperationImpl.hpp"
static void
update(const NdbError & _err){
......@@ -73,10 +72,3 @@ NdbBlob::getNdbError() const {
update(theError);
return theError;
}
const
NdbError &
NdbEventOperationImpl::getNdbError() const {
update(m_error);
return m_error;
}
......@@ -661,29 +661,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
case GSN_CREATE_INDX_REF:
case GSN_DROP_INDX_CONF:
case GSN_DROP_INDX_REF:
case GSN_CREATE_EVNT_CONF:
case GSN_CREATE_EVNT_REF:
case GSN_DROP_EVNT_CONF:
case GSN_DROP_EVNT_REF:
case GSN_LIST_TABLES_CONF:
NdbDictInterface::execSignal(&theDictionary->m_receiver,
aSignal, ptr);
break;
case GSN_SUB_META_DATA:
case GSN_SUB_REMOVE_CONF:
case GSN_SUB_REMOVE_REF:
break; // ignore these signals
case GSN_SUB_GCP_COMPLETE_REP:
case GSN_SUB_START_CONF:
case GSN_SUB_START_REF:
case GSN_SUB_TABLE_DATA:
case GSN_SUB_STOP_CONF:
case GSN_SUB_STOP_REF:
NdbDictInterface::execSignal(&theDictionary->m_receiver,
aSignal, ptr);
break;
case GSN_DIHNDBTAMPER:
{
tFirstDataPtr = int2void(tFirstData);
......
......@@ -34,10 +34,6 @@
#include "NdbUtil.hpp"
#include <NdbBlob.hpp>
class NdbGlobalEventBufferHandle;
NdbGlobalEventBufferHandle *NdbGlobalEventBuffer_init(int);
void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *);
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
const char* aDataBase , const char* aSchema)
: theImpl(NULL)
......@@ -107,16 +103,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
if (theInitState == NotConstructed)
theInitState = NotInitialised;
{
NdbGlobalEventBufferHandle *h=
NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS);
if (h == NULL) {
ndbout_c("Failed NdbGlobalEventBuffer_init(%d)",NDB_MAX_ACTIVE_EVENTS);
exit(-1);
}
theGlobalEventBufferHandle = h;
}
DBUG_VOID_RETURN;
}
......@@ -132,8 +118,6 @@ Ndb::~Ndb()
DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this));
doDisconnect();
NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle);
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
TransporterFacade::instance()->close(theNdbBlockNumber, theFirstTransId);
}
......
......@@ -28,9 +28,6 @@ public:
HugoTransactions(const NdbDictionary::Table&,
const NdbDictionary::Index* idx = 0);
~HugoTransactions();
int createEvent(Ndb*);
int eventOperation(Ndb*, void* stats,
int records);
int loadTable(Ndb*,
int records,
int batch = 512,
......
......@@ -31,13 +31,11 @@ testSystemRestart \
testTimeout \
testTransactions \
testDeadlock \
test_event ndbapi_slow_select testReadPerf testLcp \
ndbapi_slow_select testReadPerf testLcp \
testPartitioning \
testBitfield \
DbCreate DbAsyncGenerator \
test_event_multi_table \
testSRBank \
test_event_merge
testSRBank
#flexTimedAsynch
#testBlobs
......
......@@ -767,285 +767,6 @@ HugoTransactions::fillTable(Ndb* pNdb,
return NDBT_OK;
}
int
HugoTransactions::createEvent(Ndb* pNdb){
char eventName[1024];
sprintf(eventName,"%s_EVENT",tab.getName());
NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
if (!myDict) {
g_err << "Dictionary not found "
<< pNdb->getNdbError().code << " "
<< pNdb->getNdbError().message << endl;
return NDBT_FAILED;
}
NdbDictionary::Event myEvent(eventName);
myEvent.setTable(tab.getName());
myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
// myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
// myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
// const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
for(int a = 0; a < tab.getNoOfColumns(); a++){
// myEvent.addEventColumn(_table->getColumn(a)->getName());
myEvent.addEventColumn(a);
}
int res = myDict->createEvent(myEvent); // Add event to database
if (res == 0)
myEvent.print();
else if (myDict->getNdbError().classification ==
NdbError::SchemaObjectExists)
{
g_info << "Event creation failed event exists\n";
res = myDict->dropEvent(eventName);
if (res) {
g_err << "Failed to drop event: "
<< myDict->getNdbError().code << " : "
<< myDict->getNdbError().message << endl;
return NDBT_FAILED;
}
// try again
res = myDict->createEvent(myEvent); // Add event to database
if (res) {
g_err << "Failed to create event (1): "
<< myDict->getNdbError().code << " : "
<< myDict->getNdbError().message << endl;
return NDBT_FAILED;
}
}
else
{
g_err << "Failed to create event (2): "
<< myDict->getNdbError().code << " : "
<< myDict->getNdbError().message << endl;
return NDBT_FAILED;
}
return NDBT_OK;
}
#include <NdbEventOperation.hpp>
#include "TestNdbEventOperation.hpp"
#include <NdbAutoPtr.hpp>
struct receivedEvent {
Uint32 pk;
Uint32 count;
Uint32 event;
};
int XXXXX = 0;
int
HugoTransactions::eventOperation(Ndb* pNdb, void* pstats,
int records) {
int myXXXXX = XXXXX++;
Uint32 i;
const char function[] = "HugoTransactions::eventOperation: ";
struct receivedEvent* recInsertEvent;
NdbAutoObjArrayPtr<struct receivedEvent>
p00( recInsertEvent = new struct receivedEvent[3*records] );
struct receivedEvent* recUpdateEvent = &recInsertEvent[records];
struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];
EventOperationStats &stats = *(EventOperationStats*)pstats;
stats.n_inserts = 0;
stats.n_deletes = 0;
stats.n_updates = 0;
stats.n_consecutive = 0;
stats.n_duplicates = 0;
stats.n_inconsistent_gcis = 0;
for (i = 0; i < records; i++) {
recInsertEvent[i].pk = 0xFFFFFFFF;
recInsertEvent[i].count = 0;
recInsertEvent[i].event = 0xFFFFFFFF;
recUpdateEvent[i].pk = 0xFFFFFFFF;
recUpdateEvent[i].count = 0;
recUpdateEvent[i].event = 0xFFFFFFFF;
recDeleteEvent[i].pk = 0xFFFFFFFF;
recDeleteEvent[i].count = 0;
recDeleteEvent[i].event = 0xFFFFFFFF;
}
NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
if (!myDict) {
g_err << function << "Event Creation failedDictionary not found\n";
return NDBT_FAILED;
}
int r = 0;
NdbEventOperation *pOp;
char eventName[1024];
sprintf(eventName,"%s_EVENT",tab.getName());
int noEventColumnName = tab.getNoOfColumns();
g_info << function << "create EventOperation\n";
pOp = pNdb->createEventOperation(eventName, 100);
if ( pOp == NULL ) {
g_err << function << "Event operation creation failed\n";
return NDBT_FAILED;
}
g_info << function << "get values\n";
NdbRecAttr* recAttr[1024];
NdbRecAttr* recAttrPre[1024];
const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
for (int a = 0; a < noEventColumnName; a++) {
recAttr[a] = pOp->getValue(_table->getColumn(a)->getName());
recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());
}
// set up the callbacks
g_info << function << "execute\n";
if (pOp->execute()) { // This starts changes to "start flowing"
g_err << function << "operation execution failed: \n";
g_err << pOp->getNdbError().code << " "
<< pOp->getNdbError().message << endl;
return NDBT_FAILED;
}
g_info << function << "ok\n";
int count = 0;
Uint32 last_inconsitant_gci = 0xEFFFFFF0;
while (r < records){
//printf("now waiting for event...\n");
int res = pNdb->pollEvents(1000); // wait for event or 1000 ms
if (res > 0) {
//printf("got data! %d\n", r);
int overrun;
while (pOp->next(&overrun) > 0) {
r++;
r += overrun;
count++;
Uint32 gci = pOp->getGCI();
Uint32 pk = recAttr[0]->u_32_value();
if (!pOp->isConsistent()) {
if (last_inconsitant_gci != gci) {
last_inconsitant_gci = gci;
stats.n_inconsistent_gcis++;
}
g_warning << "A node failure has occured and events might be missing\n";
}
g_info << function << "GCI " << gci << ": " << count;
struct receivedEvent* recEvent;
switch (pOp->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
stats.n_inserts++;
g_info << " INSERT: ";
recEvent = recInsertEvent;
break;
case NdbDictionary::Event::TE_DELETE:
stats.n_deletes++;
g_info << " DELETE: ";
recEvent = recDeleteEvent;
break;
case NdbDictionary::Event::TE_UPDATE:
stats.n_updates++;
g_info << " UPDATE: ";
recEvent = recUpdateEvent;
break;
case NdbDictionary::Event::TE_ALL:
abort();
}
if ((int)pk < records) {
recEvent[pk].pk = pk;
recEvent[pk].count++;
}
g_info << "overrun " << overrun << " pk " << pk;
for (i = 1; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value
g_info << " post[" << i << "]=";
if (recAttr[i]->isNULL() == 0) // we have a non-null value
g_info << recAttr[i]->u_32_value();
else // we have a null value
g_info << "NULL";
}
if (recAttrPre[i]->isNULL() >= 0) { // we have a value
g_info << " pre[" << i << "]=";
if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
g_info << recAttrPre[i]->u_32_value();
else // we have a null value
g_info << "NULL";
}
}
g_info << endl;
}
} else
;//printf("timed out\n");
}
// sleep ((XXXXX-myXXXXX)*2);
g_info << myXXXXX << "dropping event operation" << endl;
int res = pNdb->dropEventOperation(pOp);
if (res != 0) {
g_err << "operation execution failed\n";
return NDBT_FAILED;
}
g_info << myXXXXX << " ok" << endl;
if (stats.n_inserts > 0) {
stats.n_consecutive++;
}
if (stats.n_deletes > 0) {
stats.n_consecutive++;
}
if (stats.n_updates > 0) {
stats.n_consecutive++;
}
for (i = 0; i < (Uint32)records/3; i++) {
if (recInsertEvent[i].pk != i) {
stats.n_consecutive ++;
ndbout << "missing insert pk " << i << endl;
} else if (recInsertEvent[i].count > 1) {
ndbout << "duplicates insert pk " << i
<< " count " << recInsertEvent[i].count << endl;
stats.n_duplicates += recInsertEvent[i].count-1;
}
if (recUpdateEvent[i].pk != i) {
stats.n_consecutive ++;
ndbout << "missing update pk " << i << endl;
} else if (recUpdateEvent[i].count > 1) {
ndbout << "duplicates update pk " << i
<< " count " << recUpdateEvent[i].count << endl;
stats.n_duplicates += recUpdateEvent[i].count-1;
}
if (recDeleteEvent[i].pk != i) {
stats.n_consecutive ++;
ndbout << "missing delete pk " << i << endl;
} else if (recDeleteEvent[i].count > 1) {
ndbout << "duplicates delete pk " << i
<< " count " << recDeleteEvent[i].count << endl;
stats.n_duplicates += recDeleteEvent[i].count-1;
}
}
return NDBT_OK;
}
int
HugoTransactions::pkReadRecords(Ndb* pNdb,
int records,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment