Ndb.cpp 43.5 KB
Newer Older
1 2 3 4
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */




/*****************************************************************************
Name:          Ndb.cpp
******************************************************************************/

23
#include <ndb_global.h>
24

25

26 27
#include "NdbApiSignal.hpp"
#include "NdbImpl.hpp"
28
#include <NdbOperation.hpp>
29
#include <NdbTransaction.hpp>
30
#include <NdbRecAttr.hpp>
31 32 33 34 35 36 37 38 39 40 41 42 43
#include <md5_hash.hpp>
#include <NdbSleep.h>
#include <NdbOut.hpp>
#include <ndb_limits.h>
#include "API.hpp"
#include <NdbEnv.h>
#include <BaseString.hpp>

/****************************************************************************
void connect();

Connect to any node which has no connection at the moment.
****************************************************************************/
44
NdbTransaction* Ndb::doConnect(Uint32 tConNode) 
45 46 47
{
  Uint32        tNode;
  Uint32        tAnyAlive = 0;
48
  int TretCode= 0;
49

50 51
  DBUG_ENTER("Ndb::doConnect");

52 53 54 55 56 57
  if (tConNode != 0) {
    TretCode = NDB_connect(tConNode);
    if ((TretCode == 1) || (TretCode == 2)) {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
58
      DBUG_RETURN(getConnectedNdbTransaction(tConNode));
59 60
    } else if (TretCode < 0) {
      DBUG_RETURN(NULL);
61 62 63 64 65 66 67 68
    } else if (TretCode != 0) {
      tAnyAlive = 1;
    }//if
  }//if
//****************************************************************************
// We will connect to any node. Make sure that we have connections to all
// nodes.
//****************************************************************************
69 70 71 72 73 74 75 76 77 78 79
  if (theImpl->m_optimized_node_selection)
  {
    Ndb_cluster_connection_node_iter &node_iter= 
      theImpl->m_node_iter;
    theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
    while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
    {
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
80 81 82
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
83
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
84 85
      } else if (TretCode < 0) {
        DBUG_RETURN(NULL);
86 87
      } else if (TretCode != 0) {
	tAnyAlive= 1;
88
      }//if
89 90 91
      DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",
			 tNode, TretCode, getNdbError().code,
			 getNdbError().message));
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    }
  }
  else // just do a regular round robin
  {
    Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
    Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
    UintR Tcount = 0;
    do {
      theCurrentConnectIndex++;
      if (theCurrentConnectIndex >= tNoOfDbNodes)
	theCurrentConnectIndex = 0;

      Tcount++;
      tNode= theImpl->theDBnodes[theCurrentConnectIndex];
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
113
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
114 115
      } else if (TretCode < 0) {
        DBUG_RETURN(NULL);
116 117 118
      } else if (TretCode != 0) {
	tAnyAlive= 1;
      }//if
119
      DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));
120 121
    } while (Tcount < tNoOfDbNodes);
  }
122 123 124 125 126 127 128 129 130 131 132 133
//****************************************************************************
// We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure.
//****************************************************************************
  if (tAnyAlive == 1) {
#ifdef VM_TRACE
    ndbout << "TretCode = " << TretCode << endl;
#endif
    theError.code = 4006;
  } else {
    theError.code = 4009;
  }//if
134
  DBUG_RETURN(NULL);
135 136 137 138 139 140 141 142 143 144 145 146
}

int 
Ndb::NDB_connect(Uint32 tNode) 
{
//****************************************************************************
// We will perform seize of a transaction record in DBTC in the specified node.
//***************************************************************************
  
  int	         tReturnCode;
  TransporterFacade *tp = TransporterFacade::instance();

147 148
  DBUG_ENTER("Ndb::NDB_connect");

149 150
  bool nodeAvail = tp->get_node_alive(tNode);
  if(nodeAvail == false){
151
    DBUG_RETURN(0);
152 153
  }
  
154
  NdbTransaction * tConArray = theConnectionArray[tNode];
155
  if (tConArray != NULL) {
156
    DBUG_RETURN(2);
157 158
  }
  
159
  NdbTransaction * tNdbCon = getNdbCon();	// Get free connection object.
160
  if (tNdbCon == NULL) {
161
    DBUG_RETURN(4);
162 163 164 165
  }//if
  NdbApiSignal*	tSignal = getSignal();		// Get signal object
  if (tSignal == NULL) {
    releaseNdbCon(tNdbCon);
166
    DBUG_RETURN(4);
167 168 169 170
  }//if
  if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {
    releaseNdbCon(tNdbCon);
    releaseSignal(tSignal);
171
    DBUG_RETURN(4);
172 173 174
  }//if
  tSignal->setData(tNdbCon->ptr2int(), 1);
//************************************************
175
// Set connection pointer as NdbTransaction object
176 177
//************************************************
  tSignal->setData(theMyRef, 2);	// Set my block reference
178
  tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
179 180
  Uint32 nodeSequence;
  { // send and receive signal
joreland@mysql.com's avatar
joreland@mysql.com committed
181
    Guard guard(tp->theMutexPtr);
182 183 184
    nodeSequence = tp->getNodeSequence(tNode);
    bool node_is_alive = tp->get_node_alive(tNode);
    if (node_is_alive) { 
185
      DBUG_PRINT("info",("Sending signal to node %u", tNode));
186 187
      tReturnCode = tp->sendSignal(tSignal, tNode);  
      releaseSignal(tSignal); 
joreland@mysql.com's avatar
joreland@mysql.com committed
188
      if (tReturnCode != -1) {
189 190
        theImpl->theWaiter.m_node = tNode;  
        theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
191 192 193 194 195 196 197
        tReturnCode = receiveResponse(); 
      }//if
    } else {
      releaseSignal(tSignal);
      tReturnCode = -1;
    }//if
  }
198
  if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
199 200 201
    //************************************************
    // Send and receive was successful
    //************************************************
202
    NdbTransaction* tPrevFirst = theConnectionArray[tNode];
203 204 205 206 207
    tNdbCon->setConnectedNodeId(tNode, nodeSequence);
    
    tNdbCon->setMyBlockReference(theMyRef);
    theConnectionArray[tNode] = tNdbCon;
    tNdbCon->theNext = tPrevFirst;
208
    DBUG_RETURN(1);
209 210 211 212 213
  } else {
    releaseNdbCon(tNdbCon);
//****************************************************************************
// Unsuccessful connect is indicated by 3.
//****************************************************************************
214 215 216
    DBUG_PRINT("info",
	       ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d",
		tReturnCode, tNdbCon->Status()));
217 218 219 220 221
    if (theError.code == 299)
    {
      // single user mode so no need to retry with other node
      DBUG_RETURN(-1);
    }
222
    DBUG_RETURN(3);
223 224 225
  }//if
}//Ndb::NDB_connect()

226 227 228
NdbTransaction *
Ndb::getConnectedNdbTransaction(Uint32 nodeId){
  NdbTransaction* next = theConnectionArray[nodeId];
229 230 231 232
  theConnectionArray[nodeId] = next->theNext;
  next->theNext = NULL;

  return next;
233
}//Ndb::getConnectedNdbTransaction()
234 235 236 237 238 239 240 241 242

/*****************************************************************************
disconnect();

Remark:        Disconnect all connections to the database. 
*****************************************************************************/
void 
Ndb::doDisconnect()
{
243
  NdbTransaction* tNdbCon;
244
  CHECK_STATUS_MACRO_VOID;
monty@mysql.com's avatar
monty@mysql.com committed
245 246
  /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */
  DBUG_ENTER("Ndb::doDisconnect");
247

248 249 250
  Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
  Uint8 *theDBnodes= theImpl->theDBnodes;
  DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes));
251 252 253 254 255
  UintR i;
  for (i = 0; i < tNoOfDbNodes; i++) {
    Uint32 tNode = theDBnodes[i];
    tNdbCon = theConnectionArray[tNode];
    while (tNdbCon != NULL) {
256
      NdbTransaction* tmpNdbCon = tNdbCon;
257 258 259 260 261 262
      tNdbCon = tNdbCon->theNext;
      releaseConnectToNdb(tmpNdbCon);
    }//while
  }//for
  tNdbCon = theTransactionList;
  while (tNdbCon != NULL) {
263
    NdbTransaction* tmpNdbCon = tNdbCon;
264 265 266
    tNdbCon = tNdbCon->theNext;
    releaseConnectToNdb(tmpNdbCon);
  }//while
267
  DBUG_VOID_RETURN;
268 269 270 271 272 273 274 275 276 277 278 279
}//Ndb::disconnect()

/*****************************************************************************
int waitUntilReady(int timeout);

Return Value:   Returns 0 if the Ndb is ready within timeout seconds.
                Returns -1 otherwise.
Remark:         Waits until a node has status != 0
*****************************************************************************/ 
int
Ndb::waitUntilReady(int timeout)
{
280
  DBUG_ENTER("Ndb::waitUntilReady");
281 282 283 284 285 286
  int secondsCounter = 0;
  int milliCounter = 0;

  if (theInitState != Initialised) {
    // Ndb::init is not called
    theError.code = 4256;
287
    DBUG_RETURN(-1);
288 289
  }

290
  while (theNode == 0) {
291
    if (secondsCounter >= timeout)
292 293 294 295
    {
      theError.code = 4269;
      DBUG_RETURN(-1);
    }
296 297 298 299 300 301
    NdbSleep_MilliSleep(100);
    milliCounter += 100;
    if (milliCounter >= 1000) {
      secondsCounter++;
      milliCounter = 0;
    }//if
302 303 304
  }

  if (theImpl->m_ndb_cluster_connection.wait_until_ready
305
      (timeout-secondsCounter,30) < 0)
306 307
  {
    theError.code = 4009;
308 309
    DBUG_RETURN(-1);
  }
310 311

  DBUG_RETURN(0);
312 313 314
}

/*****************************************************************************
315
NdbTransaction* startTransaction();
316 317 318 319 320

Return Value:   Returns a pointer to a connection object.
                Return NULL otherwise.
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
321
NdbTransaction* 
322 323
Ndb::startTransaction(const NdbDictionary::Table *table,
		      const char * keyData, Uint32 keyLen)
324
{
325
  DBUG_ENTER("Ndb::startTransaction");
326 327 328 329

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();
330 331 332 333 334
    /**
     * If the user supplied key data
     * We will make a qualified quess to which node is the primary for the
     * the fragment and contact that node
     */
335
    Uint32 nodeId;
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    NdbTableImpl* impl;
    if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table))) 
    {
      Uint32 hashValue;
      {
	Uint32 buf[4];
	if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0)
	{
	  md5_hash(buf, (const Uint64*)keyData, keyLen >> 2);
	}
	else
	{
	  Uint64 tmp[1000];
	  tmp[keyLen/8] = 0;
	  memcpy(tmp, keyData, keyLen);
	  md5_hash(buf, tmp, (keyLen+3) >> 2);	  
	}
	hashValue= buf[1];
      }
      const Uint16 *nodes;
      Uint32 cnt= impl->get_nodes(hashValue, &nodes);
      if(cnt)
	nodeId= nodes[0];
      else
	nodeId= 0;
361 362 363
    } else {
      nodeId = 0;
    }//if
364

365
    {
366
      NdbTransaction *trans= startTransactionLocal(0, nodeId);
367 368 369
      DBUG_PRINT("exit",("start trans: 0x%lx  transid: 0x%lx",
			 (long) trans,
                         (long) (trans ? trans->getTransactionId() : 0)));
370 371
      DBUG_RETURN(trans);
    }
372
  } else {
373
    DBUG_RETURN(NULL);
374 375 376 377
  }//if
}//Ndb::startTransaction()

/*****************************************************************************
378
NdbTransaction* hupp(NdbTransaction* pBuddyTrans);
379 380 381 382 383 384

Return Value:   Returns a pointer to a connection object.
                Connected to the same node as pBuddyTrans
                and also using the same transction id
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
385 386
NdbTransaction* 
Ndb::hupp(NdbTransaction* pBuddyTrans)
387
{
388 389
  DBUG_ENTER("Ndb::hupp");

390
  DBUG_PRINT("enter", ("trans: 0x%lx", (long) pBuddyTrans));
391

392 393
  Uint32 aPriority = 0;
  if (pBuddyTrans == NULL){
394
    DBUG_RETURN(startTransaction());
395 396 397 398 399 400 401
  }

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();

    Uint32 nodeId = pBuddyTrans->getConnectedNodeId();
402
    NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId);
403
    if(pCon == NULL)
404
      DBUG_RETURN(NULL);
405 406 407 408 409

    if (pCon->getConnectedNodeId() != nodeId){
      // We could not get a connection to the desired node
      // release the connection and return NULL
      closeTransaction(pCon);
410
      theError.code = 4006;
411
      DBUG_RETURN(NULL);
412 413 414
    }
    pCon->setTransactionId(pBuddyTrans->getTransactionId());
    pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());
415 416 417
    DBUG_PRINT("exit", ("hupp trans: 0x%lx transid: 0x%lx",
                        (long) pCon,
                        (long) (pCon ? pCon->getTransactionId() : 0)));
418
    DBUG_RETURN(pCon);
419
  } else {
420
    DBUG_RETURN(NULL);
421 422 423
  }//if
}//Ndb::hupp()

424
NdbTransaction* 
425 426 427 428 429 430 431 432 433 434
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
{
#ifdef VM_TRACE
  char buf[255];
  const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
  if(val != 0){
    nodeId = atoi(val);
  }
#endif

tomas@poseidon.(none)'s avatar
tomas@poseidon.(none) committed
435 436 437
  DBUG_ENTER("Ndb::startTransactionLocal");
  DBUG_PRINT("enter", ("nodeid: %d", nodeId));

438 439 440 441 442 443
  if(unlikely(theRemainingStartTransactions == 0))
  {
    theError.code = 4006;
    DBUG_RETURN(0);
  }
  
444
  NdbTransaction* tConnection;
445 446 447
  Uint64 tFirstTransId = theFirstTransId;
  tConnection = doConnect(nodeId);
  if (tConnection == NULL) {
tomas@poseidon.(none)'s avatar
tomas@poseidon.(none) committed
448
    DBUG_RETURN(NULL);
449
  }//if
joreland@mysql.com's avatar
merge  
joreland@mysql.com committed
450 451

  theRemainingStartTransactions--;
452
  NdbTransaction* tConNext = theTransactionList;
453 454 455 456 457
  if (tConnection->init())
  {
    theError.code = tConnection->theError.code;
    DBUG_RETURN(NULL);
  }
458 459 460 461 462 463 464 465 466 467 468 469 470 471
  theTransactionList = tConnection;        // into a transaction list.
  tConnection->next(tConNext);   // Add the active connection object
  tConnection->setTransactionId(tFirstTransId);
  tConnection->thePriority = aPriority;
  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
    //---------------------------------------------------
// Transaction id rolling round. We will start from
// consecutive identity 0 again.
//---------------------------------------------------
    theFirstTransId = ((tFirstTransId >> 32) << 32);      
  } else {
    theFirstTransId = tFirstTransId + 1;
  }//if
#ifdef VM_TRACE
472
  if (tConnection->theListState != NdbTransaction::NotInList) {
473 474 475 476
    printState("startTransactionLocal %x", tConnection);
    abort();
  }
#endif
tomas@poseidon.(none)'s avatar
tomas@poseidon.(none) committed
477
  DBUG_RETURN(tConnection);
478 479 480
}//Ndb::startTransactionLocal()

/*****************************************************************************
481
void closeTransaction(NdbTransaction* aConnection);
482 483 484 485 486

Parameters:     aConnection: the connection used in the transaction.
Remark:         Close transaction by releasing the connection and all operations.
*****************************************************************************/
void
487
Ndb::closeTransaction(NdbTransaction* aConnection)
488
{
489
  DBUG_ENTER("Ndb::closeTransaction");
490 491
  NdbTransaction* tCon;
  NdbTransaction* tPreviousCon;
492 493 494 495 496 497 498 499 500

  if (aConnection == NULL) {
//-----------------------------------------------------
// closeTransaction called on NULL pointer, destructive
// application behaviour.
//-----------------------------------------------------
#ifdef VM_TRACE
    printf("NULL into closeTransaction\n");
#endif
501
    DBUG_VOID_RETURN;
502 503 504 505
  }//if
  CHECK_STATUS_MACRO_VOID;
  
  tCon = theTransactionList;
506
  theRemainingStartTransactions++;
joreland@mysql.com's avatar
joreland@mysql.com committed
507
  
508 509 510
  DBUG_PRINT("info",("close trans: 0x%lx  transid: 0x%lx",
                     (long) aConnection,
                     (long) aConnection->getTransactionId()));
511 512 513 514
  DBUG_PRINT("info",("magic number: 0x%x TCConPtr: 0x%x theMyRef: 0x%x 0x%x",
		     aConnection->theMagicNumber, aConnection->theTCConPtr,
		     aConnection->theMyRef, getReference()));

515
  if (aConnection == tCon) {		// Remove the active connection object
joreland@mysql.com's avatar
joreland@mysql.com committed
516
    theTransactionList = tCon->next();	// from the transaction list.
517 518 519 520 521 522
  } else { 
    while (aConnection != tCon) {
      if (tCon == NULL) {
//-----------------------------------------------------
// closeTransaction called on non-existing transaction
//-----------------------------------------------------
523

joreland@mysql.com's avatar
joreland@mysql.com committed
524 525
	if(aConnection->theError.code == 4008){
	  /**
526
	   * When a SCAN timed-out, returning the NdbTransaction leads
joreland@mysql.com's avatar
joreland@mysql.com committed
527 528 529
	   * to reuse. And TC crashes when the API tries to reuse it to
	   * something else...
	   */
530
#ifdef VM_TRACE
531
	  printf("Scan timeout:ed NdbTransaction-> "
joreland@mysql.com's avatar
joreland@mysql.com committed
532
		 "not returning it-> memory leak\n");
533
#endif
534
	  DBUG_VOID_RETURN;
joreland@mysql.com's avatar
joreland@mysql.com committed
535
	}
536

537
#ifdef VM_TRACE
joreland@mysql.com's avatar
joreland@mysql.com committed
538
	printf("Non-existing transaction into closeTransaction\n");
539 540
	abort();
#endif
541
	DBUG_VOID_RETURN;
542 543 544 545 546 547
      }//if
      tPreviousCon = tCon;
      tCon = tCon->next();
    }//while
    tPreviousCon->next(tCon->next());
  }//if
joreland@mysql.com's avatar
joreland@mysql.com committed
548
  
549
  aConnection->release();
joreland@mysql.com's avatar
joreland@mysql.com committed
550
  
551 552
  if(aConnection->theError.code == 4008){
    /**
553
     * Something timed-out, returning the NdbTransaction leads
554 555 556 557
     * to reuse. And TC crashes when the API tries to reuse it to
     * something else...
     */
#ifdef VM_TRACE
558
    printf("Con timeout:ed NdbTransaction-> not returning it-> memory leak\n");
559
#endif
560
    DBUG_VOID_RETURN;
561
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
562
  
563 564 565 566 567 568 569
  if (aConnection->theReleaseOnClose == false) {
    /**
     * Put it back in idle list for that node
     */
    Uint32 nodeId = aConnection->getConnectedNodeId();
    aConnection->theNext = theConnectionArray[nodeId];
    theConnectionArray[nodeId] = aConnection;
570
    DBUG_VOID_RETURN;
571 572 573 574
  } else {
    aConnection->theReleaseOnClose = false;
    releaseNdbCon(aConnection);
  }//if
575
  DBUG_VOID_RETURN;
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
}//Ndb::closeTransaction()

/*****************************************************************************
int* NdbTamper(int aAction, int aNode);

Parameters: aAction     Specifies what action to be taken
            1: Lock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
            2: UnLock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
	    3: Crash node

           aNode        Specifies which node the action will be taken
     	  -1: Master DIH 
       	0-16: Nodnumber

Return Value: -1 Error  .
                
Remark:         Sends a signal to DIH.
*****************************************************************************/ 
int 
Ndb::NdbTamper(TamperType aAction, int aNode)
{
597
  NdbTransaction*	tNdbConn;
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
  NdbApiSignal		tSignal(theMyRef);
  int			tNode;
  int                   tAction;
  int			ret_code;

#ifdef CUSTOMER_RELEASE
  return -1;
#else
  CHECK_STATUS_MACRO;
  checkFailedNode();

  theRestartGCI = 0;
  switch (aAction) {
// Translate enum to integer. This is done because the SCI layer
// expects integers. 
     case LockGlbChp:
        tAction = 1;
        break;
     case UnlockGlbChp:
        tAction = 2;
	break;
     case CrashNode:
        tAction = 3;
        break;
     case ReadRestartGCI:
	tAction = 4;
	break;
     default:
        theError.code = 4102;
        return -1;
  }

  tNdbConn = getNdbCon();	// Get free connection object
  if (tNdbConn == NULL) {
    theError.code = 4000;
    return -1;
  }
  tSignal.setSignal(GSN_DIHNDBTAMPER);
  tSignal.setData (tAction, 1);
  tSignal.setData(tNdbConn->ptr2int(),2);
  tSignal.setData(theMyRef,3);		// Set return block reference
639
  tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
  TransporterFacade *tp = TransporterFacade::instance();
  if (tAction == 3) {
    tp->lock_mutex();
    tp->sendSignal(&tSignal, aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
  } else if ( (tAction == 2) || (tAction == 1) ) {
    tp->lock_mutex();
    tNode = tp->get_an_alive_node();
    if (tNode == 0) {
      theError.code = 4002;
      releaseNdbCon(tNdbConn);
      return -1;
    }//if
    ret_code = tp->sendSignal(&tSignal,aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
    return ret_code;
  } else {
    do {
      tp->lock_mutex();
      // Start protected area
      tNode = tp->get_an_alive_node();
      tp->unlock_mutex();
      // End protected area
      if (tNode == 0) {
        theError.code = 4009;
        releaseNdbCon(tNdbConn);
        return -1;
      }//if
      ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
      if (ret_code == 0) {  
672
        if (tNdbConn->Status() != NdbTransaction::Connected) {
673 674 675 676 677 678 679 680 681 682 683 684 685 686
          theRestartGCI = 0;
        }//if
        releaseNdbCon(tNdbConn);
        return theRestartGCI;
      } else if ((ret_code == -5) || (ret_code == -2)) {
        TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
      } else {
        return -1;
      }//if
    } while (1);
  }
  return 0;
#endif
}
687
#if 0
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
/****************************************************************************
NdbSchemaCon* startSchemaTransaction();

Return Value:   Returns a pointer to a schema connection object.
                Return NULL otherwise.
Remark:         Start schema transaction. Synchronous.
****************************************************************************/ 
NdbSchemaCon* 
Ndb::startSchemaTransaction()
{
  NdbSchemaCon* tSchemaCon;
  if (theSchemaConToNdbList != NULL) {
    theError.code = 4321;
    return NULL;
  }//if
  tSchemaCon = new NdbSchemaCon(this);
  if (tSchemaCon == NULL) {
    theError.code = 4000;
    return NULL;
  }//if 
  theSchemaConToNdbList = tSchemaCon;
  return tSchemaCon;  
}
/*****************************************************************************
void closeSchemaTransaction(NdbSchemaCon* aSchemaCon);

Parameters:     aSchemaCon: the schemacon used in the transaction.
Remark:         Close transaction by releasing the schemacon and all schemaop.
*****************************************************************************/
void
Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon)
{
  if (theSchemaConToNdbList != aSchemaCon) {
    abort();
    return;
  }//if
  aSchemaCon->release();
  delete aSchemaCon;
  theSchemaConToNdbList = NULL;
  return;
}//Ndb::closeSchemaTransaction()
729
#endif
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769

/*****************************************************************************
void RestartGCI(int aRestartGCI);

Remark:		Set theRestartGCI on the NDB object
*****************************************************************************/
void
Ndb::RestartGCI(int aRestartGCI)
{
  theRestartGCI = aRestartGCI;
}

/****************************************************************************
int getBlockNumber(void);

Remark:		
****************************************************************************/
int
Ndb::getBlockNumber()
{
  return theNdbBlockNumber;
}

NdbDictionary::Dictionary *
Ndb::getDictionary() const {
  return theDictionary;
}

/****************************************************************************
int getNodeId();

Remark:		
****************************************************************************/
int
Ndb::getNodeId()
{
  return theNode;
}

/****************************************************************************
770
int    getAutoIncrementValue( const char* aTableName,
771
                              Uint64 & autoValue, 
772 773 774 775 776 777 778 779 780 781
                              Uint32 cacheSize, 
                              Uint64 step,
                              Uint64 start);

Parameters:     aTableName (IN) : The table name.
                autoValue (OUT) : Returns new autoincrement value
                cacheSize  (IN) : Prefetch this many values
                step       (IN) : Specifies the step between the 
                                  autoincrement values.
                start      (IN) : Start value for first value
782
Returns:        0 if succesful, -1 if error encountered
783 784 785 786
Remark:		Returns a new autoincrement value to the application.
                The autoincrement values can be increased by steps
                (default 1) and a number of values can be prefetched
                by specifying cacheSize (default 10).
787
****************************************************************************/
788 789
int
Ndb::getAutoIncrementValue(const char* aTableName,
790 791
                           Uint64 & autoValue, Uint32 cacheSize, 
			   Uint64 step, Uint64 start)
792
{
793
  DBUG_ENTER("Ndb::getAutoIncrementValue");
794 795
  BaseString internal_tabname(internalize_table_name(aTableName));

796
  Ndb_local_table_info *info=
797
    theDictionary->get_local_table_info(internal_tabname, false);
798 799
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
800
    DBUG_RETURN(-1);
801
  }
802 803
  DBUG_PRINT("info", ("step %lu", (ulong) step));
  if (getTupleIdFromNdb(info, autoValue, cacheSize, step, start) == -1)
804
    DBUG_RETURN(-1);
805
  DBUG_PRINT("info", ("value %lu", (ulong) autoValue));
806
  DBUG_RETURN(0);
807 808
}

809 810
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
811 812
                           Uint64 & autoValue, Uint32 cacheSize, 
			   Uint64 step, Uint64 start)
813
{
814
  DBUG_ENTER("Ndb::getAutoIncrementValue");
815
  assert(aTable != 0);
816
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
817
  const BaseString& internal_tabname = table->m_internalName;
818

819 820 821 822
  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
823
    DBUG_RETURN(-1);
824
  }
825 826
  DBUG_PRINT("info", ("step %lu", (ulong) step));
  if (getTupleIdFromNdb(info, autoValue, cacheSize, step, start) == -1)
827
    DBUG_RETURN(-1);
828
  DBUG_PRINT("info", ("value %lu", (ulong) autoValue));
829
  DBUG_RETURN(0);
830 831
}

832 833
int
Ndb::getTupleIdFromNdb(Ndb_local_table_info* info,
834 835
                       Uint64 & tupleId, Uint32 cacheSize, 
		       Uint64 step, Uint64 start)
836
{
837 838 839 840 841 842 843 844 845
/*
  Returns a new TupleId to the application.
  The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
  It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
  In most cases step= start= 1, in which case we get:
  1,2,3,4,5,...
  If step=10 and start=5 and first number is 1, we get:
  5,15,25,35,...  
*/
846
  DBUG_ENTER("Ndb::getTupleIdFromNdb");
847 848 849 850 851 852 853
  DBUG_PRINT("info", ("Step %lu (%lu,%lu)", (ulong) step, (ulong) info->m_first_tuple_id, (ulong) info->m_last_tuple_id));
  /*
   Check if the next value can be taken from the pre-fetched
   sequence.
  */
  if (info->m_first_tuple_id != info->m_last_tuple_id &&
      info->m_first_tuple_id + step <= info->m_last_tuple_id)
854
  {
855
    assert(info->m_first_tuple_id < info->m_last_tuple_id);
856 857 858
    info->m_first_tuple_id += step; 
    tupleId = info->m_first_tuple_id;
    DBUG_PRINT("info", ("Next cached value %lu", (ulong) tupleId));
859
  }
860
  else
861
  {
862 863 864 865 866 867 868 869
     /*
       If start value is greater than step it is ignored
      */
      Uint64 offset = (start > step) ? 1 : start;

    /*
      Pre-fetch a number of values depending on cacheSize
     */
870 871
    if (cacheSize == 0)
      cacheSize = 1;
872 873

    DBUG_PRINT("info", ("Reading %u values from database", (uint)cacheSize));
874 875
    /*
     * reserve next cacheSize entries in db.  adds cacheSize to NEXTID
876 877
     * and returns first tupleId in the new range. If tupleId's are
     * incremented in steps then multiply the cacheSize with step size.
878
     */
879 880
    Uint64 opValue = cacheSize * step;

881 882
    if (opTupleIdOnNdb(info, opValue, 0) == -1)
      DBUG_RETURN(-1);
883 884 885
    DBUG_PRINT("info", ("Next value fetched from database %lu", (ulong) opValue));
    DBUG_PRINT("info", ("Increasing %lu by offset %lu, increment  is %lu", (ulong) (ulong) opValue, (ulong) offset, (ulong) step));
    Uint64 current, next;
886 887
    Uint64 div = ((Uint64) (opValue + step - offset)) / step;
    next = div * step + offset;
888 889 890 891
    current = (next < step) ? next : next - step;
    tupleId = (opValue <= current) ? current : next;
    DBUG_PRINT("info", ("Returning %lu", (ulong) tupleId));
    info->m_first_tuple_id = tupleId;
892
  }
893
  DBUG_RETURN(0);
894 895
}

896 897
/****************************************************************************
int readAutoIncrementValue( const char* aTableName,
898
                            Uint64 & autoValue);
899 900 901 902 903 904

Parameters:     aTableName (IN) : The table name.
                autoValue  (OUT) : The current autoincrement value
Returns:        0 if succesful, -1 if error encountered
Remark:		Returns the current autoincrement value to the application.
****************************************************************************/
905 906
int
Ndb::readAutoIncrementValue(const char* aTableName,
907
                            Uint64 & autoValue)
mskold@mysql.com's avatar
mskold@mysql.com committed
908
{
909
  DBUG_ENTER("Ndb::readAutoIncrementValue");
910 911 912 913 914 915
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
916
    DBUG_RETURN(-1);
917
  }
918
  if (readTupleIdFromNdb(info, autoValue) == -1)
919
    DBUG_RETURN(-1);
920
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
921
  DBUG_RETURN(0);
mskold@mysql.com's avatar
mskold@mysql.com committed
922 923
}

924 925
int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
926
                            Uint64 & autoValue)
927
{
928
  DBUG_ENTER("Ndb::readAutoIncrementValue");
929
  assert(aTable != 0);
930
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
931 932 933 934 935 936
  const BaseString& internal_tabname = table->m_internalName;

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
937
    DBUG_RETURN(-1);
938
  }
939
  if (readTupleIdFromNdb(info, autoValue) == -1)
940
    DBUG_RETURN(-1);
941
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
942
  DBUG_RETURN(0);
943 944
}

945 946 947
int
Ndb::readTupleIdFromNdb(Ndb_local_table_info* info,
                        Uint64 & tupleId)
mskold@mysql.com's avatar
mskold@mysql.com committed
948
{
949 950 951 952 953 954
  DBUG_ENTER("Ndb::readTupleIdFromNdb");
  if (info->m_first_tuple_id != info->m_last_tuple_id)
  {
    assert(info->m_first_tuple_id < info->m_last_tuple_id);
    tupleId = info->m_first_tuple_id + 1;
  }
955
  else
956
  {
957 958 959 960
    /*
     * peek at NEXTID.  does not reserve it so the value is valid
     * only if no other transactions are allowed.
     */
961 962 963 964
    Uint64 opValue = 0;
    if (opTupleIdOnNdb(info, opValue, 3) == -1)
      DBUG_RETURN(-1);
    tupleId = opValue;
965
  }
966
  DBUG_RETURN(0);
mskold@mysql.com's avatar
mskold@mysql.com committed
967 968
}

969 970 971 972 973 974 975 976 977 978 979
/****************************************************************************
int setAutoIncrementValue( const char* aTableName,
                           Uint64 autoValue,
                           bool modify);

Parameters:     aTableName (IN) : The table name.
                autoValue  (IN) : The new autoincrement value
                modify     (IN) : Modify existing value (not initialization)
Returns:        0 if succesful, -1 if error encountered
Remark:		Sets a new autoincrement value for the application.
****************************************************************************/
980 981
int
Ndb::setAutoIncrementValue(const char* aTableName,
982
                           Uint64 autoValue, bool modify)
983
{
984
  DBUG_ENTER("Ndb::setAutoIncrementValue");
985 986
  BaseString internal_tabname(internalize_table_name(aTableName));

987
  Ndb_local_table_info *info=
988
    theDictionary->get_local_table_info(internal_tabname, false);
989
  if (info == 0) {
990
    theError.code = theDictionary->getNdbError().code;
991
    DBUG_RETURN(-1);
992
  }
993
  if (setTupleIdInNdb(info, autoValue, modify) == -1)
994 995
    DBUG_RETURN(-1);
  DBUG_RETURN(0);
996 997
}

998 999
int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
1000
                           Uint64 autoValue, bool modify)
1001
{
1002
  DBUG_ENTER("Ndb::setAutoIncrementValue");
1003
  assert(aTable != 0);
1004
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
1005
  const BaseString& internal_tabname = table->m_internalName;
1006

1007 1008 1009 1010
  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
1011
    DBUG_RETURN(-1);
1012
  }
1013
  if (setTupleIdInNdb(info, autoValue, modify) == -1)
1014 1015
    DBUG_RETURN(-1);
  DBUG_RETURN(0);
1016 1017
}

1018 1019
int
Ndb::setTupleIdInNdb(Ndb_local_table_info* info,
1020
                     Uint64 tupleId, bool modify)
1021
{
1022
  DBUG_ENTER("Ndb::setTupleIdInNdb");
1023
  if (modify)
1024
  {
1025
    if (checkTupleIdInNdb(info, tupleId))
1026
    {
1027
      if (info->m_first_tuple_id != info->m_last_tuple_id)
1028
      {
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
        assert(info->m_first_tuple_id < info->m_last_tuple_id);
        if (tupleId <= info->m_first_tuple_id + 1)
          DBUG_RETURN(0);
        if (tupleId <= info->m_last_tuple_id)
        {
	  info->m_first_tuple_id = tupleId - 1;
          DBUG_PRINT("info",
                     ("Setting next auto increment cached value to %lu",
                      (ulong)tupleId));
	  DBUG_RETURN(0);
        }
1040
      }
1041 1042 1043 1044 1045 1046 1047
      /*
       * if tupleId <= NEXTID, do nothing.  otherwise update NEXTID to
       * tupleId and set cached range to first = last = tupleId - 1.
       */
      Uint64 opValue = tupleId;
      if (opTupleIdOnNdb(info, opValue, 2) == -1)
        DBUG_RETURN(-1);
1048
    }
1049 1050
  }
  else
1051
  {
1052 1053 1054
    /*
     * update NEXTID to given value.  reset cached range.
     */
1055 1056 1057 1058
    if (opTupleIdOnNdb(info, tupleId, 1) == -1)
      DBUG_RETURN(-1);
  }
  DBUG_RETURN(0);
1059 1060
}

1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
int
Ndb::checkTupleIdInNdb(Ndb_local_table_info* info, Uint64 tupleId)
{
  DBUG_ENTER("Ndb::checkTupleIdIndNdb");
  if ((info->m_first_tuple_id != ~(Uint64)0) &&
      (info->m_first_tuple_id > tupleId))
  {
   /*
    * If we have ever cached a value in this object and this cached
    * value is larger than the value we're trying to set then we
    * need not check with the real value in the SYSTAB_0 table.
    */
    DBUG_RETURN(0);
  }
  if (info->m_highest_seen > tupleId)
  {
    /*
     * Although we've never cached any higher value we have read
     * a higher value and again it isn't necessary to change the
     * auto increment value.
     */
    DBUG_RETURN(0);
  }
  DBUG_RETURN(1);
}

1087 1088
int
Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op)
1089
{
1090
  DBUG_ENTER("Ndb::opTupleIdOnNdb");
1091
  Uint32 aTableId = info->m_table_impl->m_tableId;
1092 1093
  DBUG_PRINT("enter", ("table: %u  value: %lu  op: %u",
                       aTableId, (ulong) opValue, op));
1094

1095
  NdbTransaction*     tConnection;
1096
  NdbOperation*      tOperation= 0; // Compiler warning if not initialized
1097 1098 1099
  Uint64             tValue;
  NdbRecAttr*        tRecAttrResult;

1100 1101
  NdbError savedError;

1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124
  CHECK_STATUS_MACRO_ZERO;

  BaseString currentDb(getDatabaseName());
  BaseString currentSchema(getDatabaseSchemaName());

  setDatabaseName("sys");
  setDatabaseSchemaName("def");
  tConnection = this->startTransaction();
  if (tConnection == NULL)
    goto error_return;

  if (usingFullyQualifiedNames())
    tOperation = tConnection->getNdbOperation("SYSTAB_0");
  else
    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
  if (tOperation == NULL)
    goto error_handler;

  switch (op)
    {
    case 0:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
1125
      tOperation->incValue("NEXTID", opValue);
1126 1127 1128 1129 1130 1131 1132
      tRecAttrResult = tOperation->getValue("NEXTID");

      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

      tValue = tRecAttrResult->u_64_value();

1133 1134
      info->m_first_tuple_id = tValue - opValue;
      info->m_last_tuple_id  = tValue - 1;
1135
      opValue = info->m_first_tuple_id; // out
1136 1137
      break;
    case 1:
1138 1139
      // create on first use
      tOperation->writeTuple();
1140 1141 1142
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->setValue("NEXTID", opValue);

1143 1144 1145
      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;

1146 1147
      info->m_first_tuple_id = ~(Uint64)0;
      info->m_last_tuple_id  = ~(Uint64)0;
1148
      info->m_highest_seen = 0;
1149 1150 1151 1152 1153 1154
      break;
    case 2:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->load_const_u64(1, opValue);
      tOperation->read_attr("NEXTID", 2);
1155
      // compare NEXTID >= opValue
1156 1157
      tOperation->branch_le(2, 1, 0);
      tOperation->write_attr("NEXTID", 1);
mskold@mysql.com's avatar
mskold@mysql.com committed
1158
      tOperation->def_label(0);
1159 1160
      tOperation->interpret_exit_ok();
      tRecAttrResult = tOperation->getValue("NEXTID");
1161
      if (tConnection->execute( Commit ) == -1)
1162
      {
1163
        goto error_handler;
1164
      }
mskold@mysql.com's avatar
mskold@mysql.com committed
1165 1166
      else
      {
1167
	info->m_highest_seen = tRecAttrResult->u_64_value();
1168
	DBUG_PRINT("info", 
1169
		   ("Setting auto increment value (db) to %lu",
1170
		    (ulong)opValue));  
1171
        info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
mskold@mysql.com's avatar
mskold@mysql.com committed
1172
      }
1173
      break;
mskold@mysql.com's avatar
mskold@mysql.com committed
1174 1175 1176 1177 1178 1179
    case 3:
      tOperation->readTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tRecAttrResult = tOperation->getValue("NEXTID");
      if (tConnection->execute( Commit ) == -1 )
        goto error_handler;
1180
      info->m_highest_seen = opValue = tRecAttrResult->u_64_value(); // out
mskold@mysql.com's avatar
mskold@mysql.com committed
1181
      break;
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
    default:
      goto error_handler;
    }

  this->closeTransaction(tConnection);

  // Restore current name space
  setDatabaseName(currentDb.c_str());
  setDatabaseSchemaName(currentSchema.c_str());

1192
  DBUG_RETURN(0);
1193 1194 1195

  error_handler:
    theError.code = tConnection->theError.code;
1196 1197 1198

    savedError = theError;

1199
    this->closeTransaction(tConnection);
1200 1201
    theError = savedError;

1202 1203 1204 1205 1206
  error_return:
    // Restore current name space
    setDatabaseName(currentDb.c_str());
    setDatabaseSchemaName(currentSchema.c_str());

1207 1208 1209 1210
  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
             theError.code,
             tConnection ? tConnection->theError.code : -1,
             tOperation ? tOperation->theError.code : -1));
1211
  DBUG_RETURN(-1);
1212 1213 1214 1215 1216
}

Uint32
convertEndian(Uint32 Data)
{
1217
#ifdef WORDS_BIGENDIAN
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  Uint32 t1, t2, t3, t4;
  t4 = (Data >> 24) & 255;
  t3 = (Data >> 16) & 255;
  t4 = t4 + (t3 << 8);
  t2 = (Data >> 8) & 255;
  t4 = t4 + (t2 << 16);
  t1 = Data & 255;
  t4 = t4 + (t1 << 24);
  return t4;
#else
  return Data;
#endif
}
const char * Ndb::getCatalogName() const
{
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1233
  return theImpl->m_dbname.c_str();
1234
}
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1235 1236


1237
int Ndb::setCatalogName(const char * a_catalog_name)
1238
{
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1239 1240
  if (a_catalog_name)
  {
1241 1242 1243 1244 1245 1246
    if (!theImpl->m_dbname.assign(a_catalog_name) ||
        theImpl->update_prefix())
    {
      theError.code = 4000;
      return -1;
    }
1247
  }
1248
  return 0;
1249
}
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1250

1251 1252
const char * Ndb::getSchemaName() const
{
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1253
  return theImpl->m_schemaname.c_str();
1254
}
msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1255 1256


1257
int Ndb::setSchemaName(const char * a_schema_name)
1258 1259
{
  if (a_schema_name) {
1260 1261 1262 1263 1264 1265
    if (!theImpl->m_schemaname.assign(a_schema_name) ||
        theImpl->update_prefix())
    {
      theError.code = 4000;
      return -1;
    }
1266
  }
1267
  return 0;
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
}
 
/*
Deprecated functions
*/
const char * Ndb::getDatabaseName() const
{
  return getCatalogName();
}
 
1278
int Ndb::setDatabaseName(const char * a_catalog_name)
1279
{
1280
  return setCatalogName(a_catalog_name);
1281 1282 1283 1284 1285 1286 1287
}
 
const char * Ndb::getDatabaseSchemaName() const
{
  return getSchemaName();
}
 
1288
int Ndb::setDatabaseSchemaName(const char * a_schema_name)
1289
{
1290
  return setSchemaName(a_schema_name);
1291 1292 1293 1294 1295 1296 1297 1298
}
 
bool Ndb::usingFullyQualifiedNames()
{
  return fullyQualifiedNames;
}
 
const char *
1299
Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames)
1300 1301 1302 1303 1304
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalTableName;
   
    // Skip database name
1305
    while (*ptr && *ptr++ != table_name_separator);
1306
    // Skip schema name
1307
    while (*ptr && *ptr++ != table_name_separator);
1308 1309 1310 1311 1312 1313 1314
    return ptr;
  }
  else
    return internalTableName;
}

const char *
1315
Ndb::externalizeTableName(const char * internalTableName)
1316
{
1317
  return externalizeTableName(internalTableName, usingFullyQualifiedNames());
1318
}
1319

1320
const char *
1321
Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames)
1322 1323 1324 1325 1326 1327
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalIndexName;
   
    // Scan name from the end
    while (*ptr++); ptr--; // strend
1328
    while (ptr >= internalIndexName && *ptr != table_name_separator)
1329 1330 1331 1332 1333 1334 1335
      ptr--;
     
    return ptr + 1;
  }
  else
    return internalIndexName;
}
1336 1337 1338 1339 1340 1341 1342

const char *
Ndb::externalizeIndexName(const char * internalIndexName)
{
  return externalizeIndexName(internalIndexName, usingFullyQualifiedNames());
}

1343 1344 1345

const BaseString
Ndb::internalize_table_name(const char *external_name) const
1346
{
1347 1348 1349 1350
  BaseString ret;
  DBUG_ENTER("internalize_table_name");
  DBUG_PRINT("enter", ("external_name: %s", external_name));

msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1351
  if (fullyQualifiedNames)
1352 1353 1354 1355 1356 1357 1358 1359 1360
  {
    /* Internal table name format <db>/<schema>/<table>
       <db>/<schema> is already available in m_prefix
       so just concat the two strings
     */
    ret.assfmt("%s%s",
               theImpl->m_prefix.c_str(),
               external_name);
  }
1361
  else
1362 1363 1364 1365
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
1366
}
1367 1368 1369 1370 1371


const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table,
                           const char * external_name) const
1372
{
1373 1374 1375 1376 1377 1378 1379
  BaseString ret;
  DBUG_ENTER("internalize_index_name");
  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
                       external_name, table ? table->m_tableId : ~0));
  if (!table)
  {
    DBUG_PRINT("error", ("!table"));
1380
    DBUG_RETURN(ret);
1381 1382
  }

msvensson@neptunus.(none)'s avatar
msvensson@neptunus.(none) committed
1383
  if (fullyQualifiedNames)
1384 1385 1386 1387 1388 1389 1390 1391
  {
    /* Internal index name format <db>/<schema>/<tabid>/<table> */
    ret.assfmt("%s%d%c%s",
               theImpl->m_prefix.c_str(),
               table->m_tableId,
               table_name_separator,
               external_name);
  }
1392
  else
1393 1394 1395 1396
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
1397 1398
}

1399

1400 1401 1402 1403
const BaseString
Ndb::getDatabaseFromInternalName(const char * internalName)
{
  char * databaseName = new char[strlen(internalName) + 1];
1404 1405 1406 1407 1408
  if (databaseName == NULL)
  {
    errno = ENOMEM;
    return BaseString(NULL);
  }
1409 1410 1411
  strcpy(databaseName, internalName);
  register char *ptr = databaseName;
   
1412 1413
  /* Scan name for the first table_name_separator */
  while (*ptr && *ptr != table_name_separator)
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(databaseName);
  delete [] databaseName;
  return ret;
}
 
const BaseString
Ndb::getSchemaFromInternalName(const char * internalName)
{
  char * schemaName = new char[strlen(internalName)];
1425 1426 1427 1428 1429
  if (schemaName == NULL)
  {
    errno = ENOMEM;
    return BaseString(NULL);
  }
1430 1431
  register const char *ptr1 = internalName;
   
1432 1433
  /* Scan name for the second table_name_separator */
  while (*ptr1 && *ptr1 != table_name_separator)
1434 1435 1436
    ptr1++;
  strcpy(schemaName, ptr1 + 1);
  register char *ptr = schemaName;
1437
  while (*ptr && *ptr != table_name_separator)
1438 1439 1440 1441 1442 1443 1444 1445 1446
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(schemaName);
  delete [] schemaName;
  return ret;
}

#ifdef VM_TRACE
#include <NdbMutex.h>
1447 1448
extern NdbMutex *ndb_print_state_mutex;

1449
static bool
1450
checkdups(NdbTransaction** list, unsigned no)
1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
{
  for (unsigned i = 0; i < no; i++)
    for (unsigned j = i + 1; j < no; j++)
      if (list[i] == list[j])
        return true;
  return false;
}
void
Ndb::printState(const char* fmt, ...)
{
  char buf[200];
  va_list ap;
  va_start(ap, fmt);
  vsprintf(buf, fmt, ap);
  va_end(ap);
1466
  NdbMutex_Lock(ndb_print_state_mutex);
1467
  bool dups = false;
1468
  unsigned i;
1469
  ndbout << buf << " ndb=" << hex << this << dec;
1470
#ifndef NDB_WIN32
1471 1472 1473 1474
  ndbout << " thread=" << (int)pthread_self();
#endif
  ndbout << endl;
  for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
1475
    NdbTransaction* con = theConnectionArray[n];
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
    if (con != 0) {
      ndbout << "conn " << n << ":" << endl;
      while (con != 0) {
        con->printState();
        con = con->theNext;
      }
    }
  }
  ndbout << "prepared: " << theNoOfPreparedTransactions<< endl;
  if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
1489
  for (i = 0; i < theNoOfPreparedTransactions; i++)
1490 1491 1492 1493 1494 1495
    thePreparedTransactionsArray[i]->printState();
  ndbout << "sent: " << theNoOfSentTransactions<< endl;
  if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
1496
  for (i = 0; i < theNoOfSentTransactions; i++)
1497 1498 1499 1500 1501 1502
    theSentTransactionsArray[i]->printState();
  ndbout << "completed: " << theNoOfCompletedTransactions<< endl;
  if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
1503
  for (i = 0; i < theNoOfCompletedTransactions; i++)
1504
    theCompletedTransactionsArray[i]->printState();
1505
  NdbMutex_Unlock(ndb_print_state_mutex);
1506 1507 1508 1509
}
#endif