Commit e618e271 authored by unknown's avatar unknown

ndb - bug#28161

  fix commit triggers with DD but not using DD


storage/ndb/include/kernel/signaldata/TupCommit.hpp:
  add diskpage
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  add diskpage
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  pass disk/nodisk down detachedtrigger-path
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  use TUPCOMMIT req for signaling diskpage back and forth
    so that I can init pointers (VM_TRACE) when from LQH
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  remove niclude
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  only derefence disk pointer if needed
parent 2df58d26
...@@ -35,7 +35,7 @@ class TupCommitReq { ...@@ -35,7 +35,7 @@ class TupCommitReq {
friend bool printTUPCOMMITREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo); friend bool printTUPCOMMITREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
public: public:
STATIC_CONST( SignalLength = 3 ); STATIC_CONST( SignalLength = 4 );
private: private:
...@@ -45,6 +45,7 @@ private: ...@@ -45,6 +45,7 @@ private:
Uint32 opPtr; Uint32 opPtr;
Uint32 gci; Uint32 gci;
Uint32 hashValue; Uint32 hashValue;
Uint32 diskpage;
}; };
#endif #endif
...@@ -6348,6 +6348,7 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) ...@@ -6348,6 +6348,7 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
tupCommitReq->opPtr = sig0; tupCommitReq->opPtr = sig0;
tupCommitReq->gci = regTcPtr.p->gci; tupCommitReq->gci = regTcPtr.p->gci;
tupCommitReq->hashValue = regTcPtr.p->hashValue; tupCommitReq->hashValue = regTcPtr.p->hashValue;
tupCommitReq->diskpage = RNIL;
EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal, EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal,
TupCommitReq::SignalLength); TupCommitReq::SignalLength);
......
...@@ -2126,7 +2126,8 @@ private: ...@@ -2126,7 +2126,8 @@ private:
#endif #endif
void checkDetachedTriggers(KeyReqStruct *req_struct, void checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* regOperPtr, Operationrec* regOperPtr,
Tablerec* regTablePtr); Tablerec* regTablePtr,
bool disk);
void fireImmediateTriggers(KeyReqStruct *req_struct, void fireImmediateTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList, DLList<TupTriggerData>& triggerList,
...@@ -2138,7 +2139,8 @@ private: ...@@ -2138,7 +2139,8 @@ private:
void fireDetachedTriggers(KeyReqStruct *req_struct, void fireDetachedTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList, DLList<TupTriggerData>& triggerList,
Operationrec* regOperPtr); Operationrec* regOperPtr,
bool disk);
void executeTriggers(KeyReqStruct *req_struct, void executeTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList, DLList<TupTriggerData>& triggerList,
...@@ -2146,7 +2148,8 @@ private: ...@@ -2146,7 +2148,8 @@ private:
void executeTrigger(KeyReqStruct *req_struct, void executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* trigPtr, TupTriggerData* trigPtr,
Operationrec* regOperPtr); Operationrec* regOperPtr,
bool disk = true);
bool readTriggerInfo(TupTriggerData* trigPtr, bool readTriggerInfo(TupTriggerData* trigPtr,
Operationrec* regOperPtr, Operationrec* regOperPtr,
...@@ -2157,7 +2160,8 @@ private: ...@@ -2157,7 +2160,8 @@ private:
Uint32* afterBuffer, Uint32* afterBuffer,
Uint32& noAfterWords, Uint32& noAfterWords,
Uint32* beforeBuffer, Uint32* beforeBuffer,
Uint32& noBeforeWords); Uint32& noBeforeWords,
bool disk);
void sendTrigAttrInfo(Signal* signal, void sendTrigAttrInfo(Signal* signal,
Uint32* data, Uint32* data,
......
...@@ -358,6 +358,7 @@ Dbtup::disk_page_commit_callback(Signal* signal, ...@@ -358,6 +358,7 @@ Dbtup::disk_page_commit_callback(Signal* signal,
tupCommitReq->opPtr= opPtrI; tupCommitReq->opPtr= opPtrI;
tupCommitReq->hashValue= hash_value; tupCommitReq->hashValue= hash_value;
tupCommitReq->gci= gci; tupCommitReq->gci= gci;
tupCommitReq->diskpage = page_id;
regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0; regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
regOperPtr.p->m_commit_disk_callback_page= page_id; regOperPtr.p->m_commit_disk_callback_page= page_id;
...@@ -388,14 +389,15 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal, ...@@ -388,14 +389,15 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
c_operation_pool.getPtr(regOperPtr, opPtrI); c_operation_pool.getPtr(regOperPtr, opPtrI);
c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci); c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr(); TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
tupCommitReq->opPtr= opPtrI; tupCommitReq->opPtr= opPtrI;
tupCommitReq->hashValue= hash_value; tupCommitReq->hashValue= hash_value;
tupCommitReq->gci= gci; tupCommitReq->gci= gci;
tupCommitReq->diskpage = page;
Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0); ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
regOperPtr.p->op_struct.m_wait_log_buffer= 0; regOperPtr.p->op_struct.m_wait_log_buffer= 0;
m_global_page_pool.getPtr(m_pgman.m_ptr, page); m_global_page_pool.getPtr(m_pgman.m_ptr, page);
...@@ -480,6 +482,15 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -480,6 +482,15 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
req_struct.signal= signal; req_struct.signal= signal;
req_struct.hash_value= hash_value; req_struct.hash_value= hash_value;
req_struct.gci= gci; req_struct.gci= gci;
regOperPtr.p->m_commit_disk_callback_page = tupCommitReq->diskpage;
#ifdef VM_TRACE
if (tupCommitReq->diskpage == RNIL)
{
m_pgman.m_ptr.setNull();
req_struct.m_disk_page_ptr.setNull();
}
#endif
ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec); ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
...@@ -628,8 +639,10 @@ skip_disk: ...@@ -628,8 +639,10 @@ skip_disk:
/** /**
* Perform "real" commit * Perform "real" commit
*/ */
Uint32 disk = regOperPtr.p->m_commit_disk_callback_page;
set_change_mask_info(&req_struct, regOperPtr.p); set_change_mask_info(&req_struct, regOperPtr.p);
checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p); checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p,
disk != RNIL);
if(regOperPtr.p->op_struct.op_type != ZDELETE) if(regOperPtr.p->op_struct.op_type != ZDELETE)
{ {
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "AttributeOffset.hpp" #include "AttributeOffset.hpp"
#include <AttributeHeader.hpp> #include <AttributeHeader.hpp>
#include <Interpreter.hpp> #include <Interpreter.hpp>
#include <signaldata/TupCommit.hpp>
#include <signaldata/TupKey.hpp> #include <signaldata/TupKey.hpp>
#include <signaldata/AttrInfo.hpp> #include <signaldata/AttrInfo.hpp>
#include <NdbSqlUtil.hpp> #include <NdbSqlUtil.hpp>
......
...@@ -459,7 +459,8 @@ void Dbtup::checkDeferredTriggers(Signal* signal, ...@@ -459,7 +459,8 @@ void Dbtup::checkDeferredTriggers(Signal* signal,
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* regOperPtr, Operationrec* regOperPtr,
Tablerec* regTablePtr) Tablerec* regTablePtr,
bool disk)
{ {
Uint32 save_type = regOperPtr->op_struct.op_type; Uint32 save_type = regOperPtr->op_struct.op_type;
Tuple_header *save_ptr = req_struct->m_tuple_ptr; Tuple_header *save_ptr = req_struct->m_tuple_ptr;
...@@ -505,7 +506,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -505,7 +506,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// If any fired immediate insert trigger then fetch after tuple // If any fired immediate insert trigger then fetch after tuple
fireDetachedTriggers(req_struct, fireDetachedTriggers(req_struct,
regTablePtr->subscriptionInsertTriggers, regTablePtr->subscriptionInsertTriggers,
regOperPtr); regOperPtr, disk);
break; break;
case(ZDELETE): case(ZDELETE):
ljam(); ljam();
...@@ -519,7 +520,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -519,7 +520,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// FIRETRIGORD with the before tuple // FIRETRIGORD with the before tuple
fireDetachedTriggers(req_struct, fireDetachedTriggers(req_struct,
regTablePtr->subscriptionDeleteTriggers, regTablePtr->subscriptionDeleteTriggers,
regOperPtr); regOperPtr, disk);
break; break;
case(ZUPDATE): case(ZUPDATE):
ljam(); ljam();
...@@ -533,7 +534,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct, ...@@ -533,7 +534,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// and send two FIRETRIGORD one with before tuple and one with after tuple // and send two FIRETRIGORD one with before tuple and one with after tuple
fireDetachedTriggers(req_struct, fireDetachedTriggers(req_struct,
regTablePtr->subscriptionUpdateTriggers, regTablePtr->subscriptionUpdateTriggers,
regOperPtr); regOperPtr, disk);
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
...@@ -591,7 +592,8 @@ Dbtup::fireDeferredTriggers(Signal* signal, ...@@ -591,7 +592,8 @@ Dbtup::fireDeferredTriggers(Signal* signal,
void void
Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct, Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList, DLList<TupTriggerData>& triggerList,
Operationrec* const regOperPtr) Operationrec* const regOperPtr,
bool disk)
{ {
TriggerPtr trigPtr; TriggerPtr trigPtr;
...@@ -612,7 +614,8 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct, ...@@ -612,7 +614,8 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
ljam(); ljam();
executeTrigger(req_struct, executeTrigger(req_struct,
trigPtr.p, trigPtr.p,
regOperPtr); regOperPtr,
disk);
} }
triggerList.next(trigPtr); triggerList.next(trigPtr);
} }
...@@ -636,7 +639,8 @@ void Dbtup::executeTriggers(KeyReqStruct *req_struct, ...@@ -636,7 +639,8 @@ void Dbtup::executeTriggers(KeyReqStruct *req_struct,
void Dbtup::executeTrigger(KeyReqStruct *req_struct, void Dbtup::executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* const trigPtr, TupTriggerData* const trigPtr,
Operationrec* const regOperPtr) Operationrec* const regOperPtr,
bool disk)
{ {
/** /**
* The block below does not work together with GREP. * The block below does not work together with GREP.
...@@ -703,7 +707,8 @@ void Dbtup::executeTrigger(KeyReqStruct *req_struct, ...@@ -703,7 +707,8 @@ void Dbtup::executeTrigger(KeyReqStruct *req_struct,
afterBuffer, afterBuffer,
noAfterWords, noAfterWords,
beforeBuffer, beforeBuffer,
noBeforeWords)) { noBeforeWords,
disk)) {
ljam(); ljam();
return; return;
} }
...@@ -806,9 +811,9 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -806,9 +811,9 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
Uint32* const afterBuffer, Uint32* const afterBuffer,
Uint32& noAfterWords, Uint32& noAfterWords,
Uint32* const beforeBuffer, Uint32* const beforeBuffer,
Uint32& noBeforeWords) Uint32& noBeforeWords,
bool disk)
{ {
//XXX this will not work with varsize attributes...
noAfterWords = 0; noAfterWords = 0;
noBeforeWords = 0; noBeforeWords = 0;
Uint32 readBuffer[MAX_ATTRIBUTES_IN_TABLE]; Uint32 readBuffer[MAX_ATTRIBUTES_IN_TABLE];
...@@ -841,8 +846,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -841,8 +846,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
c_undo_buffer.get_ptr(&req_struct->prevOpPtr.p->m_copy_tuple_location); c_undo_buffer.get_ptr(&req_struct->prevOpPtr.p->m_copy_tuple_location);
} }
if (regTabPtr->need_expand()) if (regTabPtr->need_expand(disk))
prepare_read(req_struct, regTabPtr, true); prepare_read(req_struct, regTabPtr, disk);
int ret = readAttributes(req_struct, int ret = readAttributes(req_struct,
&tableDescriptor[regTabPtr->readKeyArray].tabDescr, &tableDescriptor[regTabPtr->readKeyArray].tabDescr,
...@@ -937,8 +942,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, ...@@ -937,8 +942,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
req_struct->m_tuple_ptr= (Tuple_header*)ptr; req_struct->m_tuple_ptr= (Tuple_header*)ptr;
} }
if (regTabPtr->need_expand()) // no disk if (regTabPtr->need_expand(disk))
prepare_read(req_struct, regTabPtr, true); prepare_read(req_struct, regTabPtr, disk);
int ret = readAttributes(req_struct, int ret = readAttributes(req_struct,
&readBuffer[0], &readBuffer[0],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment