Commit dc745d74 authored by unknown's avatar unknown

wl2240 - ndb partitioning

- new test program
- changed scan take over protocol
- fixed some bugs
 


ndb/include/kernel/signaldata/TcKeyReq.hpp:
  Changed scan takeover to instead
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  return fargment instead of node (as info for scan take-over)
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  remove unused scanNode
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Ship fragment instead of node in scan take over.
  The case where fragment has changed primary will still be handled
    as fragmentdistribution has changed then
ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
  Send fragment wo/ extra bit
ndb/src/ndbapi/NdbOperationSearch.cpp:
  Fix distribution key length
ndb/test/ndbapi/Makefile.am:
  new test program for partitioning
parent ed783858
......@@ -142,7 +142,7 @@ private:
* Get:ers for scanInfo
*/
static Uint8 getTakeOverScanFlag(const UintR & scanInfo);
static Uint16 getTakeOverScanNode(const UintR & scanInfo);
static Uint16 getTakeOverScanFragment(const UintR & scanInfo);
static Uint32 getTakeOverScanInfo(const UintR & scanInfo);
......@@ -171,7 +171,7 @@ private:
* Set:ers for scanInfo
*/
static void setTakeOverScanFlag(UintR & scanInfo, Uint8 flag);
static void setTakeOverScanNode(UintR & scanInfo, Uint16 node);
static void setTakeOverScanFragment(UintR & scanInfo, Uint16 fragment);
static void setTakeOverScanInfo(UintR & scanInfo, Uint32 aScanInfo);
};
......@@ -238,8 +238,8 @@ private:
#define TAKE_OVER_SHIFT (0)
#define TAKE_OVER_NODE_SHIFT (20)
#define TAKE_OVER_NODE_MASK (4095)
#define TAKE_OVER_FRAG_SHIFT (20)
#define TAKE_OVER_FRAG_MASK (4095)
#define SCAN_INFO_SHIFT (1)
#define SCAN_INFO_MASK (262143)
......@@ -485,8 +485,8 @@ TcKeyReq::getTakeOverScanFlag(const UintR & scanInfo){
inline
Uint16
TcKeyReq::getTakeOverScanNode(const UintR & scanInfo){
return (Uint16)((scanInfo >> TAKE_OVER_NODE_SHIFT) & TAKE_OVER_NODE_MASK);
TcKeyReq::getTakeOverScanFragment(const UintR & scanInfo){
return (Uint16)((scanInfo >> TAKE_OVER_FRAG_SHIFT) & TAKE_OVER_FRAG_MASK);
}
inline
......@@ -505,9 +505,9 @@ TcKeyReq::setTakeOverScanFlag(UintR & scanInfo, Uint8 flag){
inline
void
TcKeyReq::setTakeOverScanNode(UintR & scanInfo, Uint16 node){
TcKeyReq::setTakeOverScanFragment(UintR & scanInfo, Uint16 node){
// ASSERT_MAX(node, TAKE_OVER_NODE_MASK, "TcKeyReq::setTakeOverScanNode");
scanInfo |= (node << TAKE_OVER_NODE_SHIFT);
scanInfo |= (node << TAKE_OVER_FRAG_SHIFT);
}
inline
......
......@@ -8506,7 +8506,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
#ifdef TRACE_SCAN_TAKEOVER
ndbout_c("adding (%d %d) table: %d fragId: %d frag.i: %d tableFragptr: %d",
scanptr.p->scanNumber, scanptr.p->fragPtrI,
tabptr.i, scanFragReq->fragmentNo, fragptr.i, fragptr.p->tableFragptr);
tabptr.i, scanFragReq->fragmentNoKeyLen & 0xFFFF,
fragptr.i, fragptr.p->tableFragptr);
#endif
c_scanTakeOverHash.add(scanptr);
}
......@@ -8689,11 +8690,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
TdataBuf.i = TdataBuf.p->nextDatabuf;
}
Uint32 fragId = tcConP->fragmentid;
keyInfo->clientOpPtr = scanP->scanApiOpPtr;
keyInfo->keyLen = keyLen;
keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
scanP->scanNumber)+
(getOwnNodeId() << 20);
keyInfo->scanInfo_Node =
KeyInfo20::setScanInfo(scanOp, scanP->scanNumber) + (fragId << 20);
keyInfo->transId1 = tcConP->transid[0];
keyInfo->transId2 = tcConP->transid[1];
......
......@@ -934,7 +934,7 @@ public:
// Third 16 byte cache line in second 64
// byte cache line. Diverse use.
//---------------------------------------------------
Uint32 scanNode;
Uint32 unused4;
Uint32 scanTakeOverInd;
UintR firstKeybuf; /* POINTER THE LINKED LIST OF KEY BUFFERS */
UintR lastKeybuf; /* VARIABLE POINTING TO THE LAST KEY BUFFER */
......
......@@ -2671,11 +2671,9 @@ void Dbtc::execTCKEYREQ(Signal* signal)
Uint32 TDistrGHIndex = tcKeyReq->getScanIndFlag(Treqinfo);
Uint32 TDistrKeyIndex = TDistrGHIndex;
Uint32 TscanNode = tcKeyReq->getTakeOverScanNode(TOptionalDataPtr[0]);
Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]);
regCachePtr->scanTakeOverInd = TDistrGHIndex;
regCachePtr->scanNode = TscanNode;
regCachePtr->scanInfo = TscanInfo;
regCachePtr->distributionKey = TOptionalDataPtr[TDistrKeyIndex];
......@@ -3030,7 +3028,6 @@ void Dbtc::attrinfoDihReceivedLab(Signal* signal)
TcConnectRecord * const regTcPtr = tcConnectptr.p;
Uint16 Tnode = regTcPtr->tcNodedata[0];
Uint16 TscanTakeOverInd = regCachePtr->scanTakeOverInd;
Uint16 TscanNode = regCachePtr->scanNode;
TableRecordPtr localTabptr;
localTabptr.i = regCachePtr->tableref;
......@@ -3043,11 +3040,6 @@ void Dbtc::attrinfoDihReceivedLab(Signal* signal)
TCKEY_abort(signal, 58);
return;
}
if ((TscanTakeOverInd == 1) &&
(Tnode != TscanNode)) {
TCKEY_abort(signal, 15);
return;
}//if
arrGuard(Tnode, MAX_NDB_NODES);
packLqhkeyreq(signal, calcLqhBlockRef(Tnode));
}//Dbtc::attrinfoDihReceivedLab()
......
......@@ -987,7 +987,7 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
Signal * signal = (Signal*)&tmp;
switch(attrId){
case AttributeHeader::FRAGMENT:
* outBuffer = operPtr.p->fragId;
* outBuffer = operPtr.p->fragId >> 1; // remove "hash" bit
return 1;
case AttributeHeader::ROW_COUNT:
case AttributeHeader::COMMIT_COUNT:
......
......@@ -585,7 +585,7 @@ NdbOperation::handle_distribution_key(const Uint64* value, Uint32 len)
chunk -= currLen;
}
}
setPartitionHash(tmp, (Uint32*)tmp - dst);
setPartitionHash(tmp, dst - (Uint32*)tmp);
}
return 0;
}
......
......@@ -30,7 +30,8 @@ testSystemRestart \
testTimeout \
testTransactions \
testDeadlock \
test_event ndbapi_slow_select testReadPerf testLcp
test_event ndbapi_slow_select testReadPerf testLcp \
testPartitioning
#flexTimedAsynch
#testBlobs
......@@ -69,6 +70,7 @@ test_event_SOURCES = test_event.cpp
ndbapi_slow_select_SOURCES = slow_select.cpp
testReadPerf_SOURCES = testReadPerf.cpp
testLcp_SOURCES = testLcp.cpp
testPartitioning_SOURCES = testPartitioning.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NDBT_Test.hpp>
#include <NDBT_ReturnCodes.h>
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <NdbRestarter.hpp>
#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
static
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step)
{
int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records) != 0){
return NDBT_FAILED;
}
return NDBT_OK;
}
static
int
run_drop_table(NDBT_Context* ctx, NDBT_Step* step)
{
NdbDictionary::Dictionary* dict = GETNDB(step)->getDictionary();
dict->dropTable(ctx->getTab()->getName());
return 0;
}
static
int
add_distribution_key(Ndb*, NdbDictionary::Table& tab, int when)
{
switch(when){
case 0: // Before
break;
case 1: // After
return 0;
default:
return 0;
}
int keys = tab.getNoOfPrimaryKeys();
int dks = (2 * keys + 2) / 3;
int cnt = 0;
ndbout_c("%s pks: %d dks: %d", tab.getName(), keys, dks);
for(unsigned i = 0; i<tab.getNoOfColumns(); i++)
{
NdbDictionary::Column* col = tab.getColumn(i);
if(col->getPrimaryKey())
{
if(dks >= keys || (rand() % 100) > 50)
{
col->setDistributionKey(true);
dks--;
}
keys--;
}
}
return 0;
}
int
run_create_table(NDBT_Context* ctx, NDBT_Step* step)
{
bool dk = ctx->getProperty("distributionkey", (unsigned)0);
return NDBT_Tables::createTable(GETNDB(step),
ctx->getTab()->getName(),
false, false, dk?add_distribution_key:0);
}
int
run_pk_dk(NDBT_Context* ctx, NDBT_Step* step)
{
Ndb* p_ndb = GETNDB(step);
int records = ctx->getNumRecords();
const NdbDictionary::Table *tab =
p_ndb->getDictionary()->getTable(ctx->getTab()->getName());
HugoTransactions hugoTrans(*tab);
if (hugoTrans.loadTable(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.pkReadRecords(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.pkUpdateRecords(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.pkDelRecords(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if (hugoTrans.loadTable(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.scanUpdateRecords(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.scanReadRecords(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
if(hugoTrans.clearTable(p_ndb, records) != 0)
{
return NDBT_FAILED;
}
return 0;
}
int
run_hash_dk(NDBT_Context* ctx, NDBT_Step* step)
{
return 0;
}
int
run_index_dk(NDBT_Context* ctx, NDBT_Step* step)
{
return 0;
}
NDBT_TESTSUITE(testPartitioning);
TESTCASE("pk_dk",
"Primary key operations with distribution key")
{
TC_PROPERTY("distributionkey", 1);
INITIALIZER(run_drop_table);
INITIALIZER(run_create_table);
INITIALIZER(run_pk_dk);
INITIALIZER(run_drop_table);
}
TESTCASE("hash_index_dk",
"Unique index operatations with distribution key")
{
TC_PROPERTY("distributionkey", 1);
INITIALIZER(run_drop_table);
INITIALIZER(run_create_table);
INITIALIZER(run_hash_dk);
INITIALIZER(run_drop_table);
}
TESTCASE("ordered_index_dk",
"Ordered index operatations with distribution key")
{
TC_PROPERTY("distributionkey", 1);
INITIALIZER(run_drop_table);
INITIALIZER(run_create_table);
INITIALIZER(run_index_dk);
INITIALIZER(run_drop_table);
}
NDBT_TESTSUITE_END(testPartitioning);
int main(int argc, const char** argv){
ndb_init();
testPartitioning.setCreateTable(false);
return testPartitioning.execute(argc, argv);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment