/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /***************************************************************************** Name: Ndb.cpp ******************************************************************************/ #include <ndb_global.h> #include "NdbApiSignal.hpp" #include "NdbImpl.hpp" #include <NdbOperation.hpp> #include <NdbTransaction.hpp> #include <NdbRecAttr.hpp> #include <md5_hash.hpp> #include <NdbSleep.h> #include <NdbOut.hpp> #include <ndb_limits.h> #include "API.hpp" #include <NdbEnv.h> #include <BaseString.hpp> /**************************************************************************** void connect(); Connect to any node which has no connection at the moment. ****************************************************************************/ NdbTransaction* Ndb::doConnect(Uint32 tConNode) { Uint32 tNode; Uint32 tAnyAlive = 0; int TretCode= 0; DBUG_ENTER("Ndb::doConnect"); if (tConNode != 0) { TretCode = NDB_connect(tConNode); if ((TretCode == 1) || (TretCode == 2)) { //**************************************************************************** // We have connections now to the desired node. Return //**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tConNode)); } else if (TretCode < 0) { DBUG_RETURN(NULL); } else if (TretCode != 0) { tAnyAlive = 1; }//if }//if //**************************************************************************** // We will connect to any node. Make sure that we have connections to all // nodes. //**************************************************************************** if (theImpl->m_optimized_node_selection) { Ndb_cluster_connection_node_iter &node_iter= theImpl->m_node_iter; theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter); while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter))) { TretCode= NDB_connect(tNode); if ((TretCode == 1) || (TretCode == 2)) { //**************************************************************************** // We have connections now to the desired node. Return //**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tNode)); } else if (TretCode < 0) { DBUG_RETURN(NULL); } else if (TretCode != 0) { tAnyAlive= 1; }//if DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s", tNode, TretCode, getNdbError().code, getNdbError().message)); } } else // just do a regular round robin { Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes; Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex; UintR Tcount = 0; do { theCurrentConnectIndex++; if (theCurrentConnectIndex >= tNoOfDbNodes) theCurrentConnectIndex = 0; Tcount++; tNode= theImpl->theDBnodes[theCurrentConnectIndex]; TretCode= NDB_connect(tNode); if ((TretCode == 1) || (TretCode == 2)) { //**************************************************************************** // We have connections now to the desired node. Return //**************************************************************************** DBUG_RETURN(getConnectedNdbTransaction(tNode)); } else if (TretCode < 0) { DBUG_RETURN(NULL); } else if (TretCode != 0) { tAnyAlive= 1; }//if DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode)); } while (Tcount < tNoOfDbNodes); } //**************************************************************************** // We were unable to find a free connection. If no node alive we will report // error code for cluster failure otherwise connection failure. //**************************************************************************** if (tAnyAlive == 1) { #ifdef VM_TRACE ndbout << "TretCode = " << TretCode << endl; #endif theError.code = 4006; } else { theError.code = 4009; }//if DBUG_RETURN(NULL); } int Ndb::NDB_connect(Uint32 tNode) { //**************************************************************************** // We will perform seize of a transaction record in DBTC in the specified node. //*************************************************************************** int tReturnCode; TransporterFacade *tp = TransporterFacade::instance(); DBUG_ENTER("Ndb::NDB_connect"); bool nodeAvail = tp->get_node_alive(tNode); if(nodeAvail == false){ DBUG_RETURN(0); } NdbTransaction * tConArray = theConnectionArray[tNode]; if (tConArray != NULL) { DBUG_RETURN(2); } NdbTransaction * tNdbCon = getNdbCon(); // Get free connection object. if (tNdbCon == NULL) { DBUG_RETURN(4); }//if NdbApiSignal* tSignal = getSignal(); // Get signal object if (tSignal == NULL) { releaseNdbCon(tNdbCon); DBUG_RETURN(4); }//if if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) { releaseNdbCon(tNdbCon); releaseSignal(tSignal); DBUG_RETURN(4); }//if tSignal->setData(tNdbCon->ptr2int(), 1); //************************************************ // Set connection pointer as NdbTransaction object //************************************************ tSignal->setData(theMyRef, 2); // Set my block reference tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting Uint32 nodeSequence; { // send and receive signal Guard guard(tp->theMutexPtr); nodeSequence = tp->getNodeSequence(tNode); bool node_is_alive = tp->get_node_alive(tNode); if (node_is_alive) { DBUG_PRINT("info",("Sending signal to node %u", tNode)); tReturnCode = tp->sendSignal(tSignal, tNode); releaseSignal(tSignal); if (tReturnCode != -1) { theImpl->theWaiter.m_node = tNode; theImpl->theWaiter.m_state = WAIT_TC_SEIZE; tReturnCode = receiveResponse(); }//if } else { releaseSignal(tSignal); tReturnCode = -1; }//if } if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) { //************************************************ // Send and receive was successful //************************************************ NdbTransaction* tPrevFirst = theConnectionArray[tNode]; tNdbCon->setConnectedNodeId(tNode, nodeSequence); tNdbCon->setMyBlockReference(theMyRef); theConnectionArray[tNode] = tNdbCon; tNdbCon->theNext = tPrevFirst; DBUG_RETURN(1); } else { releaseNdbCon(tNdbCon); //**************************************************************************** // Unsuccessful connect is indicated by 3. //**************************************************************************** DBUG_PRINT("info", ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d", tReturnCode, tNdbCon->Status())); if (theError.code == 299) { // single user mode so no need to retry with other node DBUG_RETURN(-1); } DBUG_RETURN(3); }//if }//Ndb::NDB_connect() NdbTransaction * Ndb::getConnectedNdbTransaction(Uint32 nodeId){ NdbTransaction* next = theConnectionArray[nodeId]; theConnectionArray[nodeId] = next->theNext; next->theNext = NULL; return next; }//Ndb::getConnectedNdbTransaction() /***************************************************************************** disconnect(); Remark: Disconnect all connections to the database. *****************************************************************************/ void Ndb::doDisconnect() { NdbTransaction* tNdbCon; CHECK_STATUS_MACRO_VOID; /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */ DBUG_ENTER("Ndb::doDisconnect"); Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes; Uint8 *theDBnodes= theImpl->theDBnodes; DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes)); UintR i; for (i = 0; i < tNoOfDbNodes; i++) { Uint32 tNode = theDBnodes[i]; tNdbCon = theConnectionArray[tNode]; while (tNdbCon != NULL) { NdbTransaction* tmpNdbCon = tNdbCon; tNdbCon = tNdbCon->theNext; releaseConnectToNdb(tmpNdbCon); }//while }//for tNdbCon = theTransactionList; while (tNdbCon != NULL) { NdbTransaction* tmpNdbCon = tNdbCon; tNdbCon = tNdbCon->theNext; releaseConnectToNdb(tmpNdbCon); }//while DBUG_VOID_RETURN; }//Ndb::disconnect() /***************************************************************************** int waitUntilReady(int timeout); Return Value: Returns 0 if the Ndb is ready within timeout seconds. Returns -1 otherwise. Remark: Waits until a node has status != 0 *****************************************************************************/ int Ndb::waitUntilReady(int timeout) { DBUG_ENTER("Ndb::waitUntilReady"); int secondsCounter = 0; int milliCounter = 0; if (theInitState != Initialised) { // Ndb::init is not called theError.code = 4256; DBUG_RETURN(-1); } while (theNode == 0) { if (secondsCounter >= timeout) { theError.code = 4269; DBUG_RETURN(-1); } NdbSleep_MilliSleep(100); milliCounter += 100; if (milliCounter >= 1000) { secondsCounter++; milliCounter = 0; }//if } if (theImpl->m_ndb_cluster_connection.wait_until_ready (timeout-secondsCounter,30) < 0) { theError.code = 4009; DBUG_RETURN(-1); } DBUG_RETURN(0); } /***************************************************************************** NdbTransaction* startTransaction(); Return Value: Returns a pointer to a connection object. Return NULL otherwise. Remark: Start transaction. Synchronous. *****************************************************************************/ NdbTransaction* Ndb::startTransaction(const NdbDictionary::Table *table, const char * keyData, Uint32 keyLen) { DBUG_ENTER("Ndb::startTransaction"); if (theInitState == Initialised) { theError.code = 0; checkFailedNode(); /** * If the user supplied key data * We will make a qualified quess to which node is the primary for the * the fragment and contact that node */ Uint32 nodeId; NdbTableImpl* impl; if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table))) { Uint32 hashValue; { Uint32 buf[4]; if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0) { md5_hash(buf, (const Uint64*)keyData, keyLen >> 2); } else { Uint64 tmp[1000]; tmp[keyLen/8] = 0; memcpy(tmp, keyData, keyLen); md5_hash(buf, tmp, (keyLen+3) >> 2); } hashValue= buf[1]; } const Uint16 *nodes; Uint32 cnt= impl->get_nodes(hashValue, &nodes); if(cnt) nodeId= nodes[0]; else nodeId= 0; } else { nodeId = 0; }//if { NdbTransaction *trans= startTransactionLocal(0, nodeId); DBUG_PRINT("exit",("start trans: 0x%lx transid: 0x%lx", (long) trans, (long) (trans ? trans->getTransactionId() : 0))); DBUG_RETURN(trans); } } else { DBUG_RETURN(NULL); }//if }//Ndb::startTransaction() /***************************************************************************** NdbTransaction* hupp(NdbTransaction* pBuddyTrans); Return Value: Returns a pointer to a connection object. Connected to the same node as pBuddyTrans and also using the same transction id Remark: Start transaction. Synchronous. *****************************************************************************/ NdbTransaction* Ndb::hupp(NdbTransaction* pBuddyTrans) { DBUG_ENTER("Ndb::hupp"); DBUG_PRINT("enter", ("trans: 0x%lx", (long) pBuddyTrans)); Uint32 aPriority = 0; if (pBuddyTrans == NULL){ DBUG_RETURN(startTransaction()); } if (theInitState == Initialised) { theError.code = 0; checkFailedNode(); Uint32 nodeId = pBuddyTrans->getConnectedNodeId(); NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId); if(pCon == NULL) DBUG_RETURN(NULL); if (pCon->getConnectedNodeId() != nodeId){ // We could not get a connection to the desired node // release the connection and return NULL closeTransaction(pCon); theError.code = 4006; DBUG_RETURN(NULL); } pCon->setTransactionId(pBuddyTrans->getTransactionId()); pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr()); DBUG_PRINT("exit", ("hupp trans: 0x%lx transid: 0x%lx", (long) pCon, (long) (pCon ? pCon->getTransactionId() : 0))); DBUG_RETURN(pCon); } else { DBUG_RETURN(NULL); }//if }//Ndb::hupp() NdbTransaction* Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId) { #ifdef VM_TRACE char buf[255]; const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255); if(val != 0){ nodeId = atoi(val); } #endif DBUG_ENTER("Ndb::startTransactionLocal"); DBUG_PRINT("enter", ("nodeid: %d", nodeId)); if(unlikely(theRemainingStartTransactions == 0)) { theError.code = 4006; DBUG_RETURN(0); } NdbTransaction* tConnection; Uint64 tFirstTransId = theFirstTransId; tConnection = doConnect(nodeId); if (tConnection == NULL) { DBUG_RETURN(NULL); }//if theRemainingStartTransactions--; NdbTransaction* tConNext = theTransactionList; if (tConnection->init()) { theError.code = tConnection->theError.code; DBUG_RETURN(NULL); } theTransactionList = tConnection; // into a transaction list. tConnection->next(tConNext); // Add the active connection object tConnection->setTransactionId(tFirstTransId); tConnection->thePriority = aPriority; if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) { //--------------------------------------------------- // Transaction id rolling round. We will start from // consecutive identity 0 again. //--------------------------------------------------- theFirstTransId = ((tFirstTransId >> 32) << 32); } else { theFirstTransId = tFirstTransId + 1; }//if #ifdef VM_TRACE if (tConnection->theListState != NdbTransaction::NotInList) { printState("startTransactionLocal %x", tConnection); abort(); } #endif DBUG_RETURN(tConnection); }//Ndb::startTransactionLocal() /***************************************************************************** void closeTransaction(NdbTransaction* aConnection); Parameters: aConnection: the connection used in the transaction. Remark: Close transaction by releasing the connection and all operations. *****************************************************************************/ void Ndb::closeTransaction(NdbTransaction* aConnection) { DBUG_ENTER("Ndb::closeTransaction"); NdbTransaction* tCon; NdbTransaction* tPreviousCon; if (aConnection == NULL) { //----------------------------------------------------- // closeTransaction called on NULL pointer, destructive // application behaviour. //----------------------------------------------------- #ifdef VM_TRACE printf("NULL into closeTransaction\n"); #endif DBUG_VOID_RETURN; }//if CHECK_STATUS_MACRO_VOID; tCon = theTransactionList; theRemainingStartTransactions++; DBUG_PRINT("info",("close trans: 0x%lx transid: 0x%lx", (long) aConnection, (long) aConnection->getTransactionId())); DBUG_PRINT("info",("magic number: 0x%x TCConPtr: 0x%x theMyRef: 0x%x 0x%x", aConnection->theMagicNumber, aConnection->theTCConPtr, aConnection->theMyRef, getReference())); if (aConnection == tCon) { // Remove the active connection object theTransactionList = tCon->next(); // from the transaction list. } else { while (aConnection != tCon) { if (tCon == NULL) { //----------------------------------------------------- // closeTransaction called on non-existing transaction //----------------------------------------------------- if(aConnection->theError.code == 4008){ /** * When a SCAN timed-out, returning the NdbTransaction leads * to reuse. And TC crashes when the API tries to reuse it to * something else... */ #ifdef VM_TRACE printf("Scan timeout:ed NdbTransaction-> " "not returning it-> memory leak\n"); #endif DBUG_VOID_RETURN; } #ifdef VM_TRACE printf("Non-existing transaction into closeTransaction\n"); abort(); #endif DBUG_VOID_RETURN; }//if tPreviousCon = tCon; tCon = tCon->next(); }//while tPreviousCon->next(tCon->next()); }//if aConnection->release(); if(aConnection->theError.code == 4008){ /** * Something timed-out, returning the NdbTransaction leads * to reuse. And TC crashes when the API tries to reuse it to * something else... */ #ifdef VM_TRACE printf("Con timeout:ed NdbTransaction-> not returning it-> memory leak\n"); #endif DBUG_VOID_RETURN; } if (aConnection->theReleaseOnClose == false) { /** * Put it back in idle list for that node */ Uint32 nodeId = aConnection->getConnectedNodeId(); aConnection->theNext = theConnectionArray[nodeId]; theConnectionArray[nodeId] = aConnection; DBUG_VOID_RETURN; } else { aConnection->theReleaseOnClose = false; releaseNdbCon(aConnection); }//if DBUG_VOID_RETURN; }//Ndb::closeTransaction() /***************************************************************************** int* NdbTamper(int aAction, int aNode); Parameters: aAction Specifies what action to be taken 1: Lock global checkpointing Can only be sent to master DIH, Parameter aNode ignored. 2: UnLock global checkpointing Can only be sent to master DIH, Parameter aNode ignored. 3: Crash node aNode Specifies which node the action will be taken -1: Master DIH 0-16: Nodnumber Return Value: -1 Error . Remark: Sends a signal to DIH. *****************************************************************************/ int Ndb::NdbTamper(TamperType aAction, int aNode) { NdbTransaction* tNdbConn; NdbApiSignal tSignal(theMyRef); int tNode; int tAction; int ret_code; #ifdef CUSTOMER_RELEASE return -1; #else CHECK_STATUS_MACRO; checkFailedNode(); theRestartGCI = 0; switch (aAction) { // Translate enum to integer. This is done because the SCI layer // expects integers. case LockGlbChp: tAction = 1; break; case UnlockGlbChp: tAction = 2; break; case CrashNode: tAction = 3; break; case ReadRestartGCI: tAction = 4; break; default: theError.code = 4102; return -1; } tNdbConn = getNdbCon(); // Get free connection object if (tNdbConn == NULL) { theError.code = 4000; return -1; } tSignal.setSignal(GSN_DIHNDBTAMPER); tSignal.setData (tAction, 1); tSignal.setData(tNdbConn->ptr2int(),2); tSignal.setData(theMyRef,3); // Set return block reference tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting TransporterFacade *tp = TransporterFacade::instance(); if (tAction == 3) { tp->lock_mutex(); tp->sendSignal(&tSignal, aNode); tp->unlock_mutex(); releaseNdbCon(tNdbConn); } else if ( (tAction == 2) || (tAction == 1) ) { tp->lock_mutex(); tNode = tp->get_an_alive_node(); if (tNode == 0) { theError.code = 4002; releaseNdbCon(tNdbConn); return -1; }//if ret_code = tp->sendSignal(&tSignal,aNode); tp->unlock_mutex(); releaseNdbCon(tNdbConn); return ret_code; } else { do { tp->lock_mutex(); // Start protected area tNode = tp->get_an_alive_node(); tp->unlock_mutex(); // End protected area if (tNode == 0) { theError.code = 4009; releaseNdbCon(tNdbConn); return -1; }//if ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0); if (ret_code == 0) { if (tNdbConn->Status() != NdbTransaction::Connected) { theRestartGCI = 0; }//if releaseNdbCon(tNdbConn); return theRestartGCI; } else if ((ret_code == -5) || (ret_code == -2)) { TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping"); } else { return -1; }//if } while (1); } return 0; #endif } #if 0 /**************************************************************************** NdbSchemaCon* startSchemaTransaction(); Return Value: Returns a pointer to a schema connection object. Return NULL otherwise. Remark: Start schema transaction. Synchronous. ****************************************************************************/ NdbSchemaCon* Ndb::startSchemaTransaction() { NdbSchemaCon* tSchemaCon; if (theSchemaConToNdbList != NULL) { theError.code = 4321; return NULL; }//if tSchemaCon = new NdbSchemaCon(this); if (tSchemaCon == NULL) { theError.code = 4000; return NULL; }//if theSchemaConToNdbList = tSchemaCon; return tSchemaCon; } /***************************************************************************** void closeSchemaTransaction(NdbSchemaCon* aSchemaCon); Parameters: aSchemaCon: the schemacon used in the transaction. Remark: Close transaction by releasing the schemacon and all schemaop. *****************************************************************************/ void Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon) { if (theSchemaConToNdbList != aSchemaCon) { abort(); return; }//if aSchemaCon->release(); delete aSchemaCon; theSchemaConToNdbList = NULL; return; }//Ndb::closeSchemaTransaction() #endif /***************************************************************************** void RestartGCI(int aRestartGCI); Remark: Set theRestartGCI on the NDB object *****************************************************************************/ void Ndb::RestartGCI(int aRestartGCI) { theRestartGCI = aRestartGCI; } /**************************************************************************** int getBlockNumber(void); Remark: ****************************************************************************/ int Ndb::getBlockNumber() { return theNdbBlockNumber; } NdbDictionary::Dictionary * Ndb::getDictionary() const { return theDictionary; } /**************************************************************************** int getNodeId(); Remark: ****************************************************************************/ int Ndb::getNodeId() { return theNode; } /**************************************************************************** Uint64 getAutoIncrementValue( const char* aTableName, Uint64 & tupleId, Uint32 cacheSize, Uint64 step, Uint64 start); Parameters: aTableName (IN) : The table name. autoValue (OUT) : Returns new autoincrement value cacheSize (IN) : Prefetch this many values step (IN) : Specifies the step between the autoincrement values. start (IN) : Start value for first value Remark: Returns a new autoincrement value to the application. The autoincrement values can be increased by steps (default 1) and a number of values can be prefetched by specifying cacheSize (default 10). ****************************************************************************/ int Ndb::getAutoIncrementValue(const char* aTableName, Uint64 & autoValue, Uint32 cacheSize, Uint64 step, Uint64 start) { DBUG_ENTER("Ndb::getAutoIncrementValue"); BaseString internal_tabname(internalize_table_name(aTableName)); Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } DBUG_PRINT("info", ("step %lu", (ulong) step)); if (getTupleIdFromNdb(info, autoValue, cacheSize, step, start) == -1) DBUG_RETURN(-1); DBUG_PRINT("info", ("value %lu", (ulong) autoValue)); DBUG_RETURN(0); } int Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 & autoValue, Uint32 cacheSize, Uint64 step, Uint64 start) { DBUG_ENTER("Ndb::getAutoIncrementValue"); assert(aTable != 0); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const BaseString& internal_tabname = table->m_internalName; Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } DBUG_PRINT("info", ("step %lu", (ulong) step)); if (getTupleIdFromNdb(info, autoValue, cacheSize, step, start) == -1) DBUG_RETURN(-1); DBUG_PRINT("info", ("value %lu", (ulong) autoValue)); DBUG_RETURN(0); } int Ndb::getTupleIdFromNdb(Ndb_local_table_info* info, Uint64 & tupleId, Uint32 cacheSize, Uint64 step, Uint64 start) { /* Returns a new TupleId to the application. The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId. It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp. In most cases step= start= 1, in which case we get: 1,2,3,4,5,... If step=10 and start=5 and first number is 1, we get: 5,15,25,35,... */ DBUG_ENTER("Ndb::getTupleIdFromNdb"); DBUG_PRINT("info", ("Step %lu (%lu,%lu)", (ulong) step, (ulong) info->m_first_tuple_id, (ulong) info->m_last_tuple_id)); /* Check if the next value can be taken from the pre-fetched sequence. */ if (info->m_first_tuple_id != info->m_last_tuple_id && info->m_first_tuple_id + step <= info->m_last_tuple_id) { assert(info->m_first_tuple_id < info->m_last_tuple_id); info->m_first_tuple_id += step; tupleId = info->m_first_tuple_id; DBUG_PRINT("info", ("Next cached value %lu", (ulong) tupleId)); } else { /* If start value is greater than step it is ignored */ Uint64 offset = (start > step) ? 1 : start; /* Pre-fetch a number of values depending on cacheSize */ if (cacheSize == 0) cacheSize = 1; DBUG_PRINT("info", ("Reading %u values from database", (uint)cacheSize)); /* * reserve next cacheSize entries in db. adds cacheSize to NEXTID * and returns first tupleId in the new range. If tupleId's are * incremented in steps then multiply the cacheSize with step size. */ Uint64 opValue = cacheSize * step; if (opTupleIdOnNdb(info, opValue, 0) == -1) DBUG_RETURN(-1); DBUG_PRINT("info", ("Next value fetched from database %lu", (ulong) opValue)); DBUG_PRINT("info", ("Increasing %lu by offset %lu, increment is %lu", (ulong) (ulong) opValue, (ulong) offset, (ulong) step)); Uint64 current, next; Uint64 div = ((Uint64) (opValue + step - offset)) / step; next = div * step + offset; current = (next < step) ? next : next - step; tupleId = (opValue <= current) ? current : next; DBUG_PRINT("info", ("Returning %lu", (ulong) tupleId)); info->m_first_tuple_id = tupleId; } DBUG_RETURN(0); } int Ndb::readAutoIncrementValue(const char* aTableName, Uint64 & tupleId) { DBUG_ENTER("Ndb::readAutoIncrementValue"); BaseString internal_tabname(internalize_table_name(aTableName)); Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } if (readTupleIdFromNdb(info, tupleId) == -1) DBUG_RETURN(-1); DBUG_PRINT("info", ("value %lu", (ulong)tupleId)); DBUG_RETURN(0); } int Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 & tupleId) { DBUG_ENTER("Ndb::readAutoIncrementValue"); assert(aTable != 0); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const BaseString& internal_tabname = table->m_internalName; Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } if (readTupleIdFromNdb(info, tupleId) == -1) DBUG_RETURN(-1); DBUG_PRINT("info", ("value %lu", (ulong)tupleId)); DBUG_RETURN(0); } int Ndb::readTupleIdFromNdb(Ndb_local_table_info* info, Uint64 & tupleId) { DBUG_ENTER("Ndb::readTupleIdFromNdb"); if (info->m_first_tuple_id != info->m_last_tuple_id) { assert(info->m_first_tuple_id < info->m_last_tuple_id); tupleId = info->m_first_tuple_id + 1; } else { /* * peek at NEXTID. does not reserve it so the value is valid * only if no other transactions are allowed. */ Uint64 opValue = 0; if (opTupleIdOnNdb(info, opValue, 3) == -1) DBUG_RETURN(-1); tupleId = opValue; } DBUG_RETURN(0); } int Ndb::setAutoIncrementValue(const char* aTableName, Uint64 tupleId, bool increase) { DBUG_ENTER("Ndb::setAutoIncrementValue"); BaseString internal_tabname(internalize_table_name(aTableName)); Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } if (setTupleIdInNdb(info, tupleId, increase) == -1) DBUG_RETURN(-1); DBUG_RETURN(0); } int Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 tupleId, bool increase) { DBUG_ENTER("Ndb::setAutoIncrementValue"); assert(aTable != 0); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const BaseString& internal_tabname = table->m_internalName; Ndb_local_table_info *info= theDictionary->get_local_table_info(internal_tabname, false); if (info == 0) { theError.code = theDictionary->getNdbError().code; DBUG_RETURN(-1); } if (setTupleIdInNdb(info, tupleId, increase) == -1) DBUG_RETURN(-1); DBUG_RETURN(0); } int Ndb::setTupleIdInNdb(Ndb_local_table_info* info, Uint64 tupleId, bool increase) { DBUG_ENTER("Ndb::setTupleIdInNdb"); if (increase) { if (info->m_first_tuple_id != info->m_last_tuple_id) { assert(info->m_first_tuple_id < info->m_last_tuple_id); if (tupleId <= info->m_first_tuple_id + 1) DBUG_RETURN(0); if (tupleId <= info->m_last_tuple_id) { info->m_first_tuple_id = tupleId - 1; DBUG_PRINT("info", ("Setting next auto increment cached value to %lu", (ulong)tupleId)); DBUG_RETURN(0); } } /* * if tupleId <= NEXTID, do nothing. otherwise update NEXTID to * tupleId and set cached range to first = last = tupleId - 1. */ if (opTupleIdOnNdb(info, tupleId, 2) == -1) DBUG_RETURN(-1); } else { /* * update NEXTID to given value. reset cached range. */ if (opTupleIdOnNdb(info, tupleId, 1) == -1) DBUG_RETURN(-1); } DBUG_RETURN(0); } int Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op) { DBUG_ENTER("Ndb::opTupleIdOnNdb"); Uint32 aTableId = info->m_table_impl->m_tableId; DBUG_PRINT("enter", ("table: %u value: %lu op: %u", aTableId, (ulong) opValue, op)); NdbTransaction* tConnection; NdbOperation* tOperation= 0; // Compiler warning if not initialized Uint64 tValue; NdbRecAttr* tRecAttrResult; NdbError savedError; CHECK_STATUS_MACRO_ZERO; BaseString currentDb(getDatabaseName()); BaseString currentSchema(getDatabaseSchemaName()); setDatabaseName("sys"); setDatabaseSchemaName("def"); tConnection = this->startTransaction(); if (tConnection == NULL) goto error_return; if (usingFullyQualifiedNames()) tOperation = tConnection->getNdbOperation("SYSTAB_0"); else tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0"); if (tOperation == NULL) goto error_handler; switch (op) { case 0: tOperation->interpretedUpdateTuple(); tOperation->equal("SYSKEY_0", aTableId ); tOperation->incValue("NEXTID", opValue); tRecAttrResult = tOperation->getValue("NEXTID"); if (tConnection->execute( Commit ) == -1 ) goto error_handler; tValue = tRecAttrResult->u_64_value(); info->m_first_tuple_id = tValue - opValue; info->m_last_tuple_id = tValue - 1; opValue = info->m_first_tuple_id; // out break; case 1: // create on first use tOperation->writeTuple(); tOperation->equal("SYSKEY_0", aTableId ); tOperation->setValue("NEXTID", opValue); if (tConnection->execute( Commit ) == -1 ) goto error_handler; info->m_first_tuple_id = ~(Uint64)0; info->m_last_tuple_id = ~(Uint64)0; break; case 2: tOperation->interpretedUpdateTuple(); tOperation->equal("SYSKEY_0", aTableId ); tOperation->load_const_u64(1, opValue); tOperation->read_attr("NEXTID", 2); // compare NEXTID >= opValue tOperation->branch_le(2, 1, 0); tOperation->write_attr("NEXTID", 1); tOperation->interpret_exit_ok(); tOperation->def_label(0); tOperation->interpret_exit_nok(9999); if (tConnection->execute( Commit ) == -1) { if (tConnection->theError.code != 9999) goto error_handler; } else { DBUG_PRINT("info", ("Setting next auto increment value (db) to %lu", (ulong)opValue)); info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1; } break; case 3: tOperation->readTuple(); tOperation->equal("SYSKEY_0", aTableId ); tRecAttrResult = tOperation->getValue("NEXTID"); if (tConnection->execute( Commit ) == -1 ) goto error_handler; opValue = tRecAttrResult->u_64_value(); // out break; default: goto error_handler; } this->closeTransaction(tConnection); // Restore current name space setDatabaseName(currentDb.c_str()); setDatabaseSchemaName(currentSchema.c_str()); DBUG_RETURN(0); error_handler: theError.code = tConnection->theError.code; savedError = theError; this->closeTransaction(tConnection); theError = savedError; error_return: // Restore current name space setDatabaseName(currentDb.c_str()); setDatabaseSchemaName(currentSchema.c_str()); DBUG_PRINT("error", ("ndb=%d con=%d op=%d", theError.code, tConnection ? tConnection->theError.code : -1, tOperation ? tOperation->theError.code : -1)); DBUG_RETURN(-1); } Uint32 convertEndian(Uint32 Data) { #ifdef WORDS_BIGENDIAN Uint32 t1, t2, t3, t4; t4 = (Data >> 24) & 255; t3 = (Data >> 16) & 255; t4 = t4 + (t3 << 8); t2 = (Data >> 8) & 255; t4 = t4 + (t2 << 16); t1 = Data & 255; t4 = t4 + (t1 << 24); return t4; #else return Data; #endif } const char * Ndb::getCatalogName() const { return theImpl->m_dbname.c_str(); } int Ndb::setCatalogName(const char * a_catalog_name) { if (a_catalog_name) { if (!theImpl->m_dbname.assign(a_catalog_name) || theImpl->update_prefix()) { theError.code = 4000; return -1; } } return 0; } const char * Ndb::getSchemaName() const { return theImpl->m_schemaname.c_str(); } int Ndb::setSchemaName(const char * a_schema_name) { if (a_schema_name) { if (!theImpl->m_schemaname.assign(a_schema_name) || theImpl->update_prefix()) { theError.code = 4000; return -1; } } return 0; } /* Deprecated functions */ const char * Ndb::getDatabaseName() const { return getCatalogName(); } int Ndb::setDatabaseName(const char * a_catalog_name) { return setCatalogName(a_catalog_name); } const char * Ndb::getDatabaseSchemaName() const { return getSchemaName(); } int Ndb::setDatabaseSchemaName(const char * a_schema_name) { return setSchemaName(a_schema_name); } bool Ndb::usingFullyQualifiedNames() { return fullyQualifiedNames; } const char * Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames) { if (fullyQualifiedNames) { register const char *ptr = internalTableName; // Skip database name while (*ptr && *ptr++ != table_name_separator); // Skip schema name while (*ptr && *ptr++ != table_name_separator); return ptr; } else return internalTableName; } const char * Ndb::externalizeTableName(const char * internalTableName) { return externalizeTableName(internalTableName, usingFullyQualifiedNames()); } const char * Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames) { if (fullyQualifiedNames) { register const char *ptr = internalIndexName; // Scan name from the end while (*ptr++); ptr--; // strend while (ptr >= internalIndexName && *ptr != table_name_separator) ptr--; return ptr + 1; } else return internalIndexName; } const char * Ndb::externalizeIndexName(const char * internalIndexName) { return externalizeIndexName(internalIndexName, usingFullyQualifiedNames()); } const BaseString Ndb::internalize_table_name(const char *external_name) const { BaseString ret; DBUG_ENTER("internalize_table_name"); DBUG_PRINT("enter", ("external_name: %s", external_name)); if (fullyQualifiedNames) { /* Internal table name format <db>/<schema>/<table> <db>/<schema> is already available in m_prefix so just concat the two strings */ ret.assfmt("%s%s", theImpl->m_prefix.c_str(), external_name); } else ret.assign(external_name); DBUG_PRINT("exit", ("internal_name: %s", ret.c_str())); DBUG_RETURN(ret); } const BaseString Ndb::internalize_index_name(const NdbTableImpl * table, const char * external_name) const { BaseString ret; DBUG_ENTER("internalize_index_name"); DBUG_PRINT("enter", ("external_name: %s, table_id: %d", external_name, table ? table->m_tableId : ~0)); if (!table) { DBUG_PRINT("error", ("!table")); DBUG_RETURN(ret); } if (fullyQualifiedNames) { /* Internal index name format <db>/<schema>/<tabid>/<table> */ ret.assfmt("%s%d%c%s", theImpl->m_prefix.c_str(), table->m_tableId, table_name_separator, external_name); } else ret.assign(external_name); DBUG_PRINT("exit", ("internal_name: %s", ret.c_str())); DBUG_RETURN(ret); } const BaseString Ndb::getDatabaseFromInternalName(const char * internalName) { char * databaseName = new char[strlen(internalName) + 1]; if (databaseName == NULL) { errno = ENOMEM; return BaseString(NULL); } strcpy(databaseName, internalName); register char *ptr = databaseName; /* Scan name for the first table_name_separator */ while (*ptr && *ptr != table_name_separator) ptr++; *ptr = '\0'; BaseString ret = BaseString(databaseName); delete [] databaseName; return ret; } const BaseString Ndb::getSchemaFromInternalName(const char * internalName) { char * schemaName = new char[strlen(internalName)]; if (schemaName == NULL) { errno = ENOMEM; return BaseString(NULL); } register const char *ptr1 = internalName; /* Scan name for the second table_name_separator */ while (*ptr1 && *ptr1 != table_name_separator) ptr1++; strcpy(schemaName, ptr1 + 1); register char *ptr = schemaName; while (*ptr && *ptr != table_name_separator) ptr++; *ptr = '\0'; BaseString ret = BaseString(schemaName); delete [] schemaName; return ret; } #ifdef VM_TRACE #include <NdbMutex.h> extern NdbMutex *ndb_print_state_mutex; static bool checkdups(NdbTransaction** list, unsigned no) { for (unsigned i = 0; i < no; i++) for (unsigned j = i + 1; j < no; j++) if (list[i] == list[j]) return true; return false; } void Ndb::printState(const char* fmt, ...) { char buf[200]; va_list ap; va_start(ap, fmt); vsprintf(buf, fmt, ap); va_end(ap); NdbMutex_Lock(ndb_print_state_mutex); bool dups = false; unsigned i; ndbout << buf << " ndb=" << hex << this << dec; #ifndef NDB_WIN32 ndbout << " thread=" << (int)pthread_self(); #endif ndbout << endl; for (unsigned n = 0; n < MAX_NDB_NODES; n++) { NdbTransaction* con = theConnectionArray[n]; if (con != 0) { ndbout << "conn " << n << ":" << endl; while (con != 0) { con->printState(); con = con->theNext; } } } ndbout << "prepared: " << theNoOfPreparedTransactions<< endl; if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) { ndbout << "!! DUPS !!" << endl; dups = true; } for (i = 0; i < theNoOfPreparedTransactions; i++) thePreparedTransactionsArray[i]->printState(); ndbout << "sent: " << theNoOfSentTransactions<< endl; if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) { ndbout << "!! DUPS !!" << endl; dups = true; } for (i = 0; i < theNoOfSentTransactions; i++) theSentTransactionsArray[i]->printState(); ndbout << "completed: " << theNoOfCompletedTransactions<< endl; if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) { ndbout << "!! DUPS !!" << endl; dups = true; } for (i = 0; i < theNoOfCompletedTransactions; i++) theCompletedTransactionsArray[i]->printState(); NdbMutex_Unlock(ndb_print_state_mutex); } #endif