Commit 34b97fc3 authored by unknown's avatar unknown

wl2126 - ndb - bug fixes


ndb/include/Makefile.am:
  remove result set
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Fix so that noOfFiredTriggers aren't overwritten during scans
sql/ha_ndbcluster.cc:
  1) Fix bugs in completed read_multi_range
  2) Change buffer handling so range scan are always put last in buffer
     This to support restart of scan
sql/ha_ndbcluster.h:
  1) Fix bugs in completed read_multi_range
  2) Change buffer handling so range scan are always put last in buffer
     This to support restart of scan
parent 2cae258f
......@@ -21,7 +21,6 @@ ndbapi/NdbBlob.hpp \
ndbapi/NdbPool.hpp \
ndbapi/NdbRecAttr.hpp \
ndbapi/NdbReceiver.hpp \
ndbapi/NdbResultSet.hpp \
ndbapi/NdbScanFilter.hpp \
ndbapi/NdbScanOperation.hpp \
ndbapi/NdbIndexScanOperation.hpp \
......
......@@ -2647,11 +2647,11 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
jamEntry();
tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
tupkeyConfLab(signal);
break;
case TcConnectionrec::COPY_TUPKEY:
......
......@@ -1162,22 +1162,22 @@ int ha_ndbcluster::peek_row()
NdbOperation *op;
THD *thd= current_thd;
DBUG_ENTER("peek_row");
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res;
if ((res= set_primary_key(op)))
ERR_RETURN(trans->getNdbError());
if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
DBUG_RETURN(0);
}
......@@ -1221,12 +1221,11 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_RETURN(0);
}
inline int ha_ndbcluster::fetch_next()
inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{
DBUG_ENTER("fetch_next");
int check;
NdbConnection *trans= m_active_trans;
NdbScanOperation *cursor= m_active_cursor;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
do {
......@@ -1262,12 +1261,12 @@ inline int ha_ndbcluster::fetch_next()
if (m_transaction_on)
{
if (execute_no_commit(this,trans) != 0)
return -1;
DBUG_RETURN(-1);
}
else
{
if (execute_commit(this,trans) != 0)
return -1;
DBUG_RETURN(-1);
int res= trans->restart();
DBUG_ASSERT(res == 0);
}
......@@ -1275,8 +1274,12 @@ inline int ha_ndbcluster::fetch_next()
}
contact_ndb= (check == 2);
}
else
{
DBUG_RETURN(-1);
}
} while (check == 2);
DBUG_RETURN(1);
}
......@@ -1296,7 +1299,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
int res;
DBUG_ENTER("next_result");
if((res= fetch_next()) == 0)
if((res= fetch_next(m_active_cursor)) == 0)
{
DBUG_PRINT("info", ("One more record found"));
......@@ -2529,13 +2532,14 @@ int ha_ndbcluster::rnd_init(bool scan)
int ha_ndbcluster::close_scan()
{
NdbScanOperation *cursor= m_active_cursor;
NdbConnection *trans= m_active_trans;
DBUG_ENTER("close_scan");
if (!cursor)
m_multi_cursor= 0;
if (!m_active_cursor && !m_multi_cursor)
DBUG_RETURN(1);
NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
if (m_ops_pending)
{
......@@ -2552,7 +2556,7 @@ int ha_ndbcluster::close_scan()
}
cursor->close(m_force_send);
m_active_cursor= NULL;
m_active_cursor= m_multi_cursor= NULL;
DBUG_RETURN(0);
}
......@@ -3132,6 +3136,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_PRINT("warning", ("m_active_cursor != NULL"));
m_active_cursor= NULL;
if (m_multi_cursor)
DBUG_PRINT("warning", ("m_multi_cursor != NULL"));
m_multi_cursor= NULL;
if (m_blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0"));
m_blobs_pending= 0;
......@@ -3776,6 +3784,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
handler(table_arg),
m_active_trans(NULL),
m_active_cursor(NULL),
m_multi_cursor(NULL),
m_ndb(NULL),
m_table(NULL),
m_table_info(NULL),
......@@ -4776,7 +4785,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
handler_buffer *buffer)
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
int res;
uint i;
KEY* key_info= table->key_info + active_index;
......@@ -4822,7 +4831,8 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
/**
* Variables for loop
*/
byte* curr= (byte*)buffer->buffer;
byte *curr= (byte*)buffer->buffer;
byte *end_of_buffer= (byte*)buffer->buffer_end;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table;
......@@ -4830,7 +4840,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
NdbIndexScanOperation* scanOp= 0;
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
for(i= 0; i<range_count && curr+reclength <= end_of_buffer; i++)
{
switch(index_type){
case PRIMARY_KEY_INDEX:
......@@ -4879,15 +4889,29 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ranges[i].range_flag &= ~(uint)UNIQUE_RANGE;
if (scanOp == 0)
{
if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) &&
!scanOp->readTuples(lm, 0, parallelism, sorted, true) &&
!define_read_attrs(curr, scanOp))
curr += reclength;
if (m_multi_cursor)
{
scanOp= m_multi_cursor;
DBUG_ASSERT(scanOp->getSorted() == sorted);
DBUG_ASSERT(scanOp->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(scanOp->reset_bounds(m_force_send))
DBUG_RETURN(ndb_err(m_active_trans));
end_of_buffer -= reclength;
}
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&& !scanOp->readTuples(lm, 0, parallelism, sorted, true) &&
!define_read_attrs(end_of_buffer-reclength, scanOp))
{
m_multi_cursor= scanOp;
m_multi_range_cursor_result_ptr= end_of_buffer-reclength;
}
else
{
ERR_RETURN(scanOp ? scanOp->getNdbError() :
m_active_trans->getNdbError());
m_multi_cursor= scanOp;
m_multi_range_cursor_result_ptr= curr;
}
}
const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key };
if ((res= set_bounds(scanOp, keys, i)))
......@@ -4926,6 +4950,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
ERR_RETURN(m_active_trans->getNdbError());
}
#if 0
#define DBUG_MULTI_RANGE(x) printf("read_multi_range_next: case %d\n", x);
#else
#define DBUG_MULTI_RANGE(x)
#endif
int
ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
{
......@@ -4952,8 +4982,10 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
}
else if (m_multi_cursor && !multi_range_sorted)
{
if ((res= fetch_next() == 0))
DBUG_MULTI_RANGE(1);
if ((res= fetch_next(m_multi_cursor)) == 0)
{
DBUG_MULTI_RANGE(2);
range_no= m_multi_cursor->get_range_no();
goto found;
}
......@@ -4962,35 +4994,44 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
goto close_scan;
}
}
else if (multi_range_sorted)
else if (m_multi_cursor && multi_range_sorted)
{
DBUG_ASSERT(m_multi_cursor);
if (m_active_cursor && (res= fetch_next()))
if (m_active_cursor && (res= fetch_next(m_multi_cursor)))
{
DBUG_MULTI_RANGE(3);
goto close_scan;
}
range_no= m_multi_cursor->get_range_no();
if (range_no == multi_range_curr)
{
DBUG_MULTI_RANGE(4);
// return current row
goto found;
}
else if (range_no > multi_range_curr)
else if (range_no > (int)multi_range_curr)
{
DBUG_MULTI_RANGE(5);
// wait with current row
m_active_cursor= 0;
continue;
}
else
{
DBUG_MULTI_RANGE(6);
// First fetch from cursor
DBUG_ASSERT(range_no == -1);
m_multi_cursor->nextResult(true);
if((res= m_multi_cursor->nextResult(true)))
{
goto close_scan;
}
multi_range_curr--; // Will be increased in for-loop
continue;
}
}
else /** m_active_cursor == 0 */
else /** m_multi_cursor == 0 */
{
DBUG_MULTI_RANGE(7);
/**
* Corresponds to range 5 in example in read_multi_range_first
*/
......@@ -5002,10 +5043,9 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
close_scan:
if (res == 1)
{
m_multi_range_result_ptr += reclength;
m_active_cursor->close();
m_multi_cursor->close();
m_active_cursor= m_multi_cursor= 0;
DBUG_MULTI_RANGE(8);
continue;
}
else
......@@ -5076,7 +5116,7 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
}
}
return 0;
DBUG_RETURN(0);
}
#endif /* HAVE_NDBCLUSTER_DB */
......@@ -31,7 +31,6 @@ class Ndb; // Forward declaration
class NdbOperation; // Forward declaration
class NdbConnection; // Forward declaration
class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbBlob;
......@@ -179,6 +178,7 @@ class ha_ndbcluster: public handler
const key_range *end_key,
bool sorted, byte* buf);
int full_table_scan(byte * buf);
int fetch_next(NdbScanOperation* op);
int next_result(byte *buf);
int define_read_attrs(byte* buf, NdbOperation* op);
int filtered_scan(const byte *key, uint key_len,
......@@ -218,7 +218,7 @@ class ha_ndbcluster: public handler
int check_ndb_connection();
NdbConnection *m_active_trans;
NdbResultSet *m_active_cursor;
NdbScanOperation *m_active_cursor;
Ndb *m_ndb;
void *m_table;
void *m_table_info;
......@@ -256,11 +256,13 @@ class ha_ndbcluster: public handler
bool m_use_local_query_cache;
bool m_disable_multi_read;
byte* m_multi_range_result_ptr;
byte *m_multi_range_result_ptr;
uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation;
const NdbOperation *m_current_multi_operation;
NdbIndexScanOperation *m_multi_cursor;
byte *m_multi_range_cursor_result_ptr;
int setup_recattr(const NdbRecAttr*);
void set_rec_per_key();
void records_update();
void no_uncommitted_rows_execute_failure();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment