Commit 9f0dbb31 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-11947 InnoDB purge workers fail to shut down

srv_release_threads(): Actually wait for the threads to resume
from suspension. On CentOS 5 and possibly other platforms,
os_event_set() may be lost.

srv_resume_thread(): A counterpart of srv_suspend_thread().
Optionally wait for the event to be set, optionally with a timeout,
and then release the thread from suspension.

srv_free_slot(): Unconditionally suspend the thread. It is always
in resumed state when this function is entered.

srv_active_wake_master_thread_low(): Only call os_event_set().

srv_purge_coordinator_suspend(): Use srv_resume_thread() instead
of the complicated logic.
parent e174d923
......@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009, Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
......@@ -781,17 +781,12 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
/*********************************************************************//**
Releases threads of the type given from suspension in the thread table.
NOTE! The server mutex has to be reserved by the caller!
@return number of threads released: this may be less than n if not
enough threads were suspended at the moment */
UNIV_INTERN
ulint
srv_release_threads(
/*================*/
enum srv_thread_type type, /*!< in: thread type */
ulint n); /*!< in: number of threads to release */
/** Ensure that a given number of threads of the type given are running
(or are already terminated).
@param[in] type thread type
@param[in] n number of threads that have to run */
void
srv_release_threads(enum srv_thread_type type, ulint n);
/**********************************************************************//**
Check whether any background thread are active. If so print which thread
......@@ -802,12 +797,10 @@ const char*
srv_any_background_threads_are_active(void);
/*=======================================*/
/**********************************************************************//**
Wakeup the purge threads. */
/** Wake up the purge threads. */
UNIV_INTERN
void
srv_purge_wakeup(void);
/*==================*/
srv_purge_wakeup();
/** Status variables to be passed to MySQL */
struct export_var_t{
......
......@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
......@@ -781,7 +781,6 @@ srv_suspend_thread_low(
/*===================*/
srv_slot_t* slot) /*!< in/out: thread slot */
{
ut_ad(!srv_read_only_mode);
ut_ad(srv_sys_mutex_own());
......@@ -839,34 +838,71 @@ srv_suspend_thread(
return(sig_count);
}
/*********************************************************************//**
Releases threads of the type given from suspension in the thread table.
NOTE! The server mutex has to be reserved by the caller!
@return number of threads released: this may be less than n if not
enough threads were suspended at the moment. */
UNIV_INTERN
ulint
srv_release_threads(
/*================*/
srv_thread_type type, /*!< in: thread type */
ulint n) /*!< in: number of threads to release */
/** Resume the calling thread.
@param[in,out] slot thread slot
@param[in] sig_count signal count (if wait)
@param[in] wait whether to wait for the event
@param[in] timeout_usec timeout in microseconds (0=infinite)
@return whether the wait timed out */
static
bool
srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true,
ulint timeout_usec = 0)
{
bool timeout;
ut_ad(!srv_read_only_mode);
ut_ad(slot->in_use);
ut_ad(slot->suspended);
if (!wait) {
timeout = false;
} else if (timeout_usec) {
timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low(
slot->event, timeout_usec, sig_count);
} else {
timeout = false;
os_event_wait_low(slot->event, sig_count);
}
srv_sys_mutex_enter();
ut_ad(slot->in_use);
ut_ad(slot->suspended);
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
srv_sys_mutex_exit();
return(timeout);
}
/** Ensure that a given number of threads of the type given are running
(or are already terminated).
@param[in] type thread type
@param[in] n number of threads that have to run */
void
srv_release_threads(enum srv_thread_type type, ulint n)
{
ulint i;
ulint count = 0;
ulint running;
ut_ad(srv_thread_type_validate(type));
ut_ad(n > 0);
srv_sys_mutex_enter();
do {
running = 0;
for (i = 0; i < srv_sys->n_sys_threads; i++) {
srv_slot_t* slot;
srv_sys_mutex_enter();
slot = &srv_sys->sys_threads[i];
for (ulint i = 0; i < srv_sys->n_sys_threads; i++) {
srv_slot_t* slot = &srv_sys->sys_threads[i];
if (slot->in_use
&& srv_slot_get_type(slot) == type
&& slot->suspended) {
if (!slot->in_use || srv_slot_get_type(slot) != type) {
continue;
} else if (!slot->suspended) {
if (++running >= n) {
break;
}
continue;
}
switch (type) {
case SRV_NONE:
......@@ -896,21 +932,11 @@ srv_release_threads(
break;
}
slot->suspended = FALSE;
++srv_sys->n_threads_active[type];
os_event_set(slot->event);
if (++count == n) {
break;
}
}
}
srv_sys_mutex_exit();
return(count);
srv_sys_mutex_exit();
} while (running && running < n);
}
/*********************************************************************//**
......@@ -923,11 +949,8 @@ srv_free_slot(
{
srv_sys_mutex_enter();
if (!slot->suspended) {
/* Mark the thread as inactive. */
srv_suspend_thread_low(slot);
}
/* Mark the thread as inactive. */
srv_suspend_thread_low(slot);
/* Free the slot for reuse. */
ut_ad(slot->in_use);
slot->in_use = FALSE;
......@@ -1965,15 +1988,7 @@ srv_active_wake_master_thread(void)
if (slot->in_use) {
ut_a(srv_slot_get_type(slot) == SRV_MASTER);
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[SRV_MASTER];
os_event_set(slot->event);
}
os_event_set(slot->event);
}
srv_sys_mutex_exit();
......@@ -2439,7 +2454,7 @@ DECLARE_THREAD(srv_master_thread)(
manual also mentions this string in several places. */
srv_main_thread_op_info = "waiting for server activity";
os_event_wait(slot->event);
srv_resume_thread(slot);
if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
os_thread_exit(NULL);
......@@ -2551,8 +2566,7 @@ DECLARE_THREAD(srv_worker_thread)(
do {
srv_suspend_thread(slot);
os_event_wait(slot->event);
srv_resume_thread(slot);
if (srv_task_execute()) {
......@@ -2688,8 +2702,6 @@ srv_purge_coordinator_suspend(
ib_int64_t sig_count = srv_suspend_thread(slot);
do {
ulint ret;
rw_lock_x_lock(&purge_sys->latch);
purge_sys->running = false;
......@@ -2698,32 +2710,11 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
if (stop) {
os_event_wait_low(slot->event, sig_count);
ret = 0;
} else if (rseg_history_len <= trx_sys->rseg_history_len) {
ret = os_event_wait_time_low(
slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count);
} else {
/* We don't want to waste time waiting, if the
history list increased by the time we got here,
unless purge has been stopped. */
ret = 0;
}
srv_sys_mutex_enter();
/* The thread can be in state !suspended after the timeout
but before this check if another thread sent a wakeup signal. */
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
ut_a(srv_sys->n_threads_active[slot->type] == 1);
}
srv_sys_mutex_exit();
const bool wait = stop
|| rseg_history_len <= trx_sys->rseg_history_len;
const bool timeout = srv_resume_thread(
slot, sig_count, wait,
stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
sig_count = srv_suspend_thread(slot);
......@@ -2735,6 +2726,19 @@ srv_purge_coordinator_suspend(
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
if (timeout
&& rseg_history_len == trx_sys->rseg_history_len
&& trx_sys->rseg_history_len < 5000) {
/* No new records were added since the
wait started. Simply wait for new
records. The magic number 5000 is an
approximation for the case where we
have cached UNDO log records which
prevent truncate of the UNDO
segments. */
stop = true;
}
} else {
ut_a(purge_sys->n_stop > 0);
......@@ -2743,33 +2747,9 @@ srv_purge_coordinator_suspend(
}
rw_lock_x_unlock(&purge_sys->latch);
if (ret == OS_SYNC_TIME_EXCEEDED) {
/* No new records added since wait started then simply
wait for new records. The magic number 5000 is an
approximation for the case where we have cached UNDO
log records which prevent truncate of the UNDO
segments. */
if (rseg_history_len == trx_sys->rseg_history_len
&& trx_sys->rseg_history_len < 5000) {
stop = true;
}
}
} while (stop);
srv_sys_mutex_enter();
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
ut_a(srv_sys->n_threads_active[slot->type] == 1);
}
srv_sys_mutex_exit();
srv_resume_thread(slot, 0, false);
}
/*********************************************************************//**
......@@ -2822,8 +2802,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
ut_ad(!slot->suspended);
if (srv_purge_should_exit(n_total_purged)) {
ut_a(!slot->suspended);
break;
}
......@@ -2934,12 +2915,10 @@ srv_get_task_queue_length(void)
return(n_tasks);
}
/**********************************************************************//**
Wakeup the purge threads. */
/** Wake up the purge threads. */
UNIV_INTERN
void
srv_purge_wakeup(void)
/*==================*/
srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);
......@@ -2954,4 +2933,3 @@ srv_purge_wakeup(void)
}
}
}
......@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009, Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
......@@ -968,17 +968,12 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
/*********************************************************************//**
Releases threads of the type given from suspension in the thread table.
NOTE! The server mutex has to be reserved by the caller!
@return number of threads released: this may be less than n if not
enough threads were suspended at the moment */
UNIV_INTERN
ulint
srv_release_threads(
/*================*/
enum srv_thread_type type, /*!< in: thread type */
ulint n); /*!< in: number of threads to release */
/** Ensure that a given number of threads of the type given are running
(or are already terminated).
@param[in] type thread type
@param[in] n number of threads that have to run */
void
srv_release_threads(enum srv_thread_type type, ulint n);
/**********************************************************************//**
Check whether any background thread are active. If so print which thread
......@@ -989,12 +984,10 @@ const char*
srv_any_background_threads_are_active(void);
/*=======================================*/
/**********************************************************************//**
Wakeup the purge threads. */
/** Wake up the purge threads. */
UNIV_INTERN
void
srv_purge_wakeup(void);
/*==================*/
srv_purge_wakeup();
/** Status variables to be passed to MySQL */
struct export_var_t{
......
......@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
......@@ -931,7 +931,6 @@ srv_suspend_thread_low(
/*===================*/
srv_slot_t* slot) /*!< in/out: thread slot */
{
ut_ad(!srv_read_only_mode);
ut_ad(srv_sys_mutex_own());
......@@ -989,34 +988,71 @@ srv_suspend_thread(
return(sig_count);
}
/*********************************************************************//**
Releases threads of the type given from suspension in the thread table.
NOTE! The server mutex has to be reserved by the caller!
@return number of threads released: this may be less than n if not
enough threads were suspended at the moment. */
UNIV_INTERN
ulint
srv_release_threads(
/*================*/
srv_thread_type type, /*!< in: thread type */
ulint n) /*!< in: number of threads to release */
/** Resume the calling thread.
@param[in,out] slot thread slot
@param[in] sig_count signal count (if wait)
@param[in] wait whether to wait for the event
@param[in] timeout_usec timeout in microseconds (0=infinite)
@return whether the wait timed out */
static
bool
srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true,
ulint timeout_usec = 0)
{
ulint i;
ulint count = 0;
bool timeout;
ut_ad(!srv_read_only_mode);
ut_ad(slot->in_use);
ut_ad(slot->suspended);
if (!wait) {
timeout = false;
} else if (timeout_usec) {
timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low(
slot->event, timeout_usec, sig_count);
} else {
timeout = false;
os_event_wait_low(slot->event, sig_count);
}
srv_sys_mutex_enter();
ut_ad(slot->in_use);
ut_ad(slot->suspended);
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
srv_sys_mutex_exit();
return(timeout);
}
/** Ensure that a given number of threads of the type given are running
(or are already terminated).
@param[in] type thread type
@param[in] n number of threads that have to run */
void
srv_release_threads(enum srv_thread_type type, ulint n)
{
ulint running;
ut_ad(srv_thread_type_validate(type));
ut_ad(n > 0);
srv_sys_mutex_enter();
do {
running = 0;
for (i = 0; i < srv_sys->n_sys_threads; i++) {
srv_slot_t* slot;
srv_sys_mutex_enter();
slot = &srv_sys->sys_threads[i];
for (ulint i = 0; i < srv_sys->n_sys_threads; i++) {
srv_slot_t* slot = &srv_sys->sys_threads[i];
if (slot->in_use
&& srv_slot_get_type(slot) == type
&& slot->suspended) {
if (!slot->in_use || srv_slot_get_type(slot) != type) {
continue;
} else if (!slot->suspended) {
if (++running >= n) {
break;
}
continue;
}
switch (type) {
case SRV_NONE:
......@@ -1046,21 +1082,11 @@ srv_release_threads(
break;
}
slot->suspended = FALSE;
++srv_sys->n_threads_active[type];
os_event_set(slot->event);
if (++count == n) {
break;
}
}
}
srv_sys_mutex_exit();
return(count);
srv_sys_mutex_exit();
} while (running && running < n);
}
/*********************************************************************//**
......@@ -1073,11 +1099,8 @@ srv_free_slot(
{
srv_sys_mutex_enter();
if (!slot->suspended) {
/* Mark the thread as inactive. */
srv_suspend_thread_low(slot);
}
/* Mark the thread as inactive. */
srv_suspend_thread_low(slot);
/* Free the slot for reuse. */
ut_ad(slot->in_use);
slot->in_use = FALSE;
......@@ -2581,15 +2604,7 @@ srv_active_wake_master_thread(void)
if (slot->in_use) {
ut_a(srv_slot_get_type(slot) == SRV_MASTER);
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[SRV_MASTER];
os_event_set(slot->event);
}
os_event_set(slot->event);
}
srv_sys_mutex_exit();
......@@ -3110,7 +3125,7 @@ DECLARE_THREAD(srv_master_thread)(
manual also mentions this string in several places. */
srv_main_thread_op_info = "waiting for server activity";
os_event_wait(slot->event);
srv_resume_thread(slot);
if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
os_thread_exit(NULL);
......@@ -3232,8 +3247,7 @@ DECLARE_THREAD(srv_worker_thread)(
do {
srv_suspend_thread(slot);
os_event_wait(slot->event);
srv_resume_thread(slot);
srv_current_thread_priority = srv_purge_thread_priority;
......@@ -3373,8 +3387,6 @@ srv_purge_coordinator_suspend(
ib_int64_t sig_count = srv_suspend_thread(slot);
do {
ulint ret;
rw_lock_x_lock(&purge_sys->latch);
purge_sys->running = false;
......@@ -3383,32 +3395,11 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
if (stop) {
os_event_wait_low(slot->event, sig_count);
ret = 0;
} else if (rseg_history_len <= trx_sys->rseg_history_len) {
ret = os_event_wait_time_low(
slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count);
} else {
/* We don't want to waste time waiting, if the
history list increased by the time we got here,
unless purge has been stopped. */
ret = 0;
}
srv_sys_mutex_enter();
/* The thread can be in state !suspended after the timeout
but before this check if another thread sent a wakeup signal. */
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
ut_a(srv_sys->n_threads_active[slot->type] == 1);
}
srv_sys_mutex_exit();
const bool wait = stop
|| rseg_history_len <= trx_sys->rseg_history_len;
const bool timeout = srv_resume_thread(
slot, sig_count, wait,
stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
sig_count = srv_suspend_thread(slot);
......@@ -3420,6 +3411,19 @@ srv_purge_coordinator_suspend(
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
if (timeout
&& rseg_history_len == trx_sys->rseg_history_len
&& trx_sys->rseg_history_len < 5000) {
/* No new records were added since the
wait started. Simply wait for new
records. The magic number 5000 is an
approximation for the case where we
have cached UNDO log records which
prevent truncate of the UNDO
segments. */
stop = true;
}
} else {
ut_a(purge_sys->n_stop > 0);
......@@ -3428,33 +3432,9 @@ srv_purge_coordinator_suspend(
}
rw_lock_x_unlock(&purge_sys->latch);
if (ret == OS_SYNC_TIME_EXCEEDED) {
/* No new records added since wait started then simply
wait for new records. The magic number 5000 is an
approximation for the case where we have cached UNDO
log records which prevent truncate of the UNDO
segments. */
if (rseg_history_len == trx_sys->rseg_history_len
&& trx_sys->rseg_history_len < 5000) {
stop = true;
}
}
} while (stop);
srv_sys_mutex_enter();
if (slot->suspended) {
slot->suspended = FALSE;
++srv_sys->n_threads_active[slot->type];
ut_a(srv_sys->n_threads_active[slot->type] == 1);
}
srv_sys_mutex_exit();
srv_resume_thread(slot, 0, false);
}
/*********************************************************************//**
......@@ -3510,8 +3490,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
ut_ad(!slot->suspended);
if (srv_purge_should_exit(n_total_purged)) {
ut_a(!slot->suspended);
break;
}
......@@ -3626,12 +3607,10 @@ srv_get_task_queue_length(void)
return(n_tasks);
}
/**********************************************************************//**
Wakeup the purge threads. */
/** Wake up the purge threads. */
UNIV_INTERN
void
srv_purge_wakeup(void)
/*==================*/
srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);
......@@ -3646,4 +3625,3 @@ srv_purge_wakeup(void)
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment