Commit 5fd214dc authored by pekka@mysql.com's avatar pekka@mysql.com

tux optim 3 - use TUP methods instead of TUP_STORE_TH signal

parent fd8acbc2
......@@ -130,7 +130,7 @@ private:
* Operate on entire tuple. Used by TUX where the table has a single
* Uint32 array attribute representing an index tree node.
*
* XXX this signal will be replaced by method in TUP
* XXX this signal is no longer used by TUX and can be removed
*/
class TupStoreTh {
friend class Dbtup;
......@@ -155,8 +155,8 @@ private:
Uint32 tableId;
Uint32 fragId;
Uint32 fragPtrI;
Uint32 tupAddr; // no longer used
Uint32 tupVersion; // no longer used
Uint32 tupAddr;
Uint32 tupVersion;
Uint32 pageId;
Uint32 pageOffset;
Uint32 bufferId;
......
......@@ -996,6 +996,14 @@ public:
Dbtup(const class Configuration &);
virtual ~Dbtup();
/*
* TUX index in TUP has single Uint32 array attribute which stores an
* index node. TUX uses following methods.
*/
int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node);
void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
private:
BLOCK_DEFINES(Dbtup);
......
......@@ -179,6 +179,64 @@ Dbtup::execTUP_QUERY_TH(Signal* signal)
return;
}
int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
terrorCode = 0;
if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
jam();
ndbrequire(terrorCode != 0);
return terrorCode;
}
pageId = pagePtr.i;
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
return 0;
}
void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
}
void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
void
Dbtup::execTUP_STORE_TH(Signal* signal)
{
......
......@@ -25,6 +25,9 @@
#include <DataBuffer.hpp>
#include <md5_hash.hpp>
// big brother
#include <Dbtup.hpp>
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
......@@ -92,6 +95,9 @@ public:
Dbtux(const Configuration& conf);
virtual ~Dbtux();
// pointer to TUP instance in this thread
Dbtup* c_tup;
private:
// sizes are in words (Uint32)
static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE;
......
......@@ -21,6 +21,7 @@
Dbtux::Dbtux(const Configuration& conf) :
SimulatedBlock(DBTUX, conf),
c_tup(0),
c_descPageList(RNIL),
#ifdef VM_TRACE
debugFile(0),
......@@ -123,6 +124,8 @@ Dbtux::execSTTOR(Signal* signal)
case 1:
jam();
CLEAR_ERROR_INSERT_VALUE;
c_tup = (Dbtup*)globalData.getBlock(DBTUP);
ndbrequire(c_tup != 0);
break;
case 3:
jam();
......@@ -180,7 +183,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_scanBoundPool.setSize(nScanBoundWords);
/*
* Index id is physical array index. We seize and initialize all
* index records now. This assumes ArrayPool is an array.
* index records now.
*/
IndexPtr indexPtr;
while (1) {
......
......@@ -20,12 +20,7 @@
/*
* Node handles.
*
* We use the "cache" implementation. Node operations are done on
* cached copies. Index memory is updated at the end of the operation.
* At most one node is inserted and it is always pre-allocated.
*
* An alternative "pointer" implementation which writes directly into
* index memory is planned for later.
* Temporary version between "cache" and "pointer" implementations.
*/
// Dbtux
......@@ -40,17 +35,6 @@ Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
new (nodePtr.p) NodeHandle(*this, frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
// node cache used always
nodePtr.p->m_node = (TreeNode*)nodePtr.p->m_cache;
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
void
......@@ -63,17 +47,29 @@ Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
// remove from cache XXX ugly
frag.m_nodeFree = frag.m_nodeList;
frag.m_nodeList = nodePtr.p->m_next;
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpInsert;
storePar.m_offset = 0;
storePar.m_size = 0;
tupStoreTh(signal, frag, nodePtr, storePar);
if (storePar.m_errorCode != 0) {
// alloc index node in TUP
Uint32 pageId = NullTupLoc.m_pageId;
Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0;
errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode != 0) {
jam();
errorCode = storePar.m_errorCode;
c_nodeHandlePool.release(nodePtr);
frag.m_nodeFree = RNIL;
return;
}
nodePtr.p->m_loc = TupLoc(pageId, pageOffset);
nodePtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(nodePtr.p->m_loc != NullTupLoc && nodePtr.p->m_node != 0);
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
/*
......@@ -114,6 +110,12 @@ Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc
seizeNode(signal, frag, tmpPtr);
ndbrequire(tmpPtr.i != RNIL);
tmpPtr.p->m_loc = loc;
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
tmpPtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
}
if (tmpPtr.p->m_acc < acc) {
jam();
......@@ -123,7 +125,7 @@ Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc
}
/*
* Create new node in the cache and mark it for insert.
* Create new node in the cache using the pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
......@@ -143,13 +145,20 @@ Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize ac
}
/*
* Mark existing node for deletion.
* Delete existing node.
*/
void
Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->getOccup() == 0);
Uint32 pageId = tmpPtr.p->m_loc.m_pageId;
Uint32 pageOffset = tmpPtr.p->m_loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(tmpPtr.p->m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// invalidate handle and storage
tmpPtr.p->m_loc = NullTupLoc;
tmpPtr.p->m_node = 0;
tmpPtr.p->m_flags |= NodeHandle::DoDelete;
// scans have already been moved by popDown or popUp
}
......@@ -162,17 +171,10 @@ Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize ac
{
TreeHead& tree = frag.m_tree;
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
if (tmpPtr.p->m_acc >= acc)
return;
if (! (tmpPtr.p->m_flags & NodeHandle::DoInsert)) {
jam();
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpRead;
storePar.m_offset = tree.getSize(tmpPtr.p->m_acc);
storePar.m_size = tree.getSize(acc) - tree.getSize(tmpPtr.p->m_acc);
tmpPtr.p->m_tux.tupStoreTh(signal, frag, tmpPtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
}
// XXX could do prefetch
tmpPtr.p->m_acc = acc;
}
......@@ -220,11 +222,7 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
if (flags & NodeHandle::DoDelete) {
jam();
ndbrequire(updateOk);
// delete
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpDelete;
nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
// delete already done
} else if (flags & NodeHandle::DoUpdate) {
jam();
ndbrequire(updateOk);
......@@ -237,13 +235,7 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
jam();
setNodePref(signal, frag, nodePtr, 1);
}
// update
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpUpdate;
storePar.m_offset = 0;
storePar.m_size = tree.getSize(nodePtr.p->m_acc);
nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
// update already done via pointer
}
// release
NodeHandlePtr tmpPtr = nodePtr;
......
......@@ -10,6 +10,8 @@ libdbtux_a_SOURCES = \
DbtuxCmp.cpp \
DbtuxDebug.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/kernel/blocks/dbtup
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
......@@ -21,5 +21,7 @@ optim 1 mc02/a 38 ms 85 ms 124 pct
optim 2 mc02/a 41 ms 80 ms 96 pct
mc02/b 51 ms 117 ms 128 pct
optim 3 mc02/a 43 ms 80 ms 85 pct
mc02/b 54 ms 118 ms 117 pct
vim: set et:
......@@ -2222,7 +2222,6 @@ pkupdateindexbuild(Par par)
{
if (par.m_no == 0) {
CHK(createindex(par) == 0);
CHK(invalidateindex(par) == 0);
} else {
CHK(pkupdate(par) == 0);
}
......@@ -2513,6 +2512,7 @@ tbusybuild(Par par)
RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, dropindex, ST);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment