Commit 8bee09aa authored by unknown's avatar unknown

Fix for bug #21059 Server crashes on join query with large dataset with NDB...

Fix for bug #21059  Server crashes on join query with large dataset with NDB tables: Releasing operation for each intermediate batch, before next call to trans->execute(NoCommit);

parent c06c3952
......@@ -228,13 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err)
inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{
#ifdef NOT_USED
int m_batch_execute= 0;
if (m_batch_execute)
return 0;
#endif
h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
h->m_force_send);
......@@ -267,13 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans)
}
inline
int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans)
int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{
#ifdef NOT_USED
int m_batch_execute= 0;
if (m_batch_execute)
return 0;
#endif
h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError,
h->m_force_send);
......@@ -290,6 +294,7 @@ Thd_ndb::Thd_ndb()
all= NULL;
stmt= NULL;
error= 0;
query_state&= NDB_QUERY_NORMAL;
}
Thd_ndb::~Thd_ndb()
......@@ -1442,7 +1447,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0)
if (execute_no_commit_ie(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
......@@ -1489,7 +1494,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
ERR_RETURN(trans->getNdbError());
}
}
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
......@@ -1629,7 +1634,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
}
last= trans->getLastDefinedOperation();
if (first)
res= execute_no_commit_ie(this,trans);
res= execute_no_commit_ie(this,trans,false);
else
{
// Table has no keys
......@@ -1678,7 +1683,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans) != 0)
if (execute_no_commit_ie(this,trans,false) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
......@@ -1726,7 +1731,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
*/
if (m_ops_pending && m_blobs_pending)
{
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
m_ops_pending= 0;
m_blobs_pending= FALSE;
......@@ -1758,7 +1763,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
{
if (m_transaction_on)
{
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(-1);
}
else
......@@ -2062,7 +2067,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN(res);
}
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(next_result(buf));
......@@ -2095,7 +2100,7 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
......@@ -2227,7 +2232,7 @@ int ha_ndbcluster::write_row(byte *record)
m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on)
{
if (execute_no_commit(this,trans) != 0)
if (execute_no_commit(this,trans,false) != 0)
{
m_skip_auto_increment= TRUE;
no_uncommitted_rows_execute_failure();
......@@ -2426,7 +2431,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
// Execute update operation
if (!cursor && execute_no_commit(this,trans) != 0) {
if (!cursor && execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
......@@ -2497,7 +2502,7 @@ int ha_ndbcluster::delete_row(const byte *record)
}
// Execute delete operation
if (execute_no_commit(this,trans) != 0) {
if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
......@@ -2931,7 +2936,7 @@ int ha_ndbcluster::close_scan()
deleteing/updating transaction before closing the scan
*/
DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
if (execute_no_commit(this,trans) != 0) {
if (execute_no_commit(this,trans,false) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
......@@ -3337,7 +3342,7 @@ int ha_ndbcluster::end_bulk_insert()
m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on)
{
if (execute_no_commit(this, trans) != 0)
if (execute_no_commit(this, trans,false) != 0)
{
no_uncommitted_rows_execute_failure();
my_errno= error= ndb_err(trans);
......@@ -3510,6 +3515,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, FALSE, &ndbcluster_hton);
}
else
......@@ -3525,6 +3531,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
ERR_RETURN(ndb->getNdbError());
no_uncommitted_rows_reset(thd);
thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
trans_register_ha(thd, TRUE, &ndbcluster_hton);
/*
......@@ -3731,6 +3738,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
thd_ndb->stmt= trans;
trans_register_ha(thd, FALSE, &ndbcluster_hton);
}
thd_ndb->query_state&= NDB_QUERY_NORMAL;
m_active_trans= trans;
// Start of statement
......@@ -5969,6 +5977,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NDB_INDEX_TYPE index_type= get_index_type(active_index);
ulong reclength= table->s->reclength;
NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
if (uses_blob_value(m_retrieve_all_fields))
{
......@@ -5982,7 +5991,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
sorted,
buffer));
}
thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE;
m_disable_multi_read= FALSE;
/**
......@@ -6129,7 +6138,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
if (!(res= execute_no_commit_ie(this, m_active_trans)))
if (!(res= execute_no_commit_ie(this, m_active_trans,true)))
{
m_multi_range_defined= multi_range_curr;
multi_range_curr= ranges;
......@@ -7831,6 +7840,24 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
DBUG_RETURN(0);
}
void
ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
bool force_release)
{
if (!force_release)
{
if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE)
{
/* We are batching reads and have not consumed all fetched
rows yet, releasing operation records is unsafe
*/
return;
}
}
trans->releaseCompletedOperations();
}
int
ndbcluster_show_status(THD* thd)
{
......
......@@ -435,6 +435,12 @@ class Ndb_cond_traverse_context
Ndb_rewrite_context *rewrite_stack;
};
typedef enum ndb_query_state_bits {
NDB_QUERY_NORMAL = 0,
NDB_QUERY_MULTI_READ_RANGE = 1
} NDB_QUERY_STATE_BITS;
/*
Place holder for ha_ndbcluster thread specific data
*/
......@@ -451,6 +457,7 @@ class Thd_ndb
NdbTransaction *stmt;
int error;
List<NDB_SHARE> changed_tables;
uint query_state;
};
class ha_ndbcluster: public handler
......@@ -657,6 +664,8 @@ static void set_tabname(const char *pathname, char *tabname);
void no_uncommitted_rows_init(THD *);
void no_uncommitted_rows_reset(THD *);
void release_completed_operations(NdbTransaction*, bool);
/*
Condition pushdown
*/
......@@ -672,8 +681,8 @@ static void set_tabname(const char *pathname, char *tabname);
NdbScanOperation* op);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment