Commit 6db8fa17 authored by unknown's avatar unknown

ndb dd

  Fix SR bug that extent pages was scanned before undo was run
  Fix bug wrt page flushing/tsman and tup's dirty page list


storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  remove some unused code
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Add LCP_PREPARE to pgman
  Change order between TSMAN/LGMAN START_RECREQ
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/diskpage.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/lgman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/src/kernel/vm/DLFifoList.hpp:
  Fix dd SR + dd free space bugs
storage/ndb/test/tools/hugoLoad.cpp:
  Fix dd SR + dd free space bugs
storage/ndb/tools/delete_all.cpp:
  Fix dd SR + dd free space bugs
parent b33151a1
...@@ -29,7 +29,8 @@ private: ...@@ -29,7 +29,8 @@ private:
STATS_LOOP = 0, STATS_LOOP = 0,
BUSY_LOOP = 1, BUSY_LOOP = 1,
CLEANUP_LOOP = 2, CLEANUP_LOOP = 2,
LCP_LOOP = 3 LCP_LOOP = 3,
LCP_PREPARE = 4
}; };
}; };
......
...@@ -1009,8 +1009,6 @@ public: ...@@ -1009,8 +1009,6 @@ public:
LCP_SR_STARTED = 10, LCP_SR_STARTED = 10,
LCP_SR_COMPLETED = 11 LCP_SR_COMPLETED = 11
}; };
LcpLocRecord m_acc;
LcpLocRecord m_tup;
LcpState lcpState; LcpState lcpState;
bool firstFragmentFlag; bool firstFragmentFlag;
...@@ -1028,6 +1026,7 @@ public: ...@@ -1028,6 +1026,7 @@ public:
bool reportEmpty; bool reportEmpty;
NdbNodeBitmask m_EMPTY_LCP_REQ; NdbNodeBitmask m_EMPTY_LCP_REQ;
Uint32 m_error;
Uint32 m_outstanding; Uint32 m_outstanding;
}; // Size 76 bytes }; // Size 76 bytes
typedef Ptr<LcpRecord> LcpRecordPtr; typedef Ptr<LcpRecord> LcpRecordPtr;
...@@ -2248,10 +2247,6 @@ private: ...@@ -2248,10 +2247,6 @@ private:
bool checkLcpStarted(Signal* signal); bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal); void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal); void getNextFragForLcp(Signal* signal);
void initLcpLocAcc(Signal* signal, Uint32 fragId);
void initLcpLocTup(Signal* signal, Uint32 fragId);
void releaseLocalLcps(Signal* signal);
void seizeLcpLoc(Signal* signal);
void sendAccContOp(Signal* signal); void sendAccContOp(Signal* signal);
void sendStartLcp(Signal* signal); void sendStartLcp(Signal* signal);
void setLogTail(Signal* signal, Uint32 keepGci); void setLogTail(Signal* signal, Uint32 keepGci);
...@@ -2283,7 +2278,6 @@ private: ...@@ -2283,7 +2278,6 @@ private:
void checkNewMbyte(Signal* signal); void checkNewMbyte(Signal* signal);
void checkReadExecSr(Signal* signal); void checkReadExecSr(Signal* signal);
void checkScanTcCompleted(Signal* signal); void checkScanTcCompleted(Signal* signal);
void checkSrCompleted(Signal* signal);
void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place); void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place);
void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place); void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place);
void deleteFragrec(Uint32 fragId); void deleteFragrec(Uint32 fragId);
...@@ -2302,7 +2296,6 @@ private: ...@@ -2302,7 +2296,6 @@ private:
void initialiseFragrec(Signal* signal); void initialiseFragrec(Signal* signal);
void initialiseGcprec(Signal* signal); void initialiseGcprec(Signal* signal);
void initialiseLcpRec(Signal* signal); void initialiseLcpRec(Signal* signal);
void initialiseLcpLocrec(Signal* signal);
void initialiseLfo(Signal* signal); void initialiseLfo(Signal* signal);
void initialiseLogFile(Signal* signal); void initialiseLogFile(Signal* signal);
void initialiseLogPage(Signal* signal); void initialiseLogPage(Signal* signal);
...@@ -2346,7 +2339,6 @@ private: ...@@ -2346,7 +2339,6 @@ private:
void releaseActiveCopy(Signal* signal); void releaseActiveCopy(Signal* signal);
void releaseAddfragrec(Signal* signal); void releaseAddfragrec(Signal* signal);
void releaseFragrec(); void releaseFragrec();
void releaseLcpLoc(Signal* signal);
void releaseOprec(Signal* signal); void releaseOprec(Signal* signal);
void releasePageRef(Signal* signal); void releasePageRef(Signal* signal);
void releaseMmPages(Signal* signal); void releaseMmPages(Signal* signal);
......
...@@ -10923,9 +10923,38 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal) ...@@ -10923,9 +10923,38 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
tabptr.i = ref->tableId; tabptr.i = ref->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec); ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED; ndbrequire(lcpPtr.p->m_outstanding);
lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED; lcpPtr.p->m_outstanding--;
contChkpNextFragLab(signal);
/**
* Only BACKUP is allowed to ref LCP_PREPARE
*/
ndbrequire(refToBlock(signal->getSendersBlockRef()) == BACKUP);
lcpPtr.p->m_error = ref->errorCode;
if (lcpPtr.p->m_outstanding == 0)
{
jam();
if(lcpPtr.p->firstFragmentFlag)
{
jam();
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
lcpPtr.p->firstFragmentFlag= false;
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
/**
* First fragment mean that last LCP is complete :-)
*/
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
jamEntry();
}
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
contChkpNextFragLab(signal);
}
} }
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
...@@ -10945,72 +10974,86 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal) ...@@ -10945,72 +10974,86 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
fragptr.i = lcpPtr.p->currentFragment.fragPtrI; fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
c_fragment_pool.getPtr(fragptr); c_fragment_pool.getPtr(fragptr);
ndbrequire(conf->tableId == fragptr.p->tabRef);
ndbrequire(conf->fragmentId == fragptr.p->fragId);
lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS; if (refToBlock(signal->getSendersBlockRef()) != PGMAN)
lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::WAIT_LCPHOLDOP; {
ndbrequire(conf->tableId == fragptr.p->tabRef);
lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP; ndbrequire(conf->fragmentId == fragptr.p->fragId);
lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::HOLDOP_READY; }
/* ---------------------------------------------------------------------- ndbrequire(lcpPtr.p->m_outstanding);
* UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE lcpPtr.p->m_outstanding--;
* ACTIVATING THE FRAGMENT AGAIN. if (lcpPtr.p->m_outstanding == 0)
* --------------------------------------------------------------------- */
ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
fragptr.p->maxGciInLcp = fragptr.p->newestGci;
fragptr.p->maxGciCompletedInLcp = cnewestCompletedGci;
{ {
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend(); jam();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(DBTUP, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
if(lcpPtr.p->firstFragmentFlag) if(lcpPtr.p->firstFragmentFlag)
{ {
jam(); jam();
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
lcpPtr.p->firstFragmentFlag= false; lcpPtr.p->firstFragmentFlag= false;
*ord = lcpPtr.p->currentFragment.lcpFragOrd; *ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length()); EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry(); jamEntry();
/** /**
* First fragment mean that last LCP is complete :-) * First fragment mean that last LCP is complete :-)
*/ */
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length()); EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
jamEntry(); jamEntry();
} }
}
if (lcpPtr.p->m_error)
{
jam();
BackupFragmentReq* req= (BackupFragmentReq*)signal->getDataPtr(); lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
req->tableId = lcpPtr.p->currentFragment.lcpFragOrd.tableId; contChkpNextFragLab(signal);
req->fragmentNo = 0; return;
req->backupPtr = m_backup_ptr; }
req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
req->count = 0;
lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
/* ----------------------------------------------------------------------
* UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
* ACTIVATING THE FRAGMENT AGAIN.
* --------------------------------------------------------------------- */
ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
fragptr.p->maxGciInLcp = fragptr.p->newestGci;
fragptr.p->maxGciCompletedInLcp = cnewestCompletedGci;
{
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(DBTUP, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
}
BackupFragmentReq* req= (BackupFragmentReq*)signal->getDataPtr();
req->tableId = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
req->fragmentNo = 0;
req->backupPtr = m_backup_ptr;
req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
req->count = 0;
#ifdef NDB_DEBUG_FULL #ifdef NDB_DEBUG_FULL
if(ERROR_INSERTED(5904)) if(ERROR_INSERTED(5904))
{ {
g_trace_lcp.sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, g_trace_lcp.sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
BackupFragmentReq::SignalLength, JBB); BackupFragmentReq::SignalLength, JBB);
} }
else else
#endif #endif
{ {
sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
BackupFragmentReq::SignalLength, JBB); BackupFragmentReq::SignalLength, JBB);
}
} }
lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_STARTED;
lcpPtr.p->m_tup.lcpLocstate = LcpLocRecord::TUP_COMPLETED;
} }
void Dblqh::execBACKUP_FRAGMENT_REF(Signal* signal) void Dblqh::execBACKUP_FRAGMENT_REF(Signal* signal)
...@@ -11025,8 +11068,7 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Signal* signal) ...@@ -11025,8 +11068,7 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Signal* signal)
lcpPtr.i = 0; lcpPtr.i = 0;
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord); ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::ACC_STARTED); ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP);
lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED; lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
/* ------------------------------------------------------------------------ /* ------------------------------------------------------------------------
...@@ -11142,7 +11184,10 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal) ...@@ -11142,7 +11184,10 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
contChkpNextFragLab(signal); contChkpNextFragLab(signal);
return; return;
} }
lcpPtr.p->m_error = 0;
lcpPtr.p->m_outstanding = 1;
ndbrequire(tabPtr.p->tableStatus == Tablerec::TABLE_DEFINED); ndbrequire(tabPtr.p->tableStatus == Tablerec::TABLE_DEFINED);
lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_FRAGID; lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_FRAGID;
...@@ -11157,6 +11202,13 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal) ...@@ -11157,6 +11202,13 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId; req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal, sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB); LcpPrepareReq::SignalLength, JBB);
if (lcpPtr.p->firstFragmentFlag)
{
lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}
}//Dblqh::sendLCP_FRAGIDREQ() }//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle) void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
...@@ -13600,7 +13652,7 @@ void Dblqh::execRESTORE_LCP_CONF(Signal* signal) ...@@ -13600,7 +13652,7 @@ void Dblqh::execRESTORE_LCP_CONF(Signal* signal)
lcpPtr.p->m_outstanding = 1; lcpPtr.p->m_outstanding = 1;
signal->theData[0] = c_lcpId; signal->theData[0] = c_lcpId;
sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB); sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
return; return;
} }
} }
...@@ -13654,7 +13706,7 @@ void Dblqh::execSTART_RECREQ(Signal* signal) ...@@ -13654,7 +13706,7 @@ void Dblqh::execSTART_RECREQ(Signal* signal)
lcpPtr.p->m_outstanding = 1; lcpPtr.p->m_outstanding = 1;
signal->theData[0] = c_lcpId; signal->theData[0] = c_lcpId;
sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB); sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
}//if }//if
}//Dblqh::execSTART_RECREQ() }//Dblqh::execSTART_RECREQ()
...@@ -13680,18 +13732,19 @@ void Dblqh::execSTART_RECCONF(Signal* signal) ...@@ -13680,18 +13732,19 @@ void Dblqh::execSTART_RECCONF(Signal* signal)
switch(refToBlock(sender)){ switch(refToBlock(sender)){
case TSMAN: case TSMAN:
jam();
break;
case LGMAN:
jam(); jam();
lcpPtr.p->m_outstanding++; lcpPtr.p->m_outstanding++;
signal->theData[0] = c_lcpId; signal->theData[0] = c_lcpId;
sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB); sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
return; return;
case LGMAN:
jam();
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
} }
jam(); jam();
csrExecUndoLogState = EULS_COMPLETED; csrExecUndoLogState = EULS_COMPLETED;
c_lcp_complete_fragments.first(fragptr); c_lcp_complete_fragments.first(fragptr);
...@@ -15781,29 +15834,6 @@ void Dblqh::checkScanTcCompleted(Signal* signal) ...@@ -15781,29 +15834,6 @@ void Dblqh::checkScanTcCompleted(Signal* signal)
}//if }//if
}//Dblqh::checkScanTcCompleted() }//Dblqh::checkScanTcCompleted()
/* ==========================================================================
* === CHECK IF ALL PARTS OF A SYSTEM RESTART ON A FRAGMENT ARE COMPLETED ===
*
* SUBROUTINE SHORT NAME = CSC
* ========================================================================= */
void Dblqh::checkSrCompleted(Signal* signal)
{
terrorCode = ZOK;
ptrGuard(lcpPtr);
if(lcpPtr.p->m_acc.lcpLocstate != LcpLocRecord::SR_ACC_COMPLETED)
{
ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::SR_ACC_STARTED);
return;
}
if(lcpPtr.p->m_tup.lcpLocstate != LcpLocRecord::SR_TUP_COMPLETED)
{
ndbrequire(lcpPtr.p->m_tup.lcpLocstate == LcpLocRecord::SR_TUP_STARTED);
return;
}
lcpPtr.p->lcpState = LcpRecord::LCP_SR_COMPLETED;
}//Dblqh::checkSrCompleted()
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* ------ CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG ------- */ /* ------ CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG ------- */
/* */ /* */
...@@ -18116,12 +18146,10 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) ...@@ -18116,12 +18146,10 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
// Print information about the current local checkpoint // Print information about the current local checkpoint
TlcpPtr.i = 0; TlcpPtr.i = 0;
ptrAss(TlcpPtr, lcpRecord); ptrAss(TlcpPtr, lcpRecord);
infoEvent(" lcpState=%d", TlcpPtr.p->lcpState); infoEvent(" lcpState=%d lastFragmentFlag=%d",
infoEvent(" lcpAccptr=%d lastFragmentFlag=%d", TlcpPtr.p->lcpState, TlcpPtr.p->lastFragmentFlag);
TlcpPtr.p->m_acc.lcpRef,
TlcpPtr.p->lastFragmentFlag);
infoEvent("currentFragment.fragPtrI=%d", infoEvent("currentFragment.fragPtrI=%d",
TlcpPtr.p->currentFragment.fragPtrI); TlcpPtr.p->currentFragment.fragPtrI);
infoEvent("currentFragment.lcpFragOrd.tableId=%d", infoEvent("currentFragment.lcpFragOrd.tableId=%d",
TlcpPtr.p->currentFragment.lcpFragOrd.tableId); TlcpPtr.p->currentFragment.lcpFragOrd.tableId);
infoEvent(" lcpQueued=%d reportEmpty=%d", infoEvent(" lcpQueued=%d reportEmpty=%d",
......
...@@ -2600,7 +2600,7 @@ private: ...@@ -2600,7 +2600,7 @@ private:
void disk_page_prealloc_callback_common(Signal*, void disk_page_prealloc_callback_common(Signal*,
Ptr<Page_request>, Ptr<Page_request>,
Ptr<Fragrecord>, Ptr<Fragrecord>,
Ptr<GlobalPage>); Ptr<Page>);
void disk_page_alloc(Signal*, void disk_page_alloc(Signal*,
Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32); Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
...@@ -2631,18 +2631,22 @@ private: ...@@ -2631,18 +2631,22 @@ private:
void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused); void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
void disk_page_set_dirty(Ptr<Page>);
void restart_setup_page(Ptr<Page>);
void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
/** /**
* Disk restart code * Disk restart code
*/ */
public: public:
int disk_page_load_hook(Uint32 page_id); int disk_page_load_hook(Uint32 page_id);
void disk_page_unmap_callback(Uint32 page_id); void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId, int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
const Local_key* key, Uint32 pages); const Local_key* key, Uint32 pages);
void disk_restart_page_bits(Uint32 tableId, Uint32 fragId, void disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
const Local_key*, Uint32 old_bits, Uint32 bits); const Local_key*, Uint32 bits);
void disk_restart_undo(Signal* signal, Uint64 lsn, void disk_restart_undo(Signal* signal, Uint64 lsn,
Uint32 type, const Uint32 * ptr, Uint32 len); Uint32 type, const Uint32 * ptr, Uint32 len);
...@@ -2654,6 +2658,7 @@ public: ...@@ -2654,6 +2658,7 @@ public:
Ptr<Tablerec> m_table_ptr; Ptr<Tablerec> m_table_ptr;
Ptr<Fragrecord> m_fragment_ptr; Ptr<Fragrecord> m_fragment_ptr;
Ptr<Page> m_page_ptr; Ptr<Page> m_page_ptr;
Ptr<Extent_info> m_extent_ptr;
Local_key m_key; Local_key m_key;
}; };
...@@ -2664,7 +2669,7 @@ private: ...@@ -2664,7 +2669,7 @@ private:
void disk_restart_undo_alloc(Apply_undo*); void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*); void disk_restart_undo_update(Apply_undo*);
void disk_restart_undo_free(Apply_undo*); void disk_restart_undo_free(Apply_undo*);
void disk_restart_undo_page_bits(Apply_undo*); void disk_restart_undo_page_bits(Signal*, Apply_undo*);
#ifdef VM_TRACE #ifdef VM_TRACE
void verify_page_lists(Disk_alloc_info&); void verify_page_lists(Disk_alloc_info&);
......
...@@ -327,6 +327,8 @@ Dbtup::disk_page_commit_callback(Signal* signal, ...@@ -327,6 +327,8 @@ Dbtup::disk_page_commit_callback(Signal* signal,
regOperPtr.p->m_commit_disk_callback_page= page_id; regOperPtr.p->m_commit_disk_callback_page= page_id;
m_global_page_pool.getPtr(m_pgman.m_ptr, page_id); m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
execTUP_COMMITREQ(signal); execTUP_COMMITREQ(signal);
if(signal->theData[0] == 0) if(signal->theData[0] == 0)
c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer); c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
...@@ -477,8 +479,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -477,8 +479,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
* the page hot. XXX move to TUP which knows better. * the page hot. XXX move to TUP which knows better.
*/ */
int flags= regOperPtr.p->op_struct.op_type | int flags= regOperPtr.p->op_struct.op_type |
Page_cache_client::COMMIT_REQ | Page_cache_client::STRICT_ORDER | Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
Page_cache_client::CORR_REQ;
int res= m_pgman.get_page(signal, req, flags); int res= m_pgman.get_page(signal, req, flags);
switch(res){ switch(res){
case 0: case 0:
...@@ -491,6 +492,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -491,6 +492,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
ndbrequire("NOT YET IMPLEMENTED" == 0); ndbrequire("NOT YET IMPLEMENTED" == 0);
break; break;
} }
disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
regOperPtr.p->m_commit_disk_callback_page= res; regOperPtr.p->m_commit_disk_callback_page= res;
regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0; regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
} }
......
...@@ -484,8 +484,6 @@ Dbtup::load_diskpage(Signal* signal, ...@@ -484,8 +484,6 @@ Dbtup::load_diskpage(Signal* signal,
req.m_callback.m_callbackFunction= req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_callback); safe_cast(&Dbtup::disk_page_load_callback);
// Make sure we maintain order
flags |= Page_cache_client::STRICT_ORDER;
if((res= m_pgman.get_page(signal, req, flags)) > 0) if((res= m_pgman.get_page(signal, req, flags)) > 0)
{ {
//ndbout_c("in cache"); //ndbout_c("in cache");
...@@ -563,8 +561,6 @@ Dbtup::load_diskpage_scan(Signal* signal, ...@@ -563,8 +561,6 @@ Dbtup::load_diskpage_scan(Signal* signal,
req.m_callback.m_callbackFunction= req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_scan_callback); safe_cast(&Dbtup::disk_page_load_scan_callback);
// Make sure we maintain order
flags |= Page_cache_client::STRICT_ORDER;
if((res= m_pgman.get_page(signal, req, flags)) > 0) if((res= m_pgman.get_page(signal, req, flags)) > 0)
{ {
// ndbout_c("in cache"); // ndbout_c("in cache");
...@@ -3111,8 +3107,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, ...@@ -3111,8 +3107,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
preq.m_callback.m_callbackData = senderData; preq.m_callback.m_callbackData = senderData;
preq.m_callback.m_callbackFunction = preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_page_callback); safe_cast(&Dbtup::nr_delete_page_callback);
int flags = Page_cache_client::COMMIT_REQ | int flags = Page_cache_client::COMMIT_REQ;
Page_cache_client::STRICT_ORDER;
res = m_pgman.get_page(signal, preq, flags); res = m_pgman.get_page(signal, preq, flags);
if (res == 0) if (res == 0)
{ {
......
...@@ -694,10 +694,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -694,10 +694,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
frag.fragTableId, frag.fragTableId,
frag.fragmentId, frag.fragmentId,
frag.m_tablespace_id); frag.m_tablespace_id);
unsigned bits = ~(unsigned)0; unsigned uncommitted, committed;
int ret = tsman.get_page_free_bits(&key, &bits); uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
ndbrequire(ret == 0); ndbrequire(ret == 0);
if (bits == 0) { if (committed == 0) {
// skip empty page // skip empty page
jam(); jam();
pos.m_get = ScanPos::Get_next_page_dd; pos.m_get = ScanPos::Get_next_page_dd;
...@@ -710,7 +711,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -710,7 +711,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
preq.m_callback.m_callbackData = scanPtr.i; preq.m_callback.m_callbackData = scanPtr.i;
preq.m_callback.m_callbackFunction = preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_page_tup_scan_callback); safe_cast(&Dbtup::disk_page_tup_scan_callback);
int flags = Page_cache_client::STRICT_ORDER; int flags = 0;
int res = m_pgman.get_page(signal, preq, flags); int res = m_pgman.get_page(signal, preq, flags);
if (res == 0) { if (res == 0) {
jam(); jam();
......
...@@ -222,7 +222,7 @@ File_formats::Datafile::Extent_header::check_free(Uint32 extent_size) const ...@@ -222,7 +222,7 @@ File_formats::Datafile::Extent_header::check_free(Uint32 extent_size) const
for(; words; words--) for(; words; words--)
sum |= m_page_bitmask[words-1]; sum |= m_page_bitmask[words-1];
if(sum & 0x7777) if(sum & 0x3333)
return false; return false;
return true; return true;
......
...@@ -2887,6 +2887,9 @@ Lgman::stop_run_undo_log(Signal* signal) ...@@ -2887,6 +2887,9 @@ Lgman::stop_run_undo_log(Signal* signal)
void void
Lgman::execEND_LCP_CONF(Signal* signal) Lgman::execEND_LCP_CONF(Signal* signal)
{ {
Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
/** /**
* pgman has completed flushing all pages * pgman has completed flushing all pages
* *
......
This diff is collapsed.
...@@ -249,15 +249,13 @@ private: ...@@ -249,15 +249,13 @@ private:
struct Page_request { struct Page_request {
enum Flags { enum Flags {
OP_MASK = 0x000F // 4 bits for TUP operation OP_MASK = 0x000F // 4 bits for TUP operation
,STRICT_ORDER = 0x0010 // maintain request order
,LOCK_PAGE = 0x0020 // lock page in memory ,LOCK_PAGE = 0x0020 // lock page in memory
,EMPTY_PAGE = 0x0040 // empty (new) page ,EMPTY_PAGE = 0x0040 // empty (new) page
,ALLOC_REQ = 0x0080 // part of alloc ,ALLOC_REQ = 0x0080 // part of alloc
,COMMIT_REQ = 0x0100 // part of commit ,COMMIT_REQ = 0x0100 // part of commit
,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn ,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn
,NO_HOOK = 0x0400 // dont run load hook ,UNLOCK_PAGE = 0x0400
,UNLOCK_PAGE = 0x0800 ,CORR_REQ = 0x0800 // correlated request (no LIRS update)
,CORR_REQ = 0x1000 // correlated request (no LIRS update)
}; };
Uint16 m_block; Uint16 m_block;
...@@ -286,6 +284,8 @@ private: ...@@ -286,6 +284,8 @@ private:
Uint32 prevList; Uint32 prevList;
}; };
typedef Uint16 Page_state;
struct Page_entry : Page_entry_stack_ptr, struct Page_entry : Page_entry_stack_ptr,
Page_entry_queue_ptr, Page_entry_queue_ptr,
Page_entry_sublist_ptr { Page_entry_sublist_ptr {
...@@ -305,7 +305,7 @@ private: ...@@ -305,7 +305,7 @@ private:
,PAGEIN = 0x0100 // paging in ,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out ,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout ,LOGSYNC = 0x0400 // undo WAL as part of pageout
,NO_HOOK = 0x0800 // don't run load hook ,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed ,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot ,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack ,ONSTACK = 0x4000 // page is on LIRS stack
...@@ -324,26 +324,26 @@ private: ...@@ -324,26 +324,26 @@ private:
,SUBLIST_COUNT = 8 ,SUBLIST_COUNT = 8
}; };
Uint16 m_state; // flags (0 for new entry) Uint16 m_file_no; // disk page address set at seize
Page_state m_state; // flags (0 for new entry)
Uint16 m_file_no; // disk page address set at seize
Uint32 m_page_no; Uint32 m_page_no;
Uint32 m_real_page_i; Uint32 m_real_page_i;
Uint32 m_copy_real_page_i; // used for flushing LOCKED pages
Uint64 m_lsn; Uint64 m_lsn;
Uint32 m_last_lcp;
Uint32 m_last_lcp;
Uint32 m_dirty_count;
Uint32 m_copy_page_i;
union { union {
Uint32 m_busy_count; // non-zero means BUSY Uint32 m_busy_count; // non-zero means BUSY
Uint32 nextPool; Uint32 nextPool;
}; };
DLFifoList<Page_request>::Head m_requests; DLFifoList<Page_request>::Head m_requests;
Uint32 nextHash; Uint32 nextHash;
Uint32 prevHash; Uint32 prevHash;
Uint32 hashValue() const { return m_file_no << 16 | m_page_no; } Uint32 hashValue() const { return m_file_no << 16 | m_page_no; }
bool equal(const Page_entry& obj) const { bool equal(const Page_entry& obj) const {
return return
...@@ -374,8 +374,6 @@ private: ...@@ -374,8 +374,6 @@ private:
Uint32 m_last_lcp_complete; Uint32 m_last_lcp_complete;
Uint32 m_lcp_curr_bucket; Uint32 m_lcp_curr_bucket;
Uint32 m_lcp_outstanding; // remaining i/o waits Uint32 m_lcp_outstanding; // remaining i/o waits
Uint32 m_lcp_copy_page;
bool m_lcp_copy_page_free;
EndLcpReq m_end_lcp_req; EndLcpReq m_end_lcp_req;
// clean-up variables // clean-up variables
...@@ -421,9 +419,10 @@ protected: ...@@ -421,9 +419,10 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal); void execCONTINUEB(Signal* signal);
void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*); void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*); void execEND_LCP_REQ(Signal*);
void execFSREADCONF(Signal*); void execFSREADCONF(Signal*);
void execFSREADREF(Signal*); void execFSREADREF(Signal*);
void execFSWRITECONF(Signal*); void execFSWRITECONF(Signal*);
...@@ -432,8 +431,8 @@ protected: ...@@ -432,8 +431,8 @@ protected:
void execDUMP_STATE_ORD(Signal* signal); void execDUMP_STATE_ORD(Signal* signal);
private: private:
static Uint32 get_sublist_no(Uint16 state); static Uint32 get_sublist_no(Page_state state);
void set_page_state(Ptr<Page_entry> ptr, Uint16 new_state); void set_page_state(Ptr<Page_entry> ptr, Page_state new_state);
bool seize_cache_page(Ptr<GlobalPage>& gptr); bool seize_cache_page(Ptr<GlobalPage>& gptr);
void release_cache_page(Uint32 i); void release_cache_page(Uint32 i);
...@@ -463,7 +462,10 @@ private: ...@@ -463,7 +462,10 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr); void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*); bool process_lcp(Signal*);
void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr);
int create_copy_page(Ptr<Page_entry>, Uint32 req_flags);
void restore_copy_page(Ptr<Page_entry>);
void pagein(Signal*, Ptr<Page_entry>); void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>); void fsreadreq(Signal*, Ptr<Page_entry>);
void fsreadconf(Signal*, Ptr<Page_entry>); void fsreadconf(Signal*, Ptr<Page_entry>);
...@@ -510,13 +512,11 @@ public: ...@@ -510,13 +512,11 @@ public:
Ptr<GlobalPage> m_ptr; // TODO remove Ptr<GlobalPage> m_ptr; // TODO remove
enum RequestFlags { enum RequestFlags {
STRICT_ORDER = Pgman::Page_request::STRICT_ORDER LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
,LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
,EMPTY_PAGE = Pgman::Page_request::EMPTY_PAGE ,EMPTY_PAGE = Pgman::Page_request::EMPTY_PAGE
,ALLOC_REQ = Pgman::Page_request::ALLOC_REQ ,ALLOC_REQ = Pgman::Page_request::ALLOC_REQ
,COMMIT_REQ = Pgman::Page_request::COMMIT_REQ ,COMMIT_REQ = Pgman::Page_request::COMMIT_REQ
,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ ,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ
,NO_HOOK = Pgman::Page_request::NO_HOOK
,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE ,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE
,CORR_REQ = Pgman::Page_request::CORR_REQ ,CORR_REQ = Pgman::Page_request::CORR_REQ
}; };
......
This diff is collapsed.
...@@ -195,11 +195,13 @@ private: ...@@ -195,11 +195,13 @@ private:
void load_extent_page_callback(Signal*, Uint32, Uint32); void load_extent_page_callback(Signal*, Uint32, Uint32);
void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>, void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>,
Uint32,Uint32,Uint32); Uint32,Uint32,Uint32);
int update_page_free_bits(Signal*, Local_key*, unsigned bits, Uint64 lsn); int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits,
int get_page_free_bits(Signal*, Local_key*, unsigned* bits); Uint64 lsn);
int unmap_page(Signal*, Local_key*); int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
int restart_undo_page_free_bits(Signal*, Local_key*, unsigned, Uint64); int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
unsigned committed_bits, Uint64 lsn);
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key); int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits); int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
...@@ -266,22 +268,23 @@ public: ...@@ -266,22 +268,23 @@ public:
* Update page free bits * Update page free bits
*/ */
int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn); int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
/** /**
* Get page free bits * Get page free bits
*/ */
int get_page_free_bits(Local_key*, unsigned* bits); int get_page_free_bits(Local_key*,
unsigned* uncommitted, unsigned* committed);
/** /**
* Update unlogged page free bit * Update unlogged page free bit
*/ */
int unmap_page(Local_key*); int unmap_page(Local_key*, Uint32 bits);
/** /**
* Undo handling of page bits * Undo handling of page bits
*/ */
int restart_undo_page_free_bits(Local_key*, unsigned bits, Uint64 lsn); int restart_undo_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
/** /**
* Get tablespace info * Get tablespace info
* *
...@@ -353,32 +356,40 @@ Tablespace_client::free_extent(Local_key* key) ...@@ -353,32 +356,40 @@ Tablespace_client::free_extent(Local_key* key)
inline inline
int int
Tablespace_client::update_page_free_bits(Local_key *key, Tablespace_client::update_page_free_bits(Local_key *key,
unsigned bits, Uint64 lsn) unsigned committed_bits,
Uint64 lsn)
{ {
return m_tsman->update_page_free_bits(m_signal, key, bits, lsn); return m_tsman->update_page_free_bits(m_signal, key, committed_bits, lsn);
} }
inline inline
int int
Tablespace_client::get_page_free_bits(Local_key *key, unsigned* bits) Tablespace_client::get_page_free_bits(Local_key *key,
unsigned* uncommited,
unsigned* commited)
{ {
return m_tsman->get_page_free_bits(m_signal, key, bits); return m_tsman->get_page_free_bits(m_signal, key, uncommited, commited);
} }
inline inline
int int
Tablespace_client::unmap_page(Local_key *key) Tablespace_client::unmap_page(Local_key *key, unsigned uncommitted_bits)
{ {
return m_tsman->unmap_page(m_signal, key); return m_tsman->unmap_page(m_signal, key, uncommitted_bits);
} }
inline inline
int int
Tablespace_client::restart_undo_page_free_bits(Local_key* key, Tablespace_client::restart_undo_page_free_bits(Local_key* key,
unsigned bits, Uint64 lsn) unsigned committed_bits,
Uint64 lsn)
{ {
return m_tsman->restart_undo_page_free_bits(m_signal, return m_tsman->restart_undo_page_free_bits(m_signal,
key, bits, lsn); m_table_id,
m_fragment_id,
key,
committed_bits,
lsn);
} }
......
...@@ -51,6 +51,13 @@ public: ...@@ -51,6 +51,13 @@ public:
*/ */
bool seize(Ptr<T> &); bool seize(Ptr<T> &);
/**
* Allocate an object from pool - update Ptr - put in front of list
*
* Return i
*/
bool seizeFront(Ptr<T> &);
/** /**
* Allocate object <b>i</b> from pool - update Ptr * Allocate object <b>i</b> from pool - update Ptr
* *
...@@ -248,6 +255,32 @@ DLFifoList<T,U>::seize(Ptr<T> & p){ ...@@ -248,6 +255,32 @@ DLFifoList<T,U>::seize(Ptr<T> & p){
return false; return false;
} }
template <class T, class U>
inline
bool
DLFifoList<T,U>::seizeFront(Ptr<T> & p){
Uint32 ff = head.firstItem;
thePool.seize(p);
if (p.i != RNIL)
{
p.p->U::prevList = RNIL;
p.p->U::nextList = ff;
head.firstItem = p.i;
if (ff == RNIL)
{
head.lastItem = p.i;
}
else
{
T * t2 = thePool.getPtr(ff);
t2->U::prevList = p.i;
}
return true;
}
p.p = NULL;
return false;
}
/** /**
* Allocate an object from pool - update Ptr * Allocate an object from pool - update Ptr
* *
......
...@@ -30,6 +30,7 @@ int main(int argc, const char** argv){ ...@@ -30,6 +30,7 @@ int main(int argc, const char** argv){
int _help = 0; int _help = 0;
int _batch = 512; int _batch = 512;
int _loops = -1; int _loops = -1;
int _rand = 0;
const char* db = 0; const char* db = 0;
struct getargs args[] = { struct getargs args[] = {
...@@ -37,7 +38,8 @@ int main(int argc, const char** argv){ ...@@ -37,7 +38,8 @@ int main(int argc, const char** argv){
{ "batch", 'b', arg_integer, &_batch, "Number of operations in each transaction", "batch" }, { "batch", 'b', arg_integer, &_batch, "Number of operations in each transaction", "batch" },
{ "loops", 'l', arg_integer, &_loops, "Number of loops", "" }, { "loops", 'l', arg_integer, &_loops, "Number of loops", "" },
{ "database", 'd', arg_string, &db, "Database", "" }, { "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" } { "usage", '?', arg_flag, &_help, "Print help", "" },
{ "rnd-rows", 0, arg_flag, &_rand, "Rand number of records", "recs" }
}; };
int num_args = sizeof(args) / sizeof(args[0]); int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0; int optind = 0;
...@@ -89,8 +91,9 @@ int main(int argc, const char** argv){ ...@@ -89,8 +91,9 @@ int main(int argc, const char** argv){
HugoTransactions hugoTrans(*pTab); HugoTransactions hugoTrans(*pTab);
loop: loop:
int rows = (_rand ? rand() % _records : _records);
if (hugoTrans.loadTable(&MyNdb, if (hugoTrans.loadTable(&MyNdb,
_records, rows,
_batch, _batch,
true, 0, false, _loops) != 0){ true, 0, false, _loops) != 0){
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
...@@ -98,6 +101,7 @@ int main(int argc, const char** argv){ ...@@ -98,6 +101,7 @@ int main(int argc, const char** argv){
if(_loops > 0) if(_loops > 0)
{ {
ndbout << "clearing..." << endl;
hugoTrans.clearTable(&MyNdb); hugoTrans.clearTable(&MyNdb);
//hugoTrans.pkDelRecords(&MyNdb, _records); //hugoTrans.pkDelRecords(&MyNdb, _records);
_loops--; _loops--;
......
...@@ -130,7 +130,8 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism) ...@@ -130,7 +130,8 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int parallelism)
goto failed; goto failed;
} }
if( pOp->readTuples(NdbOperation::LM_Exclusive,par) ) { if( pOp->readTuples(NdbOperation::LM_Exclusive,
NdbScanOperation::SF_TupScan, par) ) {
goto failed; goto failed;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment