NdbScanOperation.cpp 45.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

17
#include <ndb_global.h>
18 19
#include <Ndb.hpp>
#include <NdbScanOperation.hpp>
joreland@mysql.com's avatar
joreland@mysql.com committed
20
#include <NdbIndexScanOperation.hpp>
21
#include <NdbTransaction.hpp>
22 23 24
#include "NdbApiSignal.hpp"
#include <NdbOut.hpp>
#include "NdbDictionaryImpl.hpp"
25
#include <NdbBlob.hpp>
26

joreland@mysql.com's avatar
joreland@mysql.com committed
27 28 29 30 31 32 33 34
#include <NdbRecAttr.hpp>
#include <NdbReceiver.hpp>

#include <stdlib.h>
#include <NdbSqlUtil.hpp>

#include <signaldata/ScanTab.hpp>
#include <signaldata/KeyInfo.hpp>
35
#include <signaldata/AttrInfo.hpp>
joreland@mysql.com's avatar
joreland@mysql.com committed
36 37
#include <signaldata/TcKeyReq.hpp>

38 39
#define DEBUG_NEXT_RESULT 0

40 41
NdbScanOperation::NdbScanOperation(Ndb* aNdb, NdbOperation::Type aType) :
  NdbOperation(aNdb, aType),
joreland@mysql.com's avatar
joreland@mysql.com committed
42
  m_transConnection(NULL)
43
{
joreland@mysql.com's avatar
joreland@mysql.com committed
44 45 46 47 48 49 50
  theParallelism = 0;
  m_allocated_receivers = 0;
  m_prepared_receivers = 0;
  m_api_receivers = 0;
  m_conf_receivers = 0;
  m_sent_receivers = 0;
  m_receivers = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
51
  m_array = new Uint32[1]; // skip if on delete in fix_receivers
joreland@mysql.com's avatar
joreland@mysql.com committed
52
  theSCAN_TABREQ = 0;
53 54 55 56
}

NdbScanOperation::~NdbScanOperation()
{
joreland@mysql.com's avatar
joreland@mysql.com committed
57
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
joreland@mysql.com's avatar
joreland@mysql.com committed
58
    m_receivers[i]->release();
joreland@mysql.com's avatar
joreland@mysql.com committed
59 60 61
    theNdb->releaseNdbScanRec(m_receivers[i]);
  }
  delete[] m_array;
62 63 64 65
}

void
NdbScanOperation::setErrorCode(int aErrorCode){
66
  NdbTransaction* tmp = theNdbCon;
67 68 69 70 71 72 73
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCode(aErrorCode);
  theNdbCon = tmp;
}

void
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
74
  NdbTransaction* tmp = theNdbCon;
75 76 77 78 79 80 81 82 83 84 85 86 87 88
  theNdbCon = m_transConnection;
  NdbOperation::setErrorCodeAbort(aErrorCode);
  theNdbCon = tmp;
}

  
/*****************************************************************************
 * int init();
 *
 * Return Value:  Return 0 : init was successful.
 *                Return -1: In all other case.  
 * Remark:        Initiates operation record after allocation.
 *****************************************************************************/
int
89
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
90 91 92
{
  m_transConnection = myConnection;
  //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
93
  theNdb->theRemainingStartTransactions++; // will be checked in hupp...
94
  NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
95
  if (!aScanConnection){
96
    theNdb->theRemainingStartTransactions--;
97
    setErrorCodeAbort(theNdb->getNdbError().code);
98
    return -1;
99
  }
100

joreland@mysql.com's avatar
joreland@mysql.com committed
101 102
  // NOTE! The hupped trans becomes the owner of the operation
  if(NdbOperation::init(tab, aScanConnection) != 0){
103
    theNdb->theRemainingStartTransactions--;
104 105
    return -1;
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
106 107 108 109 110
  
  initInterpreter();
  
  theStatus = GetValue;
  theOperationType = OpenScanRequest;
111
  theNdbCon->theMagicNumber = 0xFE11DF;
112
  theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
113
  m_read_range_no = 0;
114 115 116
  return 0;
}

117 118
int 
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
119
			     Uint32 scan_flags, 
120
			     Uint32 parallel)
121
{
122
  m_ordered = m_descending = false;
joreland@mysql.com's avatar
joreland@mysql.com committed
123
  Uint32 fragCount = m_currentTable->m_fragmentCount;
124

mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
125
  if (parallel > fragCount || parallel == 0) {
joreland@mysql.com's avatar
joreland@mysql.com committed
126
     parallel = fragCount;
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
127
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
128 129 130 131 132 133 134

  // It is only possible to call openScan if 
  //  1. this transcation don't already  contain another scan operation
  //  2. this transaction don't already contain other operations
  //  3. theScanOp contains a NdbScanOperation
  if (theNdbCon->theScanningOp != NULL){
    setErrorCode(4605);
135
    return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
136
  }
137

joreland@mysql.com's avatar
joreland@mysql.com committed
138
  theNdbCon->theScanningOp = this;
joreland@mysql.com's avatar
joreland@mysql.com committed
139
  theLockMode = lm;
140

joreland@mysql.com's avatar
joreland@mysql.com committed
141 142 143 144 145 146 147 148 149 150 151 152
  bool lockExcl, lockHoldMode, readCommitted;
  switch(lm){
  case NdbScanOperation::LM_Read:
    lockExcl = false;
    lockHoldMode = true;
    readCommitted = false;
    break;
  case NdbScanOperation::LM_Exclusive:
    lockExcl = true;
    lockHoldMode = true;
    readCommitted = false;
    break;
joreland@mysql.com's avatar
joreland@mysql.com committed
153
  case NdbScanOperation::LM_CommittedRead:
joreland@mysql.com's avatar
joreland@mysql.com committed
154 155 156 157 158 159
    lockExcl = false;
    lockHoldMode = false;
    readCommitted = true;
    break;
  default:
    setErrorCode(4003);
160
    return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
161
  }
162

joreland@mysql.com's avatar
joreland@mysql.com committed
163
  m_keyInfo = lockExcl ? 1 : 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
164

165
  bool rangeScan = false;
166 167
  if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
  {
168 169 170 171 172 173 174
    if (m_currentTable == m_accessTable){
      // Old way of scanning indexes, should not be allowed
      m_currentTable = theNdb->theDictionary->
	getTable(m_currentTable->m_primaryTable.c_str());
      assert(m_currentTable != NULL);
    }
    assert (m_currentTable != m_accessTable);
joreland@mysql.com's avatar
joreland@mysql.com committed
175
    // Modify operation state
176
    theStatus = GetValue;
joreland@mysql.com's avatar
joreland@mysql.com committed
177
    theOperationType  = OpenRangeScanRequest;
178
    rangeScan = true;
joreland@mysql.com's avatar
joreland@mysql.com committed
179
  }
180 181 182 183

  bool tupScan = (scan_flags & SF_TupScan);
  if (tupScan && rangeScan)
    tupScan = false;
joreland@mysql.com's avatar
joreland@mysql.com committed
184
  
joreland@mysql.com's avatar
joreland@mysql.com committed
185
  theParallelism = parallel;
186

joreland@mysql.com's avatar
joreland@mysql.com committed
187
  if(fix_receivers(parallel) == -1){
joreland@mysql.com's avatar
joreland@mysql.com committed
188
    setErrorCodeAbort(4000);
189
    return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
190 191
  }
  
joreland@mysql.com's avatar
joreland@mysql.com committed
192
  theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
joreland@mysql.com's avatar
joreland@mysql.com committed
193 194
  if (theSCAN_TABREQ == NULL) {
    setErrorCodeAbort(4000);
195
    return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
196 197
  }//if
  
198
  theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
joreland@mysql.com's avatar
joreland@mysql.com committed
199 200 201 202 203 204 205 206
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  req->apiConnectPtr = theNdbCon->theTCConPtr;
  req->tableId = m_accessTable->m_tableId;
  req->tableSchemaVersion = m_accessTable->m_version;
  req->storedProcId = 0xFFFF;
  req->buddyConPtr = theNdbCon->theBuddyConPtr;
  
  Uint32 reqInfo = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
207
  ScanTabReq::setParallelism(reqInfo, parallel);
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
208
  ScanTabReq::setScanBatch(reqInfo, 0);
joreland@mysql.com's avatar
joreland@mysql.com committed
209 210 211
  ScanTabReq::setLockMode(reqInfo, lockExcl);
  ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
  ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
212 213
  ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
  ScanTabReq::setTupScanFlag(reqInfo, tupScan);
joreland@mysql.com's avatar
joreland@mysql.com committed
214
  req->requestInfo = reqInfo;
215

joreland@mysql.com's avatar
joreland@mysql.com committed
216 217 218
  Uint64 transId = theNdbCon->getTransactionId();
  req->transId1 = (Uint32) transId;
  req->transId2 = (Uint32) (transId >> 32);
219

220 221 222 223 224
  NdbApiSignal* tSignal = theSCAN_TABREQ->next();
  if(!tSignal)
  {
    theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
  }
225
  theLastKEYINFO = tSignal;
226 227 228 229
  
  tSignal->setSignal(GSN_KEYINFO);
  theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
  theTotalNrOfKeyWordInSignal= 0;
230

231
  getFirstATTRINFOScan();
232
  return 0;
233 234
}

joreland@mysql.com's avatar
joreland@mysql.com committed
235
int
joreland@mysql.com's avatar
joreland@mysql.com committed
236 237 238 239 240
NdbScanOperation::fix_receivers(Uint32 parallel){
  assert(parallel > 0);
  if(parallel > m_allocated_receivers){
    const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));

241
    Uint64 * tmp = new Uint64[(sz+7)/8];
joreland@mysql.com's avatar
joreland@mysql.com committed
242
    // Save old receivers
243
    memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
joreland@mysql.com's avatar
joreland@mysql.com committed
244
    delete[] m_array;
245
    m_array = (Uint32*)tmp;
joreland@mysql.com's avatar
joreland@mysql.com committed
246
    
247
    m_receivers = (NdbReceiver**)tmp;
joreland@mysql.com's avatar
joreland@mysql.com committed
248 249 250
    m_api_receivers = m_receivers + parallel;
    m_conf_receivers = m_api_receivers + parallel;
    m_sent_receivers = m_conf_receivers + parallel;
251
    m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
joreland@mysql.com's avatar
joreland@mysql.com committed
252

joreland@mysql.com's avatar
joreland@mysql.com committed
253
    // Only get/init "new" receivers
joreland@mysql.com's avatar
joreland@mysql.com committed
254
    NdbReceiver* tScanRec;
joreland@mysql.com's avatar
joreland@mysql.com committed
255
    for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
joreland@mysql.com's avatar
joreland@mysql.com committed
256 257 258 259 260 261
      tScanRec = theNdb->getNdbScanRec();
      if (tScanRec == NULL) {
	setErrorCodeAbort(4000);
	return -1;
      }//if
      m_receivers[i] = tScanRec;
joreland@mysql.com's avatar
joreland@mysql.com committed
262
      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
joreland@mysql.com's avatar
joreland@mysql.com committed
263
    }
joreland@mysql.com's avatar
joreland@mysql.com committed
264
    m_allocated_receivers = parallel;
joreland@mysql.com's avatar
joreland@mysql.com committed
265
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
266
  
267
  reset_receivers(parallel, 0);
268 269 270
  return 0;
}

joreland@mysql.com's avatar
joreland@mysql.com committed
271 272 273 274 275
/**
 * Move receiver from send array to conf:ed array
 */
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
joreland@mysql.com's avatar
joreland@mysql.com committed
276
  if(theError.code == 0){
277 278 279
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_delivered");
    
joreland@mysql.com's avatar
joreland@mysql.com committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
    
    last = m_conf_receivers_count;
    m_conf_receivers[last] = tRec;
    m_conf_receivers_count = last + 1;
    tRec->m_list_index = last;
    tRec->m_current_row = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
294
  }
295 296
}

joreland@mysql.com's avatar
joreland@mysql.com committed
297 298 299 300 301
/**
 * Remove receiver as it's completed
 */
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
joreland@mysql.com's avatar
joreland@mysql.com committed
302
  if(theError.code == 0){
303 304 305
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver_completed");
    
joreland@mysql.com's avatar
joreland@mysql.com committed
306 307 308 309 310 311 312 313
    Uint32 idx = tRec->m_list_index;
    Uint32 last = m_sent_receivers_count - 1;
    if(idx != last){
      NdbReceiver * move = m_sent_receivers[last];
      m_sent_receivers[idx] = move;
      move->m_list_index = idx;
    }
    m_sent_receivers_count = last;
joreland@mysql.com's avatar
joreland@mysql.com committed
314
  }
315 316
}

joreland@mysql.com's avatar
joreland@mysql.com committed
317 318 319 320 321 322 323 324 325 326 327 328 329 330
/*****************************************************************************
 * int getFirstATTRINFOScan( U_int32 aData )
 *
 * Return Value:  Return 0:   Successful
 *      	  Return -1:  All other cases
 * Parameters:    None: 	   Only allocate the first signal.
 * Remark:        When a scan is defined we need to use this method instead 
 *                of insertATTRINFO for the first signal. 
 *                This is because we need not to mess up the code in 
 *                insertATTRINFO with if statements since we are not 
 *                interested in the TCKEYREQ signal.
 *****************************************************************************/
int
NdbScanOperation::getFirstATTRINFOScan()
331
{
joreland@mysql.com's avatar
joreland@mysql.com committed
332
  NdbApiSignal* tSignal;
333

joreland@mysql.com's avatar
joreland@mysql.com committed
334 335 336 337 338 339 340 341 342 343 344
  tSignal = theNdb->getSignal();
  if (tSignal == NULL){
    setErrorCodeAbort(4000);      
    return -1;    
  }
  tSignal->setSignal(m_attrInfoGSN);
  theAI_LenInCurrAI = 8;
  theATTRINFOptr = &tSignal->getDataPtrSend()[8];
  theFirstATTRINFO = tSignal;
  theCurrentATTRINFO = tSignal;
  theCurrentATTRINFO->next(NULL);
345 346 347 348

  return 0;
}

joreland@mysql.com's avatar
joreland@mysql.com committed
349 350 351 352 353 354
/**
 * Constats for theTupleKeyDefined[][0]
 */
#define SETBOUND_EQ 1
#define FAKE_PTR 2
#define API_PTR 3
355

joreland@mysql.com's avatar
joreland@mysql.com committed
356
#define WAITFOR_SCAN_TIMEOUT 120000
357

joreland@mysql.com's avatar
joreland@mysql.com committed
358 359
int
NdbScanOperation::executeCursor(int nodeId){
360
  NdbTransaction * tCon = theNdbCon;
joreland@mysql.com's avatar
joreland@mysql.com committed
361 362
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
363 364

  Uint32 magic = tCon->theMagicNumber;
joreland@mysql.com's avatar
joreland@mysql.com committed
365
  Uint32 seq = tCon->theNodeSequence;
366

joreland@mysql.com's avatar
joreland@mysql.com committed
367 368 369
  if (tp->get_node_alive(nodeId) &&
      (tp->getNodeSequence(nodeId) == seq)) {

370 371 372 373
    /**
     * Only call prepareSendScan first time (incase of restarts)
     *   - check with theMagicNumber
     */
joreland@mysql.com's avatar
joreland@mysql.com committed
374
    tCon->theMagicNumber = 0x37412619;
375 376 377 378
    if(magic != 0x37412619 && 
       prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
      return -1;
    
joreland@mysql.com's avatar
joreland@mysql.com committed
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
    
    if (doSendScan(nodeId) == -1)
      return -1;

    return 0;
  } else {
    if (!(tp->get_node_stopping(nodeId) &&
	  (tp->getNodeSequence(nodeId) == seq))){
      TRACE_DEBUG("The node is hard dead when attempting to start a scan");
      setErrorCode(4029);
      tCon->theReleaseOnClose = true;
    } else {
      TRACE_DEBUG("The node is stopping when attempting to start a scan");
      setErrorCode(4030);
    }//if
394
    tCon->theCommitStatus = NdbTransaction::Aborted;
joreland@mysql.com's avatar
joreland@mysql.com committed
395 396
  }//if
  return -1;
397 398
}

399

400
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
{
  int res;
  if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
    // handle blobs
    NdbBlob* tBlob = theBlobList;
    while (tBlob != 0) {
      if (tBlob->atNextResult() == -1)
        return -1;
      tBlob = tBlob->theNext;
    }
    /*
     * Flush blob part ops on behalf of user because
     * - nextResult is analogous to execute(NoCommit)
     * - user is likely to want blob value before next execute
     */
    if (m_transConnection->executePendingBlobOps() == -1)
      return -1;
    return 0;
  }
  return res;
}

int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
424
{
joreland@mysql.com's avatar
joreland@mysql.com committed
425
  if(m_ordered)
426 427
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
							       forceSend);
joreland@mysql.com's avatar
joreland@mysql.com committed
428 429 430 431 432 433 434
  
  /**
   * Check current receiver
   */
  int retVal = 2;
  Uint32 idx = m_current_api_receiver;
  Uint32 last = m_api_receivers_count;
435
  m_curr_row = 0;
436 437 438

  if(DEBUG_NEXT_RESULT)
    ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
joreland@mysql.com's avatar
joreland@mysql.com committed
439 440 441 442 443 444 445
  
  /**
   * Check next buckets
   */
  for(; idx < last; idx++){
    NdbReceiver* tRec = m_api_receivers[idx];
    if(tRec->nextResult()){
446
      m_curr_row = tRec->copyout(theReceiver);
joreland@mysql.com's avatar
joreland@mysql.com committed
447 448 449 450 451 452 453 454
      retVal = 0;
      break;
    }
  }
    
  /**
   * We have advanced atleast one bucket
   */
joreland@mysql.com's avatar
joreland@mysql.com committed
455
  if(!fetchAllowed || !retVal){
joreland@mysql.com's avatar
joreland@mysql.com committed
456
    m_current_api_receiver = idx;
457
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
joreland@mysql.com's avatar
joreland@mysql.com committed
458 459
    return retVal;
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
460
  
joreland@mysql.com's avatar
joreland@mysql.com committed
461 462 463
  Uint32 nodeId = theNdbCon->theDBnode;
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
464 465 466
  if(theError.code)
    return -1;

joreland@mysql.com's avatar
joreland@mysql.com committed
467
  Uint32 seq = theNdbCon->theNodeSequence;
468 469
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
							  forceSend) == 0){
joreland@mysql.com's avatar
joreland@mysql.com committed
470 471 472 473 474
      
    idx = m_current_api_receiver;
    last = m_api_receivers_count;
      
    do {
475 476
      if(theError.code){
	setErrorCode(theError.code);
477
	if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
478 479 480
	return -1;
      }
      
joreland@mysql.com's avatar
joreland@mysql.com committed
481 482
      Uint32 cnt = m_conf_receivers_count;
      Uint32 sent = m_sent_receivers_count;
483 484 485

      if(DEBUG_NEXT_RESULT)
	ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
joreland@mysql.com's avatar
joreland@mysql.com committed
486 487 488 489 490 491 492 493 494 495 496 497
	
      if(cnt > 0){
	/**
	 * Just move completed receivers
	 */
	memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
	last += cnt;
	m_conf_receivers_count = 0;
      } else if(retVal == 2 && sent > 0){
	/**
	 * No completed...
	 */
498 499
	theNdb->theImpl->theWaiter.m_node = nodeId;
	theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
joreland@mysql.com's avatar
joreland@mysql.com committed
500 501 502 503 504
	int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
	if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
	  continue;
	} else {
	  idx = last;
505
	  retVal = -2; //return_code;
joreland@mysql.com's avatar
joreland@mysql.com committed
506 507 508 509 510
	}
      } else if(retVal == 2){
	/**
	 * No completed & no sent -> EndOfData
	 */
511 512 513
	theError.code = -1; // make sure user gets error if he tries again
	if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
	return 1;
514
      }
joreland@mysql.com's avatar
joreland@mysql.com committed
515 516 517 518 519 520 521
	
      if(retVal == 0)
	break;
	
      for(; idx < last; idx++){
	NdbReceiver* tRec = m_api_receivers[idx];
	if(tRec->nextResult()){
522
	  m_curr_row = tRec->copyout(theReceiver);      
joreland@mysql.com's avatar
joreland@mysql.com committed
523 524 525
	  retVal = 0;
	  break;
	}
526
      }
joreland@mysql.com's avatar
joreland@mysql.com committed
527 528 529
    } while(retVal == 2);
  } else {
    retVal = -3;
530
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
531 532 533 534 535 536 537 538
    
  m_api_receivers_count = last;
  m_current_api_receiver = idx;
    
  switch(retVal){
  case 0:
  case 1:
  case 2:
539
    if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
joreland@mysql.com's avatar
joreland@mysql.com committed
540 541 542 543 544 545 546 547
    return retVal;
  case -1:
    setErrorCode(4008); // Timeout
    break;
  case -2:
    setErrorCode(4028); // Node fail
    break;
  case -3: // send_next_scan -> return fail (set error-code self)
joreland@mysql.com's avatar
joreland@mysql.com committed
548 549
    if(theError.code == 0)
      setErrorCode(4028); // seq changed = Node fail
joreland@mysql.com's avatar
joreland@mysql.com committed
550 551 552 553 554
    break;
  }
    
  theNdbCon->theTransactionIsStarted = false;
  theNdbCon->theReleaseOnClose = true;
555
  if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
joreland@mysql.com's avatar
joreland@mysql.com committed
556
  return -1;
557 558
}

joreland@mysql.com's avatar
joreland@mysql.com committed
559
int
560 561
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
				 bool forceSend){  
tomas@poseidon.ndb.mysql.com's avatar
Merge  
tomas@poseidon.ndb.mysql.com committed
562
  if(cnt > 0){
joreland@mysql.com's avatar
joreland@mysql.com committed
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
    NdbApiSignal tSignal(theNdb->theMyRef);
    tSignal.setSignal(GSN_SCAN_NEXTREQ);
    
    Uint32* theData = tSignal.getDataPtrSend();
    theData[0] = theNdbCon->theTCConPtr;
    theData[1] = stopScanFlag == true ? 1 : 0;
    Uint64 transId = theNdbCon->theTransactionId;
    theData[2] = transId;
    theData[3] = (Uint32) (transId >> 32);
    
    /**
     * Prepare ops
     */
    Uint32 last = m_sent_receivers_count;
    Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
578
    Uint32 sent = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
579 580
    for(Uint32 i = 0; i<cnt; i++){
      NdbReceiver * tRec = m_api_receivers[i];
581 582 583 584 585 586 587
      if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
      {
	m_sent_receivers[last+sent] = tRec;
	tRec->m_list_index = last+sent;
	tRec->prepareSend();
	sent++;
      }
joreland@mysql.com's avatar
joreland@mysql.com committed
588
    }
589 590
    memmove(m_api_receivers, m_api_receivers+cnt, 
	    (theParallelism-cnt) * sizeof(char*));
joreland@mysql.com's avatar
joreland@mysql.com committed
591
    
592 593 594 595 596
    int ret = 0;
    if(sent)
    {
      Uint32 nodeId = theNdbCon->theDBnode;
      TransporterFacade * tp = TransporterFacade::instance();
597
      if(cnt > 21){
598 599 600 601 602 603
	tSignal.setLength(4);
	LinearSectionPtr ptr[3];
	ptr[0].p = prep_array;
	ptr[0].sz = sent;
	ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
      } else {
604
	tSignal.setLength(4+sent);
605 606
	ret = tp->sendSignal(&tSignal, nodeId);
      }
joreland@mysql.com's avatar
joreland@mysql.com committed
607
    }
608
    
609 610
    if (!ret) checkForceSend(forceSend);

611
    m_sent_receivers_count = last + sent;
joreland@mysql.com's avatar
joreland@mysql.com committed
612 613
    m_api_receivers_count -= cnt;
    m_current_api_receiver = 0;
614
    
joreland@mysql.com's avatar
joreland@mysql.com committed
615
    return ret;
616
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
617
  return 0;
618 619
}

620 621 622 623 624 625 626 627 628
void NdbScanOperation::checkForceSend(bool forceSend)
{
  if (forceSend) {
    TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
  } else {
    TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
  }//if
}

629 630 631 632
int 
NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
{
  printf("NdbScanOperation::prepareSend\n");
joreland@mysql.com's avatar
joreland@mysql.com committed
633
  abort();
634 635 636 637 638 639 640 641 642 643
  return 0;
}

int 
NdbScanOperation::doSend(int ProcessorId)
{
  printf("NdbScanOperation::doSend\n");
  return 0;
}

644
void NdbScanOperation::close(bool forceSend, bool releaseOp)
645
{
646 647 648 649 650 651
  DBUG_ENTER("NdbScanOperation::close");
  DBUG_PRINT("enter", ("this=%x tcon=%x con=%x force=%d release=%d",
                       (UintPtr)this,
                       (UintPtr)m_transConnection, (UintPtr)theNdbCon,
                       forceSend, releaseOp));

652
  if(m_transConnection){
653
    if(DEBUG_NEXT_RESULT)
654
      ndbout_c("close() theError.code = %d "
655 656 657 658 659 660 661 662
	       "m_api_receivers_count = %d "
	       "m_conf_receivers_count = %d "
	       "m_sent_receivers_count = %d",
	       theError.code, 
	       m_api_receivers_count,
	       m_conf_receivers_count,
	       m_sent_receivers_count);
    
joreland@mysql.com's avatar
joreland@mysql.com committed
663 664
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
665
    close_impl(tp, forceSend);
joreland@mysql.com's avatar
joreland@mysql.com committed
666
    
667 668 669 670 671
  }

  NdbConnection* tCon = theNdbCon;
  NdbConnection* tTransCon = m_transConnection;
  theNdbCon = NULL;
joreland@mysql.com's avatar
joreland@mysql.com committed
672
  m_transConnection = NULL;
673 674 675 676 677 678 679 680

  if (releaseOp && tTransCon) {
    NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
    tTransCon->releaseExecutedScanOperation(tOp);
  }
  
  tCon->theScanningOp = 0;
  theNdb->closeTransaction(tCon);
681
  theNdb->theRemainingStartTransactions--;
682
  DBUG_VOID_RETURN;
joreland@mysql.com's avatar
joreland@mysql.com committed
683
}
684

joreland@mysql.com's avatar
joreland@mysql.com committed
685
void
686
NdbScanOperation::execCLOSE_SCAN_REP(){
joreland@mysql.com's avatar
joreland@mysql.com committed
687 688
  m_conf_receivers_count = 0;
  m_sent_receivers_count = 0;
689 690
}

joreland@mysql.com's avatar
joreland@mysql.com committed
691
void NdbScanOperation::release()
692
{
joreland@mysql.com's avatar
joreland@mysql.com committed
693
  if(theNdbCon != 0 || m_transConnection != 0){
694
    close();
joreland@mysql.com's avatar
joreland@mysql.com committed
695 696 697
  }
  for(Uint32 i = 0; i<m_allocated_receivers; i++){
    m_receivers[i]->release();
698
  }
699 700 701

  NdbOperation::release();
  
joreland@mysql.com's avatar
joreland@mysql.com committed
702 703 704 705 706
  if(theSCAN_TABREQ)
  {
    theNdb->releaseSignal(theSCAN_TABREQ);
    theSCAN_TABREQ = 0;
  }
707 708
}

joreland@mysql.com's avatar
joreland@mysql.com committed
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
/***************************************************************************
int prepareSendScan(Uint32 aTC_ConnectPtr,
                    Uint64 aTransactionId)

Return Value:   Return 0 : preparation of send was succesful.
                Return -1: In all other case.   
Parameters:     aTC_ConnectPtr: the Connect pointer to TC.
		aTransactionId:	the Transaction identity of the transaction.
Remark:         Puts the the final data into ATTRINFO signal(s)  after this 
                we know the how many signal to send and their sizes
***************************************************************************/
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
				      Uint64 aTransactionId){

  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
    setErrorCodeAbort(4005);
    return -1;
  }
729

joreland@mysql.com's avatar
joreland@mysql.com committed
730
  theErrorLine = 0;
731

joreland@mysql.com's avatar
joreland@mysql.com committed
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748
  // In preapareSendInterpreted we set the sizes (word 4-8) in the
  // first ATTRINFO signal.
  if (prepareSendInterpreted() == -1)
    return -1;
  
  if(m_ordered){
    ((NdbIndexScanOperation*)this)->fix_get_values();
  }
  
  theCurrentATTRINFO->setLength(theAI_LenInCurrAI);

  /**
   * Prepare all receivers
   */
  theReceiver.prepareSend();
  bool keyInfo = m_keyInfo;
  Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
  /**
   * The number of records sent by each LQH is calculated and the kernel
   * is informed of this number by updating the SCAN_TABREQ signal
   */
  Uint32 batch_size, batch_byte_size, first_batch_size;
  theReceiver.calculate_batch_size(key_size,
                                   theParallelism,
                                   batch_size,
                                   batch_byte_size,
                                   first_batch_size);
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
  req->batch_byte_size= batch_byte_size;
  req->first_batch_size= first_batch_size;

joreland@mysql.com's avatar
joreland@mysql.com committed
764 765 766 767 768 769 770 771
  /**
   * Set keyinfo flag
   *  (Always keyinfo when using blobs)
   */
  Uint32 reqInfo = req->requestInfo;
  ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
  req->requestInfo = reqInfo;
  
joreland@mysql.com's avatar
joreland@mysql.com committed
772
  for(Uint32 i = 0; i<theParallelism; i++){
773 774 775
    m_receivers[i]->do_get_value(&theReceiver, batch_size, 
				 key_size, 
				 m_read_range_no);
776
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
777
  return 0;
778 779
}

joreland@mysql.com's avatar
joreland@mysql.com committed
780
/*****************************************************************************
joreland@mysql.com's avatar
joreland@mysql.com committed
781 782 783 784 785 786
int doSend()

Return Value:   Return >0 : send was succesful, returns number of signals sent
                Return -1: In all other case.   
Parameters:     aProcessorId: Receiving processor node
Remark:         Sends the ATTRINFO signal(s)
joreland@mysql.com's avatar
joreland@mysql.com committed
787
*****************************************************************************/
joreland@mysql.com's avatar
joreland@mysql.com committed
788 789
int
NdbScanOperation::doSendScan(int aProcessorId)
790
{
joreland@mysql.com's avatar
joreland@mysql.com committed
791 792 793 794 795 796 797 798 799 800 801 802
  Uint32 tSignalCount = 0;
  NdbApiSignal* tSignal;
 
  if (theInterpretIndicator != 1 ||
      (theOperationType != OpenScanRequest &&
       theOperationType != OpenRangeScanRequest)) {
      setErrorCodeAbort(4005);
      return -1;
  }
  
  assert(theSCAN_TABREQ != NULL);
  tSignal = theSCAN_TABREQ;
803 804 805 806 807 808
  
  Uint32 tupKeyLen = theTupKeyLen;
  Uint32 len = theTotalNrOfKeyWordInSignal;
  Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
  Uint64 transId = theNdbCon->theTransactionId;
  
joreland@mysql.com's avatar
joreland@mysql.com committed
809 810 811 812
  // Update the "attribute info length in words" in SCAN_TABREQ before 
  // sending it. This could not be done in openScan because 
  // we created the ATTRINFO signals after the SCAN_TABREQ signal.
  ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
813
  req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
814 815 816
  Uint32 tmp = req->requestInfo;
  ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
  req->distributionKey = theDistributionKey;
817
  req->requestInfo = tmp;
818
  tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
819

joreland@mysql.com's avatar
joreland@mysql.com committed
820
  TransporterFacade *tp = TransporterFacade::instance();
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
821 822 823
  LinearSectionPtr ptr[3];
  ptr[0].p = m_prepared_receivers;
  ptr[0].sz = theParallelism;
824
  if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
825 826 827
    setErrorCode(4002);
    return -1;
  } 
828 829

  if (tupKeyLen > 0){
joreland@mysql.com's avatar
joreland@mysql.com committed
830
    // must have at least one signal since it contains attrLen for bounds
831 832 833 834
    assert(theLastKEYINFO != NULL);
    tSignal = theLastKEYINFO;
    tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
    
835 836
    assert(theSCAN_TABREQ->next() != NULL);
    tSignal = theSCAN_TABREQ->next();
837 838 839 840 841 842 843 844
    
    NdbApiSignal* last;
    do {
      KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
      keyInfo->connectPtr = aTC_ConnectPtr;
      keyInfo->transId[0] = Uint32(transId);
      keyInfo->transId[1] = Uint32(transId >> 32);
      
joreland@mysql.com's avatar
joreland@mysql.com committed
845
      if (tp->sendSignal(tSignal,aProcessorId) == -1){
846 847
	setErrorCode(4002);
	return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
848
      }
849
      
joreland@mysql.com's avatar
joreland@mysql.com committed
850
      tSignalCount++;
851
      last = tSignal;
joreland@mysql.com's avatar
joreland@mysql.com committed
852
      tSignal = tSignal->next();
853
    } while(last != theLastKEYINFO);
854
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
855 856 857
  
  tSignal = theFirstATTRINFO;
  while (tSignal != NULL) {
858 859 860 861 862
    AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
    attrInfo->connectPtr = aTC_ConnectPtr;
    attrInfo->transId[0] = Uint32(transId);
    attrInfo->transId[1] = Uint32(transId >> 32);
    
joreland@mysql.com's avatar
joreland@mysql.com committed
863 864 865 866 867 868 869 870
    if (tp->sendSignal(tSignal,aProcessorId) == -1){
      setErrorCode(4002);
      return -1;
    }
    tSignalCount++;
    tSignal = tSignal->next();
  }    
  theStatus = WaitResponse;  
joreland@mysql.com's avatar
joreland@mysql.com committed
871

872
  m_curr_row = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
873 874 875 876 877 878 879
  m_sent_receivers_count = theParallelism;
  if(m_ordered)
  {
    m_current_api_receiver = theParallelism;
    m_api_receivers_count = theParallelism;
  }
  
joreland@mysql.com's avatar
joreland@mysql.com committed
880 881 882
  return tSignalCount;
}//NdbOperation::doSendScan()

joreland@mysql.com's avatar
joreland@mysql.com committed
883
/*****************************************************************************
884
 * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
joreland@mysql.com's avatar
joreland@mysql.com committed
885
 *
886
 * Parameters:     The update transactions NdbTransaction pointer.
joreland@mysql.com's avatar
joreland@mysql.com committed
887 888 889 890 891 892 893 894 895
 * Return Value:   A reference to the transferred operation object 
 *                   or NULL if no success.
 * Remark:         Take over the scanning transactions NdbOperation 
 *                 object for a tuple to an update transaction, 
 *                 which is the last operation read in nextScanResult()
 *		   (theNdbCon->thePreviousScanRec)
 *
 *     FUTURE IMPLEMENTATION:   (This note was moved from header file.)
 *     In the future, it will even be possible to transfer 
896 897
 *     to a NdbTransaction on another Ndb-object.  
 *     In this case the receiving NdbTransaction-object must call 
joreland@mysql.com's avatar
joreland@mysql.com committed
898 899 900 901
 *     a method receiveOpFromScan to actually receive the information.  
 *     This means that the updating transactions can be placed
 *     in separate threads and thus increasing the parallelism during
 *     the scan process. 
joreland@mysql.com's avatar
joreland@mysql.com committed
902
 ****************************************************************************/
joreland@mysql.com's avatar
joreland@mysql.com committed
903 904 905
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
906 907 908
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
joreland@mysql.com's avatar
joreland@mysql.com committed
909 910 911 912 913 914 915
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
    memcpy(data, src, 4*size);
    return 0;
  }
  return -1;
}

joreland@mysql.com's avatar
joreland@mysql.com committed
916
NdbOperation*
917 918
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
{
joreland@mysql.com's avatar
joreland@mysql.com committed
919
  
920 921 922
  NdbRecAttr * tRecAttr = m_curr_row;
  if(tRecAttr)
  {
joreland@mysql.com's avatar
joreland@mysql.com committed
923 924 925 926
    NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
    if (newOp == NULL){
      return NULL;
    }
927
    pTrans->theSimpleState = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
928 929 930 931 932 933 934 935 936 937 938 939
    
    const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;

    newOp->theTupKeyLen = len;
    newOp->theOperationType = opType;
    if (opType == DeleteRequest) {
      newOp->theStatus = GetValue;  
    } else {
      newOp->theStatus = SetValue;  
    }
    
    const Uint32 * src = (Uint32*)tRecAttr->aRef();
mronstrom@mysql.com's avatar
mronstrom@mysql.com committed
940
    const Uint32 tScanInfo = src[len] & 0x3FFFF;
joreland@mysql.com's avatar
joreland@mysql.com committed
941
    const Uint32 tTakeOverFragment = src[len] >> 20;
joreland@mysql.com's avatar
joreland@mysql.com committed
942 943 944
    {
      UintR scanInfo = 0;
      TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
joreland@mysql.com's avatar
joreland@mysql.com committed
945
      TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
joreland@mysql.com's avatar
joreland@mysql.com committed
946 947
      TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
      newOp->theScanInfo = scanInfo;
948 949
      newOp->theDistrKeyIndicator_ = 1;
      newOp->theDistributionKey = tTakeOverFragment;
joreland@mysql.com's avatar
joreland@mysql.com committed
950
    }
951

joreland@mysql.com's avatar
joreland@mysql.com committed
952 953 954 955 956 957 958 959 960
    // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
    TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
    Uint32 i = 0;
    for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
      tcKeyReq->keyInfo[i] = * src++;
    }
    
    if(i < len){
      NdbApiSignal* tSignal = theNdb->getSignal();
961
      newOp->theTCREQ->next(tSignal); 
joreland@mysql.com's avatar
joreland@mysql.com committed
962 963 964 965 966 967 968 969 970 971 972 973
      
      Uint32 left = len - i;
      while(tSignal && left > KeyInfo::DataLength){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
	src += KeyInfo::DataLength;
	left -= KeyInfo::DataLength;

	tSignal->next(theNdb->getSignal());
	tSignal = tSignal->next();
      }
974

joreland@mysql.com's avatar
joreland@mysql.com committed
975 976 977 978
      if(tSignal && left > 0){
	tSignal->setSignal(GSN_KEYINFO);
	KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
	memcpy(keyInfo->keyData, src, 4 * left);
joreland@mysql.com's avatar
joreland@mysql.com committed
979 980 981 982 983 984 985 986 987 988 989
      }      
    }
    // create blob handles automatically
    if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
      for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
	NdbColumnImpl* c = m_currentTable->m_columns[i];
	assert(c != 0);
	if (c->getBlobType()) {
	  if (newOp->getBlobHandle(pTrans, c) == NULL)
	    return NULL;
	}
joreland@mysql.com's avatar
joreland@mysql.com committed
990 991
      }
    }
joreland@mysql.com's avatar
joreland@mysql.com committed
992
    
joreland@mysql.com's avatar
joreland@mysql.com committed
993
    return newOp;
994
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
995
  return 0;
996 997
}

joreland@mysql.com's avatar
joreland@mysql.com committed
998 999 1000
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
joreland@mysql.com's avatar
joreland@mysql.com committed
1001
  m_keyInfo = 1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1002 1003 1004 1005 1006 1007 1008
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrName));
}

NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
joreland@mysql.com's avatar
joreland@mysql.com committed
1009
  m_keyInfo = 1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1010 1011 1012 1013
  return NdbOperation::getBlobHandle(m_transConnection, 
				     m_currentTable->getColumn(anAttrId));
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1014
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
1015
  : NdbScanOperation(aNdb, NdbOperation::OrderedIndexScan)
1016
{
joreland@mysql.com's avatar
joreland@mysql.com committed
1017
}
1018

joreland@mysql.com's avatar
joreland@mysql.com committed
1019
NdbIndexScanOperation::~NdbIndexScanOperation(){
1020 1021
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1022
int
1023 1024
NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
				const void* aValue, Uint32 len)
1025
{
joreland@mysql.com's avatar
joreland@mysql.com committed
1026
  return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
1027 1028
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1029
int
1030 1031
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
				const void* aValue, Uint32 len)
1032
{
joreland@mysql.com's avatar
joreland@mysql.com committed
1033 1034
  return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
}
1035

joreland@mysql.com's avatar
joreland@mysql.com committed
1036 1037 1038 1039 1040
int
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
				  const char* aValue, 
				  Uint32 len){
  return setBound(anAttrObject, BoundEQ, aValue, len);
1041 1042
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1043 1044 1045
NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, 
				     char* aValue){
1046
  if(!m_ordered){
joreland@mysql.com's avatar
joreland@mysql.com committed
1047 1048
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
1049 1050 1051 1052 1053 1054 1055 1056 1057
  
  int id = attrInfo->m_attrId;                // In "real" table
  assert(m_accessTable->m_index);
  int sz = (int)m_accessTable->m_index->m_key_ids.size();
  if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  }
  
  assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
joreland@mysql.com's avatar
joreland@mysql.com committed
1058
  Uint32 marker = theTupleKeyDefined[id][0];
1059
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1060 1061 1062 1063
  if(marker == SETBOUND_EQ){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
  } else if(marker == API_PTR){
    return NdbScanOperation::getValue_impl(attrInfo, aValue);
1064
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
1065
  
1066 1067
  assert(marker == FAKE_PTR);
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1068 1069 1070 1071 1072 1073 1074 1075 1076
  UintPtr oldVal;
  oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8)
  oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
#endif
  theTupleKeyDefined[id][0] = API_PTR;

  NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
  tmp->setup(attrInfo, aValue);
1077

joreland@mysql.com's avatar
joreland@mysql.com committed
1078
  return tmp;
1079 1080
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1081 1082 1083 1084 1085 1086 1087
#include <AttributeHeader.hpp>
/*
 * Define bound on index column in range scan.
 */
int
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
				int type, const void* aValue, Uint32 len)
1088
{
1089 1090 1091 1092 1093
  if (!tAttrInfo)
  {
    setErrorCodeAbort(4318);    // Invalid attribute
    return -1;
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
1094 1095 1096
  if (theOperationType == OpenRangeScanRequest &&
      (0 <= type && type <= 4) &&
      len <= 8000) {
1097
    // insert bound type
1098 1099
    Uint32 currLen = theTotalNrOfKeyWordInSignal;
    Uint32 remaining = KeyInfo::DataLength - currLen;
joreland@mysql.com's avatar
joreland@mysql.com committed
1100
    Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
1101
    bool tDistrKey = tAttrInfo->m_distributionKey;
1102

1103 1104 1105 1106 1107 1108
    len = aValue != NULL ? sizeInBytes : 0;
    if (len != sizeInBytes && (len != 0)) {
      setErrorCodeAbort(4209);
      return -1;
    }

1109
    // insert attribute header
joreland@mysql.com's avatar
joreland@mysql.com committed
1110 1111 1112
    Uint32 tIndexAttrId = tAttrInfo->m_attrId;
    Uint32 sizeInWords = (len + 3) / 4;
    AttributeHeader ah(tIndexAttrId, sizeInWords);
1113 1114
    const Uint32 ahValue = ah.m_value;

1115
    const Uint32 align = (UintPtr(aValue) & 7);
1116 1117 1118
    const bool aligned = (tDistrKey && type == BoundEQ) ? 
      (align == 0) : (align & 3) == 0;

1119 1120 1121
    const bool nobytes = (len & 0x3) == 0;
    const Uint32 totalLen = 2 + sizeInWords;
    Uint32 tupKeyLen = theTupKeyLen;
1122
    if(remaining > totalLen && aligned && nobytes){
1123 1124 1125 1126 1127 1128 1129
      Uint32 * dst = theKEYINFOptr + currLen;
      * dst ++ = type;
      * dst ++ = ahValue;
      memcpy(dst, aValue, 4 * sizeInWords);
      theTotalNrOfKeyWordInSignal = currLen + totalLen;
    } else {
      if(!aligned || !nobytes){
1130
        Uint32 tempData[2000];
1131 1132
	tempData[0] = type;
	tempData[1] = ahValue;
1133
	tempData[2 + (len >> 2)] = 0;
1134
        memcpy(tempData+2, aValue, len);
1135
	
1136 1137 1138 1139 1140
	insertBOUNDS(tempData, 2+sizeInWords);
      } else {
	Uint32 buf[2] = { type, ahValue };
	insertBOUNDS(buf, 2);
	insertBOUNDS((Uint32*)aValue, sizeInWords);
1141
      }
joreland@mysql.com's avatar
joreland@mysql.com committed
1142
    }
1143
    theTupKeyLen = tupKeyLen + totalLen;
1144

joreland@mysql.com's avatar
joreland@mysql.com committed
1145 1146 1147 1148 1149 1150 1151 1152 1153
    /**
     * Do sorted stuff
     */

    /**
     * The primary keys for an ordered index is defined in the beginning
     * so it's safe to use [tIndexAttrId] 
     * (instead of looping as is NdbOperation::equal_impl)
     */
1154 1155 1156 1157
    if(type == BoundEQ && tDistrKey)
    {
      theNoOfTupKeyLeft--;
      return handle_distribution_key((Uint64*)aValue, sizeInWords);
joreland@mysql.com's avatar
joreland@mysql.com committed
1158 1159 1160 1161 1162
    }
    return 0;
  } else {
    setErrorCodeAbort(4228);    // XXX wrong code
    return -1;
1163 1164 1165
  }
}

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
int
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
  Uint32 len;
  Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
  Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
  do {
    len = (sz < remaining ? sz : remaining);
    memcpy(dst, data, 4 * len);
    
    if(sz >= remaining){
      NdbApiSignal* tCurr = theLastKEYINFO;
      tCurr->setLength(KeyInfo::MaxSignalLength);
      NdbApiSignal* tSignal = tCurr->next();
      if(tSignal)
	;
      else if((tSignal = theNdb->getSignal()) != 0)
      {
	tCurr->next(tSignal);
	tSignal->setSignal(GSN_KEYINFO);
      } else {
	goto error;
      }
      theLastKEYINFO = tSignal;
      theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
      remaining = KeyInfo::DataLength;
      sz -= len;
      data += len;
    } else {
      len = (KeyInfo::DataLength - remaining) + len;
      break;
    }
1197
  } while(true);   
1198 1199 1200 1201 1202 1203 1204 1205
  theTotalNrOfKeyWordInSignal = len;
  return 0;

error:
  setErrorCodeAbort(4228);    // XXX wrong code
  return -1;
}

1206
int
joreland@mysql.com's avatar
joreland@mysql.com committed
1207
NdbIndexScanOperation::readTuples(LockMode lm,
1208 1209 1210 1211 1212 1213 1214 1215
				  Uint32 scan_flags,
				  Uint32 parallel)
{
  const bool order_by = scan_flags & SF_OrderBy;
  const bool order_desc = scan_flags & SF_Descending;
  const bool read_range_no = scan_flags & SF_ReadRangeNo;

  int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
1216
  if(!res && read_range_no)
1217 1218 1219 1220 1221
  {
    m_read_range_no = 1;
    Uint32 word = 0;
    AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
    if(insertATTRINFO(word) == -1)
1222
      res = -1;
1223
  }
1224
  if(!res && order_by){
1225 1226 1227 1228 1229 1230
    m_ordered = true;
    if (order_desc) {
      m_descending = true;
      ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
      ScanTabReq::setDescendingFlag(req->requestInfo, true);
    }
1231 1232
    Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
    m_sort_columns = cnt; // -1 for NDB$NODE
joreland@mysql.com's avatar
joreland@mysql.com committed
1233
    m_current_api_receiver = m_sent_receivers_count;
1234
    m_api_receivers_count = m_sent_receivers_count;
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
    
    m_sort_columns = cnt;
    for(Uint32 i = 0; i<cnt; i++){
      const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
      const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
      NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
      UintPtr newVal = UintPtr(tmp);
      theTupleKeyDefined[i][0] = FAKE_PTR;
      theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
      theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
    }
joreland@mysql.com's avatar
joreland@mysql.com committed
1248
  }
1249 1250 1251
  m_this_bound_start = 0;
  m_first_bound_word = theKEYINFOptr;
  
1252
  return res;
joreland@mysql.com's avatar
joreland@mysql.com committed
1253
}
1254

joreland@mysql.com's avatar
joreland@mysql.com committed
1255 1256 1257 1258 1259 1260
void
NdbIndexScanOperation::fix_get_values(){
  /**
   * Loop through all getValues and set buffer pointer to "API" pointer
   */
  NdbRecAttr * curr = theReceiver.theFirstRecAttr;
1261
  Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1262
  assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
1263
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1264 1265
  const NdbIndexImpl * idx = m_accessTable->m_index;
  const NdbTableImpl * tab = m_currentTable;
1266 1267 1268
  for(Uint32 i = 0; i<cnt; i++){
    Uint32 val = theTupleKeyDefined[i][0];
    switch(val){
1269 1270
    case FAKE_PTR:
      curr->setup(curr->m_column, 0);
1271
    case API_PTR:
1272 1273
      curr = curr->next();
      break;
1274 1275
    case SETBOUND_EQ:
      break;
joreland@mysql.com's avatar
joreland@mysql.com committed
1276
#ifdef VM_TRACE
1277 1278
    default:
      abort();
joreland@mysql.com's avatar
joreland@mysql.com committed
1279 1280
#endif
    }
1281 1282 1283
  }
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1284 1285 1286 1287 1288 1289 1290
int
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 
			       const NdbReceiver* t1, 
			       const NdbReceiver* t2){

  NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
  NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
1291

joreland@mysql.com's avatar
joreland@mysql.com committed
1292 1293
  r1 = (skip ? r1->next() : r1);
  r2 = (skip ? r2->next() : r2);
1294 1295
  const int jdir = 1 - 2 * (int)m_descending;
  assert(jdir == 1 || jdir == -1);
joreland@mysql.com's avatar
joreland@mysql.com committed
1296 1297 1298 1299 1300
  while(cols > 0){
    Uint32 * d1 = (Uint32*)r1->aRef();
    Uint32 * d2 = (Uint32*)r2->aRef();
    unsigned r1_null = r1->isNULL();
    if((r1_null ^ (unsigned)r2->isNULL())){
1301
      return (r1_null ? -1 : 1) * jdir;
joreland@mysql.com's avatar
joreland@mysql.com committed
1302
    }
1303
    const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
1304
    Uint32 len = r1->theAttrSize * r1->theArraySize;
joreland@mysql.com's avatar
joreland@mysql.com committed
1305
    if(!r1_null){
1306
      const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
1307
      int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
joreland@mysql.com's avatar
joreland@mysql.com committed
1308 1309
      if(r){
	assert(r != NdbSqlUtil::CmpUnknown);
1310
	return r * jdir;
joreland@mysql.com's avatar
joreland@mysql.com committed
1311 1312 1313 1314 1315
      }
    }
    cols--;
    r1 = r1->next();
    r2 = r2->next();
1316
  }
joreland@mysql.com's avatar
joreland@mysql.com committed
1317
  return 0;
1318 1319
}

joreland@mysql.com's avatar
joreland@mysql.com committed
1320
int
1321 1322
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
					   bool forceSend){
joreland@mysql.com's avatar
joreland@mysql.com committed
1323
  
1324
  m_curr_row = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
1325
  Uint32 u_idx = 0, u_last = 0;
1326
  Uint32 s_idx   = m_current_api_receiver; // first sorted
joreland@mysql.com's avatar
joreland@mysql.com committed
1327 1328 1329
  Uint32 s_last  = theParallelism;         // last sorted

  NdbReceiver** arr = m_api_receivers;
1330
  NdbReceiver* tRec = arr[s_idx];
joreland@mysql.com's avatar
joreland@mysql.com committed
1331 1332 1333
  
  if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
				 fetchAllowed, 
1334
				 (s_idx < s_last ? tRec->nextResult() : 0));
joreland@mysql.com's avatar
joreland@mysql.com committed
1335 1336 1337 1338
  
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
1339 1340 1341
  
  bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1342 1343 1344 1345 1346
  if(fetchNeeded){
    if(fetchAllowed){
      if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
      TransporterFacade* tp = TransporterFacade::instance();
      Guard guard(tp->theMutexPtr);
1347 1348
      if(theError.code)
	return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1349 1350
      Uint32 seq = theNdbCon->theNodeSequence;
      Uint32 nodeId = theNdbCon->theDBnode;
1351 1352
      if(seq == tp->getNodeSequence(nodeId) &&
	 !send_next_scan_ordered(s_idx, forceSend)){
joreland@mysql.com's avatar
joreland@mysql.com committed
1353
	Uint32 tmp = m_sent_receivers_count;
1354
	s_idx = m_current_api_receiver; 
1355
	while(m_sent_receivers_count > 0 && !theError.code){
1356 1357
	  theNdb->theImpl->theWaiter.m_node = nodeId;
	  theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
joreland@mysql.com's avatar
joreland@mysql.com committed
1358 1359 1360 1361
	  int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
	  if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
	    continue;
	  }
1362
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
1363 1364 1365 1366 1367 1368 1369
	  setErrorCode(4028);
	  return -1;
	}
	
	if(theError.code){
	  setErrorCode(theError.code);
	  if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
joreland@mysql.com's avatar
joreland@mysql.com committed
1370 1371 1372 1373 1374 1375 1376 1377 1378
	  return -1;
	}
	
	u_idx = 0;
	u_last = m_conf_receivers_count;
	m_conf_receivers_count = 0;
	memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
	
	if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
1379 1380 1381
      } else {
	setErrorCode(4028);
	return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1382 1383
      }
    } else {
1384
      if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
joreland@mysql.com's avatar
joreland@mysql.com committed
1385 1386
      return 2;
    }
1387 1388 1389 1390
  } else {
    u_idx = s_idx;
    u_last = s_idx + 1;
    s_idx++;
joreland@mysql.com's avatar
joreland@mysql.com committed
1391
  }
1392
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1393 1394 1395 1396 1397
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);


1398
  Uint32 cols = m_sort_columns + m_read_range_no;
joreland@mysql.com's avatar
joreland@mysql.com committed
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
  Uint32 skip = m_keyInfo;
  while(u_idx < u_last){
    u_last--;
    tRec = arr[u_last];
    
    // Do binary search instead to find place
    Uint32 place = s_idx;
    for(; place < s_last; place++){
      if(compare(skip, cols, tRec, arr[place]) <= 0){
	break;
      }
    }
    
    if(place != s_idx){
      if(DEBUG_NEXT_RESULT) 
	ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
      memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
    }
    
    if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
    m_api_receivers[place-1] = tRec;
    s_idx--;
1421 1422
  }

joreland@mysql.com's avatar
joreland@mysql.com committed
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
  if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
				 u_idx, u_last,
				 s_idx, s_last);
  
  m_current_api_receiver = s_idx;
  
  if(DEBUG_NEXT_RESULT)
    for(Uint32 i = s_idx; i<s_last; i++)
      ndbout_c("%p", arr[i]);
  
  tRec = m_api_receivers[s_idx];    
  if(s_idx < s_last && tRec->nextResult()){
1435
    m_curr_row = tRec->copyout(theReceiver);      
1436
    if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
joreland@mysql.com's avatar
joreland@mysql.com committed
1437
    return 0;
1438 1439
  }

1440 1441 1442
  theError.code = -1;
  if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
  return 1;
1443 1444
}

1445
int
1446
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){  
joreland@mysql.com's avatar
joreland@mysql.com committed
1447 1448 1449
  if(idx == theParallelism)
    return 0;
  
1450
  NdbReceiver* tRec = m_api_receivers[idx];
joreland@mysql.com's avatar
joreland@mysql.com committed
1451 1452 1453
  NdbApiSignal tSignal(theNdb->theMyRef);
  tSignal.setSignal(GSN_SCAN_NEXTREQ);
  
1454
  Uint32 last = m_sent_receivers_count;
joreland@mysql.com's avatar
joreland@mysql.com committed
1455
  Uint32* theData = tSignal.getDataPtrSend();
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
  Uint32* prep_array = theData + 4;
  
  m_current_api_receiver = idx + 1;
  if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
  {
    if(DEBUG_NEXT_RESULT)
      ndbout_c("receiver completed, don't send");
    return 0;
  }
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
  theData[0] = theNdbCon->theTCConPtr;
  theData[1] = 0;
  Uint64 transId = theNdbCon->theTransactionId;
  theData[2] = transId;
  theData[3] = (Uint32) (transId >> 32);
  
  /**
   * Prepare ops
   */
  m_sent_receivers[last] = tRec;
  tRec->m_list_index = last;
  tRec->prepareSend();
  m_sent_receivers_count = last + 1;
1479
  
joreland@mysql.com's avatar
joreland@mysql.com committed
1480 1481 1482
  Uint32 nodeId = theNdbCon->theDBnode;
  TransporterFacade * tp = TransporterFacade::instance();
  tSignal.setLength(4+1);
1483 1484 1485
  int ret= tp->sendSignal(&tSignal, nodeId);
  if (!ret) checkForceSend(forceSend);
  return ret;
1486
}
joreland@mysql.com's avatar
joreland@mysql.com committed
1487 1488

int
1489
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
joreland@mysql.com's avatar
joreland@mysql.com committed
1490 1491 1492
  Uint32 seq = theNdbCon->theNodeSequence;
  Uint32 nodeId = theNdbCon->theDBnode;
  
1493 1494
  if(seq != tp->getNodeSequence(nodeId))
  {
joreland@mysql.com's avatar
joreland@mysql.com committed
1495 1496 1497
    theNdbCon->theReleaseOnClose = true;
    return -1;
  }
1498
  
1499 1500 1501 1502 1503
  /**
   * Wait for outstanding
   */
  while(theError.code == 0 && m_sent_receivers_count)
  {
1504 1505
    theNdb->theImpl->theWaiter.m_node = nodeId;
    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
joreland@mysql.com's avatar
joreland@mysql.com committed
1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1516
      theNdbCon->theReleaseOnClose = true;
joreland@mysql.com's avatar
joreland@mysql.com committed
1517 1518 1519 1520
      return -1;
    }
  }

1521 1522 1523 1524 1525 1526 1527
  if(theError.code)
  {
    m_api_receivers_count = 0;
    m_current_api_receiver = m_ordered ? theParallelism : 0;
  }


1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562
  /**
   * move all conf'ed into api
   *   so that send_next_scan can check if they needs to be closed
   */
  Uint32 api = m_api_receivers_count;
  Uint32 conf = m_conf_receivers_count;

  if(m_ordered)
  {
    /**
     * Ordered scan, keep the m_api_receivers "to the right"
     */
    memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 
	    (theParallelism - m_current_api_receiver) * sizeof(char*));
    api = (theParallelism - m_current_api_receiver);
    m_api_receivers_count = api;
  }
  
  if(DEBUG_NEXT_RESULT)
    ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
	     m_ordered, api, conf, 
	     m_sent_receivers_count, m_current_api_receiver, theParallelism);
  
  if(api+conf)
  {
    /**
     * There's something to close
     *   setup m_api_receivers (for send_next_scan)
     */
    memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
    m_api_receivers_count = api + conf;
    m_conf_receivers_count = 0;
  }
  
  // Send close scan
tomas@poseidon.ndb.mysql.com's avatar
Merge  
tomas@poseidon.ndb.mysql.com committed
1563
  if(send_next_scan(api+conf, true, forceSend) == -1)
1564 1565 1566
  {
    theNdbCon->theReleaseOnClose = true;
    return -1;
joreland@mysql.com's avatar
joreland@mysql.com committed
1567 1568 1569 1570 1571
  }
  
  /**
   * wait for close scan conf
   */
1572 1573
  while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
  {
1574 1575
    theNdb->theImpl->theWaiter.m_node = nodeId;
    theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
joreland@mysql.com's avatar
joreland@mysql.com committed
1576 1577 1578 1579 1580 1581 1582 1583 1584 1585
    int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
    switch(return_code){
    case 0:
      break;
    case -1:
      setErrorCode(4008);
    case -2:
      m_api_receivers_count = 0;
      m_conf_receivers_count = 0;
      m_sent_receivers_count = 0;
1586
      theNdbCon->theReleaseOnClose = true;
joreland@mysql.com's avatar
joreland@mysql.com committed
1587 1588 1589
      return -1;
    }
  }
1590
  
1591 1592
  return 0;
}
joreland@mysql.com's avatar
joreland@mysql.com committed
1593

1594 1595
void
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
joreland@mysql.com's avatar
joreland@mysql.com committed
1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606
  for(Uint32 i = 0; i<parallell; i++){
    m_receivers[i]->m_list_index = i;
    m_prepared_receivers[i] = m_receivers[i]->getId();
    m_sent_receivers[i] = m_receivers[i];
    m_conf_receivers[i] = 0;
    m_api_receivers[i] = 0;
    m_receivers[i]->prepareSend();
  }
  
  m_api_receivers_count = 0;
  m_current_api_receiver = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
1607
  m_sent_receivers_count = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
1608
  m_conf_receivers_count = 0;
1609 1610 1611
}

int
1612
NdbScanOperation::restart(bool forceSend)
1613 1614 1615 1616 1617 1618 1619 1620
{
  
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
  Uint32 nodeId = theNdbCon->theDBnode;
  
  {
    int res;
1621
    if((res= close_impl(tp, forceSend)))
1622 1623 1624 1625 1626 1627 1628 1629 1630
    {
      return res;
    }
  }
  
  /**
   * Reset receivers
   */
  reset_receivers(theParallelism, m_ordered);
joreland@mysql.com's avatar
joreland@mysql.com committed
1631
  
1632
  theError.code = 0;
joreland@mysql.com's avatar
joreland@mysql.com committed
1633 1634 1635 1636 1637
  if (doSendScan(nodeId) == -1)
    return -1;
  
  return 0;
}
1638 1639

int
1640
NdbIndexScanOperation::reset_bounds(bool forceSend){
1641 1642 1643 1644 1645
  int res;
  
  {
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
1646
    res= close_impl(tp, forceSend);
1647 1648 1649 1650
  }

  if(!res)
  {
1651
    theError.code = 0;
1652 1653
    reset_receivers(theParallelism, m_ordered);
    
1654 1655
    theLastKEYINFO = theSCAN_TABREQ->next();
    theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
1656 1657
    theTupKeyLen = 0;
    theTotalNrOfKeyWordInSignal = 0;
1658 1659
    theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
    theDistrKeyIndicator_ = 0;
1660 1661
    m_this_bound_start = 0;
    m_first_bound_word = theKEYINFOptr;
1662
    m_transConnection
1663
      ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
1664
		    this);
1665 1666 1667 1668 1669
    m_transConnection->define_scan_op(this);
    return 0;
  }
  return res;
}
1670 1671

int
1672
NdbIndexScanOperation::end_of_bound(Uint32 no)
1673
{
1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
  if(no < (1 << 13)) // Only 12-bits no of ranges
  {
    Uint32 bound_head = * m_first_bound_word;
    bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
    * m_first_bound_word = bound_head;
    
    m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
    m_this_bound_start = theTupKeyLen;
    return 0;
  }
  return -1;
}

1687
int
1688 1689
NdbIndexScanOperation::get_range_no()
{
1690 1691
  NdbRecAttr* tRecAttr = m_curr_row;
  if(m_read_range_no && tRecAttr)
1692
  {
1693 1694 1695 1696
    if(m_keyInfo)
      tRecAttr = tRecAttr->next();
    Uint32 ret = *(Uint32*)tRecAttr->aRef();
    return ret;
1697
  }
1698
  return -1;
1699
}