Fix for bug #21059 Server crashes on join query with large dataset with NDB...

Fix for bug #21059  Server crashes on join query with large dataset with NDB tables: do not release operation records for on-going read_multi_range
parent 66e82554
...@@ -228,14 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err) ...@@ -228,14 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err)
inline inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{ {
#ifdef NOT_USED #ifdef NOT_USED
int m_batch_execute= 0; int m_batch_execute= 0;
if (m_batch_execute) if (m_batch_execute)
return 0; return 0;
#endif #endif
h->release_completed_operations(trans); h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit, return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError, NdbTransaction::AbortOnError,
h->m_force_send); h->m_force_send);
...@@ -268,14 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) ...@@ -268,14 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans)
} }
inline inline
int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{ {
#ifdef NOT_USED #ifdef NOT_USED
int m_batch_execute= 0; int m_batch_execute= 0;
if (m_batch_execute) if (m_batch_execute)
return 0; return 0;
#endif #endif
h->release_completed_operations(trans); h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit, return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError, NdbTransaction::AO_IgnoreError,
h->m_force_send); h->m_force_send);
...@@ -292,6 +294,7 @@ Thd_ndb::Thd_ndb() ...@@ -292,6 +294,7 @@ Thd_ndb::Thd_ndb()
all= NULL; all= NULL;
stmt= NULL; stmt= NULL;
error= 0; error= 0;
query_state&= NDB_QUERY_NORMAL;
} }
Thd_ndb::~Thd_ndb() Thd_ndb::~Thd_ndb()
...@@ -1445,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -1445,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if ((res= define_read_attrs(buf, op))) if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res); DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -1492,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -1492,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
} }
} }
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -1632,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) ...@@ -1632,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
} }
last= trans->getLastDefinedOperation(); last= trans->getLastDefinedOperation();
if (first) if (first)
res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false);
else else
{ {
// Table has no keys // Table has no keys
...@@ -1681,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1681,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((res= define_read_attrs(buf, op))) if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res); DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0)
{ {
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
...@@ -1729,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -1729,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
*/ */
if (m_ops_pending && m_blobs_pending) if (m_ops_pending && m_blobs_pending)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
m_ops_pending= 0; m_ops_pending= 0;
m_blobs_pending= FALSE; m_blobs_pending= FALSE;
...@@ -1761,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) ...@@ -1761,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{ {
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
else else
...@@ -2065,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -2065,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN(res); DBUG_RETURN(res);
} }
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
...@@ -2098,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -2098,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if ((res= define_read_attrs(buf, op))) if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res); DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully")); DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
...@@ -2230,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2230,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record)
m_bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0)
{ {
m_skip_auto_increment= TRUE; m_skip_auto_increment= TRUE;
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
...@@ -2430,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2430,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
} }
// Execute update operation // Execute update operation
if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -2501,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -2501,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record)
} }
// Execute delete operation // Execute delete operation
if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -2942,7 +2945,7 @@ int ha_ndbcluster::close_scan() ...@@ -2942,7 +2945,7 @@ int ha_ndbcluster::close_scan()
// Lock row // Lock row
DBUG_PRINT("info", ("Keeping lock on scanned row")); DBUG_PRINT("info", ("Keeping lock on scanned row"));
if (!(op= cursor->lockTuple())) if (!(op= cursor->lockCurrentTuple()))
{ {
m_lock_tuple= false; m_lock_tuple= false;
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -2957,7 +2960,7 @@ int ha_ndbcluster::close_scan() ...@@ -2957,7 +2960,7 @@ int ha_ndbcluster::close_scan()
deleteing/updating transaction before closing the scan deleteing/updating transaction before closing the scan
*/ */
DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -3367,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert() ...@@ -3367,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert()
m_bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on) if (m_transaction_on)
{ {
if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0)
{ {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
my_errno= error= ndb_err(trans); my_errno= error= ndb_err(trans);
...@@ -3540,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3540,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError()); ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd); no_uncommitted_rows_reset(thd);
thd_ndb->stmt= trans; thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, FALSE, &ndbcluster_hton); trans_register_ha(thd, FALSE, &ndbcluster_hton);
} }
else else
...@@ -3555,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3555,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError()); ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd); no_uncommitted_rows_reset(thd);
thd_ndb->all= trans; thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, TRUE, &ndbcluster_hton); trans_register_ha(thd, TRUE, &ndbcluster_hton);
/* /*
...@@ -3761,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) ...@@ -3761,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
thd_ndb->stmt= trans; thd_ndb->stmt= trans;
trans_register_ha(thd, FALSE, &ndbcluster_hton); trans_register_ha(thd, FALSE, &ndbcluster_hton);
} }
thd_ndb->query_state&= NDB_QUERY_NORMAL;
m_active_trans= trans; m_active_trans= trans;
// Start of statement // Start of statement
...@@ -6009,7 +6015,8 @@ int ha_ndbcluster::write_ndb_file() ...@@ -6009,7 +6015,8 @@ int ha_ndbcluster::write_ndb_file()
} }
void void
ha_ndbcluster::release_completed_operations(NdbConnection *trans) ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
bool force_release)
{ {
if (trans->hasBlobOperation()) if (trans->hasBlobOperation())
{ {
...@@ -6018,7 +6025,19 @@ ha_ndbcluster::release_completed_operations(NdbConnection *trans) ...@@ -6018,7 +6025,19 @@ ha_ndbcluster::release_completed_operations(NdbConnection *trans)
*/ */
return; return;
} }
if (!force_release)
{
if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE)
{
/* We are batching reads and have not consumed all fetched
rows yet, releasing operation records is unsafe
*/
return;
}
}
trans->releaseCompletedOperations(); trans->releaseCompletedOperations();
}
int int
ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
KEY_MULTI_RANGE *ranges, KEY_MULTI_RANGE *ranges,
...@@ -6033,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -6033,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NDB_INDEX_TYPE index_type= get_index_type(active_index); NDB_INDEX_TYPE index_type= get_index_type(active_index);
ulong reclength= table->s->reclength; ulong reclength= table->s->reclength;
NdbOperation* op; NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
if (uses_blob_value(m_retrieve_all_fields)) if (uses_blob_value(m_retrieve_all_fields))
{ {
...@@ -6046,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -6046,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
sorted, sorted,
buffer)); buffer));
} }
thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE;
m_disable_multi_read= FALSE; m_disable_multi_read= FALSE;
/** /**
...@@ -6193,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ...@@ -6193,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
*/ */
m_current_multi_operation= m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true)))
{ {
m_multi_range_defined= multi_range_curr; m_multi_range_defined= multi_range_curr;
multi_range_curr= ranges; multi_range_curr= ranges;
......
...@@ -435,6 +435,11 @@ class Ndb_cond_traverse_context ...@@ -435,6 +435,11 @@ class Ndb_cond_traverse_context
Ndb_rewrite_context *rewrite_stack; Ndb_rewrite_context *rewrite_stack;
}; };
typedef enum ndb_query_state_bits {
NDB_QUERY_NORMAL = 0,
NDB_QUERY_MULTI_READ_RANGE = 1
} NDB_QUERY_STATE_BITS;
/* /*
Place holder for ha_ndbcluster thread specific data Place holder for ha_ndbcluster thread specific data
*/ */
...@@ -451,6 +456,7 @@ class Thd_ndb ...@@ -451,6 +456,7 @@ class Thd_ndb
NdbTransaction *stmt; NdbTransaction *stmt;
int error; int error;
List<NDB_SHARE> changed_tables; List<NDB_SHARE> changed_tables;
uint query_state;
}; };
class ha_ndbcluster: public handler class ha_ndbcluster: public handler
...@@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname);
NdbScanOperation* op); NdbScanOperation* op);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
NdbTransaction *m_active_trans; NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor; NdbScanOperation *m_active_cursor;
...@@ -716,7 +722,7 @@ static void set_tabname(const char *pathname, char *tabname); ...@@ -716,7 +722,7 @@ static void set_tabname(const char *pathname, char *tabname);
bool m_force_send; bool m_force_send;
ha_rows m_autoincrement_prefetch; ha_rows m_autoincrement_prefetch;
bool m_transaction_on; bool m_transaction_on;
void release_completed_operations(NdbConnection*); void release_completed_operations(NdbTransaction*, bool);
Ndb_cond_stack *m_cond_stack; Ndb_cond_stack *m_cond_stack;
bool m_disable_multi_read; bool m_disable_multi_read;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment