diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 83b227860cf30834a0fc6cf0a83a471d90b3bc1e..93ae14a8d50ce74e7de5b6a4bdf3165fd30ec765 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -215,8 +215,6 @@ NdbEventOperationImpl::~NdbEventOperationImpl() DBUG_VOID_RETURN; stop(); - // m_bufferHandle->dropSubscribeEvent(m_bufferId); - ; // ToDo? We should send stop signal here if (theMainOp == NULL) { @@ -428,7 +426,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n) // create blob event operation tBlobOp = - m_ndb->theEventBuffer->createEventOperation(*blobEvnt, m_error); + m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error); if (tBlobOp == NULL) DBUG_RETURN(NULL); @@ -561,6 +559,8 @@ NdbEventOperationImpl::execute_nolock() m_state= EO_EXECUTING; mi_type= m_eventImpl->mi_type; m_ndb->theEventBuffer->add_op(); + m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this); if (r == 0) { if (theMainOp == NULL) { @@ -568,19 +568,24 @@ NdbEventOperationImpl::execute_nolock() NdbEventOperationImpl* blob_op = theBlobOpList; while (blob_op != NULL) { r = blob_op->execute_nolock(); - if (r != 0) + if (r != 0) { break; + } + // blob event op now holds reference + // cleared by TE_STOP or TE_CLUSTER_FAILURE + m_ref_count++; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); blob_op = blob_op->m_next; } } if (r == 0) { - m_ref_count++; - DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); DBUG_RETURN(0); } } //Error + m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); m_state= EO_ERROR; mi_type= 0; m_magic_number= 0; @@ -1582,6 +1587,33 @@ NdbEventBuffer::complete_outof_order_gcis() ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI); } +void +NdbEventBuffer::insert_event(NdbEventOperationImpl* impl, + SubTableData &data, + LinearSectionPtr *ptr, + Uint32 &oid_ref) +{ + NdbEventOperationImpl *dropped_ev_op = m_dropped_ev_op; + do + { + do + { + oid_ref = impl->m_oid; + insertDataL(impl, &data, ptr); + NdbEventOperationImpl* blob_op = impl->theBlobOpList; + while (blob_op != NULL) + { + oid_ref = blob_op->m_oid; + insertDataL(blob_op, &data, ptr); + blob_op = blob_op->m_next; + } + } while((impl = impl->m_next)); + impl = dropped_ev_op; + dropped_ev_op = NULL; + } while (impl); +} + + void NdbEventBuffer::report_node_connected(Uint32 node_id) { @@ -1606,21 +1638,8 @@ NdbEventBuffer::report_node_connected(Uint32 node_id) /** * Insert this event for each operation */ - { - // no need to lock()/unlock(), receive thread calls this - NdbEventOperationImpl* impl = &op->m_impl; - do if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } while((impl = impl->m_next)); - for (impl = m_dropped_ev_op; impl; impl = impl->m_next) - if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } - } + // no need to lock()/unlock(), receive thread calls this + insert_event(&op->m_impl, data, ptr, data.senderData); DBUG_VOID_RETURN; } @@ -1648,21 +1667,8 @@ NdbEventBuffer::report_node_failure(Uint32 node_id) /** * Insert this event for each operation */ - { - // no need to lock()/unlock(), receive thread calls this - NdbEventOperationImpl* impl = &op->m_impl; - do if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } while((impl = impl->m_next)); - for (impl = m_dropped_ev_op; impl; impl = impl->m_next) - if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } - } + // no need to lock()/unlock(), receive thread calls this + insert_event(&op->m_impl, data, ptr, data.senderData); DBUG_VOID_RETURN; } @@ -1693,21 +1699,8 @@ NdbEventBuffer::completeClusterFailed() /** * Insert this event for each operation */ - { - // no need to lock()/unlock(), receive thread calls this - NdbEventOperationImpl* impl = &op->m_impl; - do if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } while((impl = impl->m_next)); - for (impl = m_dropped_ev_op; impl; impl = impl->m_next) - if (!impl->m_node_bit_mask.isclear()) - { - data.senderData = impl->m_oid; - insertDataL(impl, &data, ptr); - } - } + // no need to lock()/unlock(), receive thread calls this + insert_event(&op->m_impl, data, ptr, data.senderData); /** * Release all GCI's with m_gci > gci @@ -1797,17 +1790,36 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, { case NdbDictionary::Event::_TE_NODE_FAILURE: op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri)); + DBUG_PRINT("info", + ("_TE_NODE_FAILURE: m_ref_count: %u for op: %p id: %u", + op->m_ref_count, op, SubTableData::getNdbdNodeId(ri))); break; case NdbDictionary::Event::_TE_ACTIVE: op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri)); // internal event, do not relay to user + DBUG_PRINT("info", + ("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u", + op->m_ref_count, op, SubTableData::getNdbdNodeId(ri))); DBUG_RETURN_EVENT(0); break; case NdbDictionary::Event::_TE_CLUSTER_FAILURE: - op->m_node_bit_mask.clear(); - DBUG_ASSERT(op->m_ref_count > 0); - op->m_ref_count--; - DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + if (!op->m_node_bit_mask.isclear()) + { + op->m_node_bit_mask.clear(); + DBUG_ASSERT(op->m_ref_count > 0); + op->m_ref_count--; + DBUG_PRINT("info", ("_TE_CLUSTER_FAILURE: m_ref_count: %u for op: %p", + op->m_ref_count, op)); + if (op->theMainOp) + { + // blob event op, need to clear ref count in main op + DBUG_ASSERT(op->m_ref_count == 0); + DBUG_ASSERT(op->theMainOp->m_ref_count > 0); + op->theMainOp->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", + op->theMainOp->m_ref_count, op->theMainOp)); + } + } break; case NdbDictionary::Event::_TE_STOP: op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri)); @@ -1815,7 +1827,17 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, { DBUG_ASSERT(op->m_ref_count > 0); op->m_ref_count--; - DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); + DBUG_PRINT("info", ("_TE_STOP: m_ref_count: %u for op: %p", + op->m_ref_count, op)); + if (op->theMainOp) + { + // blob event op, need to clear ref count in main op + DBUG_ASSERT(op->m_ref_count == 0); + DBUG_ASSERT(op->theMainOp->m_ref_count > 0); + op->theMainOp->m_ref_count--; + DBUG_PRINT("info", ("m_ref_count: %u for op: %p", + op->theMainOp->m_ref_count, op->theMainOp)); + } } break; default: @@ -2639,10 +2661,10 @@ NdbEventBuffer::createEventOperation(const char* eventName, } NdbEventOperationImpl* -NdbEventBuffer::createEventOperation(NdbEventImpl& evnt, - NdbError &theError) +NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt, + NdbError &theError) { - DBUG_ENTER("NdbEventBuffer::createEventOperation [evnt]"); + DBUG_ENTER("NdbEventBuffer::createEventOperationImpl"); NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt); if (tOp == 0) { diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index ef25c8f48ecd3e07f93844fbf1b2ad84f0fee320..b68aa8036906b31134d66acd111f941b74c27d63 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -436,8 +436,8 @@ public: Vector<Gci_container_pod> m_active_gci; NdbEventOperation *createEventOperation(const char* eventName, NdbError &); - NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt, - NdbError &); + NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt, + NdbError &); void dropEventOperation(NdbEventOperation *); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); @@ -541,6 +541,11 @@ public: #endif private: + void insert_event(NdbEventOperationImpl* impl, + SubTableData &data, + LinearSectionPtr *ptr, + Uint32 &oid_ref); + int expand(unsigned sz); // all allocated data