Commit 7a8336be authored by unknown's avatar unknown

wl2135 - index restart


mysql-test/r/ndb_index_ordered.result:
  test case
mysql-test/t/ndb_index_ordered.test:
  test case
ndb/include/kernel/signaldata/ScanTab.hpp:
  Split exclusive/keyinfo
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Add possibility retreive sorted flag
ndb/include/ndbapi/NdbOperation.hpp:
  Add possibility retreive lock mode
ndb/include/ndbapi/NdbResultSet.hpp:
  Add possibility to get operation
ndb/src/common/debugger/signaldata/ScanTab.cpp:
  Split exclusive/keyinfo
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  Store flags in ScanFragReq::requestInfo format
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Store flags in ScanFragReq::requestInfo format
ndb/src/ndbapi/NdbOperationDefine.cpp:
  Keep theLockMode up-to-date
ndb/src/ndbapi/NdbScanOperation.cpp:
  Keep theLockMode up-to-date
sql/ha_ndbcluster.cc:
  Use NdbIndexScanOperation::reset_bounds when
    performing second index access
parent 7cf1c4d5
...@@ -37,6 +37,14 @@ a b c ...@@ -37,6 +37,14 @@ a b c
1 2 3 1 2 3
2 3 5 2 3 5
3 4 6 3 4 6
select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
a b c
6 7 2
5 6 2
1 2 3
2 3 5
3 4 6
4 5 8
update t1 set c = 3 where b = 3; update t1 set c = 3 where b = 3;
select * from t1 order by a; select * from t1 order by a;
a b c a b c
......
...@@ -23,6 +23,9 @@ select * from t1 where b > 4 order by b; ...@@ -23,6 +23,9 @@ select * from t1 where b > 4 order by b;
select * from t1 where b < 4 order by b; select * from t1 where b < 4 order by b;
select * from t1 where b <= 4 order by b; select * from t1 where b <= 4 order by b;
# Test of reset_bounds
select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
# #
# Here we should add some "explain select" to verify that the ordered index is # Here we should add some "explain select" to verify that the ordered index is
# used for these queries. # used for these queries.
......
...@@ -74,6 +74,7 @@ private: ...@@ -74,6 +74,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo); static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo); static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo); static Uint8 getRangeScanFlag(const UintR & requestInfo);
static Uint8 getKeyinfoFlag(const UintR & requestInfo);
static Uint16 getScanBatch(const UintR & requestInfo); static Uint16 getScanBatch(const UintR & requestInfo);
/** /**
...@@ -85,6 +86,7 @@ private: ...@@ -85,6 +86,7 @@ private:
static void setHoldLockFlag(UintR & requestInfo, Uint32 flag); static void setHoldLockFlag(UintR & requestInfo, Uint32 flag);
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag); static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag); static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
static void setScanBatch(Uint32& requestInfo, Uint32 sz); static void setScanBatch(Uint32& requestInfo, Uint32 sz);
}; };
...@@ -95,12 +97,13 @@ private: ...@@ -95,12 +97,13 @@ private:
l = Lock mode - 1 Bit 8 l = Lock mode - 1 Bit 8
h = Hold lock mode - 1 Bit 10 h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11 c = Read Committed - 1 Bit 11
k = Keyinfo - 1 Bit 12
x = Range Scan (TUX) - 1 Bit 15 x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 10 Bit 16-25 (max 1023) b = Scan batch - 10 Bit 16-25 (max 1023)
1111111111222222222233 1111111111222222222233
01234567890123456789012345678901 01234567890123456789012345678901
ppppppppl hc xbbbbbbbbbb ppppppppl hck xbbbbbbbbbb
*/ */
#define PARALLELL_SHIFT (0) #define PARALLELL_SHIFT (0)
...@@ -112,6 +115,9 @@ private: ...@@ -112,6 +115,9 @@ private:
#define HOLD_LOCK_SHIFT (10) #define HOLD_LOCK_SHIFT (10)
#define HOLD_LOCK_MASK (1) #define HOLD_LOCK_MASK (1)
#define KEYINFO_SHIFT (12)
#define KEYINFO_MASK (1)
#define READ_COMMITTED_SHIFT (11) #define READ_COMMITTED_SHIFT (11)
#define READ_COMMITTED_MASK (1) #define READ_COMMITTED_MASK (1)
...@@ -206,6 +212,20 @@ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ ...@@ -206,6 +212,20 @@ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
requestInfo |= (flag << SCAN_BATCH_SHIFT); requestInfo |= (flag << SCAN_BATCH_SHIFT);
} }
inline
Uint8
ScanTabReq::getKeyinfoFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> KEYINFO_SHIFT) & KEYINFO_MASK);
}
inline
void
ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){
ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag");
requestInfo |= (flag << KEYINFO_SHIFT);
}
/** /**
* *
* SENDER: Dbtc * SENDER: Dbtc
......
...@@ -125,6 +125,7 @@ public: ...@@ -125,6 +125,7 @@ public:
*/ */
int reset_bounds(); int reset_bounds();
bool getSorted() const { return m_ordered; }
private: private:
NdbIndexScanOperation(Ndb* aNdb); NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation(); virtual ~NdbIndexScanOperation();
......
...@@ -717,6 +717,8 @@ public: ...@@ -717,6 +717,8 @@ public:
NotDefined ///< Internal for debugging NotDefined ///< Internal for debugging
}; };
LockMode getLockMode() const { return theLockMode; }
protected: protected:
/****************************************************************************** /******************************************************************************
* These are the methods used to create and delete the NdbOperation objects. * These are the methods used to create and delete the NdbOperation objects.
...@@ -893,7 +895,7 @@ protected: ...@@ -893,7 +895,7 @@ protected:
// currently defined // currently defined
OperationType theOperationType; // Read Request, Update Req...... OperationType theOperationType; // Read Request, Update Req......
Uint8 theLockMode; // Can be set to WRITE if read operation LockMode theLockMode; // Can be set to WRITE if read operation
OperationStatus theStatus; // The status of the operation. OperationStatus theStatus; // The status of the operation.
Uint32 theMagicNumber; // Magic number to verify that object Uint32 theMagicNumber; // Magic number to verify that object
// is correct // is correct
......
...@@ -138,7 +138,11 @@ public: ...@@ -138,7 +138,11 @@ public:
*/ */
int deleteTuple(); int deleteTuple();
int deleteTuple(NdbConnection* takeOverTransaction); int deleteTuple(NdbConnection* takeOverTransaction);
/**
* Get underlying operation
*/
NdbOperation* getOperation();
private: private:
NdbResultSet(NdbScanOperation*); NdbResultSet(NdbScanOperation*);
...@@ -149,4 +153,10 @@ private: ...@@ -149,4 +153,10 @@ private:
NdbScanOperation* m_operation; NdbScanOperation* m_operation;
}; };
inline
NdbOperation*
NdbResultSet::getOperation(){
return m_operation;
}
#endif #endif
...@@ -30,12 +30,13 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv ...@@ -30,12 +30,13 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x", fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr); sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n", fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n",
sig->getParallelism(requestInfo), sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo), sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo), sig->getLockMode(requestInfo),
sig->getHoldLockFlag(requestInfo), sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo)); sig->getRangeScanFlag(requestInfo),
sig->getKeyinfoFlag(requestInfo));
Uint32 keyLen = (sig->attrLenKeyLen >> 16); Uint32 keyLen = (sig->attrLenKeyLen >> 16);
Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF); Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
......
...@@ -1197,18 +1197,7 @@ public: ...@@ -1197,18 +1197,7 @@ public:
Uint16 first_batch_size; Uint16 first_batch_size;
Uint32 batch_byte_size; Uint32 batch_byte_size;
// Shall the locks be held until the application have read the Uint32 scanRequestInfo; // ScanFrag format
// records
Uint8 scanLockHold;
// Shall the locks be read or write locks
Uint8 scanLockMode;
// Skip locks by other transactions and read latest committed
Uint8 readCommitted;
// Scan is on ordered index
Uint8 rangeScan;
// Close is ordered // Close is ordered
bool m_close_scan_req; bool m_close_scan_req;
......
...@@ -8599,8 +8599,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ...@@ -8599,8 +8599,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
UintR scanParallel, UintR scanParallel,
UintR noOprecPerFrag) UintR noOprecPerFrag)
{ {
const UintR reqinfo = scanTabReq->requestInfo;
scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanApiRec = apiConnectptr.i; scanptr.p->scanApiRec = apiConnectptr.i;
scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF; scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF;
...@@ -8611,10 +8609,17 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ...@@ -8611,10 +8609,17 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->noOprecPerFrag = noOprecPerFrag; scanptr.p->noOprecPerFrag = noOprecPerFrag;
scanptr.p->first_batch_size= scanTabReq->first_batch_size; scanptr.p->first_batch_size= scanTabReq->first_batch_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo);
scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo); Uint32 tmp = 0;
scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo); const UintR ri = scanTabReq->requestInfo;
scanptr.p->rangeScan = ScanTabReq::getRangeScanFlag(reqinfo); ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri));
ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri));
ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
scanptr.p->scanRequestInfo = tmp;
scanptr.p->scanStoredProcId = scanTabReq->storedProcId; scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
scanptr.p->scanState = ScanRecord::RUNNING; scanptr.p->scanState = ScanRecord::RUNNING;
scanptr.p->m_queued_count = 0; scanptr.p->m_queued_count = 0;
...@@ -8631,7 +8636,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ...@@ -8631,7 +8636,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ptr.p->m_apiPtr = cdata[i]; ptr.p->m_apiPtr = cdata[i];
}//for }//for
(* (scanptr.p->rangeScan ? (* (ScanTabReq::getRangeScanFlag(ri) ?
&c_counters.c_range_scan_count : &c_counters.c_range_scan_count :
&c_counters.c_scan_count))++; &c_counters.c_scan_count))++;
}//Dbtc::initScanrec() }//Dbtc::initScanrec()
...@@ -9491,17 +9496,8 @@ void Dbtc::sendScanFragReq(Signal* signal, ...@@ -9491,17 +9496,8 @@ void Dbtc::sendScanFragReq(Signal* signal,
ScanRecord* scanP, ScanRecord* scanP,
ScanFragRec* scanFragP) ScanFragRec* scanFragP)
{ {
Uint32 requestInfo = 0;
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0]; ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode); Uint32 requestInfo = scanP->scanRequestInfo;
ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
if(scanP->scanLockMode == 1){ // Not read -> keyinfo
jam();
ScanFragReq::setKeyinfoFlag(requestInfo, 1);
}
ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted);
ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan);
ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
ScanFragReq::setScanPrio(requestInfo, 1); ScanFragReq::setScanPrio(requestInfo, 1);
apiConnectptr.i = scanP->scanApiRec; apiConnectptr.i = scanP->scanApiRec;
req->tableId = scanP->scanTableref; req->tableId = scanP->scanTableref;
...@@ -10470,9 +10466,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) ...@@ -10470,9 +10466,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanSchemaVersion, sp.p->scanSchemaVersion,
sp.p->scanTableref, sp.p->scanTableref,
sp.p->scanStoredProcId); sp.p->scanStoredProcId);
infoEvent(" lhold=%d, lmode=%d",
sp.p->scanLockHold,
sp.p->scanLockMode);
infoEvent(" apiRec=%d, next=%d", infoEvent(" apiRec=%d, next=%d",
sp.p->scanApiRec, sp.p->nextScan); sp.p->scanApiRec, sp.p->nextScan);
......
...@@ -55,6 +55,7 @@ NdbOperation::insertTuple() ...@@ -55,6 +55,7 @@ NdbOperation::insertTuple()
theOperationType = InsertRequest; theOperationType = InsertRequest;
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -74,6 +75,7 @@ NdbOperation::updateTuple() ...@@ -74,6 +75,7 @@ NdbOperation::updateTuple()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest; theOperationType = UpdateRequest;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -93,6 +95,7 @@ NdbOperation::writeTuple() ...@@ -93,6 +95,7 @@ NdbOperation::writeTuple()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = WriteRequest; theOperationType = WriteRequest;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -115,6 +118,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm) ...@@ -115,6 +118,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm)
case LM_CommittedRead: case LM_CommittedRead:
return readTuple(); return readTuple();
break; break;
default:
return -1;
}; };
} }
/****************************************************************************** /******************************************************************************
...@@ -130,6 +135,7 @@ NdbOperation::readTuple() ...@@ -130,6 +135,7 @@ NdbOperation::readTuple()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = ReadRequest; theOperationType = ReadRequest;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Read;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -150,6 +156,7 @@ NdbOperation::deleteTuple() ...@@ -150,6 +156,7 @@ NdbOperation::deleteTuple()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest; theOperationType = DeleteRequest;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive() ...@@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = ReadExclusive; theOperationType = ReadExclusive;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -189,6 +197,7 @@ NdbOperation::simpleRead() ...@@ -189,6 +197,7 @@ NdbOperation::simpleRead()
theOperationType = ReadRequest; theOperationType = ReadRequest;
theSimpleIndicator = 1; theSimpleIndicator = 1;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -218,6 +227,7 @@ NdbOperation::committedRead() ...@@ -218,6 +227,7 @@ NdbOperation::committedRead()
theSimpleIndicator = 1; theSimpleIndicator = 1;
theDirtyIndicator = 1; theDirtyIndicator = 1;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -240,6 +250,7 @@ NdbOperation::dirtyUpdate() ...@@ -240,6 +250,7 @@ NdbOperation::dirtyUpdate()
theSimpleIndicator = 1; theSimpleIndicator = 1;
theDirtyIndicator = 1; theDirtyIndicator = 1;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -262,6 +273,7 @@ NdbOperation::dirtyWrite() ...@@ -262,6 +273,7 @@ NdbOperation::dirtyWrite()
theSimpleIndicator = 1; theSimpleIndicator = 1;
theDirtyIndicator = 1; theDirtyIndicator = 1;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
return 0; return 0;
} else { } else {
setErrorCode(4200); setErrorCode(4200);
...@@ -282,7 +294,7 @@ NdbOperation::interpretedUpdateTuple() ...@@ -282,7 +294,7 @@ NdbOperation::interpretedUpdateTuple()
tNdbCon->theSimpleState = 0; tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest; theOperationType = UpdateRequest;
theAI_LenInCurrAI = 25; theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
initInterpreter(); initInterpreter();
return 0; return 0;
...@@ -307,7 +319,7 @@ NdbOperation::interpretedDeleteTuple() ...@@ -307,7 +319,7 @@ NdbOperation::interpretedDeleteTuple()
theErrorLine = tErrorLine++; theErrorLine = tErrorLine++;
theAI_LenInCurrAI = 25; theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
initInterpreter(); initInterpreter();
return 0; return 0;
} else { } else {
......
...@@ -144,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -144,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
} }
theNdbCon->theScanningOp = this; theNdbCon->theScanningOp = this;
theLockMode = lm;
bool lockExcl, lockHoldMode, readCommitted; bool lockExcl, lockHoldMode, readCommitted;
switch(lm){ switch(lm){
...@@ -167,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -167,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return 0; return 0;
} }
m_keyInfo = lockExcl; m_keyInfo = lockExcl ? 1 : 0;
bool range = false; bool range = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex || if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
...@@ -743,20 +744,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ...@@ -743,20 +744,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
req->batch_byte_size= batch_byte_size; req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size; req->first_batch_size= first_batch_size;
/**
* Set keyinfo flag
* (Always keyinfo when using blobs)
*/
Uint32 reqInfo = req->requestInfo;
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
req->requestInfo = reqInfo;
for(Uint32 i = 0; i<theParallelism; i++){ for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
} }
return 0; return 0;
} }
/****************************************************************************** /*****************************************************************************
int doSend() int doSend()
Return Value: Return >0 : send was succesful, returns number of signals sent Return Value: Return >0 : send was succesful, returns number of signals sent
Return -1: In all other case. Return -1: In all other case.
Parameters: aProcessorId: Receiving processor node Parameters: aProcessorId: Receiving processor node
Remark: Sends the ATTRINFO signal(s) Remark: Sends the ATTRINFO signal(s)
******************************************************************************/ *****************************************************************************/
int int
NdbScanOperation::doSendScan(int aProcessorId) NdbScanOperation::doSendScan(int aProcessorId)
{ {
...@@ -842,7 +851,7 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -842,7 +851,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
return tSignalCount; return tSignalCount;
}//NdbOperation::doSendScan() }//NdbOperation::doSendScan()
/****************************************************************************** /*****************************************************************************
* NdbOperation* takeOverScanOp(NdbConnection* updateTrans); * NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
* *
* Parameters: The update transactions NdbConnection pointer. * Parameters: The update transactions NdbConnection pointer.
...@@ -861,7 +870,7 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -861,7 +870,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
* This means that the updating transactions can be placed * This means that the updating transactions can be placed
* in separate threads and thus increasing the parallelism during * in separate threads and thus increasing the parallelism during
* the scan process. * the scan process.
*****************************************************************************/ ****************************************************************************/
int int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{ {
...@@ -970,6 +979,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ ...@@ -970,6 +979,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
NdbBlob* NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName) NdbScanOperation::getBlobHandle(const char* anAttrName)
{ {
m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection, return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrName)); m_currentTable->getColumn(anAttrName));
} }
...@@ -977,6 +987,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName) ...@@ -977,6 +987,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName)
NdbBlob* NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId) NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{ {
m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection, return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrId)); m_currentTable->getColumn(anAttrId));
} }
......
...@@ -1341,6 +1341,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1341,6 +1341,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf) bool sorted, byte* buf)
{ {
bool restart;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbResultSet *cursor; NdbResultSet *cursor;
NdbIndexScanOperation *op; NdbIndexScanOperation *op;
...@@ -1352,16 +1353,28 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1352,16 +1353,28 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(start_key, "start_key");); DBUG_EXECUTE("enter", print_key(start_key, "start_key"););
DBUG_EXECUTE("enter", print_key(end_key, "end_key");); DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
if(m_active_cursor == 0)
NdbOperation::LockMode lm= {
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); restart= false;
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) NdbOperation::LockMode lm=
m_index[active_index].index, (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
(const NDBTAB *) m_table)) || if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
!(cursor= op->readTuples(lm, 0, parallelism, sorted))) m_index[active_index].index,
ERR_RETURN(trans->getNdbError()); (const NDBTAB *) m_table)) ||
m_active_cursor= cursor; !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
} else {
restart= true;
op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
DBUG_ASSERT(op->getSorted() == sorted);
DBUG_ASSERT(op->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(op->reset_bounds())
DBUG_RETURN(ndb_err(m_active_trans));
}
if (start_key && if (start_key &&
set_bounds(op, start_key, set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ? (start_key->flag == HA_READ_KEY_EXACT) ?
...@@ -1383,10 +1396,19 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1383,10 +1396,19 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
NdbIndexScanOperation::BoundGT)) NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
DBUG_RETURN(define_read_attrs(buf, op)); if(!restart)
{
DBUG_RETURN(define_read_attrs(buf, op));
}
else
{
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
}
} }
/* /*
Start a filtered scan in NDB. Start a filtered scan in NDB.
...@@ -2225,9 +2247,6 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, ...@@ -2225,9 +2247,6 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf"); DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted)); DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
if (m_active_cursor)
close_scan();
switch (get_index_type(active_index)){ switch (get_index_type(active_index)){
case PRIMARY_KEY_ORDERED_INDEX: case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
...@@ -2255,11 +2274,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, ...@@ -2255,11 +2274,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
break; break;
} }
// Start the ordered index scan and fetch the first row // Start the ordered index scan and fetch the first row
error= ordered_index_scan(start_key, end_key, sorted, error= ordered_index_scan(start_key, end_key, sorted, buf);
buf);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment