Commit 58dc80f1 authored by unknown's avatar unknown

Fixes made after running with sql-bench


sql/ha_ndbcluster.cc:
  Added new function set_primary_key_from_old_data, used from update_row to specify the primary key of the record to update.
  Simplified index_read function, to use read_range_first
  Close the active cursor if read_range_first is called on an open cursor.
  In read_range_first, map error code from HA_ERR_KEY_NOT_FOUND to HA_ERR_END_OF_FILE.
  Remove call to function waitUntilReady from check_ndb_connection, this shuold remove the delay from transactions executed when node(s) are down.
sql/ha_ndbcluster.h:
  Added new function set_primary_key_from_old_data
parent 73fd805b
...@@ -558,6 +558,24 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) ...@@ -558,6 +558,24 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
} }
int ha_ndbcluster::set_primary_key_from_old_data(NdbOperation *op, const byte *old_data)
{
KEY* key_info= table->key_info + table->primary_key;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
DBUG_ENTER("set_primary_key_from_old_data");
for (; key_part != end; key_part++)
{
Field* field= key_part->field;
if (set_ndb_key(op, field,
key_part->fieldnr-1, old_data+key_part->offset))
ERR_RETURN(op->getNdbError());
}
DBUG_RETURN(0);
}
int ha_ndbcluster::set_primary_key(NdbOperation *op) int ha_ndbcluster::set_primary_key(NdbOperation *op)
{ {
DBUG_ENTER("set_primary_key"); DBUG_ENTER("set_primary_key");
...@@ -756,7 +774,7 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -756,7 +774,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
be sent to NDB be sent to NDB
*/ */
DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
if (ops_pending && trans->execute(NoCommit) != 0) if (ops_pending && (trans->execute(NoCommit) != 0))
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
ops_pending= 0; ops_pending= 0;
...@@ -1201,7 +1219,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1201,7 +1219,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
else else
{ {
int res; int res;
if ((res= set_primary_key(op, old_data + table->null_bytes))) if ((res= set_primary_key_from_old_data(op, old_data)))
DBUG_RETURN(res); DBUG_RETURN(res);
} }
} }
...@@ -1498,59 +1516,12 @@ int ha_ndbcluster::index_read(byte *buf, ...@@ -1498,59 +1516,12 @@ int ha_ndbcluster::index_read(byte *buf,
DBUG_ENTER("index_read"); DBUG_ENTER("index_read");
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag)); active_index, key_len, find_flag));
KEY* key_info;
int error= 1;
statistic_increment(ha_read_key_count, &LOCK_status);
if (m_active_cursor) key_range start_key;
close_scan(); start_key.key= key;
start_key.length= key_len;
switch (get_index_type(active_index)){ start_key.flag= find_flag;
case PRIMARY_KEY_INDEX: DBUG_RETURN(read_range_first(&start_key, NULL, true));
key_info= table->key_info + active_index;
if (key_len == key_info->key_length &&
find_flag == HA_READ_KEY_EXACT)
error= pk_read(key, key_len, buf);
else
{
key_range start_key;
start_key.key= key;
start_key.length= key_len;
start_key.flag= find_flag;
error= ordered_index_scan(&start_key, 0, false, buf);
}
break;
case UNIQUE_INDEX:
key_info= table->key_info + active_index;
if (key_len == key_info->key_length &&
find_flag == HA_READ_KEY_EXACT)
error= unique_index_read(key, key_len, buf);
else
{
key_range start_key;
start_key.key= key;
start_key.length= key_len;
start_key.flag= find_flag;
error= ordered_index_scan(&start_key, 0, false, buf);
break;
}
break;
case ORDERED_INDEX:
key_range start_key;
start_key.key= key;
start_key.length= key_len;
start_key.flag= find_flag;
error= ordered_index_scan(&start_key, 0, false, buf);
break;
default:
case UNDEFINED_INDEX:
break;
}
DBUG_RETURN(error);
} }
...@@ -1610,28 +1581,39 @@ int ha_ndbcluster::read_range_first(const key_range *start_key, ...@@ -1610,28 +1581,39 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
DBUG_ENTER("ha_ndbcluster::read_range_first"); DBUG_ENTER("ha_ndbcluster::read_range_first");
DBUG_PRINT("info", ("sorted: %d", sorted)); DBUG_PRINT("info", ("sorted: %d", sorted));
if (m_active_cursor)
close_scan();
switch (get_index_type(active_index)){ switch (get_index_type(active_index)){
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
key_info= table->key_info + active_index; key_info= table->key_info + active_index;
if (start_key && if (start_key &&
start_key->length == key_info->key_length && start_key->length == key_info->key_length &&
start_key->flag == HA_READ_KEY_EXACT) start_key->flag == HA_READ_KEY_EXACT)
DBUG_RETURN(pk_read(start_key->key, start_key->length, buf)); {
error= pk_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
break; break;
case UNIQUE_INDEX: case UNIQUE_INDEX:
key_info= table->key_info + active_index; key_info= table->key_info + active_index;
if (start_key && if (start_key &&
start_key->length == key_info->key_length && start_key->length == key_info->key_length &&
start_key->flag == HA_READ_KEY_EXACT) start_key->flag == HA_READ_KEY_EXACT)
DBUG_RETURN(unique_index_read(start_key->key, start_key->length, buf)); {
error= unique_index_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
break; break;
default: default:
break; break;
} }
// Start the ordered index scan and fetch the first row // Start the ordered index scan and fetch the first row
error= ordered_index_scan(start_key, end_key, sorted, error= ordered_index_scan(start_key, end_key, sorted,
buf); buf);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -2824,11 +2806,6 @@ int ha_ndbcluster::check_ndb_connection() ...@@ -2824,11 +2806,6 @@ int ha_ndbcluster::check_ndb_connection()
} }
m_ndb= (Ndb*)thd->transaction.ndb; m_ndb= (Ndb*)thd->transaction.ndb;
m_ndb->setDatabaseName(m_dbname); m_ndb->setDatabaseName(m_dbname);
if (m_ndb->waitUntilReady() != 0)
{
DBUG_PRINT("error", ("Ndb was not ready"));
DBUG_RETURN(3);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3118,6 +3095,7 @@ ha_ndbcluster::records_in_range(int inx, ...@@ -3118,6 +3095,7 @@ ha_ndbcluster::records_in_range(int inx,
records= 1; records= 1;
} }
#endif #endif
DBUG_PRINT("exit", ("records: %d", records));
DBUG_RETURN(records); DBUG_RETURN(records);
} }
......
...@@ -181,6 +181,7 @@ class ha_ndbcluster: public handler ...@@ -181,6 +181,7 @@ class ha_ndbcluster: public handler
int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op); int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
int set_bounds(NdbOperation *ndb_op, const key_range *key, int set_bounds(NdbOperation *ndb_op, const key_range *key,
int bound); int bound);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment