Commit a92fae08 authored by unknown's avatar unknown

Bug#25387 - ndb: dbug assert in reference counting for event operations

- blob event operation not reference counted correctly, missing TE_ACTIVE
- add reference counting for blobs events
- make sure also blob event operations get TE_ACTIVE
- some minor cleanups + adjustment of dbug prints
parent b9a2f91b
......@@ -215,8 +215,6 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
DBUG_VOID_RETURN;
stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // ToDo? We should send stop signal here
if (theMainOp == NULL)
{
......@@ -428,7 +426,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
// create blob event operation
tBlobOp =
m_ndb->theEventBuffer->createEventOperation(*blobEvnt, m_error);
m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
if (tBlobOp == NULL)
DBUG_RETURN(NULL);
......@@ -561,6 +559,8 @@ NdbEventOperationImpl::execute_nolock()
m_state= EO_EXECUTING;
mi_type= m_eventImpl->mi_type;
m_ndb->theEventBuffer->add_op();
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
if (r == 0) {
if (theMainOp == NULL) {
......@@ -568,19 +568,24 @@ NdbEventOperationImpl::execute_nolock()
NdbEventOperationImpl* blob_op = theBlobOpList;
while (blob_op != NULL) {
r = blob_op->execute_nolock();
if (r != 0)
if (r != 0) {
break;
}
// blob event op now holds reference
// cleared by TE_STOP or TE_CLUSTER_FAILURE
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
blob_op = blob_op->m_next;
}
}
if (r == 0)
{
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
DBUG_RETURN(0);
}
}
//Error
m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
m_state= EO_ERROR;
mi_type= 0;
m_magic_number= 0;
......@@ -1582,6 +1587,33 @@ NdbEventBuffer::complete_outof_order_gcis()
ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI);
}
void
NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref)
{
NdbEventOperationImpl *dropped_ev_op = m_dropped_ev_op;
do
{
do
{
oid_ref = impl->m_oid;
insertDataL(impl, &data, ptr);
NdbEventOperationImpl* blob_op = impl->theBlobOpList;
while (blob_op != NULL)
{
oid_ref = blob_op->m_oid;
insertDataL(blob_op, &data, ptr);
blob_op = blob_op->m_next;
}
} while((impl = impl->m_next));
impl = dropped_ev_op;
dropped_ev_op = NULL;
} while (impl);
}
void
NdbEventBuffer::report_node_connected(Uint32 node_id)
{
......@@ -1606,21 +1638,8 @@ NdbEventBuffer::report_node_connected(Uint32 node_id)
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
DBUG_VOID_RETURN;
}
......@@ -1648,21 +1667,8 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
DBUG_VOID_RETURN;
}
......@@ -1693,21 +1699,8 @@ NdbEventBuffer::completeClusterFailed()
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
/**
* Release all GCI's with m_gci > gci
......@@ -1797,17 +1790,36 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{
case NdbDictionary::Event::_TE_NODE_FAILURE:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
DBUG_PRINT("info",
("_TE_NODE_FAILURE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
break;
case NdbDictionary::Event::_TE_ACTIVE:
op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri));
// internal event, do not relay to user
DBUG_PRINT("info",
("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
DBUG_RETURN_EVENT(0);
break;
case NdbDictionary::Event::_TE_CLUSTER_FAILURE:
op->m_node_bit_mask.clear();
DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
if (!op->m_node_bit_mask.isclear())
{
op->m_node_bit_mask.clear();
DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--;
DBUG_PRINT("info", ("_TE_CLUSTER_FAILURE: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
// blob event op, need to clear ref count in main op
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
}
break;
case NdbDictionary::Event::_TE_STOP:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
......@@ -1815,7 +1827,17 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{
DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
DBUG_PRINT("info", ("_TE_STOP: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
// blob event op, need to clear ref count in main op
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
}
break;
default:
......@@ -2639,10 +2661,10 @@ NdbEventBuffer::createEventOperation(const char* eventName,
}
NdbEventOperationImpl*
NdbEventBuffer::createEventOperation(NdbEventImpl& evnt,
NdbError &theError)
NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
NdbError &theError)
{
DBUG_ENTER("NdbEventBuffer::createEventOperation [evnt]");
DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
if (tOp == 0)
{
......
......@@ -436,8 +436,8 @@ public:
Vector<Gci_container_pod> m_active_gci;
NdbEventOperation *createEventOperation(const char* eventName,
NdbError &);
NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt,
NdbError &);
NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
NdbError &);
void dropEventOperation(NdbEventOperation *);
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
......@@ -541,6 +541,11 @@ public:
#endif
private:
void insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref);
int expand(unsigned sz);
// all allocated data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment