Commit e7e4dde5 authored by jonas@eel.(none)'s avatar jonas@eel.(none)

ndb - bug#25755

  Make sure subscriber is removed from list when n_subscribers is decreased
parent f7c6b133
...@@ -1431,17 +1431,26 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr, ...@@ -1431,17 +1431,26 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
if (r) if (r)
{ {
jam();
// we have to wait getting tab info // we have to wait getting tab info
DBUG_RETURN(1); DBUG_RETURN(1);
} }
if (tabPtr.p->setupTrigger(signal, *this)) if (tabPtr.p->setupTrigger(signal, *this))
{ {
jam();
// we have to wait for triggers to be setup // we have to wait for triggers to be setup
DBUG_RETURN(1); DBUG_RETURN(1);
} }
completeOneSubscriber(signal, tabPtr, subbPtr); int ret = completeOneSubscriber(signal, tabPtr, subbPtr);
if (ret == -1)
{
jam();
LocalDLList<Subscriber> subscribers(c_subscriberPool,
tabPtr.p->c_subscribers);
subscribers.release(subbPtr);
}
completeInitTable(signal, tabPtr); completeInitTable(signal, tabPtr);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1517,6 +1526,22 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr) ...@@ -1517,6 +1526,22 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
req->tableId = tableId; req->tableId = tableId;
DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId)); DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId));
if (ERROR_INSERTED(13031))
{
jam();
ndbout_c("HERE");
CLEAR_ERROR_INSERT_VALUE;
GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
ref->tableId = tableId;
ref->senderData = tabPtr.i;
ref->errorCode = GetTabInfoRef::TableNotDefined;
sendSignal(reference(), GSN_GET_TABINFOREF, signal,
GetTabInfoRef::SignalLength, JBB);
DBUG_RETURN(1);
}
ndbout_c("HARE");
sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
GetTabInfoReq::SignalLength, JBB); GetTabInfoReq::SignalLength, JBB);
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -1530,7 +1555,7 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr) ...@@ -1530,7 +1555,7 @@ Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void int
Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr) Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr)
{ {
jam(); jam();
...@@ -1540,19 +1565,22 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP ...@@ -1540,19 +1565,22 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP
(c_startup.m_restart_server_node_id == 0 || (c_startup.m_restart_server_node_id == 0 ||
tabPtr.p->m_state != Table::DROPPED)) tabPtr.p->m_state != Table::DROPPED))
{ {
jam();
sendSubStartRef(signal,subbPtr,tabPtr.p->m_error, sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
SubscriptionData::TableData); SubscriptionData::TableData);
tabPtr.p->n_subscribers--; tabPtr.p->n_subscribers--;
DBUG_RETURN(-1);
} }
else else
{ {
jam();
SubscriptionPtr subPtr; SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
subPtr.p->m_table_ptrI= tabPtr.i; subPtr.p->m_table_ptrI= tabPtr.i;
sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3, sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
SubscriptionData::TableData); SubscriptionData::TableData);
} }
DBUG_VOID_RETURN; DBUG_RETURN(0);
} }
void void
...@@ -1565,11 +1593,17 @@ Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr) ...@@ -1565,11 +1593,17 @@ Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr)
LocalDLList<Subscriber> subscribers(c_subscriberPool, LocalDLList<Subscriber> subscribers(c_subscriberPool,
tabPtr.p->c_subscribers); tabPtr.p->c_subscribers);
SubscriberPtr subbPtr; SubscriberPtr subbPtr;
for(subscribers.first(subbPtr); for(subscribers.first(subbPtr); !subbPtr.isNull();)
!subbPtr.isNull(); {
subscribers.next(subbPtr)) jam();
Ptr<Subscriber> tmp = subbPtr;
subscribers.next(subbPtr);
int ret = completeOneSubscriber(signal, tabPtr, tmp);
if (ret == -1)
{ {
completeOneSubscriber(signal, tabPtr, subbPtr); jam();
subscribers.release(tmp);
}
} }
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
......
...@@ -251,7 +251,7 @@ public: ...@@ -251,7 +251,7 @@ public:
SubscriberPtr subbPtr); SubscriberPtr subbPtr);
int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr); int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
void completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr); int completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr);
void completeAllSubscribers(Signal* signal, TablePtr tabPtr); void completeAllSubscribers(Signal* signal, TablePtr tabPtr);
void completeInitTable(Signal* signal, TablePtr tabPtr); void completeInitTable(Signal* signal, TablePtr tabPtr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment