Commit 7bb3a522 authored by Monty's avatar Monty

Fixed race condition between flush tables and insert delayed

FLUSH TABLES kills all insert delayed treads. If this happens at same time
as a newly started insert delayed query, the insert may fail with either
and error ("Query interrupted") or the row may be lost.
This patch fixes this by changing a failed attempt of get_delay_table() to
convert the query to use a normal insert.

The test case for this patch can be found in the FLUSH TABLES commit after
this one.
parent 163b34fe
...@@ -2451,10 +2451,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, ...@@ -2451,10 +2451,12 @@ bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
} }
/* Unlock the delayed insert object after its last access. */ /* Unlock the delayed insert object after its last access. */
di->unlock(); di->unlock();
DBUG_RETURN((table_list->table == NULL)); DBUG_PRINT("exit", ("table_list->table: %p", table_list->table));
DBUG_RETURN(thd->is_error());
end_create: end_create:
mysql_mutex_unlock(&LOCK_delayed_create); mysql_mutex_unlock(&LOCK_delayed_create);
DBUG_PRINT("exit", ("is_error: %d", thd->is_error()));
DBUG_RETURN(thd->is_error()); DBUG_RETURN(thd->is_error());
} }
...@@ -2509,24 +2511,27 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd) ...@@ -2509,24 +2511,27 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
if (thd.killed) if (thd.killed)
{ {
/* /*
Copy the error message. Note that we don't treat fatal Check how the insert thread was killed. If it was killed
errors in the delayed thread as fatal errors in the by FLUSH TABLES which calls kill_delayed_threads_for_table(),
main thread. If delayed thread was killed, we don't then is_error is not set.
want to send "Server shutdown in progress" in the In this case, return without setting an error,
INSERT THREAD. which means that the insert will be converted to a normal insert.
The thread could be killed with an error message if
di->handle_inserts() or di->open_and_lock_table() fails.
The thread could be killed without an error message if
killed using THD::notify_shared_lock() or
kill_delayed_threads_for_table().
*/ */
if (!thd.is_error()) if (thd.is_error())
my_message(ER_QUERY_INTERRUPTED, ER_THD(&thd, ER_QUERY_INTERRUPTED), {
MYF(0)); /*
else Copy the error message. Note that we don't treat fatal
errors in the delayed thread as fatal errors in the
main thread. If delayed thread was killed, we don't
want to send "Server shutdown in progress" in the
INSERT THREAD.
The thread could be killed with an error message if
di->handle_inserts() or di->open_and_lock_table() fails.
*/
my_message(thd.get_stmt_da()->sql_errno(), my_message(thd.get_stmt_da()->sql_errno(),
thd.get_stmt_da()->message(), MYF(0)); thd.get_stmt_da()->message(), MYF(0));
}
goto error; goto error;
} }
} }
...@@ -3086,11 +3091,30 @@ pthread_handler_t handle_delayed_insert(void *arg) ...@@ -3086,11 +3091,30 @@ pthread_handler_t handle_delayed_insert(void *arg)
mysql_mutex_unlock(&di->thd.mysys_var->mutex); mysql_mutex_unlock(&di->thd.mysys_var->mutex);
mysql_mutex_lock(&di->mutex); mysql_mutex_lock(&di->mutex);
} }
/*
The code depends on that the following ASSERT always hold.
I don't want to accidently introduce and bugs in the following code
in this commit, so I leave the small cleaning up of the code to
a future commit
*/
DBUG_ASSERT(thd->lock || di->stacked_inserts == 0);
DBUG_PRINT("delayed", DBUG_PRINT("delayed",
("thd->killed: %d di->tables_in_use: %d thd->lock: %d", ("thd->killed: %d di->status: %d di->stacked_insert: %d di->tables_in_use: %d thd->lock: %d",
thd->killed, di->tables_in_use, thd->lock != 0)); thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0));
if (di->tables_in_use && ! thd->lock && !thd->killed) /*
This is used to test see what happens if killed is sent before
we have time to handle the insert requests.
*/
DBUG_EXECUTE_IF("write_delay_wakeup",
if (!thd->killed && di->stacked_inserts)
my_sleep(500000);
);
if (di->tables_in_use && ! thd->lock &&
(!thd->killed || di->stacked_inserts))
{ {
/* /*
Request for new delayed insert. Request for new delayed insert.
...@@ -3648,20 +3672,24 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -3648,20 +3672,24 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
&map); &map);
lex->first_select_lex()->no_wrap_view_item= FALSE; lex->first_select_lex()->no_wrap_view_item= FALSE;
/* /*
When we are not using GROUP BY and there are no ungrouped aggregate functions When we are not using GROUP BY and there are no ungrouped
we can refer to other tables in the ON DUPLICATE KEY part. aggregate functions we can refer to other tables in the ON
We use next_name_resolution_table descructively, so check it first (views?) DUPLICATE KEY part. We use next_name_resolution_table
descructively, so check it first (views?)
*/ */
DBUG_ASSERT (!table_list->next_name_resolution_table); DBUG_ASSERT (!table_list->next_name_resolution_table);
if (lex->first_select_lex()->group_list.elements == 0 && if (lex->first_select_lex()->group_list.elements == 0 &&
!lex->first_select_lex()->with_sum_func) !lex->first_select_lex()->with_sum_func)
{
/* /*
We must make a single context out of the two separate name resolution contexts : We must make a single context out of the two separate name
the INSERT table and the tables in the SELECT part of INSERT ... SELECT. resolution contexts : the INSERT table and the tables in the
To do that we must concatenate the two lists SELECT part of INSERT ... SELECT. To do that we must
concatenate the two lists
*/ */
table_list->next_name_resolution_table= table_list->next_name_resolution_table=
ctx_state.get_first_name_resolution_table(); ctx_state.get_first_name_resolution_table();
}
res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values, res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
MARK_COLUMNS_READ, 0, NULL, 0); MARK_COLUMNS_READ, 0, NULL, 0);
...@@ -3762,9 +3790,9 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -3762,9 +3790,9 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
void void
DESCRIPTION DESCRIPTION
If the result table is the same as one of the source tables (INSERT SELECT), If the result table is the same as one of the source tables
the result table is not finally prepared at the join prepair phase. (INSERT SELECT), the result table is not finally prepared at the
Do the final preparation now. join prepair phase. Do the final preparation now.
RETURN RETURN
0 OK 0 OK
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment