fixed mem leak in subscriction handling in Suma

    debug printouts
    ndb documentation update
parent b87f4701
...@@ -1218,12 +1218,12 @@ public: ...@@ -1218,12 +1218,12 @@ public:
/** /**
* Drop a subscription to an event * Drop a subscription to an event
* *
* @param eventName * @param eventOp
* unique identifier of the event * Event operation
* *
* @return 0 on success * @return 0 on success
*/ */
int dropEventOperation(NdbEventOperation* eventName); int dropEventOperation(NdbEventOperation* eventOp);
/** /**
* Wait for an event to occur. Will return as soon as an event * Wait for an event to occur. Will return as soon as an event
......
...@@ -50,6 +50,17 @@ ...@@ -50,6 +50,17 @@
//#define EVENT_DEBUG //#define EVENT_DEBUG
//#define EVENT_PH3_DEBUG //#define EVENT_PH3_DEBUG
//#define EVENT_DEBUG2 //#define EVENT_DEBUG2
#if 0
#undef DBUG_ENTER
#undef DBUG_PRINT
#undef DBUG_RETURN
#undef DBUG_VOID_RETURN
#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
#endif
/** /**
* @todo: * @todo:
...@@ -112,15 +123,12 @@ Suma::getNodeGroupMembers(Signal* signal) { ...@@ -112,15 +123,12 @@ Suma::getNodeGroupMembers(Signal* signal) {
void void
Suma::execSTTOR(Signal* signal) { Suma::execSTTOR(Signal* signal) {
jamEntry(); jamEntry();
DBUG_ENTER("Suma::execSTTOR");
const Uint32 startphase = signal->theData[1]; const Uint32 startphase = signal->theData[1];
const Uint32 typeOfStart = signal->theData[7]; const Uint32 typeOfStart = signal->theData[7];
#ifdef NODEFAIL_DEBUG DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));
ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u",
startphase, typeOfStart);
#endif
if(startphase == 1){ if(startphase == 1){
jam(); jam();
...@@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) { ...@@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) {
g_subPtrI = subPtr.i; g_subPtrI = subPtr.i;
// sendSTTORRY(signal); // sendSTTORRY(signal);
#endif #endif
return; DBUG_VOID_RETURN;
} }
if(startphase == 5) { if(startphase == 5) {
...@@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) { ...@@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) {
for( int i = 0; i < NO_OF_BUCKETS; i++) { for( int i = 0; i < NO_OF_BUCKETS; i++) {
if (getResponsibleSumaNodeId(i) == refToNode(reference())) { if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
// I'm running this bucket // I'm running this bucket
#ifdef EVENT_DEBUG DBUG_PRINT("info",("bucket %u set to true", i));
ndbout_c("bucket %u set to true", i);
#endif
c_buckets[i].active = true; c_buckets[i].active = true;
} }
} }
...@@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) { ...@@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) {
c_masterNodeId == getOwnNodeId()) { c_masterNodeId == getOwnNodeId()) {
jam(); jam();
createSequence(signal); createSequence(signal);
return; DBUG_VOID_RETURN;
}//if }//if
}//if }//if
sendSTTORRY(signal); sendSTTORRY(signal);
return; DBUG_VOID_RETURN;
} }
void void
Suma::createSequence(Signal* signal) Suma::createSequence(Signal* signal)
{ {
jam(); jam();
DBUG_ENTER("Suma::createSequence");
UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
req->senderData = RNIL; req->senderData = RNIL;
req->sequenceId = SUMA_SEQUENCE; req->sequenceId = SUMA_SEQUENCE;
req->requestType = UtilSequenceReq::Create; req->requestType = UtilSequenceReq::Create;
#ifdef DEBUG_SUMA_SEQUENCE
ndbout_c("SUMA: Create sequence");
#endif
sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
signal, UtilSequenceReq::SignalLength, JBB); signal, UtilSequenceReq::SignalLength, JBB);
// execUTIL_SEQUENCE_CONF will call createSequenceReply() // execUTIL_SEQUENCE_CONF will call createSequenceReply()
DBUG_VOID_RETURN;
} }
void void
...@@ -379,7 +384,7 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) ...@@ -379,7 +384,7 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
} }
void void
SumaParticipant::sendSubStopReq(Signal *signal){ SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){
DBUG_ENTER("SumaParticipant::sendSubStopReq"); DBUG_ENTER("SumaParticipant::sendSubStopReq");
static bool remove_lock = false; static bool remove_lock = false;
jam(); jam();
...@@ -399,7 +404,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ ...@@ -399,7 +404,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if(remove_lock) { if(remove_lock && !unlock) {
jam(); jam();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -424,6 +429,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ ...@@ -424,6 +429,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){
void void
SumaParticipant::execSUB_STOP_CONF(Signal* signal){ SumaParticipant::execSUB_STOP_CONF(Signal* signal){
jamEntry(); jamEntry();
DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF");
SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();
...@@ -449,16 +455,17 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){ ...@@ -449,16 +455,17 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){
} }
} }
sendSubStopReq(signal); sendSubStopReq(signal,true);
DBUG_VOID_RETURN;
} }
void void
SumaParticipant::execSUB_STOP_REF(Signal* signal){ SumaParticipant::execSUB_STOP_REF(Signal* signal){
jamEntry(); jamEntry();
SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); DBUG_ENTER("SumaParticipant::execSUB_STOP_REF");
SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
Uint32 subscriptionId = ref->subscriptionId; Uint32 subscriptionId = ref->subscriptionId;
Uint32 subscriptionKey = ref->subscriptionKey; Uint32 subscriptionKey = ref->subscriptionKey;
Uint32 part = ref->part; Uint32 part = ref->part;
...@@ -845,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) ...@@ -845,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
{ {
jamEntry(); jamEntry();
DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
CRASH_INSERTION(13002); CRASH_INSERTION(13002);
UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
#ifdef DEBUG_SUMA_SEQUENCE
ndbout_c("SUMA: Create sequence conf");
#endif
if(conf->requestType == UtilSequenceReq::Create) { if(conf->requestType == UtilSequenceReq::Create) {
jam(); jam();
createSequenceReply(signal, conf, NULL); createSequenceReply(signal, conf, NULL);
return; DBUG_VOID_RETURN;
} }
Uint64 subId; Uint64 subId;
...@@ -874,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) ...@@ -874,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
CreateSubscriptionIdConf::SignalLength, JBB); CreateSubscriptionIdConf::SignalLength, JBB);
c_subscriberPool.release(subbPtr); c_subscriberPool.release(subbPtr);
DBUG_VOID_RETURN;
} }
void void
Suma::execUTIL_SEQUENCE_REF(Signal* signal) Suma::execUTIL_SEQUENCE_REF(Signal* signal)
{ {
jamEntry(); jamEntry();
DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
if(ref->requestType == UtilSequenceReq::Create) { if(ref->requestType == UtilSequenceReq::Create) {
jam(); jam();
createSequenceReply(signal, NULL, ref); createSequenceReply(signal, NULL, ref);
return; DBUG_VOID_RETURN;
} }
Uint32 subData = ref->senderData; Uint32 subData = ref->senderData;
...@@ -894,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal) ...@@ -894,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal)
c_subscriberPool.getPtr(subbPtr,subData); c_subscriberPool.getPtr(subbPtr,subData);
sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
c_subscriberPool.release(subbPtr); c_subscriberPool.release(subbPtr);
return; DBUG_VOID_RETURN;
}//execUTIL_SEQUENCE_REF() }//execUTIL_SEQUENCE_REF()
...@@ -2091,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){ ...@@ -2091,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){
void void
SumaParticipant::execSUB_START_REQ(Signal* signal){ SumaParticipant::execSUB_START_REQ(Signal* signal){
jamEntry(); jamEntry();
#ifdef NODEFAIL_DEBUG DBUG_ENTER("SumaParticipant::execSUB_START_REQ");
ndbout_c("Suma::execSUB_START_REQ");
#endif
CRASH_INSERTION(13013); CRASH_INSERTION(13013);
...@@ -2103,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ...@@ -2103,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
jam(); jam();
sendSubStartRef(signal, /** Error Code */ 0, true); sendSubStartRef(signal, /** Error Code */ 0, true);
return; DBUG_VOID_RETURN;
} }
// only allow other Suma's in the nodegroup to come through for restart purposes // only allow other Suma's in the nodegroup to come through for restart purposes
} }
...@@ -2124,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ...@@ -2124,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
if(!c_subscriptions.find(subPtr, key)){ if(!c_subscriptions.find(subPtr, key)){
jam(); jam();
sendSubStartRef(signal, /** Error Code */ 0); sendSubStartRef(signal, /** Error Code */ 0);
return; DBUG_VOID_RETURN;
} }
Ptr<SyncRecord> syncPtr; Ptr<SyncRecord> syncPtr;
...@@ -2135,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ...@@ -2135,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
ndbout_c("Locked"); ndbout_c("Locked");
#endif #endif
sendSubStartRef(signal, /** Error Code */ 0, true); sendSubStartRef(signal, /** Error Code */ 0, true);
return; DBUG_VOID_RETURN;
} }
syncPtr.p->m_locked = true; syncPtr.p->m_locked = true;
...@@ -2144,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ...@@ -2144,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
jam(); jam();
syncPtr.p->m_locked = false; syncPtr.p->m_locked = false;
sendSubStartRef(signal, /** Error Code */ 0); sendSubStartRef(signal, /** Error Code */ 0);
return; DBUG_VOID_RETURN;
} }
Uint32 type = subPtr.p->m_subscriptionType; Uint32 type = subPtr.p->m_subscriptionType;
...@@ -2211,6 +2217,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ...@@ -2211,6 +2217,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
break; break;
} }
ndbrequire(ok); ndbrequire(ok);
DBUG_VOID_RETURN;
} }
void void
...@@ -2963,6 +2970,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ ...@@ -2963,6 +2970,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
} }
} }
#endif #endif
DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref)));
sendSignal(ref, GSN_SUB_TABLE_DATA, signal, sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB, ptr, nptr); SubTableData::SignalLength, JBB, ptr, nptr);
data->logType = tmp; data->logType = tmp;
...@@ -3263,6 +3271,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep() ...@@ -3263,6 +3271,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep()
void void
SumaParticipant::execSUB_STOP_REQ(Signal* signal){ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
jamEntry(); jamEntry();
DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
CRASH_INSERTION(13019); CRASH_INSERTION(13019);
...@@ -3292,7 +3301,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ ...@@ -3292,7 +3301,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
SubStopConf::SignalLength, JBB); SubStopConf::SignalLength, JBB);
removeSubscribersOnNode(signal, refToNode(subscriberRef)); removeSubscribersOnNode(signal, refToNode(subscriberRef));
return; DBUG_VOID_RETURN;
} }
if(!c_subscriptions.find(subPtr, key)){ if(!c_subscriptions.find(subPtr, key)){
...@@ -3333,7 +3342,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ ...@@ -3333,7 +3342,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
if (!found) { if (!found) {
jam(); jam();
sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND); sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
return; DBUG_VOID_RETURN;
} }
} }
...@@ -3346,11 +3355,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ ...@@ -3346,11 +3355,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
if (syncPtr.p->m_locked) { if (syncPtr.p->m_locked) {
jam(); jam();
sendSubStopRef(signal, /** Error Code */ 0, true); sendSubStopRef(signal, /** Error Code */ 0, true);
return; DBUG_VOID_RETURN;
} }
syncPtr.p->m_locked = true; syncPtr.p->m_locked = true;
syncPtr.p->startDropTrigger(signal); syncPtr.p->startDropTrigger(signal);
DBUG_VOID_RETURN;
} }
void void
......
...@@ -376,7 +376,7 @@ public: ...@@ -376,7 +376,7 @@ public:
void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, void sendSubStartComplete(Signal*, SubscriberPtr, Uint32,
SubscriptionData::Part); SubscriptionData::Part);
void sendSubStopComplete(Signal*, SubscriberPtr); void sendSubStopComplete(Signal*, SubscriberPtr);
void sendSubStopReq(Signal* signal); void sendSubStopReq(Signal* signal, bool unlock= false);
void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr); void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment