Commit 865eb873 authored by joreland@mysql.com's avatar joreland@mysql.com

wl2126 - ndb - bug fixes

parent 20fa339c
...@@ -21,7 +21,6 @@ ndbapi/NdbBlob.hpp \ ...@@ -21,7 +21,6 @@ ndbapi/NdbBlob.hpp \
ndbapi/NdbPool.hpp \ ndbapi/NdbPool.hpp \
ndbapi/NdbRecAttr.hpp \ ndbapi/NdbRecAttr.hpp \
ndbapi/NdbReceiver.hpp \ ndbapi/NdbReceiver.hpp \
ndbapi/NdbResultSet.hpp \
ndbapi/NdbScanFilter.hpp \ ndbapi/NdbScanFilter.hpp \
ndbapi/NdbScanOperation.hpp \ ndbapi/NdbScanOperation.hpp \
ndbapi/NdbIndexScanOperation.hpp \ ndbapi/NdbIndexScanOperation.hpp \
......
...@@ -2647,11 +2647,11 @@ void Dblqh::execTUPKEYCONF(Signal* signal) ...@@ -2647,11 +2647,11 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
jamEntry(); jamEntry();
tcConnectptr.i = tcIndex; tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec); ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
switch (tcConnectptr.p->transactionState) { switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP: case TcConnectionrec::WAIT_TUP:
jam(); jam();
if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
tupkeyConfLab(signal); tupkeyConfLab(signal);
break; break;
case TcConnectionrec::COPY_TUPKEY: case TcConnectionrec::COPY_TUPKEY:
......
...@@ -1221,12 +1221,11 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1221,12 +1221,11 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
inline int ha_ndbcluster::fetch_next() inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{ {
DBUG_ENTER("fetch_next"); DBUG_ENTER("fetch_next");
int check; int check;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbScanOperation *cursor= m_active_cursor;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE; bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
do { do {
...@@ -1262,12 +1261,12 @@ inline int ha_ndbcluster::fetch_next() ...@@ -1262,12 +1261,12 @@ inline int ha_ndbcluster::fetch_next()
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans) != 0)
return -1; DBUG_RETURN(-1);
} }
else else
{ {
if (execute_commit(this,trans) != 0) if (execute_commit(this,trans) != 0)
return -1; DBUG_RETURN(-1);
int res= trans->restart(); int res= trans->restart();
DBUG_ASSERT(res == 0); DBUG_ASSERT(res == 0);
} }
...@@ -1275,6 +1274,10 @@ inline int ha_ndbcluster::fetch_next() ...@@ -1275,6 +1274,10 @@ inline int ha_ndbcluster::fetch_next()
} }
contact_ndb= (check == 2); contact_ndb= (check == 2);
} }
else
{
DBUG_RETURN(-1);
}
} while (check == 2); } while (check == 2);
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -1296,7 +1299,7 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1296,7 +1299,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
int res; int res;
DBUG_ENTER("next_result"); DBUG_ENTER("next_result");
if((res= fetch_next()) == 0) if((res= fetch_next(m_active_cursor)) == 0)
{ {
DBUG_PRINT("info", ("One more record found")); DBUG_PRINT("info", ("One more record found"));
...@@ -2529,13 +2532,14 @@ int ha_ndbcluster::rnd_init(bool scan) ...@@ -2529,13 +2532,14 @@ int ha_ndbcluster::rnd_init(bool scan)
int ha_ndbcluster::close_scan() int ha_ndbcluster::close_scan()
{ {
NdbScanOperation *cursor= m_active_cursor;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
DBUG_ENTER("close_scan"); DBUG_ENTER("close_scan");
if (!cursor) m_multi_cursor= 0;
if (!m_active_cursor && !m_multi_cursor)
DBUG_RETURN(1); DBUG_RETURN(1);
NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
if (m_ops_pending) if (m_ops_pending)
{ {
...@@ -2552,7 +2556,7 @@ int ha_ndbcluster::close_scan() ...@@ -2552,7 +2556,7 @@ int ha_ndbcluster::close_scan()
} }
cursor->close(m_force_send); cursor->close(m_force_send);
m_active_cursor= NULL; m_active_cursor= m_multi_cursor= NULL;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3132,6 +3136,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3132,6 +3136,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_PRINT("warning", ("m_active_cursor != NULL")); DBUG_PRINT("warning", ("m_active_cursor != NULL"));
m_active_cursor= NULL; m_active_cursor= NULL;
if (m_multi_cursor)
DBUG_PRINT("warning", ("m_multi_cursor != NULL"));
m_multi_cursor= NULL;
if (m_blobs_pending) if (m_blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0")); DBUG_PRINT("warning", ("blobs_pending != 0"));
m_blobs_pending= 0; m_blobs_pending= 0;
...@@ -3776,6 +3784,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3776,6 +3784,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
handler(table_arg), handler(table_arg),
m_active_trans(NULL), m_active_trans(NULL),
m_active_cursor(NULL), m_active_cursor(NULL),
m_multi_cursor(NULL),
m_ndb(NULL), m_ndb(NULL),
m_table(NULL), m_table(NULL),
m_table_info(NULL), m_table_info(NULL),
...@@ -4822,7 +4831,8 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4822,7 +4831,8 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
/** /**
* Variables for loop * Variables for loop
*/ */
byte* curr= (byte*)buffer->buffer; byte *curr= (byte*)buffer->buffer;
byte *end_of_buffer= (byte*)buffer->buffer_end;
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table; const NDBTAB *tab= (const NDBTAB *) m_table;
...@@ -4830,7 +4840,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4830,7 +4840,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index; const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation(); const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
NdbIndexScanOperation* scanOp= 0; NdbIndexScanOperation* scanOp= 0;
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++) for(i= 0; i<range_count && curr+reclength <= end_of_buffer; i++)
{ {
switch(index_type){ switch(index_type){
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
...@@ -4879,15 +4889,29 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4879,15 +4889,29 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ranges[i].range_flag &= ~(uint)UNIQUE_RANGE; ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
if (scanOp == 0) if (scanOp == 0)
{ {
if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) && if (m_multi_cursor)
!scanOp->readTuples(lm, 0, parallelism, sorted, true) && {
!define_read_attrs(curr, scanOp)) scanOp= m_multi_cursor;
curr += reclength; DBUG_ASSERT(scanOp->getSorted() == sorted);
DBUG_ASSERT(scanOp->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(scanOp->reset_bounds(m_force_send))
DBUG_RETURN(ndb_err(m_active_trans));
end_of_buffer -= reclength;
}
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&& !scanOp->readTuples(lm, 0, parallelism, sorted, true) &&
!define_read_attrs(end_of_buffer-reclength, scanOp))
{
m_multi_cursor= scanOp;
m_multi_range_cursor_result_ptr= end_of_buffer-reclength;
}
else else
{
ERR_RETURN(scanOp ? scanOp->getNdbError() : ERR_RETURN(scanOp ? scanOp->getNdbError() :
m_active_trans->getNdbError()); m_active_trans->getNdbError());
m_multi_cursor= scanOp; }
m_multi_range_cursor_result_ptr= curr;
} }
const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key }; const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
if ((res= set_bounds(scanOp, keys, i))) if ((res= set_bounds(scanOp, keys, i)))
...@@ -4926,6 +4950,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, ...@@ -4926,6 +4950,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ERR_RETURN(m_active_trans->getNdbError()); ERR_RETURN(m_active_trans->getNdbError());
} }
#if 0
#define DBUG_MULTI_RANGE(x) printf("read_multi_range_next: case %d\n", x);
#else
#define DBUG_MULTI_RANGE(x)
#endif
int int
ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
{ {
...@@ -4952,8 +4982,10 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -4952,8 +4982,10 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
} }
else if (m_multi_cursor && !multi_range_sorted) else if (m_multi_cursor && !multi_range_sorted)
{ {
if ((res= fetch_next() == 0)) DBUG_MULTI_RANGE(1);
if ((res= fetch_next(m_multi_cursor)) == 0)
{ {
DBUG_MULTI_RANGE(2);
range_no= m_multi_cursor->get_range_no(); range_no= m_multi_cursor->get_range_no();
goto found; goto found;
} }
...@@ -4962,35 +4994,44 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -4962,35 +4994,44 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
goto close_scan; goto close_scan;
} }
} }
else if (multi_range_sorted) else if (m_multi_cursor && multi_range_sorted)
{
if (m_active_cursor && (res= fetch_next(m_multi_cursor)))
{ {
DBUG_ASSERT(m_multi_cursor); DBUG_MULTI_RANGE(3);
if (m_active_cursor && (res= fetch_next()))
goto close_scan; goto close_scan;
}
range_no= m_multi_cursor->get_range_no(); range_no= m_multi_cursor->get_range_no();
if (range_no == multi_range_curr) if (range_no == multi_range_curr)
{ {
DBUG_MULTI_RANGE(4);
// return current row // return current row
goto found; goto found;
} }
else if (range_no > multi_range_curr) else if (range_no > (int)multi_range_curr)
{ {
DBUG_MULTI_RANGE(5);
// wait with current row // wait with current row
m_active_cursor= 0; m_active_cursor= 0;
continue; continue;
} }
else else
{ {
DBUG_MULTI_RANGE(6);
// First fetch from cursor // First fetch from cursor
DBUG_ASSERT(range_no == -1); DBUG_ASSERT(range_no == -1);
m_multi_cursor->nextResult(true); if((res= m_multi_cursor->nextResult(true)))
{
goto close_scan;
}
multi_range_curr--; // Will be increased in for-loop multi_range_curr--; // Will be increased in for-loop
continue; continue;
} }
} }
else /** m_active_cursor == 0 */ else /** m_multi_cursor == 0 */
{ {
DBUG_MULTI_RANGE(7);
/** /**
* Corresponds to range 5 in example in read_multi_range_first * Corresponds to range 5 in example in read_multi_range_first
*/ */
...@@ -5002,10 +5043,9 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -5002,10 +5043,9 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
close_scan: close_scan:
if (res == 1) if (res == 1)
{ {
m_multi_range_result_ptr += reclength; m_multi_cursor->close();
m_active_cursor->close();
m_active_cursor= m_multi_cursor= 0; m_active_cursor= m_multi_cursor= 0;
DBUG_MULTI_RANGE(8);
continue; continue;
} }
else else
...@@ -5076,7 +5116,7 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr) ...@@ -5076,7 +5116,7 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
} }
} }
return 0; DBUG_RETURN(0);
} }
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -31,7 +31,6 @@ class Ndb; // Forward declaration ...@@ -31,7 +31,6 @@ class Ndb; // Forward declaration
class NdbOperation; // Forward declaration class NdbOperation; // Forward declaration
class NdbConnection; // Forward declaration class NdbConnection; // Forward declaration
class NdbRecAttr; // Forward declaration class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation; class NdbScanOperation;
class NdbIndexScanOperation; class NdbIndexScanOperation;
class NdbBlob; class NdbBlob;
...@@ -179,6 +178,7 @@ class ha_ndbcluster: public handler ...@@ -179,6 +178,7 @@ class ha_ndbcluster: public handler
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf); bool sorted, byte* buf);
int full_table_scan(byte * buf); int full_table_scan(byte * buf);
int fetch_next(NdbScanOperation* op);
int next_result(byte *buf); int next_result(byte *buf);
int define_read_attrs(byte* buf, NdbOperation* op); int define_read_attrs(byte* buf, NdbOperation* op);
int filtered_scan(const byte *key, uint key_len, int filtered_scan(const byte *key, uint key_len,
...@@ -218,7 +218,7 @@ class ha_ndbcluster: public handler ...@@ -218,7 +218,7 @@ class ha_ndbcluster: public handler
int check_ndb_connection(); int check_ndb_connection();
NdbConnection *m_active_trans; NdbConnection *m_active_trans;
NdbResultSet *m_active_cursor; NdbScanOperation *m_active_cursor;
Ndb *m_ndb; Ndb *m_ndb;
void *m_table; void *m_table;
void *m_table_info; void *m_table_info;
...@@ -256,9 +256,11 @@ class ha_ndbcluster: public handler ...@@ -256,9 +256,11 @@ class ha_ndbcluster: public handler
bool m_use_local_query_cache; bool m_use_local_query_cache;
bool m_disable_multi_read; bool m_disable_multi_read;
byte* m_multi_range_result_ptr; byte *m_multi_range_result_ptr;
uint m_multi_range_defined_count; uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation; const NdbOperation *m_current_multi_operation;
NdbIndexScanOperation *m_multi_cursor;
byte *m_multi_range_cursor_result_ptr;
int setup_recattr(const NdbRecAttr*); int setup_recattr(const NdbRecAttr*);
void set_rec_per_key(); void set_rec_per_key();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment