Commit 37b42d2d authored by unknown's avatar unknown

wl2240 - ndb partitioning

1) Fix startTransaction with hint
2) Ship fragment data to API to make better guess
3) Expose both primary & backup replicas 
   To (in the future) be even more clever on choosing nodes for
   transactions/operations


ndb/include/kernel/ndb_limits.h:
  Set maxsize of array with fragment data
ndb/include/kernel/signaldata/DictTabInfo.hpp:
  Send fragment data to API
ndb/src/common/debugger/signaldata/DictTabInfo.cpp:
  Send fragment data to API
ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
  Send fragment data to API
ndb/src/kernel/blocks/dbdict/Dbdict.hpp:
  Send fragment data to API
ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  Use Uint16 for fragment info
  Fix uninit variable
ndb/src/ndbapi/Ndb.cpp:
  Impl. startTransaction with hint
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Add interface for retreiving replica nodes given a hash-value
ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  Add interface for retreiving replica nodes given a hash-value
ndb/src/ndbapi/ndb_cluster_connection.cpp:
  remove Fragment2NodeMap and put it on table instead
ndb/src/ndbapi/ndb_cluster_connection_impl.hpp:
  remove Fragment2NodeMap and put it on table instead
ndb/test/ndbapi/testNdbApi.cpp:
  Test some more
parent 17ec8128
......@@ -60,6 +60,7 @@
#define MAX_KEY_SIZE_IN_WORDS 1023
#define MAX_FRM_DATA_SIZE 6000
#define MAX_NULL_BITS 4096
#define MAX_FRAGMENT_DATA_BYTES (4+(2 * 8 * MAX_REPLICAS * MAX_NDB_NODES))
#define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1)
/*
......
......@@ -96,6 +96,8 @@ public:
FrmLen = 26,
FrmData = 27,
FragmentCount = 128, // No of fragments in table (!fragment replicas)
FragmentDataLen = 129,
FragmentData = 130, // CREATE_FRAGMENTATION reply
TableEnd = 999,
AttributeName = 1000, // String, Mandatory
......@@ -236,7 +238,9 @@ public:
Uint32 FrmLen;
char FrmData[MAX_FRM_DATA_SIZE];
Uint32 FragmentCount;
Uint32 FragmentDataLen;
Uint16 FragmentData[(MAX_FRAGMENT_DATA_BYTES+1)/2];
void init();
};
......
......@@ -46,6 +46,8 @@ DictTabInfo::TableMapping[] = {
DTIMAP2(Table, FrmLen, FrmLen, 0, MAX_FRM_DATA_SIZE),
DTIMAPB(Table, FrmData, FrmData, 0, MAX_FRM_DATA_SIZE, FrmLen),
DTIMAP(Table, FragmentCount, FragmentCount),
DTIMAP2(Table, FragmentDataLen, FragmentDataLen, 0, MAX_FRAGMENT_DATA_BYTES),
DTIMAPB(Table, FragmentData, FragmentData, 0, MAX_FRAGMENT_DATA_BYTES, FragmentDataLen),
DTIBREAK(AttributeName)
};
......@@ -120,6 +122,8 @@ DictTabInfo::Table::init(){
FrmLen = 0;
memset(FrmData, 0, sizeof(FrmData));
FragmentCount = 0;
FragmentDataLen = 0;
memset(FragmentData, 0, sizeof(FragmentData));
}
void
......
......@@ -206,7 +206,7 @@ void Dbdict::packTableIntoPages(Signal* signal, Uint32 tableId, Uint32 pageId)
8 * ZSIZE_OF_PAGES_IN_WORDS);
w.first();
packTableIntoPagesImpl(w, tablePtr);
packTableIntoPagesImpl(w, tablePtr, signal);
Uint32 wordsOfTable = w.getWordsUsed();
Uint32 pagesUsed =
......@@ -235,7 +235,8 @@ void Dbdict::packTableIntoPages(Signal* signal, Uint32 tableId, Uint32 pageId)
void
Dbdict::packTableIntoPagesImpl(SimpleProperties::Writer & w,
TableRecordPtr tablePtr){
TableRecordPtr tablePtr,
Signal* signal){
w.add(DictTabInfo::TableName, tablePtr.p->tableName);
w.add(DictTabInfo::TableId, tablePtr.i);
......@@ -257,7 +258,31 @@ Dbdict::packTableIntoPagesImpl(SimpleProperties::Writer & w,
w.add(DictTabInfo::TableKValue, tablePtr.p->kValue);
w.add(DictTabInfo::FragmentTypeVal, tablePtr.p->fragmentType);
w.add(DictTabInfo::TableTypeVal, tablePtr.p->tableType);
w.add(DictTabInfo::FragmentCount, tablePtr.p->fragmentCount);
if(!signal)
{
w.add(DictTabInfo::FragmentCount, tablePtr.p->fragmentCount);
}
else
{
Uint32 * theData = signal->getDataPtrSend();
CreateFragmentationReq * const req = (CreateFragmentationReq*)theData;
req->senderRef = 0;
req->senderData = RNIL;
req->fragmentationType = tablePtr.p->fragmentType;
req->noOfFragments = 0;
req->fragmentNode = 0;
req->primaryTableId = tablePtr.i;
EXECUTE_DIRECT(DBDIH, GSN_CREATE_FRAGMENTATION_REQ, signal,
CreateFragmentationReq::SignalLength);
if(signal->theData[0] == 0)
{
Uint16 *data = (Uint16*)&signal->theData[25];
Uint32 count = 2 + data[0] * data[1];
w.add(DictTabInfo::FragmentDataLen, 2*count);
w.add(DictTabInfo::FragmentData, data, 2*count);
}
}
if (tablePtr.p->primaryTableId != RNIL){
TableRecordPtr primTab;
......@@ -5697,7 +5722,10 @@ void Dbdict::sendGET_TABLEID_REF(Signal* signal,
void Dbdict::execGET_TABINFOREQ(Signal* signal)
{
jamEntry();
if(!assembleFragments(signal)) { return; }
if(!assembleFragments(signal))
{
return;
}
GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
......
......@@ -1582,7 +1582,8 @@ private:
bool getNewAttributeRecord(TableRecordPtr tablePtr,
AttributeRecordPtr & attrPtr);
void packTableIntoPages(Signal* signal, Uint32 tableId, Uint32 pageId);
void packTableIntoPagesImpl(SimpleProperties::Writer &, TableRecordPtr);
void packTableIntoPagesImpl(SimpleProperties::Writer &, TableRecordPtr,
Signal* signal= 0);
void sendGET_TABINFOREQ(Signal* signal,
Uint32 tableId);
......
......@@ -6202,9 +6202,12 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
if (primaryTableId == RNIL) {
if(fragmentNode == 0){
jam();
// needs to be fixed for single fragment tables
NGPtr.i = 0; //c_nextNodeGroup;
c_nextNodeGroup = (NGPtr.i + 1 == cnoOfNodeGroups ? 0 : NGPtr.i + 1);
NGPtr.i = 0;
if(noOfFragments < csystemnodes)
{
NGPtr.i = c_nextNodeGroup;
c_nextNodeGroup = (NGPtr.i + 1 == cnoOfNodeGroups ? 0 : NGPtr.i + 1);
}
} else if(! (fragmentNode < MAX_NDB_NODES)) {
jam();
err = CreateFragmentationRef::InvalidNodeId;
......@@ -6257,33 +6260,28 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
}
}
//@todo use section writer
Uint32 count = 2;
Uint32 fragments[2 + 8*MAX_REPLICAS*MAX_NDB_NODES];
Uint32 next_replica_node[MAX_NDB_NODES];
memset(next_replica_node,0,sizeof(next_replica_node));
Uint16 *fragments = (Uint16*)(signal->theData+25);
if (primaryTableId == RNIL) {
jam();
Uint8 next_replica_node[MAX_NDB_NODES];
memset(next_replica_node,0,sizeof(next_replica_node));
for(Uint32 fragNo = 0; fragNo<noOfFragments; fragNo++){
jam();
ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
Uint32 ind = next_replica_node[NGPtr.i];
const Uint32 max = NGPtr.p->nodeCount;
//-------------------------------------------------------------------
// We make an extra step to ensure that the primary replicas are
// spread among the nodes.
//-------------------------------------------------------------------
next_replica_node[NGPtr.i] = (ind + 1 >= max ? 0 : ind + 1);
for(Uint32 replicaNo = 0; replicaNo<noOfReplicas; replicaNo++){
Uint32 tmp= next_replica_node[NGPtr.i];
for(Uint32 replicaNo = 0; replicaNo<noOfReplicas; replicaNo++)
{
jam();
const Uint32 nodeId = NGPtr.p->nodesInGroup[ind++];
const Uint32 nodeId = NGPtr.p->nodesInGroup[tmp++];
fragments[count++] = nodeId;
ind = (ind == max ? 0 : ind);
tmp = (tmp >= max ? 0 : tmp);
}
tmp++;
next_replica_node[NGPtr.i]= (tmp >= max ? 0 : tmp);
/**
* Next node group for next fragment
*/
......@@ -6332,26 +6330,42 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
fragments[0] = noOfReplicas;
fragments[1] = noOfFragments;
LinearSectionPtr ptr[3];
ptr[0].p = &fragments[0];
ptr[0].sz = count;
sendSignal(senderRef,
GSN_CREATE_FRAGMENTATION_CONF,
signal,
CreateFragmentationConf::SignalLength,
JBB,
ptr,
1);
if(senderRef != 0)
{
LinearSectionPtr ptr[3];
ptr[0].p = (Uint32*)&fragments[0];
ptr[0].sz = (count + 1) / 2;
sendSignal(senderRef,
GSN_CREATE_FRAGMENTATION_CONF,
signal,
CreateFragmentationConf::SignalLength,
JBB,
ptr,
1);
}
else
{
// Execute direct
signal->theData[0] = 0;
}
return;
} while(false);
CreateFragmentationRef * const ref =
(CreateFragmentationRef*)signal->getDataPtrSend();
ref->senderRef = reference();
ref->senderData = senderData;
ref->errorCode = err;
sendSignal(senderRef, GSN_CREATE_FRAGMENTATION_REF, signal,
CreateFragmentationRef::SignalLength, JBB);
if(senderRef != 0)
{
CreateFragmentationRef * const ref =
(CreateFragmentationRef*)signal->getDataPtrSend();
ref->senderRef = reference();
ref->senderData = senderData;
ref->errorCode = err;
sendSignal(senderRef, GSN_CREATE_FRAGMENTATION_REF, signal,
CreateFragmentationRef::SignalLength, JBB);
}
else
{
// Execute direct
signal->theData[0] = err;
}
}
void Dbdih::execDIADDTABREQ(Signal* signal)
......@@ -6419,12 +6433,15 @@ void Dbdih::execDIADDTABREQ(Signal* signal)
tabPtr.p->method = TabRecord::HASH;
tabPtr.p->kvalue = req->kValue;
Uint32 fragments[2 + 8*MAX_REPLICAS*MAX_NDB_NODES];
union {
Uint16 fragments[2 + MAX_FRAG_PER_NODE*MAX_REPLICAS*MAX_NDB_NODES];
Uint32 align;
};
SegmentedSectionPtr fragDataPtr;
signal->getSection(fragDataPtr, DiAddTabReq::FRAGMENTATION);
copy(fragments, fragDataPtr);
copy((Uint32*)fragments, fragDataPtr);
releaseSections(signal);
const Uint32 noReplicas = fragments[0];
const Uint32 noFragments = fragments[1];
......@@ -11419,7 +11436,6 @@ void Dbdih::makeNodeGroups(Uint32 nodeArray[])
NodeRecordPtr mngNodeptr;
Uint32 tmngNode;
Uint32 tmngNodeGroup;
Uint32 tmngReplica;
Uint32 tmngLimit;
Uint32 i;
......@@ -11428,7 +11444,6 @@ void Dbdih::makeNodeGroups(Uint32 nodeArray[])
* TO NODE GROUP ZNIL
*-----------------------------------------------------------------------*/
tmngNodeGroup = 0;
tmngReplica = 0;
tmngLimit = csystemnodes - cnoHotSpare;
ndbrequire(tmngLimit < MAX_NDB_NODES);
for (i = 0; i < tmngLimit; i++) {
......@@ -11440,13 +11455,11 @@ void Dbdih::makeNodeGroups(Uint32 nodeArray[])
mngNodeptr.p->nodeGroup = tmngNodeGroup;
NGPtr.i = tmngNodeGroup;
ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
arrGuard(tmngReplica, MAX_REPLICAS);
NGPtr.p->nodesInGroup[tmngReplica] = mngNodeptr.i;
tmngReplica++;
if (tmngReplica == cnoReplicas) {
arrGuard(NGPtr.p->nodeCount, MAX_REPLICAS);
NGPtr.p->nodesInGroup[NGPtr.p->nodeCount++] = mngNodeptr.i;
if (NGPtr.p->nodeCount == cnoReplicas) {
jam();
tmngNodeGroup++;
tmngReplica = 0;
}//if
}//for
cnoOfNodeGroups = tmngNodeGroup;
......
......@@ -307,18 +307,41 @@ Ndb::startTransaction(const NdbDictionary::Table *table,
if (theInitState == Initialised) {
theError.code = 0;
checkFailedNode();
/**
* If the user supplied key data
* We will make a qualified quess to which node is the primary for the
* the fragment and contact that node
*/
/**
* If the user supplied key data
* We will make a qualified quess to which node is the primary for the
* the fragment and contact that node
*/
Uint32 nodeId;
if(keyData != 0) {
nodeId = 0; // guess not supported
// nodeId = m_ndb_cluster_connection->guess_primary_node(keyData, keyLen);
NdbTableImpl* impl;
if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table)))
{
Uint32 hashValue;
{
Uint32 buf[4];
if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0)
{
md5_hash(buf, (const Uint64*)keyData, keyLen >> 2);
}
else
{
Uint64 tmp[1000];
tmp[keyLen/8] = 0;
memcpy(tmp, keyData, keyLen);
md5_hash(buf, tmp, (keyLen+3) >> 2);
}
hashValue= buf[1];
}
const Uint16 *nodes;
Uint32 cnt= impl->get_nodes(hashValue, &nodes);
if(cnt)
nodeId= nodes[0];
else
nodeId= 0;
} else {
nodeId = 0;
}//if
{
NdbTransaction *trans= startTransactionLocal(0, nodeId);
DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx",
......
......@@ -482,6 +482,26 @@ NdbTableImpl::buildColumnHash(){
}
#endif
}
Uint32
NdbTableImpl::get_nodes(Uint32 hashValue, const Uint16 ** nodes) const
{
if(m_replicaCount > 0)
{
Uint32 fragmentId = hashValue & m_hashValueMask;
if(fragmentId < m_hashpointerValue)
{
fragmentId = hashValue & ((m_hashValueMask << 1) + 1);
}
Uint32 pos = fragmentId * m_replicaCount;
if(pos + m_replicaCount <= m_fragments.size())
{
* nodes = m_fragments.getBase()+pos;
return m_replicaCount;
}
}
return 0;
}
/**
* NdbIndexImpl
......@@ -1226,7 +1246,6 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
impl->m_kvalue = tableDesc.TableKValue;
impl->m_minLoadFactor = tableDesc.MinLoadFactor;
impl->m_maxLoadFactor = tableDesc.MaxLoadFactor;
impl->m_fragmentCount = tableDesc.FragmentCount;
impl->m_indexType = (NdbDictionary::Index::Type)
getApiConstant(tableDesc.TableType,
......@@ -1326,7 +1345,42 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
impl->m_keyLenInWords = keyInfoPos;
impl->m_noOfBlobs = blobCount;
impl->m_noOfDistributionKeys = distKeys;
if(tableDesc.FragmentDataLen > 0)
{
int i;
Uint32 fragCount = tableDesc.FragmentData[0];
Uint32 replicaCount = tableDesc.FragmentData[1];
impl->m_replicaCount = replicaCount;
impl->m_fragmentCount = fragCount;
for(i = 0; i<(fragCount*replicaCount); i++)
{
impl->m_fragments.push_back(tableDesc.FragmentData[i+2]);
}
impl->m_replicaCount = replicaCount;
impl->m_fragmentCount = fragCount;
Uint32 topBit = (1 << 31);
for(int i = 31; i>=0; i--){
if((fragCount & topBit) != 0)
break;
topBit >>= 1;
}
impl->m_hashValueMask = topBit - 1;
impl->m_hashpointerValue = fragCount - (impl->m_hashValueMask + 1);
}
else
{
impl->m_fragmentCount = tableDesc.FragmentCount;
impl->m_replicaCount = 0;
impl->m_hashValueMask = 0;
impl->m_hashpointerValue = 0;
}
* ret = impl;
DBUG_RETURN(0);
}
......@@ -3041,6 +3095,7 @@ NdbDictInterface::execLIST_TABLES_CONF(NdbApiSignal* signal,
}
template class Vector<int>;
template class Vector<Uint16>;
template class Vector<Uint32>;
template class Vector<Vector<Uint32> >;
template class Vector<NdbTableImpl*>;
......
......@@ -117,7 +117,14 @@ public:
Vector<Uint32> m_columnHash;
Vector<NdbColumnImpl *> m_columns;
void buildColumnHash();
/**
* Fragment info
*/
Uint32 m_hashValueMask;
Uint32 m_hashpointerValue;
Vector<Uint16> m_fragments;
bool m_logging;
int m_kvalue;
int m_minLoadFactor;
......@@ -144,6 +151,8 @@ public:
Uint8 m_noOfKeys;
Uint8 m_noOfDistributionKeys;
Uint8 m_noOfBlobs;
Uint8 m_replicaCount;
/**
* Equality/assign
......@@ -156,6 +165,11 @@ public:
static NdbTableImpl & getImpl(NdbDictionary::Table & t);
static NdbTableImpl & getImpl(const NdbDictionary::Table & t);
NdbDictionary::Table * m_facade;
/**
* Return count
*/
Uint32 get_nodes(Uint32 hashValue, const Uint16** nodes) const ;
};
class NdbIndexImpl : public NdbDictionary::Index, public NdbDictObjectImpl {
......
......@@ -539,110 +539,5 @@ void Ndb_cluster_connection_impl::connect_thread()
DBUG_VOID_RETURN;
}
/*
* Hint handling to select node
* ToDo: fix this
*/
void
Ndb_cluster_connection_impl::FragmentToNodeMap::init(Uint32 noOfNodes,
Uint8 nodeIds[])
{
kValue = 6;
noOfFragments = 2 * noOfNodes;
/**
* Compute hashValueMask and hashpointerValue
*/
{
Uint32 topBit = (1 << 31);
for(int i = 31; i>=0; i--){
if((noOfFragments & topBit) != 0)
break;
topBit >>= 1;
}
hashValueMask = topBit - 1;
hashpointerValue = noOfFragments - (hashValueMask + 1);
}
/**
* This initialization depends on
* the fact that:
* primary node for fragment i = i % noOfNodes
*
* This algorithm should be implemented in Dbdih
*/
{
if (fragment2PrimaryNodeMap != 0)
abort();
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
Uint32 i;
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
fragment2PrimaryNodeMap[j] = tmp;
}
for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
}
}
void
Ndb_cluster_connection_impl::FragmentToNodeMap::release(){
delete [] fragment2PrimaryNodeMap;
fragment2PrimaryNodeMap = 0;
}
static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
Uint32
Ndb_cluster_connection_impl::guess_primary_node(const char *keyData,
Uint32 keyLen)
{
Uint64 tempData[MAX_KEY_LEN_64_WORDS];
const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
const char * usedKeyData = 0;
/**
* If key data buffer is not aligned (on 64 bit boundary)
* or key len is not a multiple of 4
* Use temp data
*/
if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
usedKeyData = keyData;
} else {
memcpy(&tempData[0], keyData, keyLen);
const int slack = keyLen & 3;
if(slack > 0) {
memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
}//if
usedKeyData = (char *)&tempData[0];
}//if
Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
hashValue >>= fragmentToNodeMap.kValue;
Uint32 fragmentId = hashValue &
fragmentToNodeMap.hashValueMask;
if(fragmentId < fragmentToNodeMap.hashpointerValue) {
fragmentId = hashValue &
((fragmentToNodeMap.hashValueMask << 1) + 1);
}//if
return fragmentId;
}
template class Vector<Ndb_cluster_connection_impl::Node>;
......@@ -54,22 +54,6 @@ private:
friend class NdbImpl;
friend void* run_ndb_cluster_connection_connect_thread(void*);
friend class Ndb_cluster_connection;
/**
* Structure containing values for guessing primary node
*/
struct FragmentToNodeMap {
FragmentToNodeMap():
fragment2PrimaryNodeMap(0) {};
Uint32 kValue;
Uint32 hashValueMask;
Uint32 hashpointerValue;
Uint32 noOfFragments;
Uint32 *fragment2PrimaryNodeMap;
void init(Uint32 noOfNodes, Uint8 nodeIds[]);
void release();
} fragmentToNodeMap;
struct Node
{
......@@ -85,8 +69,6 @@ private:
Vector<Node> m_all_nodes;
void init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config);
Uint32 guess_primary_node(const char * keyData, Uint32 keyLen);
void connect_thread();
TransporterFacade *m_transporter_facade;
......
......@@ -140,14 +140,19 @@ int runTestMaxTransaction(NDBT_Context* ctx, NDBT_Step* step){
pCon = pNdb->startTransaction();
break;
case 1:
{
BaseString key;
key.appfmt("DATA-%d", i);
ndbout_c("%s", key.c_str());
pCon = pNdb->startTransaction(pTab,
"DATA",
4);
break;
key.c_str(),
key.length());
}
break;
default:
abort();
}
if (pCon == NULL){
ERR(pNdb->getNdbError());
errors++;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment