Commit e28b2c5a authored by Brandon Nesterenko's avatar Brandon Nesterenko

Restore thread existence check

parent 9a215d63
......@@ -2673,73 +2673,75 @@ rpl_parallel_entry::choose_thread_internal(sched_bucket *cur_thr,
Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr= cur_thr->thr;
*did_enter_cond= false;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
for (;;)
if (thr)
{
if (thr->current_owner != &cur_thr->thr)
{
/*
The worker thread became idle, and returned to the free list and
possibly was allocated to a different request. So we should allocate
a new worker thread.
*/
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
did_enter_cond, old_stage);
thr= NULL;
break;
}
else if (thr->queued_size <= opt_slave_parallel_max_queued)
{
/* The thread is ready to queue into. */
break;
}
else
*did_enter_cond= false;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
for (;;)
{
/*
We have reached the limit of how much memory we are allowed to use
for queuing events, so wait for the thread to consume some of its
queue.
*/
if (!*did_enter_cond)
if (thr->current_owner != &cur_thr->thr)
{
/*
We need to do the debug_sync before ENTER_COND().
Because debug_sync changes the thd->mysys_var->current_mutex,
and this can cause THD::awake to use the wrong mutex.
The worker thread became idle, and returned to the free list and
possibly was allocated to a different request. So we should allocate
a new worker thread.
*/
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
did_enter_cond, old_stage);
thr= NULL;
break;
}
else if (thr->queued_size <= opt_slave_parallel_max_queued)
{
/* The thread is ready to queue into. */
break;
}
else
{
/*
We have reached the limit of how much memory we are allowed to use
for queuing events, so wait for the thread to consume some of its
queue.
*/
if (!*did_enter_cond)
{
/*
We need to do the debug_sync before ENTER_COND().
Because debug_sync changes the thd->mysys_var->current_mutex,
and this can cause THD::awake to use the wrong mutex.
*/
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
};);
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
debug_sync_set_action(
rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
};);
#endif
rli->sql_driver_thd->set_time_for_next_stage();
rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
&thr->LOCK_rpl_thread,
&stage_waiting_for_room_in_worker_thread,
old_stage);
*did_enter_cond= true;
}
rli->sql_driver_thd->set_time_for_next_stage();
rli->sql_driver_thd->ENTER_COND(
&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
&stage_waiting_for_room_in_worker_thread, old_stage);
*did_enter_cond= true;
}
if (unlikely(rli->sql_driver_thd->check_killed(1)))
{
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
did_enter_cond, old_stage);
my_error(ER_CONNECTION_KILLED, MYF(0));
if (unlikely(rli->sql_driver_thd->check_killed(1)))
{
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
did_enter_cond, old_stage);
my_error(ER_CONNECTION_KILLED, MYF(0));
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
debug_sync_set_action(
rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
#endif
slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
}
slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
}
mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment