Commit 2a8ea386 authored by joreland@mysql.com's avatar joreland@mysql.com

Fixed bugs in ordered scan discovered by mysql-test-run

Enabled ordered scan in handler
parent 04e31c1c
...@@ -125,6 +125,7 @@ private: ...@@ -125,6 +125,7 @@ private:
virtual ~NdbIndexScanOperation(); virtual ~NdbIndexScanOperation();
int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len); int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
int saveBoundATTRINFO();
virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32); virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
......
...@@ -128,7 +128,6 @@ protected: ...@@ -128,7 +128,6 @@ protected:
NdbApiSignal* theSCAN_TABREQ; NdbApiSignal* theSCAN_TABREQ;
int getFirstATTRINFOScan(); int getFirstATTRINFOScan();
int saveBoundATTRINFO();
int doSendScan(int ProcessorId); int doSendScan(int ProcessorId);
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId); int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
......
...@@ -1737,8 +1737,8 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName, ...@@ -1737,8 +1737,8 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
return 0; return 0;
} }
NdbTableImpl* primTab = getTable(tab->m_primaryTable.c_str()); NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
if(primTab == 0){ if(prim == 0){
m_error.code = 4243; m_error.code = 4243;
return 0; return 0;
} }
...@@ -1752,7 +1752,7 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName, ...@@ -1752,7 +1752,7 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
idx->m_indexId = tab->m_tableId; idx->m_indexId = tab->m_tableId;
idx->m_internalName.assign(internalName); idx->m_internalName.assign(internalName);
idx->m_externalName.assign(externalName); idx->m_externalName.assign(externalName);
idx->m_tableName.assign(primTab->m_externalName); idx->m_tableName.assign(prim->m_externalName);
idx->m_type = tab->m_indexType; idx->m_type = tab->m_indexType;
// skip last attribute (NDB$PK or NDB$TNODE) // skip last attribute (NDB$PK or NDB$TNODE)
for(unsigned i = 0; i+1<tab->m_columns.size(); i++){ for(unsigned i = 0; i+1<tab->m_columns.size(); i++){
...@@ -1760,6 +1760,14 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName, ...@@ -1760,6 +1760,14 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
// Copy column definition // Copy column definition
*col = *tab->m_columns[i]; *col = *tab->m_columns[i];
idx->m_columns.push_back(col); idx->m_columns.push_back(col);
/**
* reverse map
*/
int key_id = prim->getColumn(col->getName())->getColumnNo();
int fill = -1;
idx->m_key_ids.fill(key_id, fill);
idx->m_key_ids[key_id] = i;
col->m_keyInfoPos = key_id;
} }
idx->m_table = tab; idx->m_table = tab;
......
...@@ -176,6 +176,7 @@ public: ...@@ -176,6 +176,7 @@ public:
BaseString m_externalName; BaseString m_externalName;
BaseString m_tableName; BaseString m_tableName;
Vector<NdbColumnImpl *> m_columns; Vector<NdbColumnImpl *> m_columns;
Vector<int> m_key_ids;
NdbDictionary::Index::Type m_type; NdbDictionary::Index::Type m_type;
bool m_logging; bool m_logging;
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "NdbUtil.hpp" #include "NdbUtil.hpp"
#include "NdbOut.hpp" #include "NdbOut.hpp"
#include "NdbImpl.hpp" #include "NdbImpl.hpp"
#include <NdbScanOperation.hpp> #include <NdbIndexScanOperation.hpp>
#include "NdbBlob.hpp" #include "NdbBlob.hpp"
#include <Interpreter.hpp> #include <Interpreter.hpp>
...@@ -317,7 +317,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) ...@@ -317,7 +317,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
(!tAttrInfo->m_indexOnly) && (!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){ (theStatus != Init)){
if (theStatus == SetBound) { if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO(); ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue; theStatus = GetValue;
} }
if (theStatus != GetValue) { if (theStatus != GetValue) {
......
...@@ -33,7 +33,7 @@ Adjust: 991029 UABRONM First version. ...@@ -33,7 +33,7 @@ Adjust: 991029 UABRONM First version.
#include "NdbRecAttr.hpp" #include "NdbRecAttr.hpp"
#include "NdbUtil.hpp" #include "NdbUtil.hpp"
#include "Interpreter.hpp" #include "Interpreter.hpp"
#include <NdbScanOperation.hpp> #include <NdbIndexScanOperation.hpp>
#ifdef VM_TRACE #ifdef VM_TRACE
#include <NdbEnv.h> #include <NdbEnv.h>
...@@ -217,7 +217,7 @@ NdbOperation::initial_interpreterCheck() ...@@ -217,7 +217,7 @@ NdbOperation::initial_interpreterCheck()
{ {
if ((theInterpretIndicator == 1)) { if ((theInterpretIndicator == 1)) {
if (theStatus == SetBound) { if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO(); ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue; theStatus = GetValue;
} }
if (theStatus == ExecInterpretedValue) { if (theStatus == ExecInterpretedValue) {
......
...@@ -375,7 +375,7 @@ NdbScanOperation::getFirstATTRINFOScan() ...@@ -375,7 +375,7 @@ NdbScanOperation::getFirstATTRINFOScan()
* a separate list. Then continue with normal scan. * a separate list. Then continue with normal scan.
*/ */
int int
NdbScanOperation::saveBoundATTRINFO() NdbIndexScanOperation::saveBoundATTRINFO()
{ {
theCurrentATTRINFO->setLength(theAI_LenInCurrAI); theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
theBoundATTRINFO = theFirstATTRINFO; theBoundATTRINFO = theFirstATTRINFO;
...@@ -395,18 +395,27 @@ NdbScanOperation::saveBoundATTRINFO() ...@@ -395,18 +395,27 @@ NdbScanOperation::saveBoundATTRINFO()
* unless the one's with EqBound * unless the one's with EqBound
*/ */
if(!res && m_ordered){ if(!res && m_ordered){
Uint32 idx = 0;
Uint32 cnt = m_currentTable->getNoOfPrimaryKeys(); /**
while(!theTupleKeyDefined[idx][0] && idx < cnt){ * If setBound EQ
NdbColumnImpl* col = m_currentTable->getColumn(idx); */
Uint32 i = 0;
while(theTupleKeyDefined[i][0] == SETBOUND_EQ)
i++;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt - i;
for(; i<cnt; i++){
NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
UintPtr newVal = UintPtr(tmp); UintPtr newVal = UintPtr(tmp);
theTupleKeyDefined[idx][0] = FAKE_PTR; theTupleKeyDefined[i][0] = FAKE_PTR;
theTupleKeyDefined[idx][1] = (newVal & 0xFFFFFFFF); theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8) #if (SIZEOF_CHARP == 8)
theTupleKeyDefined[idx][2] = (newVal >> 32); theTupleKeyDefined[i][2] = (newVal >> 32);
#endif #endif
idx++;
} }
} }
return res; return res;
...@@ -753,7 +762,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ...@@ -753,7 +762,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
} }
if (theStatus == SetBound) { if (theStatus == SetBound) {
saveBoundATTRINFO(); ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue; theStatus = GetValue;
} }
...@@ -1049,19 +1058,33 @@ NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, ...@@ -1049,19 +1058,33 @@ NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
NdbRecAttr* NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
char* aValue){ char* aValue){
if(!attrInfo->getPrimaryKey() || !m_ordered){ if(!m_ordered){
return NdbScanOperation::getValue_impl(attrInfo, aValue); return NdbScanOperation::getValue_impl(attrInfo, aValue);
} }
if (theStatus == SetBound) {
saveBoundATTRINFO();
theStatus = GetValue;
}
Uint32 id = attrInfo->m_attrId; int id = attrInfo->m_attrId; // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 marker = theTupleKeyDefined[id][0]; Uint32 marker = theTupleKeyDefined[id][0];
if(marker == SETBOUND_EQ){ if(marker == SETBOUND_EQ){
return NdbScanOperation::getValue_impl(attrInfo, aValue); return NdbScanOperation::getValue_impl(attrInfo, aValue);
} else if(marker == API_PTR){ } else if(marker == API_PTR){
return NdbScanOperation::getValue_impl(attrInfo, aValue); return NdbScanOperation::getValue_impl(attrInfo, aValue);
} }
assert(marker == FAKE_PTR);
UintPtr oldVal; UintPtr oldVal;
oldVal = theTupleKeyDefined[id][1]; oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8) #if (SIZEOF_CHARP == 8)
...@@ -1071,6 +1094,7 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, ...@@ -1071,6 +1094,7 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
NdbRecAttr* tmp = (NdbRecAttr*)oldVal; NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
tmp->setup(attrInfo, aValue); tmp->setup(attrInfo, aValue);
return tmp; return tmp;
} }
...@@ -1121,10 +1145,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, ...@@ -1121,10 +1145,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
* so it's safe to use [tIndexAttrId] * so it's safe to use [tIndexAttrId]
* (instead of looping as is NdbOperation::equal_impl) * (instead of looping as is NdbOperation::equal_impl)
*/ */
if(!theTupleKeyDefined[tIndexAttrId][0]){ if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){
theNoOfTupKeyDefined++; theNoOfTupKeyDefined++;
theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ; theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
m_sort_columns -= m_ordered;
} }
return 0; return 0;
...@@ -1142,8 +1165,9 @@ NdbIndexScanOperation::readTuples(LockMode lm, ...@@ -1142,8 +1165,9 @@ NdbIndexScanOperation::readTuples(LockMode lm,
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(rs && order_by){ if(rs && order_by){
m_ordered = 1; m_ordered = 1;
m_sort_columns = m_accessTable->getNoOfPrimaryKeys(); m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count; m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
} }
return rs; return rs;
} }
...@@ -1154,33 +1178,29 @@ NdbIndexScanOperation::fix_get_values(){ ...@@ -1154,33 +1178,29 @@ NdbIndexScanOperation::fix_get_values(){
* Loop through all getValues and set buffer pointer to "API" pointer * Loop through all getValues and set buffer pointer to "API" pointer
*/ */
NdbRecAttr * curr = theReceiver.theFirstRecAttr; NdbRecAttr * curr = theReceiver.theFirstRecAttr;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
Uint32 cnt = m_sort_columns;
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 idx = 0; NdbIndexImpl * idx = m_accessTable->m_index;
NdbTableImpl * tab = m_currentTable; NdbTableImpl * tab = m_currentTable;
while(cnt > 0){ // To MAXNROFTUPLEKEY loops for(Uint32 i = 0; i<cnt; i++){
NdbColumnImpl * col = tab->getColumn(idx); Uint32 val = theTupleKeyDefined[i][0];
if(col->getPrimaryKey()){ switch(val){
Uint32 val = theTupleKeyDefined[idx][0]; case FAKE_PTR:{
switch(val){ NdbColumnImpl * key = idx->m_columns[i];
case FAKE_PTR: NdbColumnImpl * col = tab->getColumn(key->m_keyInfoPos);
curr->setup(col, 0); curr->setup(col, 0);
// Fall-through }
case API_PTR: break;
cnt--; case API_PTR:
break; case SETBOUND_EQ:
case SETBOUND_EQ: break;
(void)1;
#ifdef VM_TRACE #ifdef VM_TRACE
break; default:
default: abort();
abort();
#endif #endif
}
} }
idx++; curr = curr->next();
} }
} }
...@@ -1222,24 +1242,23 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1222,24 +1242,23 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
int int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 u_idx = m_current_api_receiver; // start of unsorted Uint32 u_idx, u_last;
Uint32 u_last = u_idx + 1; // last unsorted Uint32 s_idx = m_current_api_receiver; // first sorted
Uint32 s_idx = u_last; // start of sorted
Uint32 s_last = theParallelism; // last sorted Uint32 s_last = theParallelism; // last sorted
NdbReceiver** arr = m_api_receivers; NdbReceiver** arr = m_api_receivers;
NdbReceiver* tRec = arr[u_idx]; NdbReceiver* tRec = arr[s_idx];
if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d", if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
fetchAllowed, fetchAllowed,
(u_idx < s_last ? tRec->nextResult() : 0)); (s_idx < s_last ? tRec->nextResult() : 0));
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
u_idx, u_last, u_idx, u_last,
s_idx, s_last); s_idx, s_last);
bool fetchNeeded = (u_idx == s_last) || !tRec->nextResult(); bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
if(fetchNeeded){ if(fetchNeeded){
if(fetchAllowed){ if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
...@@ -1247,8 +1266,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1247,8 +1266,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){ if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count; Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){ while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
...@@ -1256,24 +1276,30 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1256,24 +1276,30 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
} }
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1; return -1;
} }
u_idx = 0; u_idx = 0;
u_last = m_conf_receivers_count; u_last = m_conf_receivers_count;
s_idx = (u_last > 1 ? s_last : s_idx);
m_conf_receivers_count = 0; m_conf_receivers_count = 0;
memcpy(arr, m_conf_receivers, u_last * sizeof(char*)); memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last); if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
if(theError.code){ if(theError.code){
setErrorCode(theError.code); setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1; return -1;
} }
} }
} else { } else {
if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
return 2; return 2;
} }
} else {
u_idx = s_idx;
u_last = s_idx + 1;
s_idx++;
} }
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
...@@ -1319,6 +1345,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1319,6 +1345,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
tRec = m_api_receivers[s_idx]; tRec = m_api_receivers[s_idx];
if(s_idx < s_last && tRec->nextResult()){ if(s_idx < s_last && tRec->nextResult()){
tRec->copyout(theReceiver); tRec->copyout(theReceiver);
if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
return 0; return 0;
} }
...@@ -1329,9 +1356,11 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1329,9 +1356,11 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if(seq == tp->getNodeSequence(nodeId) && if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 && send_next_scan(0, true) == 0 &&
theError.code == 0){ theError.code == 0){
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1; return 1;
} }
setErrorCode(theError.code); setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1; return -1;
} }
...@@ -1363,7 +1392,8 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ ...@@ -1363,7 +1392,8 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tRec->prepareSend(); tRec->prepareSend();
m_sent_receivers_count = last + 1; m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance(); TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1); tSignal.setLength(4+1);
......
...@@ -871,7 +871,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -871,7 +871,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index); index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname))) if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment