Commit 1c75becb authored by joreland@mysql.com's avatar joreland@mysql.com

wl#2126 - read_multi_range ndb part

parent 2a44929c
......@@ -430,6 +430,10 @@ public:
*/
const NdbOperation * getNextCompletedOperation(const NdbOperation * op)const;
const NdbOperation* getFirstDefinedOperation()const{return theFirstOpInList;}
const NdbOperation* getLastDefinedOperation()const{return theLastOpInList;}
/** @} *********************************************************************/
/**
......
......@@ -718,6 +718,7 @@ public:
};
LockMode getLockMode() const { return theLockMode; }
void setAbortOption(Int8 ao) { m_abortOption = ao; }
/**
* Set/get distribution/partition key
......@@ -746,10 +747,13 @@ protected:
void initInterpreter();
void next(NdbOperation*); // Set next pointer
NdbOperation* next(); // Get next pointer
public:
const NdbOperation* next() const;
protected:
enum OperationStatus{
enum OperationStatus
{
Init,
OperationDefined,
TupleKeyDefined,
......@@ -995,6 +999,12 @@ NdbOperation::next()
return theNext;
}
inline
const NdbOperation*
NdbOperation::next() const
{
return theNext;
}
/******************************************************************************
OperationStatus Status();
......
......@@ -232,21 +232,6 @@ Remark: Handle time-out on a transaction object.
void
NdbConnection::handleExecuteCompletion()
{
if (theCompletionStatus == CompletedFailure) {
NdbOperation* tOpTemp = theFirstExecOpInList;
while (tOpTemp != NULL) {
/*****************************************************************************
* Ensure that all executing operations report failed for each
* read attribute when failure occurs.
* We do not want any operations to report both failure and
* success on different read attributes.
****************************************************************************/
tOpTemp->handleFailedAI_ElemLen();
tOpTemp = tOpTemp->next();
}//while
theReturnStatus = ReturnFailure;
}//if
/***************************************************************************
* Move the NdbOperation objects from the list of executing
* operations to list of completed
......@@ -1512,6 +1497,7 @@ transactions.
/**********************************************************************/
theCompletionStatus = CompletedFailure;
theCommitStatus = Aborted;
theReturnStatus = ReturnFailure;
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
......
......@@ -543,10 +543,11 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
theStatus = Finished;
// blobs want this
if (m_abortOption != IgnoreError)
{
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
}
theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), m_abortOption);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), ao);
if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read
return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError);
......
......@@ -55,8 +55,9 @@ static const char *ha_ndb_ext=".ndb";
#define ERR_RETURN(err) \
{ \
ERR_PRINT(err); \
DBUG_RETURN(ndb_to_mysql_error(&err)); \
const NdbError& tmp= err; \
ERR_PRINT(tmp); \
DBUG_RETURN(ndb_to_mysql_error(&tmp)); \
}
// Typedefs for long names
......@@ -980,13 +981,14 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op)
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
{
int res;
DBUG_ENTER("pk_read");
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
uint no_fields= table->fields, i;
NdbConnection *trans= m_active_trans;
NdbOperation *op;
THD *thd= current_thd;
DBUG_ENTER("pk_read");
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
......@@ -1008,27 +1010,12 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
}
else
{
int res;
if ((res= set_primary_key(op, key)))
return res;
}
// Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
m_retrieve_all_fields)
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(trans->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i].ptr= NULL;
}
}
if((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0)
{
......@@ -1042,7 +1029,6 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_RETURN(0);
}
/*
Read one complementing record from NDB using primary key from old_data
*/
......@@ -1101,6 +1087,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf)
{
int res;
NdbConnection *trans= m_active_trans;
NdbIndexOperation *op;
THD *thd= current_thd;
......@@ -1133,22 +1120,8 @@ int ha_ndbcluster::unique_index_read(const byte *key,
key_ptr+= key_part->length;
}
// Get non-index attribute(s)
for (i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i].ptr= NULL;
}
}
if((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0)
{
......@@ -1446,11 +1419,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (get_ndb_value(op, NULL, hidden_no, NULL))
ERR_RETURN(op->getNdbError());
}
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
DBUG_RETURN(0);
}
/*
......@@ -1461,6 +1430,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
int res;
bool restart;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
......@@ -1497,22 +1467,20 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
{
const key_range *keys[2]= { start_key, end_key };
int ret= set_bounds(op, keys);
if (ret)
DBUG_RETURN(ret);
res= set_bounds(op, keys);
if (res)
DBUG_RETURN(res);
}
if (!restart)
if (!restart && (res= define_read_attrs(buf, op)))
{
DBUG_RETURN(define_read_attrs(buf, op));
DBUG_RETURN(res);
}
else
{
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
}
}
/*
......@@ -1533,6 +1501,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
byte *buf,
enum ha_rkey_function find_flag)
{
int res;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbScanOperation *op;
......@@ -1596,9 +1565,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf.end();
}
DBUG_RETURN(define_read_attrs(buf, op));
}
if((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
}
/*
Start full table scan in NDB
......@@ -1607,6 +1581,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
int ha_ndbcluster::full_table_scan(byte *buf)
{
uint i;
int res;
NdbResultSet *cursor;
NdbScanOperation *op;
NdbConnection *trans= m_active_trans;
......@@ -1620,7 +1595,14 @@ int ha_ndbcluster::full_table_scan(byte *buf)
!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
if((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
}
/*
......@@ -4554,4 +4536,171 @@ int ha_ndbcluster::write_ndb_file()
DBUG_RETURN(error);
}
int
ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges,
uint range_count,
bool sorted,
handler_buffer *buffer)
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
int res;
uint i;
KEY* key_info= table->key_info + active_index;
NDB_INDEX_TYPE index_type = get_index_type(active_index);
ulong reclength = table->reclength;
NdbOperation* op;
switch(index_type){
case UNIQUE_INDEX:
case PRIMARY_KEY_INDEX:
break;
case PRIMARY_KEY_ORDERED_INDEX:
case UNIQUE_ORDERED_INDEX:
/**
* Currently not supported on ordered indexes
*/
for(i= 0; i<range_count; i++)
{
if (ranges[i].start_key.length == key_info->key_length &&
ranges[i].start_key.flag == HA_READ_KEY_EXACT)
continue;
/**
* Mark that we using hander:: implementation
*/
m_disable_multi_read= true;
return handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer);
}
break;
default:
case ORDERED_INDEX:
m_disable_multi_read= true;
return handler::read_multi_range_first(found_range_p,
ranges,
range_count,
sorted,
buffer);
}
m_disable_multi_read= false;
multi_ranges= ranges;
multi_range_count= range_count;
multi_range_sorted= sorted;
multi_range_buffer= buffer;
multi_range_found_p= found_range_p;
byte* curr = buffer->buffer;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NdbOperation* lastOp = m_active_trans->getLastDefinedOperation();
switch(index_type){
case PRIMARY_KEY_INDEX:
case PRIMARY_KEY_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{
if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(IgnoreError), true))
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
}
break;
case UNIQUE_INDEX:
case UNIQUE_ORDERED_INDEX:
for(i= 0; i<range_count && curr+reclength <= buffer->buffer_end; i++)
{
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) &&
!set_primary_key(op, ranges[i].start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(IgnoreError), true))
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
}
}
if(i != range_count)
{
buffer->end_of_used_area= buffer->buffer_end;
}
else
{
buffer->end_of_used_area= curr;
}
/**
* Set first operation in multi range
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if(!(res= execute_no_commit_ie(this, m_active_trans)))
{
multi_range_curr= 0;
m_multi_range_result_ptr= buffer->buffer;
return read_multi_range_next();
}
ERR_RETURN(m_active_trans->getNdbError());
}
int
ha_ndbcluster::read_multi_range_next()
{
DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
if(m_disable_multi_read)
DBUG_RETURN(handler::read_multi_range_next());
ulong reclength = table->reclength;
const NdbOperation* op = m_current_multi_operation;
while(multi_range_curr < multi_range_count && op && op->getNdbError().code)
{
multi_range_curr++;
op = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
}
if(multi_range_curr < multi_range_count && op)
{
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
unpack_record(table->record[0]);
table->status= 0;
/**
* Move to next
*/
multi_range_curr++;
m_current_multi_operation = m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
DBUG_RETURN(0);
}
if(multi_range_curr == multi_range_count)
{
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
/**
* Read remaining ranges
*/
uint left = multi_range_count - multi_range_curr;
DBUG_RETURN(read_multi_range_first(multi_range_found_p,
multi_ranges + multi_range_curr,
left,
multi_range_sorted,
multi_range_buffer));
}
#endif /* HAVE_NDBCLUSTER_DB */
......@@ -111,6 +111,14 @@ class ha_ndbcluster: public handler
byte* buf);
int read_range_next();
/**
* Multi range stuff
*/
int read_multi_range_first(key_multi_range **found_range_p,
key_multi_range *ranges, uint range_count,
bool sorted, handler_buffer *buffer);
int read_multi_range_next(void);
bool get_error_message(int error, String *buf);
void info(uint);
int extra(enum ha_extra_function operation);
......@@ -257,6 +265,10 @@ class ha_ndbcluster: public handler
uint32 m_blobs_buffer_size;
uint m_dupkey;
bool m_disable_multi_read;
byte* m_multi_range_result_ptr;
const NdbOperation* m_current_multi_operation;
void set_rec_per_key();
void records_update();
void no_uncommitted_rows_execute_failure();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment