Commit c61d23e0 authored by joreland@mysql.com's avatar joreland@mysql.com

merge

parents 2f24eec4 96c4a9c6
......@@ -125,6 +125,7 @@ private:
virtual ~NdbIndexScanOperation();
int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
int saveBoundATTRINFO();
virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
......
......@@ -37,7 +37,7 @@ public:
};
NdbReceiver(Ndb *aNdb);
void init(ReceiverType type, void* owner, bool keyInfo);
void init(ReceiverType type, void* owner);
void release();
~NdbReceiver();
......
......@@ -128,11 +128,11 @@ protected:
NdbApiSignal* theSCAN_TABREQ;
int getFirstATTRINFOScan();
int saveBoundATTRINFO();
int doSendScan(int ProcessorId);
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int fix_receivers(Uint32 parallel, bool keyInfo);
int fix_receivers(Uint32 parallel);
Uint32* m_array; // containing all arrays below
Uint32 m_allocated_receivers;
NdbReceiver** m_receivers; // All receivers
......
......@@ -60,6 +60,48 @@
// seen only when we debug the product
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state;
return out;
}
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state;
return out;
}
#else
#define DEBUG(x)
#endif
......@@ -7177,7 +7219,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
ScanFragRef * ref;
const Uint32 transid1 = scanFragReq->transId1;
const Uint32 transid2 = scanFragReq->transId2;
Uint32 errorCode;
Uint32 errorCode= 0;
Uint32 senderData;
Uint32 hashIndex;
TcConnectionrecPtr nextHashptr;
......@@ -8466,7 +8508,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
const Uint32 type = getNodeInfo(nodeId).m_type;
const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
const bool old_dest = (getNodeInfo(nodeId).m_version < MAKE_VERSION(3,5,0));
const bool longable = is_api && !old_dest;
const bool longable = true; // TODO is_api && !old_dest;
Uint32 * dst = keyInfo->keyData;
dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
......@@ -15736,7 +15778,7 @@ void Dblqh::completedLogPage(Signal* signal, Uint32 clpType)
/* ---------------------------------------------------------------- */
void Dblqh::deleteFragrec(Uint32 fragId)
{
Uint32 indexFound;
Uint32 indexFound= RNIL;
fragptr.i = RNIL;
for (Uint32 i = (NO_OF_FRAG_PER_NODE - 1); (Uint32)~i; i--) {
jam();
......
......@@ -220,7 +220,7 @@ void Dbtup::sendReadAttrinfo(Signal* signal,
*/
Uint32 routeBlockref = regOperPtr->coordinatorTC;
if(is_api && !old_dest){
if(true){ // TODO is_api && !old_dest){
ljam();
transIdAI->attrData[0] = recBlockref;
LinearSectionPtr ptr[3];
......
......@@ -1497,7 +1497,8 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
ndbout_c("Failed to unpack buffer");
break;
}
delete prop;
return (ndb_mgm_configuration*)cvf.m_cfg;
} while(0);
......
......@@ -230,10 +230,6 @@ NdbColumnImpl::assign(const NdbColumnImpl& org)
NdbTableImpl::NdbTableImpl()
: NdbDictionary::Table(* this), m_facade(this)
{
m_noOfKeys = 0;
m_sizeOfKeysInWords = 0;
m_noOfBlobs = 0;
m_index = 0;
init();
}
......@@ -1149,7 +1145,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
Uint32 keyInfoPos = 0;
Uint32 keyCount = 0;
Uint32 blobCount;
Uint32 blobCount = 0;
for(Uint32 i = 0; i < tableDesc.NoOfAttributes; i++) {
DictTabInfo::Attribute attrDesc; attrDesc.init();
......@@ -1737,8 +1733,8 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
return 0;
}
NdbTableImpl* primTab = getTable(tab->m_primaryTable.c_str());
if(primTab == 0){
NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
if(prim == 0){
m_error.code = 4243;
return 0;
}
......@@ -1752,7 +1748,7 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
idx->m_indexId = tab->m_tableId;
idx->m_internalName.assign(internalName);
idx->m_externalName.assign(externalName);
idx->m_tableName.assign(primTab->m_externalName);
idx->m_tableName.assign(prim->m_externalName);
idx->m_type = tab->m_indexType;
// skip last attribute (NDB$PK or NDB$TNODE)
for(unsigned i = 0; i+1<tab->m_columns.size(); i++){
......@@ -1760,6 +1756,14 @@ NdbDictionaryImpl::getIndexImpl(const char * externalName,
// Copy column definition
*col = *tab->m_columns[i];
idx->m_columns.push_back(col);
/**
* reverse map
*/
int key_id = prim->getColumn(col->getName())->getColumnNo();
int fill = -1;
idx->m_key_ids.fill(key_id, fill);
idx->m_key_ids[key_id] = i;
col->m_keyInfoPos = key_id;
}
idx->m_table = tab;
......
......@@ -176,6 +176,7 @@ public:
BaseString m_externalName;
BaseString m_tableName;
Vector<NdbColumnImpl *> m_columns;
Vector<int> m_key_ids;
NdbDictionary::Index::Type m_type;
bool m_logging;
......
......@@ -37,8 +37,6 @@
#include <signaldata/IndxKeyInfo.hpp>
#include <signaldata/IndxAttrInfo.hpp>
#define CHECK_NULL(v) assert(v == NULL); v = NULL;
NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_theIndex(NULL),
......@@ -52,7 +50,7 @@ NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
/**
* Change receiver type
*/
theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this, false);
theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this);
}
NdbIndexOperation::~NdbIndexOperation()
......
......@@ -92,7 +92,7 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
theBoundATTRINFO(NULL),
theBlobList(NULL)
{
theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
theError.code = 0;
}
/*****************************************************************************
......@@ -195,7 +195,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
tcKeyReq->scanInfo = 0;
theKEYINFOptr = &tcKeyReq->keyInfo[0];
theATTRINFOptr = &tcKeyReq->attrInfo[0];
theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
return 0;
}
......@@ -291,6 +291,7 @@ NdbOperation::release()
theNdb->releaseNdbBlob(tSaveBlob);
}
theBlobList = NULL;
theReceiver.release();
}
NdbRecAttr*
......
......@@ -34,7 +34,7 @@
#include "NdbUtil.hpp"
#include "NdbOut.hpp"
#include "NdbImpl.hpp"
#include <NdbScanOperation.hpp>
#include <NdbIndexScanOperation.hpp>
#include "NdbBlob.hpp"
#include <Interpreter.hpp>
......@@ -317,7 +317,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
(!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){
if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus != GetValue) {
......
......@@ -33,7 +33,7 @@ Adjust: 991029 UABRONM First version.
#include "NdbRecAttr.hpp"
#include "NdbUtil.hpp"
#include "Interpreter.hpp"
#include <NdbScanOperation.hpp>
#include <NdbIndexScanOperation.hpp>
#ifdef VM_TRACE
#include <NdbEnv.h>
......@@ -217,7 +217,7 @@ NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
if (theStatus == SetBound) {
((NdbScanOperation*)this)->saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus == ExecInterpretedValue) {
......
......@@ -29,10 +29,20 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
m_owner(0)
{
theCurrentRecAttr = theFirstRecAttr = 0;
m_defined_rows = 0;
m_rows = new NdbRecAttr*[0];
}
NdbReceiver::~NdbReceiver()
{
if (m_id != NdbObjectIdMap::InvalidId) {
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
}
delete[] m_rows;
}
void
NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
NdbReceiver::init(ReceiverType type, void* owner)
{
theMagicNumber = 0x11223344;
m_type = type;
......@@ -44,8 +54,6 @@ NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
theFirstRecAttr = NULL;
theCurrentRecAttr = NULL;
m_key_info = (keyInfo ? 1 : 0);
m_defined_rows = 0;
}
void
......@@ -61,13 +69,6 @@ NdbReceiver::release(){
theCurrentRecAttr = NULL;
}
NdbReceiver::~NdbReceiver()
{
if (m_id != NdbObjectIdMap::InvalidId) {
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
}
}
NdbRecAttr *
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
......@@ -90,9 +91,13 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
m_defined_rows = rows;
m_rows = new NdbRecAttr*[rows + 1]; m_rows[rows] = 0;
if(rows > m_defined_rows){
delete[] m_rows;
m_defined_rows = rows;
m_rows = new NdbRecAttr*[rows + 1];
}
m_rows[rows] = 0;
NdbColumnImpl key;
if(key_size){
key.m_attrId = KEY_ATTR_ID;
......@@ -159,7 +164,6 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
{
bool ok = true;
NdbRecAttr* currRecAttr = theCurrentRecAttr;
NdbRecAttr* prevRecAttr = currRecAttr;
for (Uint32 used = 0; used < aLength ; used++){
AttributeHeader ah(* aDataPtr++);
......@@ -171,18 +175,21 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
*/
while(currRecAttr && currRecAttr->attrId() != tAttrId){
ok &= currRecAttr->setNULL();
prevRecAttr = currRecAttr;
currRecAttr = currRecAttr->next();
}
if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
used += tAttrSize;
aDataPtr += tAttrSize;
prevRecAttr = currRecAttr;
currRecAttr = currRecAttr->next();
} else {
ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p",
this,ok, tAttrId, currRecAttr);
currRecAttr = theCurrentRecAttr;
while(currRecAttr != 0){
ndbout_c("%d ", currRecAttr->attrId());
currRecAttr = currRecAttr->next();
}
abort();
return -1;
}
......
......@@ -57,11 +57,15 @@ NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
m_conf_receivers = 0;
m_sent_receivers = 0;
m_receivers = 0;
m_array = new Uint32[1]; // skip if on delete in fix_receivers
}
NdbScanOperation::~NdbScanOperation()
{
fix_receivers(0, false);
for(Uint32 i = 0; i<m_allocated_receivers; i++){
theNdb->releaseNdbScanRec(m_receivers[i]);
}
delete[] m_array;
if (m_resultSet)
delete m_resultSet;
}
......@@ -130,31 +134,23 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection)
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 batch,
Uint32 parallell)
Uint32 parallel)
{
m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount;
if(batch + parallell == 0){ // Max speed
if (batch + parallel == 0) {
batch = 16;
parallell = fragCount;
}
if(batch == 0 && parallell > 0){ // Backward
batch = (parallell >= 16 ? 16 : parallell & 15);
parallell = (parallell + 15) / 16;
if(parallell == 0)
parallell = 1;
}
if(parallell > fragCount)
parallell = fragCount;
else if(parallell == 0)
parallell = fragCount;
assert(parallell > 0);
parallel= fragCount;
} else {
if (batch == 0 && parallel > 0) { // Backward
batch = (parallel >= 16 ? 16 : parallel);
parallel = (parallel + 15) / 16;
}
if (parallel > fragCount || parallel == 0)
parallel = fragCount;
}
// It is only possible to call openScan if
// 1. this transcation don't already contain another scan operation
......@@ -179,7 +175,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
lockHoldMode = true;
readCommitted = false;
break;
case NdbScanOperation::LM_Dirty:
case NdbScanOperation::LM_CommittedRead:
lockExcl = false;
lockHoldMode = false;
readCommitted = true;
......@@ -204,10 +200,10 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
range = true;
}
theParallelism = parallell;
theParallelism = parallel;
theBatchSize = batch;
if(fix_receivers(parallell, lockExcl) == -1){
if(fix_receivers(parallel) == -1){
setErrorCodeAbort(4000);
return 0;
}
......@@ -226,7 +222,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->buddyConPtr = theNdbCon->theBuddyConPtr;
Uint32 reqInfo = 0;
ScanTabReq::setParallelism(reqInfo, parallell);
ScanTabReq::setParallelism(reqInfo, parallel);
ScanTabReq::setScanBatch(reqInfo, batch);
ScanTabReq::setLockMode(reqInfo, lockExcl);
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
......@@ -244,38 +240,38 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
int
NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
if(parallell == 0 || parallell > m_allocated_receivers){
if(m_prepared_receivers) delete[] m_prepared_receivers;
if(m_receivers) delete[] m_receivers;
if(m_api_receivers) delete[] m_api_receivers;
if(m_conf_receivers) delete[] m_conf_receivers;
if(m_sent_receivers) delete[] m_sent_receivers;
m_allocated_receivers = parallell;
if(parallell == 0){
return 0;
}
NdbScanOperation::fix_receivers(Uint32 parallel){
assert(parallel > 0);
if(parallel > m_allocated_receivers){
const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
Uint32 * tmp = new Uint32[(sz+3)/4];
// Save old receivers
memcpy(tmp+parallel, m_receivers, m_allocated_receivers*sizeof(char*));
delete[] m_array;
m_array = tmp;
m_prepared_receivers = new Uint32[parallell];
m_receivers = new NdbReceiver*[parallell];
m_api_receivers = new NdbReceiver*[parallell];
m_conf_receivers = new NdbReceiver*[parallell];
m_sent_receivers = new NdbReceiver*[parallell];
m_prepared_receivers = tmp;
m_receivers = (NdbReceiver**)(tmp + parallel);
m_api_receivers = m_receivers + parallel;
m_conf_receivers = m_api_receivers + parallel;
m_sent_receivers = m_conf_receivers + parallel;
// Only get/init "new" receivers
NdbReceiver* tScanRec;
for (Uint32 i = 0; i < parallell; i ++) {
for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
tScanRec = theNdb->getNdbScanRec();
if (tScanRec == NULL) {
setErrorCodeAbort(4000);
return -1;
}//if
m_receivers[i] = tScanRec;
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this, keyInfo);
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
}
m_allocated_receivers = parallel;
}
for(Uint32 i = 0; i<parallell; i++){
for(Uint32 i = 0; i<parallel; i++){
m_receivers[i]->m_list_index = i;
m_prepared_receivers[i] = m_receivers[i]->getId();
m_sent_receivers[i] = m_receivers[i];
......@@ -285,7 +281,7 @@ NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
m_api_receivers_count = 0;
m_current_api_receiver = 0;
m_sent_receivers_count = parallell;
m_sent_receivers_count = parallel;
m_conf_receivers_count = 0;
return 0;
}
......@@ -375,7 +371,7 @@ NdbScanOperation::getFirstATTRINFOScan()
* a separate list. Then continue with normal scan.
*/
int
NdbScanOperation::saveBoundATTRINFO()
NdbIndexScanOperation::saveBoundATTRINFO()
{
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
theBoundATTRINFO = theFirstATTRINFO;
......@@ -395,18 +391,27 @@ NdbScanOperation::saveBoundATTRINFO()
* unless the one's with EqBound
*/
if(!res && m_ordered){
Uint32 idx = 0;
Uint32 cnt = m_currentTable->getNoOfPrimaryKeys();
while(!theTupleKeyDefined[idx][0] && idx < cnt){
NdbColumnImpl* col = m_currentTable->getColumn(idx);
/**
* If setBound EQ
*/
Uint32 i = 0;
while(theTupleKeyDefined[i][0] == SETBOUND_EQ)
i++;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt - i;
for(; i<cnt; i++){
NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
UintPtr newVal = UintPtr(tmp);
theTupleKeyDefined[idx][0] = FAKE_PTR;
theTupleKeyDefined[idx][1] = (newVal & 0xFFFFFFFF);
theTupleKeyDefined[i][0] = FAKE_PTR;
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
theTupleKeyDefined[idx][2] = (newVal >> 32);
theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
idx++;
}
}
return res;
......@@ -753,7 +758,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
}
if (theStatus == SetBound) {
saveBoundATTRINFO();
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
......@@ -1049,19 +1054,33 @@ NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
NdbRecAttr*
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
char* aValue){
if(!attrInfo->getPrimaryKey() || !m_ordered){
if(!m_ordered){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
if (theStatus == SetBound) {
saveBoundATTRINFO();
theStatus = GetValue;
}
Uint32 id = attrInfo->m_attrId;
int id = attrInfo->m_attrId; // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 marker = theTupleKeyDefined[id][0];
if(marker == SETBOUND_EQ){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
} else if(marker == API_PTR){
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
assert(marker == FAKE_PTR);
UintPtr oldVal;
oldVal = theTupleKeyDefined[id][1];
#if (SIZEOF_CHARP == 8)
......@@ -1071,6 +1090,7 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
tmp->setup(attrInfo, aValue);
return tmp;
}
......@@ -1121,10 +1141,9 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
* so it's safe to use [tIndexAttrId]
* (instead of looping as is NdbOperation::equal_impl)
*/
if(!theTupleKeyDefined[tIndexAttrId][0]){
if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){
theNoOfTupKeyDefined++;
theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
m_sort_columns -= m_ordered;
}
return 0;
......@@ -1142,8 +1161,9 @@ NdbIndexScanOperation::readTuples(LockMode lm,
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(rs && order_by){
m_ordered = 1;
m_sort_columns = m_accessTable->getNoOfPrimaryKeys();
m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
}
return rs;
}
......@@ -1154,33 +1174,29 @@ NdbIndexScanOperation::fix_get_values(){
* Loop through all getValues and set buffer pointer to "API" pointer
*/
NdbRecAttr * curr = theReceiver.theFirstRecAttr;
Uint32 cnt = m_sort_columns;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
Uint32 idx = 0;
NdbIndexImpl * idx = m_accessTable->m_index;
NdbTableImpl * tab = m_currentTable;
while(cnt > 0){ // To MAXNROFTUPLEKEY loops
NdbColumnImpl * col = tab->getColumn(idx);
if(col->getPrimaryKey()){
Uint32 val = theTupleKeyDefined[idx][0];
switch(val){
case FAKE_PTR:
curr->setup(col, 0);
// Fall-through
case API_PTR:
cnt--;
break;
case SETBOUND_EQ:
(void)1;
for(Uint32 i = 0; i<cnt; i++){
Uint32 val = theTupleKeyDefined[i][0];
switch(val){
case FAKE_PTR:{
NdbColumnImpl * key = idx->m_columns[i];
NdbColumnImpl * col = tab->getColumn(key->m_keyInfoPos);
curr->setup(col, 0);
}
break;
case API_PTR:
case SETBOUND_EQ:
break;
#ifdef VM_TRACE
break;
default:
abort();
default:
abort();
#endif
}
}
idx++;
curr = curr->next();
}
}
......@@ -1222,24 +1238,23 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 u_idx = m_current_api_receiver; // start of unsorted
Uint32 u_last = u_idx + 1; // last unsorted
Uint32 s_idx = u_last; // start of sorted
Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted
Uint32 s_last = theParallelism; // last sorted
NdbReceiver** arr = m_api_receivers;
NdbReceiver* tRec = arr[u_idx];
NdbReceiver* tRec = arr[s_idx];
if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
fetchAllowed,
(u_idx < s_last ? tRec->nextResult() : 0));
(s_idx < s_last ? tRec->nextResult() : 0));
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
u_idx, u_last,
s_idx, s_last);
bool fetchNeeded = (u_idx == s_last) || !tRec->nextResult();
bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
if(fetchNeeded){
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
......@@ -1247,8 +1262,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
......@@ -1256,24 +1272,30 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
u_idx = 0;
u_last = m_conf_receivers_count;
s_idx = (u_last > 1 ? s_last : s_idx);
m_conf_receivers_count = 0;
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
if(theError.code){
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
}
} else {
if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
return 2;
}
} else {
u_idx = s_idx;
u_last = s_idx + 1;
s_idx++;
}
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
......@@ -1319,6 +1341,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
tRec = m_api_receivers[s_idx];
if(s_idx < s_last && tRec->nextResult()){
tRec->copyout(theReceiver);
if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
return 0;
}
......@@ -1329,9 +1352,11 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1;
}
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
......@@ -1363,7 +1388,8 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tRec->prepareSend();
m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
......
......@@ -19,6 +19,7 @@
#include <mgmapi.h>
#include <Vector.hpp>
#include <BaseString.hpp>
class NdbRestarter {
public:
......@@ -85,8 +86,8 @@ protected:
Vector<ndb_mgm_node_state> apiNodes;
bool connected;
const char* addr;
const char* host;
BaseString addr;
BaseString host;
int port;
NdbMgmHandle handle;
ndb_mgm_configuration * m_config;
......
......@@ -71,7 +71,7 @@ NdbBackup::getFileSystemPathForNode(int _node_id){
*/
ConfigRetriever cr;
ndb_mgm_configuration * p = cr.getConfig(host, port, 0, NODE_TYPE_API);
ndb_mgm_configuration * p = cr.getConfig(host.c_str(), port, 0, NODE_TYPE_API);
if(p == 0){
const char * s = cr.getErrorString();
if(s == 0)
......@@ -156,7 +156,7 @@ NdbBackup::execRestore(bool _restore_data,
snprintf(buf, 255, "valgrind --leak-check=yes -v ndb_restore -c \"nodeid=%d;host=%s\" -n %d -b %d %s %s .",
ownNodeId,
addr,
addr.c_str(),
_node_id,
_backup_id,
_restore_data?"-r":"",
......
......@@ -33,13 +33,11 @@
NdbRestarter::NdbRestarter(const char* _addr):
connected(false),
addr(_addr),
host(NULL),
port(-1),
handle(NULL),
m_config(0)
{
if (addr == NULL){
if (_addr == NULL){
LocalConfig lcfg;
if(!lcfg.init()){
lcfg.printError();
......@@ -60,20 +58,20 @@ NdbRestarter::NdbRestarter(const char* _addr):
case MgmId_TCP:
char buf[255];
snprintf(buf, 255, "%s:%d", m->data.tcp.remoteHost, m->data.tcp.port);
addr = strdup(buf);
host = strdup(m->data.tcp.remoteHost);
addr.assign(buf);
host.assign(m->data.tcp.remoteHost);
port = m->data.tcp.port;
return;
break;
case MgmId_File:
break;
default:
break;
}
if (addr != NULL)
break;
}
} else {
addr.assign(_addr);
}
}
NdbRestarter::~NdbRestarter(){
......@@ -398,10 +396,10 @@ NdbRestarter::connect(){
g_err << "handle == NULL" << endl;
return -1;
}
g_info << "Connecting to mgmsrv at " << addr << endl;
if (ndb_mgm_connect(handle, addr) == -1) {
g_info << "Connecting to mgmsrv at " << addr.c_str() << endl;
if (ndb_mgm_connect(handle, addr.c_str()) == -1) {
MGMERR(handle);
g_err << "Connection to " << addr << " failed" << endl;
g_err << "Connection to " << addr.c_str() << " failed" << endl;
return -1;
}
......
......@@ -866,8 +866,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism))) //, sorted))) // Bug
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment