/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */




/*****************************************************************************
Name:          Ndb.cpp
******************************************************************************/

#include <ndb_global.h>


#include "NdbApiSignal.hpp"
#include "NdbImpl.hpp"
#include <NdbOperation.hpp>
#include <NdbTransaction.hpp>
#include <NdbEventOperation.hpp>
#include <NdbRecAttr.hpp>
#include <md5_hash.hpp>
#include <NdbSleep.h>
#include <NdbOut.hpp>
#include <ndb_limits.h>
#include "API.hpp"
#include <NdbEnv.h>
#include <BaseString.hpp>

/****************************************************************************
void connect();

Connect to any node which has no connection at the moment.
****************************************************************************/
NdbTransaction* Ndb::doConnect(Uint32 tConNode) 
{
  Uint32        tNode;
  Uint32        tAnyAlive = 0;
  int TretCode= 0;

  DBUG_ENTER("Ndb::doConnect");

  if (tConNode != 0) {
    TretCode = NDB_connect(tConNode);
    if ((TretCode == 1) || (TretCode == 2)) {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
      DBUG_RETURN(getConnectedNdbTransaction(tConNode));
    } else if (TretCode != 0) {
      tAnyAlive = 1;
    }//if
  }//if
//****************************************************************************
// We will connect to any node. Make sure that we have connections to all
// nodes.
//****************************************************************************
  if (theImpl->m_optimized_node_selection)
  {
    Ndb_cluster_connection_node_iter &node_iter= 
      theImpl->m_node_iter;
    theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
    while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
    {
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
      } else if (TretCode != 0) {
	tAnyAlive= 1;
      }//if
      DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",
			 tNode, TretCode, getNdbError().code,
			 getNdbError().message));
    }
  }
  else // just do a regular round robin
  {
    Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
    Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
    UintR Tcount = 0;
    do {
      theCurrentConnectIndex++;
      if (theCurrentConnectIndex >= tNoOfDbNodes)
	theCurrentConnectIndex = 0;

      Tcount++;
      tNode= theImpl->theDBnodes[theCurrentConnectIndex];
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
      } else if (TretCode != 0) {
	tAnyAlive= 1;
      }//if
      DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));
    } while (Tcount < tNoOfDbNodes);
  }
//****************************************************************************
// We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure.
//****************************************************************************
  if (tAnyAlive == 1) {
#ifdef VM_TRACE
    ndbout << "TretCode = " << TretCode << endl;
#endif
    theError.code = 4006;
  } else {
    theError.code = 4009;
  }//if
  DBUG_RETURN(NULL);
}

int 
Ndb::NDB_connect(Uint32 tNode) 
{
//****************************************************************************
// We will perform seize of a transaction record in DBTC in the specified node.
//***************************************************************************
  
  int	         tReturnCode;
  TransporterFacade *tp = TransporterFacade::instance();

  DBUG_ENTER("Ndb::NDB_connect");

  bool nodeAvail = tp->get_node_alive(tNode);
  if(nodeAvail == false){
    DBUG_RETURN(0);
  }
  
  NdbTransaction * tConArray = theConnectionArray[tNode];
  if (tConArray != NULL) {
    DBUG_RETURN(2);
  }
  
  NdbTransaction * tNdbCon = getNdbCon();	// Get free connection object.
  if (tNdbCon == NULL) {
    DBUG_RETURN(4);
  }//if
  NdbApiSignal*	tSignal = getSignal();		// Get signal object
  if (tSignal == NULL) {
    releaseNdbCon(tNdbCon);
    DBUG_RETURN(4);
  }//if
  if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {
    releaseNdbCon(tNdbCon);
    releaseSignal(tSignal);
    DBUG_RETURN(4);
  }//if
  tSignal->setData(tNdbCon->ptr2int(), 1);
//************************************************
// Set connection pointer as NdbTransaction object
//************************************************
  tSignal->setData(theMyRef, 2);	// Set my block reference
  tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
  Uint32 nodeSequence;
  { // send and receive signal
    Guard guard(tp->theMutexPtr);
    nodeSequence = tp->getNodeSequence(tNode);
    bool node_is_alive = tp->get_node_alive(tNode);
    if (node_is_alive) { 
      tReturnCode = tp->sendSignal(tSignal, tNode);  
      releaseSignal(tSignal); 
      if (tReturnCode != -1) {
        theImpl->theWaiter.m_node = tNode;  
        theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
        tReturnCode = receiveResponse(); 
      }//if
    } else {
      releaseSignal(tSignal);
      tReturnCode = -1;
    }//if
  }
  if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
    //************************************************
    // Send and receive was successful
    //************************************************
    NdbTransaction* tPrevFirst = theConnectionArray[tNode];
    tNdbCon->setConnectedNodeId(tNode, nodeSequence);
    
    tNdbCon->setMyBlockReference(theMyRef);
    theConnectionArray[tNode] = tNdbCon;
    tNdbCon->theNext = tPrevFirst;
    DBUG_RETURN(1);
  } else {
    releaseNdbCon(tNdbCon);
//****************************************************************************
// Unsuccessful connect is indicated by 3.
//****************************************************************************
    DBUG_PRINT("info",
	       ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d",
		tReturnCode, tNdbCon->Status()));
    DBUG_RETURN(3);
  }//if
}//Ndb::NDB_connect()

NdbTransaction *
Ndb::getConnectedNdbTransaction(Uint32 nodeId){
  NdbTransaction* next = theConnectionArray[nodeId];
  theConnectionArray[nodeId] = next->theNext;
  next->theNext = NULL;

  return next;
}//Ndb::getConnectedNdbTransaction()

/*****************************************************************************
disconnect();

Remark:        Disconnect all connections to the database. 
*****************************************************************************/
void 
Ndb::doDisconnect()
{
  DBUG_ENTER("Ndb::doDisconnect");
  NdbTransaction* tNdbCon;
  CHECK_STATUS_MACRO_VOID;

  Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
  Uint8 *theDBnodes= theImpl->theDBnodes;
  DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes));
  UintR i;
  for (i = 0; i < tNoOfDbNodes; i++) {
    Uint32 tNode = theDBnodes[i];
    tNdbCon = theConnectionArray[tNode];
    while (tNdbCon != NULL) {
      NdbTransaction* tmpNdbCon = tNdbCon;
      tNdbCon = tNdbCon->theNext;
      releaseConnectToNdb(tmpNdbCon);
    }//while
  }//for
  tNdbCon = theTransactionList;
  while (tNdbCon != NULL) {
    NdbTransaction* tmpNdbCon = tNdbCon;
    tNdbCon = tNdbCon->theNext;
    releaseConnectToNdb(tmpNdbCon);
  }//while
  DBUG_VOID_RETURN;
}//Ndb::disconnect()

/*****************************************************************************
int waitUntilReady(int timeout);

Return Value:   Returns 0 if the Ndb is ready within timeout seconds.
                Returns -1 otherwise.
Remark:         Waits until a node has status != 0
*****************************************************************************/ 
int
Ndb::waitUntilReady(int timeout)
{
  DBUG_ENTER("Ndb::waitUntilReady");
  int secondsCounter = 0;
  int milliCounter = 0;
  int noChecksSinceFirstAliveFound = 0;
  int id;

  if (theInitState != Initialised) {
    // Ndb::init is not called
    theError.code = 4256;
    DBUG_RETURN(-1);
  }

  while (theNode == 0) {
    if (secondsCounter >= timeout)
    {
      theError.code = 4269;
      DBUG_RETURN(-1);
    }
    NdbSleep_MilliSleep(100);
    milliCounter += 100;
    if (milliCounter >= 1000) {
      secondsCounter++;
      milliCounter = 0;
    }//if
  }

  if (theImpl->m_ndb_cluster_connection.wait_until_ready
      (timeout-secondsCounter,30) < 0)
  {
    theError.code = 4009;
    DBUG_RETURN(-1);
  }

  DBUG_RETURN(0);
}

/*****************************************************************************
NdbTransaction* startTransaction();

Return Value:   Returns a pointer to a connection object.
                Return NULL otherwise.
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
NdbTransaction* 
Ndb::startTransaction(const NdbDictionary::Table *table,
		      const char * keyData, Uint32 keyLen)
{
  DBUG_ENTER("Ndb::startTransaction");

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();
    /**
     * If the user supplied key data
     * We will make a qualified quess to which node is the primary for the
     * the fragment and contact that node
     */
    Uint32 nodeId;
    NdbTableImpl* impl;
    if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table))) 
    {
      Uint32 hashValue;
      {
	Uint32 buf[4];
	if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0)
	{
	  md5_hash(buf, (const Uint64*)keyData, keyLen >> 2);
	}
	else
	{
	  Uint64 tmp[1000];
	  tmp[keyLen/8] = 0;
	  memcpy(tmp, keyData, keyLen);
	  md5_hash(buf, tmp, (keyLen+3) >> 2);	  
	}
	hashValue= buf[1];
      }
      const Uint16 *nodes;
      Uint32 cnt= impl->get_nodes(hashValue, &nodes);
      if(cnt)
	nodeId= nodes[0];
      else
	nodeId= 0;
    } else {
      nodeId = 0;
    }//if

    {
      NdbTransaction *trans= startTransactionLocal(0, nodeId);
      DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx",
			 trans, trans ? trans->getTransactionId() : 0));
      DBUG_RETURN(trans);
    }
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::startTransaction()

/*****************************************************************************
NdbTransaction* hupp(NdbTransaction* pBuddyTrans);

Return Value:   Returns a pointer to a connection object.
                Connected to the same node as pBuddyTrans
                and also using the same transction id
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
NdbTransaction* 
Ndb::hupp(NdbTransaction* pBuddyTrans)
{
  DBUG_ENTER("Ndb::hupp");

  DBUG_PRINT("enter", ("trans: 0x%x",pBuddyTrans));

  Uint32 aPriority = 0;
  if (pBuddyTrans == NULL){
    DBUG_RETURN(startTransaction());
  }

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();

    Uint32 nodeId = pBuddyTrans->getConnectedNodeId();
    NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId);
    if(pCon == NULL)
      DBUG_RETURN(NULL);

    if (pCon->getConnectedNodeId() != nodeId){
      // We could not get a connection to the desired node
      // release the connection and return NULL
      closeTransaction(pCon);
      theError.code = 4006;
      DBUG_RETURN(NULL);
    }
    pCon->setTransactionId(pBuddyTrans->getTransactionId());
    pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());
    DBUG_PRINT("exit", ("hupp trans: 0x%x transid: 0x%llx",
			pCon, pCon ? pCon->getTransactionId() : 0));
    DBUG_RETURN(pCon);
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::hupp()

NdbTransaction* 
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
{
#ifdef VM_TRACE
  char buf[255];
  const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
  if(val != 0){
    nodeId = atoi(val);
  }
#endif

  DBUG_ENTER("Ndb::startTransactionLocal");
  DBUG_PRINT("enter", ("nodeid: %d", nodeId));

  NdbTransaction* tConnection;
  Uint64 tFirstTransId = theFirstTransId;
  tConnection = doConnect(nodeId);
  if (tConnection == NULL) {
    DBUG_RETURN(NULL);
  }//if
  NdbTransaction* tConNext = theTransactionList;
  tConnection->init();
  theTransactionList = tConnection;        // into a transaction list.
  tConnection->next(tConNext);   // Add the active connection object
  tConnection->setTransactionId(tFirstTransId);
  tConnection->thePriority = aPriority;
  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
    //---------------------------------------------------
// Transaction id rolling round. We will start from
// consecutive identity 0 again.
//---------------------------------------------------
    theFirstTransId = ((tFirstTransId >> 32) << 32);      
  } else {
    theFirstTransId = tFirstTransId + 1;
  }//if
#ifdef VM_TRACE
  if (tConnection->theListState != NdbTransaction::NotInList) {
    printState("startTransactionLocal %x", tConnection);
    abort();
  }
#endif
  DBUG_RETURN(tConnection);
}//Ndb::startTransactionLocal()

/*****************************************************************************
void closeTransaction(NdbTransaction* aConnection);

Parameters:     aConnection: the connection used in the transaction.
Remark:         Close transaction by releasing the connection and all operations.
*****************************************************************************/
void
Ndb::closeTransaction(NdbTransaction* aConnection)
{
  DBUG_ENTER("Ndb::closeTransaction");
  NdbTransaction* tCon;
  NdbTransaction* tPreviousCon;

  if (aConnection == NULL) {
//-----------------------------------------------------
// closeTransaction called on NULL pointer, destructive
// application behaviour.
//-----------------------------------------------------
#ifdef VM_TRACE
    printf("NULL into closeTransaction\n");
#endif
    DBUG_VOID_RETURN;
  }//if
  CHECK_STATUS_MACRO_VOID;
  
  tCon = theTransactionList;
  
  DBUG_PRINT("info",("close trans: 0x%x transid: 0x%llx",
		     aConnection, aConnection->getTransactionId()));
  DBUG_PRINT("info",("magic number: 0x%x TCConPtr: 0x%x theMyRef: 0x%x 0x%x",
		     aConnection->theMagicNumber, aConnection->theTCConPtr,
		     aConnection->theMyRef, getReference()));

  if (aConnection == tCon) {		// Remove the active connection object
    theTransactionList = tCon->next();	// from the transaction list.
  } else { 
    while (aConnection != tCon) {
      if (tCon == NULL) {
//-----------------------------------------------------
// closeTransaction called on non-existing transaction
//-----------------------------------------------------

	if(aConnection->theError.code == 4008){
	  /**
	   * When a SCAN timed-out, returning the NdbTransaction leads
	   * to reuse. And TC crashes when the API tries to reuse it to
	   * something else...
	   */
#ifdef VM_TRACE
	  printf("Scan timeout:ed NdbTransaction-> "
		 "not returning it-> memory leak\n");
#endif
	  DBUG_VOID_RETURN;
	}

#ifdef VM_TRACE
	printf("Non-existing transaction into closeTransaction\n");
	abort();
#endif
	DBUG_VOID_RETURN;
      }//if
      tPreviousCon = tCon;
      tCon = tCon->next();
    }//while
    tPreviousCon->next(tCon->next());
  }//if
  
  aConnection->release();
  
  if(aConnection->theError.code == 4008){
    /**
     * Something timed-out, returning the NdbTransaction leads
     * to reuse. And TC crashes when the API tries to reuse it to
     * something else...
     */
#ifdef VM_TRACE
    printf("Con timeout:ed NdbTransaction-> not returning it-> memory leak\n");
#endif
    DBUG_VOID_RETURN;
  }
  
  if (aConnection->theReleaseOnClose == false) {
    /**
     * Put it back in idle list for that node
     */
    Uint32 nodeId = aConnection->getConnectedNodeId();
    aConnection->theNext = theConnectionArray[nodeId];
    theConnectionArray[nodeId] = aConnection;
    DBUG_VOID_RETURN;
  } else {
    aConnection->theReleaseOnClose = false;
    releaseNdbCon(aConnection);
  }//if
  DBUG_VOID_RETURN;
}//Ndb::closeTransaction()

/*****************************************************************************
int* NdbTamper(int aAction, int aNode);

Parameters: aAction     Specifies what action to be taken
            1: Lock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
            2: UnLock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
	    3: Crash node

           aNode        Specifies which node the action will be taken
     	  -1: Master DIH 
       	0-16: Nodnumber

Return Value: -1 Error  .
                
Remark:         Sends a signal to DIH.
*****************************************************************************/ 
int 
Ndb::NdbTamper(TamperType aAction, int aNode)
{
  NdbTransaction*	tNdbConn;
  NdbApiSignal		tSignal(theMyRef);
  int			tNode;
  int                   tAction;
  int			ret_code;

#ifdef CUSTOMER_RELEASE
  return -1;
#else
  CHECK_STATUS_MACRO;
  checkFailedNode();

  theRestartGCI = 0;
  switch (aAction) {
// Translate enum to integer. This is done because the SCI layer
// expects integers. 
     case LockGlbChp:
        tAction = 1;
        break;
     case UnlockGlbChp:
        tAction = 2;
	break;
     case CrashNode:
        tAction = 3;
        break;
     case ReadRestartGCI:
	tAction = 4;
	break;
     default:
        theError.code = 4102;
        return -1;
  }

  tNdbConn = getNdbCon();	// Get free connection object
  if (tNdbConn == NULL) {
    theError.code = 4000;
    return -1;
  }
  tSignal.setSignal(GSN_DIHNDBTAMPER);
  tSignal.setData (tAction, 1);
  tSignal.setData(tNdbConn->ptr2int(),2);
  tSignal.setData(theMyRef,3);		// Set return block reference
  tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
  TransporterFacade *tp = TransporterFacade::instance();
  if (tAction == 3) {
    tp->lock_mutex();
    tp->sendSignal(&tSignal, aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
  } else if ( (tAction == 2) || (tAction == 1) ) {
    tp->lock_mutex();
    tNode = tp->get_an_alive_node();
    if (tNode == 0) {
      theError.code = 4002;
      releaseNdbCon(tNdbConn);
      return -1;
    }//if
    ret_code = tp->sendSignal(&tSignal,aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
    return ret_code;
  } else {
    do {
      tp->lock_mutex();
      // Start protected area
      tNode = tp->get_an_alive_node();
      tp->unlock_mutex();
      // End protected area
      if (tNode == 0) {
        theError.code = 4009;
        releaseNdbCon(tNdbConn);
        return -1;
      }//if
      ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
      if (ret_code == 0) {  
        if (tNdbConn->Status() != NdbTransaction::Connected) {
          theRestartGCI = 0;
        }//if
        releaseNdbCon(tNdbConn);
        return theRestartGCI;
      } else if ((ret_code == -5) || (ret_code == -2)) {
        TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
      } else {
        return -1;
      }//if
    } while (1);
  }
  return 0;
#endif
}
#if 0
/****************************************************************************
NdbSchemaCon* startSchemaTransaction();

Return Value:   Returns a pointer to a schema connection object.
                Return NULL otherwise.
Remark:         Start schema transaction. Synchronous.
****************************************************************************/ 
NdbSchemaCon* 
Ndb::startSchemaTransaction()
{
  NdbSchemaCon* tSchemaCon;
  if (theSchemaConToNdbList != NULL) {
    theError.code = 4321;
    return NULL;
  }//if
  tSchemaCon = new NdbSchemaCon(this);
  if (tSchemaCon == NULL) {
    theError.code = 4000;
    return NULL;
  }//if 
  theSchemaConToNdbList = tSchemaCon;
  return tSchemaCon;  
}
/*****************************************************************************
void closeSchemaTransaction(NdbSchemaCon* aSchemaCon);

Parameters:     aSchemaCon: the schemacon used in the transaction.
Remark:         Close transaction by releasing the schemacon and all schemaop.
*****************************************************************************/
void
Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon)
{
  if (theSchemaConToNdbList != aSchemaCon) {
    abort();
    return;
  }//if
  aSchemaCon->release();
  delete aSchemaCon;
  theSchemaConToNdbList = NULL;
  return;
}//Ndb::closeSchemaTransaction()
#endif

/*****************************************************************************
void RestartGCI(int aRestartGCI);

Remark:		Set theRestartGCI on the NDB object
*****************************************************************************/
void
Ndb::RestartGCI(int aRestartGCI)
{
  theRestartGCI = aRestartGCI;
}

/****************************************************************************
int getBlockNumber(void);

Remark:		
****************************************************************************/
int
Ndb::getBlockNumber()
{
  return theNdbBlockNumber;
}

NdbDictionary::Dictionary *
Ndb::getDictionary() const {
  return theDictionary;
}

/****************************************************************************
int getNodeId();

Remark:		
****************************************************************************/
int
Ndb::getNodeId()
{
  return theNode;
}

/****************************************************************************
Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );

Parameters:     aTableId : The TableId.
                cacheSize: Prefetch this many values
Remark:		Returns a new TupleId to the application.
                The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
                It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
****************************************************************************/
Uint64
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
{
  DBUG_ENTER("getAutoIncrementValue");
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0)
    DBUG_RETURN(~(Uint64)0);
  const NdbTableImpl *table= info->m_table_impl;
  Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  DBUG_RETURN(tupleId);
}

Uint64
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize)
{
  DBUG_ENTER("getAutoIncrementValue");
  if (aTable == 0)
    DBUG_RETURN(~(Uint64)0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
  DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  DBUG_RETURN(tupleId);
}

Uint64 
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0)
    return ~(Uint64)0;
  return getTupleIdFromNdb(table->m_tableId, cacheSize);
}

Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
{
  DBUG_ENTER("getTupleIdFromNdb");
  if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
  {
    theFirstTupleId[aTableId]++;
    DBUG_PRINT("info", ("next cached value %ul", 
                        (ulong) theFirstTupleId[aTableId]));
    DBUG_RETURN(theFirstTupleId[aTableId]);
  }
  else // theFirstTupleId == theLastTupleId
  {
    DBUG_PRINT("info",("reading %u values from database", 
                       (cacheSize == 0) ? 1 : cacheSize));
    DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
  }
}

Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
{
  DBUG_ENTER("readAutoIncrementValue");
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0) {
    theError= theDictionary->getNdbError();
    DBUG_RETURN(~(Uint64)0);
  }
  Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  DBUG_RETURN(tupleId);
}

Uint64
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
{
  DBUG_ENTER("readAutoIncrementValue");
  if (aTable == 0)
    DBUG_RETURN(~(Uint64)0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
  DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
  DBUG_RETURN(tupleId);
}

Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
{
  if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
    // Cache is empty, check next in database
    return opTupleIdOnNdb(aTableId, 0, 3);

  return theFirstTupleId[aTableId] + 1;
}

bool
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{
  DBUG_ENTER("setAutoIncrementValue");
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0) {
    theError= theDictionary->getNdbError();
    DBUG_RETURN(false);
  }
  const NdbTableImpl* table= info->m_table_impl;
  DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
}

bool
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool increase)
{
  DBUG_ENTER("setAutoIncrementValue");
  if (aTable == 0)
    DBUG_RETURN(~(Uint64)0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
}

bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
  DBUG_ENTER("setTupleIdInNdb(const char*, ...)");
  const NdbTableImpl* table = theDictionary->getTable(aTableName);
  if (table == 0) {
    theError= theDictionary->getNdbError();
    DBUG_RETURN(false);
  }
  DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
}

bool
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
{
  DBUG_ENTER("setTupleIdInNdb(Uint32, ...)");
  if (increase)
  {
    if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
    {
      // We have a cache sequence
      if (val <= theFirstTupleId[aTableId]+1)
	DBUG_RETURN(false);
      if (val <= theLastTupleId[aTableId])
      {
	theFirstTupleId[aTableId] = val - 1;
	DBUG_RETURN(true);
      }
      // else continue;
    }
    DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 2) == val));
  }
  else
    DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
}

Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
{
  DBUG_ENTER("Ndb::opTupleIdOnNdb");
  DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op));

  NdbTransaction*     tConnection;
  NdbOperation*      tOperation= 0; // Compiler warning if not initialized
  Uint64             tValue;
  NdbRecAttr*        tRecAttrResult;
  int                result;
  Uint64 ret;

  CHECK_STATUS_MACRO_ZERO;

  BaseString currentDb(getDatabaseName());
  BaseString currentSchema(getDatabaseSchemaName());

  setDatabaseName("sys");
  setDatabaseSchemaName("def");
  tConnection = this->startTransaction();
  if (tConnection == NULL)
    goto error_return;

  if (usingFullyQualifiedNames())
    tOperation = tConnection->getNdbOperation("SYSTAB_0");
  else
    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
  if (tOperation == NULL)
    goto error_handler;

  switch (op)
    {
    case 0:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->incValue("NEXTID", opValue);
      tRecAttrResult = tOperation->getValue("NEXTID");

      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

      tValue = tRecAttrResult->u_64_value();

      theFirstTupleId[aTableId] = tValue - opValue;
      theLastTupleId[aTableId]  = tValue - 1;
      ret = theFirstTupleId[aTableId];
      break;
    case 1:
      tOperation->updateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->setValue("NEXTID", opValue);

      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

      theFirstTupleId[aTableId] = ~(Uint64)0;
      theLastTupleId[aTableId]  = ~(Uint64)0;
      ret = opValue;
      break;
    case 2:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->load_const_u64(1, opValue);
      tOperation->read_attr("NEXTID", 2);
      tOperation->branch_le(2, 1, 0);
      tOperation->write_attr("NEXTID", 1);
      tOperation->interpret_exit_ok();
      tOperation->def_label(0);
      tOperation->interpret_exit_nok(9999);
      
      if ( (result = tConnection->execute( Commit )) == -1 )
        goto error_handler;
      
      if (result == 9999)
        ret = ~(Uint64)0;
      else
      {
        theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
	ret = opValue;
      }
      break;
    case 3:
      tOperation->readTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tRecAttrResult = tOperation->getValue("NEXTID");
      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;
      ret = tRecAttrResult->u_64_value();
      break;
    default:
      goto error_handler;
    }

  this->closeTransaction(tConnection);

  // Restore current name space
  setDatabaseName(currentDb.c_str());
  setDatabaseSchemaName(currentSchema.c_str());

  DBUG_RETURN(ret);

  error_handler:
    theError.code = tConnection->theError.code;
    this->closeTransaction(tConnection);
  error_return:
    // Restore current name space
    setDatabaseName(currentDb.c_str());
    setDatabaseSchemaName(currentSchema.c_str());

  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
             theError.code,
             tConnection ? tConnection->theError.code : -1,
             tOperation ? tOperation->theError.code : -1));
  DBUG_RETURN(~(Uint64)0);
}

Uint32
convertEndian(Uint32 Data)
{
#ifdef WORDS_BIGENDIAN
  Uint32 t1, t2, t3, t4;
  t4 = (Data >> 24) & 255;
  t3 = (Data >> 16) & 255;
  t4 = t4 + (t3 << 8);
  t2 = (Data >> 8) & 255;
  t4 = t4 + (t2 << 16);
  t1 = Data & 255;
  t4 = t4 + (t1 << 24);
  return t4;
#else
  return Data;
#endif
}
const char * Ndb::getCatalogName() const
{
  return theImpl->m_dbname.c_str();
}


void Ndb::setCatalogName(const char * a_catalog_name)
{
  if (a_catalog_name)
  {
    theImpl->m_dbname.assign(a_catalog_name);
    theImpl->update_prefix();
  }
}


const char * Ndb::getSchemaName() const
{
  return theImpl->m_schemaname.c_str();
}


void Ndb::setSchemaName(const char * a_schema_name)
{
  if (a_schema_name) {
    theImpl->m_schemaname.assign(a_schema_name);
    theImpl->update_prefix();
  }
}
 
/*
Deprecated functions
*/
const char * Ndb::getDatabaseName() const
{
  return getCatalogName();
}
 
void Ndb::setDatabaseName(const char * a_catalog_name)
{
  setCatalogName(a_catalog_name);
}
 
const char * Ndb::getDatabaseSchemaName() const
{
  return getSchemaName();
}
 
void Ndb::setDatabaseSchemaName(const char * a_schema_name)
{
  setSchemaName(a_schema_name);
}
 
bool Ndb::usingFullyQualifiedNames()
{
  return fullyQualifiedNames;
}
 
const char *
Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalTableName;
   
    // Skip database name
    while (*ptr && *ptr++ != table_name_separator);
    // Skip schema name
    while (*ptr && *ptr++ != table_name_separator);
    return ptr;
  }
  else
    return internalTableName;
}

const char *
Ndb::externalizeTableName(const char * internalTableName)
{
  return externalizeTableName(internalTableName, usingFullyQualifiedNames());
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalIndexName;
   
    // Scan name from the end
    while (*ptr++); ptr--; // strend
    while (ptr >= internalIndexName && *ptr != table_name_separator)
      ptr--;
     
    return ptr + 1;
  }
  else
    return internalIndexName;
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName)
{
  return externalizeIndexName(internalIndexName, usingFullyQualifiedNames());
}


const BaseString
Ndb::internalize_table_name(const char *external_name) const
{
  BaseString ret;
  DBUG_ENTER("internalize_table_name");
  DBUG_PRINT("enter", ("external_name: %s", external_name));

  if (fullyQualifiedNames)
  {
    /* Internal table name format <db>/<schema>/<table>
       <db>/<schema> is already available in m_prefix
       so just concat the two strings
     */
    ret.assfmt("%s%s",
               theImpl->m_prefix.c_str(),
               external_name);
  }
  else
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
}


const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table,
                           const char * external_name) const
{
  BaseString ret;
  DBUG_ENTER("internalize_index_name");
  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
                       external_name, table ? table->m_tableId : ~0));
  if (!table)
  {
    DBUG_PRINT("error", ("!table"));
    return ret;
  }

  if (fullyQualifiedNames)
  {
    /* Internal index name format <db>/<schema>/<tabid>/<table> */
    ret.assfmt("%s%d%c%s",
               theImpl->m_prefix.c_str(),
               table->m_tableId,
               table_name_separator,
               external_name);
  }
  else
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
}


const BaseString
Ndb::getDatabaseFromInternalName(const char * internalName)
{
  char * databaseName = new char[strlen(internalName) + 1];
  strcpy(databaseName, internalName);
  register char *ptr = databaseName;
   
  /* Scan name for the first table_name_separator */
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(databaseName);
  delete [] databaseName;
  return ret;
}
 
const BaseString
Ndb::getSchemaFromInternalName(const char * internalName)
{
  char * schemaName = new char[strlen(internalName)];
  register const char *ptr1 = internalName;
   
  /* Scan name for the second table_name_separator */
  while (*ptr1 && *ptr1 != table_name_separator)
    ptr1++;
  strcpy(schemaName, ptr1 + 1);
  register char *ptr = schemaName;
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(schemaName);
  delete [] schemaName;
  return ret;
}

NdbEventOperation* Ndb::createEventOperation(const char* eventName,
					     const int bufferLength)
{
  NdbEventOperation* tOp;

  tOp = new NdbEventOperation(this, eventName, bufferLength);

  if (tOp == 0)
  {
    theError.code= 4000;
    return NULL;
  }

  if (tOp->getState() != NdbEventOperation::EO_CREATED) {
    theError.code= tOp->getNdbError().code;
    delete tOp;
    tOp = NULL;
  }

  //now we have to look up this event in dict

  return tOp;
}

int Ndb::dropEventOperation(NdbEventOperation* op) {
  delete op;
  return 0;
}

NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle()
{
  return theGlobalEventBufferHandle;
}

//void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs)
//{
//}

int
Ndb::pollEvents(int aMillisecondNumber)
{
  return NdbEventOperation::wait(theGlobalEventBufferHandle,
				 aMillisecondNumber);
}

#ifdef VM_TRACE
#include <NdbMutex.h>
extern NdbMutex *ndb_print_state_mutex;

static bool
checkdups(NdbTransaction** list, unsigned no)
{
  for (unsigned i = 0; i < no; i++)
    for (unsigned j = i + 1; j < no; j++)
      if (list[i] == list[j])
        return true;
  return false;
}
void
Ndb::printState(const char* fmt, ...)
{
  char buf[200];
  va_list ap;
  va_start(ap, fmt);
  vsprintf(buf, fmt, ap);
  va_end(ap);
  NdbMutex_Lock(ndb_print_state_mutex);
  bool dups = false;
  unsigned i;
  ndbout << buf << " ndb=" << hex << this << dec;
#ifndef NDB_WIN32
  ndbout << " thread=" << (int)pthread_self();
#endif
  ndbout << endl;
  for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
    NdbTransaction* con = theConnectionArray[n];
    if (con != 0) {
      ndbout << "conn " << n << ":" << endl;
      while (con != 0) {
        con->printState();
        con = con->theNext;
      }
    }
  }
  ndbout << "prepared: " << theNoOfPreparedTransactions<< endl;
  if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfPreparedTransactions; i++)
    thePreparedTransactionsArray[i]->printState();
  ndbout << "sent: " << theNoOfSentTransactions<< endl;
  if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfSentTransactions; i++)
    theSentTransactionsArray[i]->printState();
  ndbout << "completed: " << theNoOfCompletedTransactions<< endl;
  if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfCompletedTransactions; i++)
    theCompletedTransactionsArray[i]->printState();
  NdbMutex_Unlock(ndb_print_state_mutex);
}
#endif