Commit ef7a2344 authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Julius Goryavsky

Fixup 0ccdf54b

0ccdf54b removed stack allocated THD objects from functions
Wsrep_schema::replay_transaction(). However, it inadvertedly
anticipated the destruction of the THD, causing assertions and usage
of THD after it was destroyed.
The fix consists in extracting the original function into a separate
function, and leave the allocation and destruction of the THD object
in Wsrep_schema::replay_transaction(), making sure that using the heap
allocated THD has no side effects.
Same for Wsrep_schema::recover_sr_transactions().
Signed-off-by: default avatarJulius Goryavsky <julius.goryavsky@mariadb.com>
parent 0936c138
...@@ -1244,25 +1244,12 @@ int Wsrep_schema::remove_fragments(THD* thd, ...@@ -1244,25 +1244,12 @@ int Wsrep_schema::remove_fragments(THD* thd,
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
int Wsrep_schema::replay_transaction(THD* orig_thd, static int replay_transaction(THD* thd,
Relay_log_info* rli, THD* orig_thd,
const wsrep::ws_meta& ws_meta, Relay_log_info* rli,
const std::vector<wsrep::seqno>& fragments) const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
{ {
DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty());
THD *thd= new THD(next_thread_id(), true);
if (!thd)
{
WSREP_WARN("Could not open allocate memory for THD");
DBUG_RETURN(1);
}
thd->thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &thd);
wsrep_assign_from_threadvars(thd);
Wsrep_schema_impl::wsrep_off wsrep_off(thd); Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd); Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd); Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
...@@ -1283,8 +1270,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, ...@@ -1283,8 +1270,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd,
{ {
WSREP_WARN("Could not open SR table for read: %d", error); WSREP_WARN("Could not open SR table for read: %d", error);
Wsrep_schema_impl::finish_stmt(thd); Wsrep_schema_impl::finish_stmt(thd);
my_free(thd); return 1;
DBUG_RETURN(1);
} }
frag_table= frag_table_l.table; frag_table= frag_table_l.table;
...@@ -1370,24 +1356,36 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, ...@@ -1370,24 +1356,36 @@ int Wsrep_schema::replay_transaction(THD* orig_thd,
if (key) if (key)
my_free(key); my_free(key);
delete thd;
DBUG_RETURN(ret);
}
return ret;
}
int Wsrep_schema::recover_sr_transactions(THD *orig_thd) int Wsrep_schema::replay_transaction(THD* orig_thd,
Relay_log_info* rli,
const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
{ {
DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty());
THD *storage_thd= new THD(next_thread_id(), true); THD *thd= new THD(next_thread_id(), true);
if (!storage_thd) if (!thd)
{ {
WSREP_WARN("Could not open allocate memory for THD"); WSREP_WARN("Could not allocate memory for THD");
DBUG_RETURN(1); DBUG_RETURN(1);
} }
storage_thd->thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd); thd->thread_stack= (orig_thd ? orig_thd->thread_stack : (char *) &thd);
wsrep_assign_from_threadvars(storage_thd); wsrep_assign_from_threadvars(thd);
int ret= ::replay_transaction(thd, orig_thd, rli, ws_meta, fragments);
delete thd;
DBUG_RETURN(ret);
}
static int recover_sr_transactions(THD* storage_thd, THD* orig_thd)
{
TABLE* frag_table= 0; TABLE* frag_table= 0;
TABLE_LIST frag_table_l; TABLE_LIST frag_table_l;
TABLE* cluster_table= 0; TABLE* cluster_table= 0;
...@@ -1410,14 +1408,14 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1410,14 +1408,14 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
&cluster_table_l)) &cluster_table_l))
{ {
Wsrep_schema_impl::finish_stmt(storage_thd); Wsrep_schema_impl::finish_stmt(storage_thd);
DBUG_RETURN(1); return 1;
} }
cluster_table= cluster_table_l.table; cluster_table= cluster_table_l.table;
if (Wsrep_schema_impl::init_for_scan(cluster_table)) if (Wsrep_schema_impl::init_for_scan(cluster_table))
{ {
Wsrep_schema_impl::finish_stmt(storage_thd); Wsrep_schema_impl::finish_stmt(storage_thd);
DBUG_RETURN(1); return 1;
} }
if ((error= Wsrep_schema_impl::next_record(cluster_table))) if ((error= Wsrep_schema_impl::next_record(cluster_table)))
...@@ -1428,12 +1426,12 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1428,12 +1426,12 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
if (error == HA_ERR_END_OF_FILE) if (error == HA_ERR_END_OF_FILE)
{ {
WSREP_INFO("Cluster table is empty, not recovering transactions"); WSREP_INFO("Cluster table is empty, not recovering transactions");
DBUG_RETURN(0); return 0;
} }
else else
{ {
WSREP_ERROR("Failed to read cluster table: %d", error); WSREP_ERROR("Failed to read cluster table: %d", error);
DBUG_RETURN(1); return 1;
} }
} }
...@@ -1540,6 +1538,25 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1540,6 +1538,25 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
trans_commit(storage_thd); trans_commit(storage_thd);
storage_thd->set_mysys_var(0); storage_thd->set_mysys_var(0);
out: out:
return ret;
}
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
{
DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
THD *storage_thd= new THD(next_thread_id(), true);
if (!storage_thd)
{
WSREP_WARN("Could not allocate memory for THD");
DBUG_RETURN(1);
}
storage_thd->thread_stack=
(orig_thd ? orig_thd->thread_stack : (char *) &storage_thd);
wsrep_assign_from_threadvars(storage_thd);
int ret= ::recover_sr_transactions(storage_thd, orig_thd);
delete storage_thd; delete storage_thd;
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment