Commit 0935ddcb authored by unknown's avatar unknown

ndb - tux scan: use lock pool for better perf and mem usage


ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  use pool of lock ops (like tupscan) instead of array of 992
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  use pool of lock ops (like tupscan) instead of array of 992
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp:
  use pool of lock ops (like tupscan) instead of array of 992
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  use pool of lock ops (like tupscan) instead of array of 992
ndb/src/kernel/vm/DLFifoList.hpp:
  add Head::isEmpty
parent 76dd9aa2
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <AttributeHeader.hpp> #include <AttributeHeader.hpp>
#include <ArrayPool.hpp> #include <ArrayPool.hpp>
#include <DataBuffer.hpp> #include <DataBuffer.hpp>
#include <DLFifoList.hpp>
#include <md5_hash.hpp> #include <md5_hash.hpp>
// big brother // big brother
...@@ -334,6 +335,18 @@ private: ...@@ -334,6 +335,18 @@ private:
typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool; typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool;
ScanBoundPool c_scanBoundPool; ScanBoundPool c_scanBoundPool;
// ScanLock
struct ScanLock {
Uint32 m_accLockOp;
union {
Uint32 nextPool;
Uint32 nextList;
};
Uint32 prevList;
};
typedef Ptr<ScanLock> ScanLockPtr;
ArrayPool<ScanLock> c_scanLockPool;
/* /*
* Scan operation. * Scan operation.
* *
...@@ -379,6 +392,8 @@ private: ...@@ -379,6 +392,8 @@ private:
Uint32 m_savePointId; Uint32 m_savePointId;
// lock waited for or obtained and not yet passed to LQH // lock waited for or obtained and not yet passed to LQH
Uint32 m_accLockOp; Uint32 m_accLockOp;
// locks obtained and passed to LQH but not yet returned by LQH
DLFifoList<ScanLock>::Head m_accLockOps;
Uint8 m_readCommitted; // no locking Uint8 m_readCommitted; // no locking
Uint8 m_lockMode; Uint8 m_lockMode;
Uint8 m_descending; Uint8 m_descending;
...@@ -394,13 +409,6 @@ private: ...@@ -394,13 +409,6 @@ private:
Uint32 nextList; Uint32 nextList;
}; };
Uint32 prevList; Uint32 prevList;
/*
* Locks obtained and passed to LQH but not yet returned by LQH.
* The max was increased from 16 to 992 (default 64). Record max
* ever used in this scan. TODO fix quadratic behaviour
*/
Uint32 m_maxAccLockOps;
Uint32 m_accLockOps[MaxAccLockOps];
ScanOp(ScanBoundPool& scanBoundPool); ScanOp(ScanBoundPool& scanBoundPool);
}; };
typedef Ptr<ScanOp> ScanOpPtr; typedef Ptr<ScanOp> ScanOpPtr;
...@@ -620,8 +628,9 @@ private: ...@@ -620,8 +628,9 @@ private:
bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent); bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void abortAccLockOps(Signal* signal, ScanOpPtr scanPtr);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp);
void removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp);
void releaseScanOp(ScanOpPtr& scanPtr); void releaseScanOp(ScanOpPtr& scanPtr);
/* /*
...@@ -674,7 +683,8 @@ private: ...@@ -674,7 +683,8 @@ private:
DebugMeta = 1, // log create and drop index DebugMeta = 1, // log create and drop index
DebugMaint = 2, // log maintenance ops DebugMaint = 2, // log maintenance ops
DebugTree = 4, // log and check tree after each op DebugTree = 4, // log and check tree after each op
DebugScan = 8 // log scans DebugScan = 8, // log scans
DebugLock = 16 // log ACC locks
}; };
STATIC_CONST( DataFillByte = 0xa2 ); STATIC_CONST( DataFillByte = 0xa2 );
STATIC_CONST( NodeFillByte = 0xa4 ); STATIC_CONST( NodeFillByte = 0xa4 );
...@@ -938,6 +948,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : ...@@ -938,6 +948,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_transId2(0), m_transId2(0),
m_savePointId(0), m_savePointId(0),
m_accLockOp(RNIL), m_accLockOp(RNIL),
m_accLockOps(),
m_readCommitted(0), m_readCommitted(0),
m_lockMode(0), m_lockMode(0),
m_descending(0), m_descending(0),
...@@ -945,18 +956,12 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) : ...@@ -945,18 +956,12 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_boundMax(scanBoundPool), m_boundMax(scanBoundPool),
m_scanPos(), m_scanPos(),
m_scanEnt(), m_scanEnt(),
m_nodeScan(RNIL), m_nodeScan(RNIL)
m_maxAccLockOps(0)
{ {
m_bound[0] = &m_boundMin; m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax; m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0; m_boundCnt[0] = 0;
m_boundCnt[1] = 0; m_boundCnt[1] = 0;
#ifdef VM_TRACE
for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = 0x1f1f1f1f;
}
#endif
} }
// Dbtux::Index // Dbtux::Index
......
...@@ -330,6 +330,7 @@ operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr) ...@@ -330,6 +330,7 @@ operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr)
NdbOut& NdbOut&
operator<<(NdbOut& out, const Dbtux::ScanOp& scan) operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
{ {
Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX);
out << "[ScanOp " << hex << &scan; out << "[ScanOp " << hex << &scan;
out << " [state " << dec << scan.m_state << "]"; out << " [state " << dec << scan.m_state << "]";
out << " [lockwait " << dec << scan.m_lockwait << "]"; out << " [lockwait " << dec << scan.m_lockwait << "]";
...@@ -339,9 +340,15 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) ...@@ -339,9 +340,15 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << " [savePointId " << dec << scan.m_savePointId << "]"; out << " [savePointId " << dec << scan.m_savePointId << "]";
out << " [accLockOp " << hex << scan.m_accLockOp << "]"; out << " [accLockOp " << hex << scan.m_accLockOp << "]";
out << " [accLockOps"; out << " [accLockOps";
for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { {
if (scan.m_accLockOps[i] != RNIL) DLFifoList<Dbtux::ScanLock>::Head head = scan.m_accLockOps;
out << " " << hex << scan.m_accLockOps[i]; LocalDLFifoList<Dbtux::ScanLock> list(tux->c_scanLockPool, head);
Dbtux::ScanLockPtr lockPtr;
list.first(lockPtr);
while (lockPtr.i != RNIL) {
out << " " << hex << lockPtr.p->m_accLockOp;
list.next(lockPtr);
}
} }
out << "]"; out << "]";
out << " [readCommitted " << dec << scan.m_readCommitted << "]"; out << " [readCommitted " << dec << scan.m_readCommitted << "]";
...@@ -367,13 +374,12 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) ...@@ -367,13 +374,12 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
NdbOut& NdbOut&
operator<<(NdbOut& out, const Dbtux::Index& index) operator<<(NdbOut& out, const Dbtux::Index& index)
{ {
Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX);
out << "[Index " << hex << &index; out << "[Index " << hex << &index;
out << " [tableId " << dec << index.m_tableId << "]"; out << " [tableId " << dec << index.m_tableId << "]";
out << " [numFrags " << dec << index.m_numFrags << "]"; out << " [numFrags " << dec << index.m_numFrags << "]";
for (unsigned i = 0; i < index.m_numFrags; i++) { for (unsigned i = 0; i < index.m_numFrags; i++) {
out << " [frag " << dec << i << " "; out << " [frag " << dec << i << " ";
// dangerous and wrong
Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX);
const Dbtux::Frag& frag = *tux->c_fragPool.getPtr(index.m_fragPtrI[i]); const Dbtux::Frag& frag = *tux->c_fragPool.getPtr(index.m_fragPtrI[i]);
out << frag; out << frag;
out << "]"; out << "]";
......
...@@ -159,6 +159,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -159,6 +159,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
Uint32 nFragment; Uint32 nFragment;
Uint32 nAttribute; Uint32 nAttribute;
Uint32 nScanOp; Uint32 nScanOp;
Uint32 nScanBatch;
const ndb_mgm_configuration_iterator * p = const ndb_mgm_configuration_iterator * p =
theConfiguration.getOwnConfigIterator(); theConfiguration.getOwnConfigIterator();
...@@ -168,9 +169,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -168,9 +169,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_FRAGMENT, &nFragment)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_FRAGMENT, &nFragment));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_ATTRIBUTE, &nAttribute)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_ATTRIBUTE, &nAttribute));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch));
const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize; const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize;
const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4; const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4;
const Uint32 nScanLock = nScanOp * nScanBatch;
c_indexPool.setSize(nIndex); c_indexPool.setSize(nIndex);
c_fragPool.setSize(nFragment); c_fragPool.setSize(nFragment);
...@@ -178,6 +181,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -178,6 +181,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_fragOpPool.setSize(MaxIndexFragments); c_fragOpPool.setSize(MaxIndexFragments);
c_scanOpPool.setSize(nScanOp); c_scanOpPool.setSize(nScanOp);
c_scanBoundPool.setSize(nScanBoundWords); c_scanBoundPool.setSize(nScanBoundWords);
c_scanLockPool.setSize(nScanLock);
/* /*
* Index id is physical array index. We seize and initialize all * Index id is physical array index. We seize and initialize all
* index records now. * index records now.
......
...@@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
removeAccLockOp(scan, req->accOperationPtr); removeAccLockOp(scanPtr, req->accOperationPtr);
} }
if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
jam(); jam();
...@@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_state = ScanOp::Locked; scan.m_state = ScanOp::Locked;
scan.m_accLockOp = lockReq->accOpPtr; scan.m_accLockOp = lockReq->accOpPtr;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
...@@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lockwait = true; scan.m_lockwait = true;
scan.m_accLockOp = lockReq->accOpPtr; scan.m_accLockOp = lockReq->accOpPtr;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
...@@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (accLockOp != RNIL) { if (accLockOp != RNIL) {
scan.m_accLockOp = RNIL; scan.m_accLockOp = RNIL;
// remember it until LQH unlocks it // remember it until LQH unlocks it
addAccLockOp(scan, accLockOp); addAccLockOp(scanPtr, accLockOp);
} else { } else {
ndbrequire(scan.m_readCommitted); ndbrequire(scan.m_readCommitted);
// operation RNIL in LQH would signal no tuple returned // operation RNIL in LQH would signal no tuple returned
...@@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal) ...@@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
...@@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal) ...@@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal)
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
...@@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
...@@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ...@@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH // unlock all not unlocked by LQH
for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (! scan.m_accLockOps.isEmpty()) {
if (scan.m_accLockOps[i] != RNIL) {
jam(); jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); abortAccLockOps(signal, scanPtr);
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOps[i];
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOps[i] = RNIL;
}
} }
// send conf // send conf
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
...@@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ...@@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
} }
void void
Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
{ {
ndbrequire(accLockOp != RNIL); ScanOp& scan = *scanPtr.p;
Uint32* list = scan.m_accLockOps; #ifdef VM_TRACE
bool ok = false; if (debugFlags & (DebugScan | DebugLock)) {
for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
ndbrequire(list[i] != accLockOp);
if (! ok && list[i] == RNIL) {
list[i] = accLockOp;
ok = true;
// continue check for duplicates
} }
#endif
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
ScanLockPtr lockPtr;
while (list.first(lockPtr)) {
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = lockPtr.p->m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
list.release(lockPtr);
} }
if (! ok) { }
unsigned i = scan.m_maxAccLockOps;
if (i < MaxAccLockOps) { void
list[i] = accLockOp; Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
ok = true; {
scan.m_maxAccLockOps = i + 1; ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE
if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Add lock " << hex << accLockOp << dec
<< " to scan " << scanPtr.i << " " << scan << endl;
} }
#endif
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
ScanLockPtr lockPtr;
#ifdef VM_TRACE
list.first(lockPtr);
while (lockPtr.i != RNIL) {
ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
list.next(lockPtr);
} }
#endif
bool ok = list.seize(lockPtr);
ndbrequire(ok); ndbrequire(ok);
ndbrequire(accLockOp != RNIL);
lockPtr.p->m_accLockOp = accLockOp;
} }
void void
Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
{ {
ndbrequire(accLockOp != RNIL); ScanOp& scan = *scanPtr.p;
Uint32* list = scan.m_accLockOps; #ifdef VM_TRACE
bool ok = false; if (debugFlags & (DebugScan | DebugLock)) {
for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { debugOut << "Remove lock " << hex << accLockOp << dec
if (list[i] == accLockOp) { << " from scan " << scanPtr.i << " " << scan << endl;
list[i] = RNIL; }
ok = true; #endif
LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
ScanLockPtr lockPtr;
list.first(lockPtr);
while (lockPtr.i != RNIL) {
if (lockPtr.p->m_accLockOp == accLockOp) {
jam();
break; break;
} }
list.next(lockPtr);
} }
ndbrequire(ok); ndbrequire(lockPtr.i != RNIL);
list.release(lockPtr);
} }
/* /*
......
...@@ -34,6 +34,8 @@ public: ...@@ -34,6 +34,8 @@ public:
Head(); Head();
Uint32 firstItem; Uint32 firstItem;
Uint32 lastItem; Uint32 lastItem;
inline bool isEmpty() const { return firstItem == RNIL;}
}; };
DLFifoList(ArrayPool<T> & thePool); DLFifoList(ArrayPool<T> & thePool);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment