Commit d99c91b6 authored by joreland@mysql.com's avatar joreland@mysql.com

Fix handler w.r.t scan

parent eb863023
......@@ -25,8 +25,6 @@
#include "NdbIndexOperation.hpp"
#include "NdbIndexScanOperation.hpp"
#include "NdbScanFilter.hpp"
#include "NdbSchemaCon.hpp"
#include "NdbSchemaOp.hpp"
#include "NdbRecAttr.hpp"
#include "NdbResultSet.hpp"
#include "NdbDictionary.hpp"
......
......@@ -220,5 +220,4 @@ NdbOut& operator<<(NdbOut& ndbout, const NdbRecAttr &r)
}
return ndbout;
>>>>>>>
}
......@@ -30,8 +30,7 @@ static int
select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int parallelism,
int* count_rows,
UtilTransactions::ScanLock lock,
NdbConnection* pBuddyTrans=0);
UtilTransactions::ScanLock lock);
int main(int argc, const char** argv){
const char* _dbname = "TEST_DB";
......@@ -95,14 +94,13 @@ int
select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int parallelism,
int* count_rows,
UtilTransactions::ScanLock lock,
NdbConnection* pBuddyTrans){
UtilTransactions::ScanLock lock){
int retryAttempt = 0;
const int retryMax = 100;
int check;
NdbConnection *pTrans;
NdbOperation *pOp;
NdbScanOperation *pOp;
while (true){
......@@ -112,7 +110,7 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
return NDBT_FAILED;
}
pTrans = pNdb->hupp(pBuddyTrans);
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
......@@ -124,26 +122,27 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbOperation(pTab->getName());
pOp = pTrans->getNdbScanOperation(pTab->getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet * rs;
switch(lock){
case UtilTransactions::SL_ReadHold:
check = pOp->openScanReadHoldLock(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
break;
case UtilTransactions::SL_Exclusive:
check = pOp->openScanExclusive(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallelism);
break;
case UtilTransactions::SL_Read:
default:
check = pOp->openScanRead(parallelism);
rs = pOp->readTuples(NdbScanOperation::LM_Dirty, 0, parallelism);
}
if( check == -1 ) {
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
......@@ -156,7 +155,7 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
return NDBT_FAILED;
}
check = pTrans->executeScan();
check = pTrans->execute(NoCommit);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
......@@ -165,11 +164,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
int eof;
int rows = 0;
eof = pTrans->nextScanResult();
eof = rs->nextResult();
while(eof == 0){
rows++;
eof = pTrans->nextScanResult();
eof = rs->nextResult();
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
......
......@@ -462,10 +462,10 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN;
}
NdbCursorOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
{
return (type == TL_WRITE_ALLOW_WRITE) ?
NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read;
NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_Read;
}
static const ulong index_type_flags[]=
......@@ -795,7 +795,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
Set bounds for a ordered index scan, use key_range
*/
int ha_ndbcluster::set_bounds(NdbOperation *op,
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
const key_range *key,
int bound)
{
......@@ -843,7 +843,7 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
so if this bound was not EQ, bail out and make
a best effort attempt
*/
if (bound != NdbOperation::BoundEQ)
if (bound != NdbIndexScanOperation::BoundEQ)
break;
}
......@@ -861,7 +861,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
{
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbScanOperation *op;
NdbIndexScanOperation *op;
const char *index_name;
DBUG_ENTER("ordered_index_scan");
......@@ -869,19 +869,19 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index);
if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
if (start_key &&
set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ?
NdbOperation::BoundEQ :
NdbIndexScanOperation::BoundEQ :
(start_key->flag == HA_READ_AFTER_KEY) ?
NdbOperation::BoundLT :
NdbOperation::BoundLE))
NdbIndexScanOperation::BoundLT :
NdbIndexScanOperation::BoundLE))
DBUG_RETURN(1);
if (end_key)
......@@ -892,8 +892,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
}
else if (set_bounds(op, end_key,
(end_key->flag == HA_READ_AFTER_KEY) ?
NdbOperation::BoundGE :
NdbOperation::BoundGT))
NdbIndexScanOperation::BoundGE :
NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1);
}
DBUG_RETURN(define_read_attrs(buf, op));
......@@ -931,10 +931,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
{
// Start scan filter
NdbScanFilter sf(op);
......@@ -1000,7 +1000,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
......
......@@ -33,6 +33,8 @@ class NdbOperation; // Forward declaration
class NdbConnection; // Forward declaration
class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
......@@ -182,7 +184,7 @@ class ha_ndbcluster: public handler
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
int set_bounds(NdbOperation *ndb_op, const key_range *key,
int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *key,
int bound);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
void print_results();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment