Commit b393e2cb authored by Nikita Malyavin's avatar Nikita Malyavin

add innodb_debug_sync var to support DEBUG_SYNC from purge threads

parent 6d7a8269
...@@ -19376,6 +19376,33 @@ innodb_log_checksums_update( ...@@ -19376,6 +19376,33 @@ innodb_log_checksums_update(
thd, *static_cast<const my_bool*>(save)); thd, *static_cast<const my_bool*>(save));
} }
#ifdef UNIV_DEBUG
static
void
innobase_debug_sync_callback(srv_slot_t *slot, const void *value)
{
const char *value_str = *static_cast<const char* const*>(value);
size_t len = strlen(value_str) + 1;
// One allocatoin for list node object and value.
void *buf = ut_malloc_nokey(sizeof(srv_slot_t::debug_sync_t) + len);
srv_slot_t::debug_sync_t *sync = new(buf) srv_slot_t::debug_sync_t();
strcpy(sync->str, value_str);
rw_lock_x_lock(&slot->debug_sync_lock);
UT_LIST_ADD_LAST(slot->debug_sync, sync);
rw_lock_x_unlock(&slot->debug_sync_lock);
}
static
void
innobase_debug_sync_set(THD *thd, st_mysql_sys_var*, void *, const void *value)
{
srv_for_each_thread(SRV_WORKER, innobase_debug_sync_callback, value);
srv_for_each_thread(SRV_PURGE, innobase_debug_sync_callback, value);
}
#endif
static SHOW_VAR innodb_status_variables_export[]= { static SHOW_VAR innodb_status_variables_export[]= {
{"Innodb", (char*) &show_innodb_vars, SHOW_FUNC}, {"Innodb", (char*) &show_innodb_vars, SHOW_FUNC},
{NullS, NullS, SHOW_LONG} {NullS, NullS, SHOW_LONG}
...@@ -19881,7 +19908,8 @@ static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads, ...@@ -19881,7 +19908,8 @@ static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads,
NULL, NULL, NULL, NULL,
4, /* Default setting */ 4, /* Default setting */
1, /* Minimum value */ 1, /* Minimum value */
32, 0); /* Maximum value */ srv_max_purge_threads,/* Maximum value */
0);
static MYSQL_SYSVAR_ULONG(sync_array_size, srv_sync_array_size, static MYSQL_SYSVAR_ULONG(sync_array_size, srv_sync_array_size,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
...@@ -20982,6 +21010,16 @@ static MYSQL_SYSVAR_BOOL(debug_force_scrubbing, ...@@ -20982,6 +21010,16 @@ static MYSQL_SYSVAR_BOOL(debug_force_scrubbing,
0, 0,
"Perform extra scrubbing to increase test exposure", "Perform extra scrubbing to increase test exposure",
NULL, NULL, FALSE); NULL, NULL, FALSE);
char *innobase_debug_sync;
static MYSQL_SYSVAR_STR(debug_sync, innobase_debug_sync,
PLUGIN_VAR_NOCMDARG,
"debug_sync for innodb purge threads. "
"Use it t oset up sync points for all purge threads "
"at once. The commands will be applied sequentially at "
"the beginning of purging the next node ",
NULL,
innobase_debug_sync_set, NULL);
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
static MYSQL_SYSVAR_BOOL(instrument_semaphores, innodb_instrument_semaphores, static MYSQL_SYSVAR_BOOL(instrument_semaphores, innodb_instrument_semaphores,
...@@ -21204,6 +21242,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -21204,6 +21242,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(background_scrub_data_check_interval), MYSQL_SYSVAR(background_scrub_data_check_interval),
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
MYSQL_SYSVAR(debug_force_scrubbing), MYSQL_SYSVAR(debug_force_scrubbing),
MYSQL_SYSVAR(debug_sync),
#endif #endif
MYSQL_SYSVAR(instrument_semaphores), MYSQL_SYSVAR(instrument_semaphores),
MYSQL_SYSVAR(buf_dump_status_frequency), MYSQL_SYSVAR(buf_dump_status_frequency),
......
...@@ -385,6 +385,9 @@ struct que_thr_t{ ...@@ -385,6 +385,9 @@ struct que_thr_t{
related delete/updates */ related delete/updates */
row_prebuilt_t* prebuilt; /*!< prebuilt structure processed by row_prebuilt_t* prebuilt; /*!< prebuilt structure processed by
the query thread */ the query thread */
ut_d(srv_slot_t *thread_slot;) /*!< a slot from srv_sys.sys_threads
* if any */
}; };
#define QUE_THR_MAGIC_N 8476583 #define QUE_THR_MAGIC_N 8476583
......
...@@ -615,6 +615,8 @@ extern ulong srv_fatal_semaphore_wait_threshold; ...@@ -615,6 +615,8 @@ extern ulong srv_fatal_semaphore_wait_threshold;
/** Buffer pool dump status frequence in percentages */ /** Buffer pool dump status frequence in percentages */
extern ulong srv_buf_dump_status_frequency; extern ulong srv_buf_dump_status_frequency;
constexpr uint srv_max_purge_threads = 32;
# ifdef UNIV_PFS_THREAD # ifdef UNIV_PFS_THREAD
/* Keys to register InnoDB threads with performance schema */ /* Keys to register InnoDB threads with performance schema */
extern mysql_pfs_key_t buf_dump_thread_key; extern mysql_pfs_key_t buf_dump_thread_key;
...@@ -1128,8 +1130,26 @@ struct srv_slot_t{ ...@@ -1128,8 +1130,26 @@ struct srv_slot_t{
to do */ to do */
que_thr_t* thr; /*!< suspended query thread que_thr_t* thr; /*!< suspended query thread
(only used for user threads) */ (only used for user threads) */
#ifdef UNIV_DEBUG
struct debug_sync_t {
UT_LIST_NODE_T(debug_sync_t)
debug_sync_list;
char str[0];
};
UT_LIST_BASE_NODE_T(debug_sync_t)
debug_sync;
rw_lock_t debug_sync_lock;
#endif
}; };
#ifdef UNIV_DEBUG
typedef void srv_slot_callback_t(srv_slot_t*, const void*);
void srv_for_each_thread(srv_thread_type type,
srv_slot_callback_t callback,
const void *arg);
#endif
#ifdef WITH_WSREP #ifdef WITH_WSREP
UNIV_INTERN UNIV_INTERN
void void
......
...@@ -67,7 +67,12 @@ trx_purge( ...@@ -67,7 +67,12 @@ trx_purge(
submit to task queue. */ submit to task queue. */
ulint limit, /*!< in: the maximum number of ulint limit, /*!< in: the maximum number of
records to purge in one batch */ records to purge in one batch */
bool truncate); /*!< in: truncate history if true */ bool truncate /*!< in: truncate history if true */
#ifdef UNIV_DEBUG
, srv_slot_t *slot /*!< in/out: purge coordinator
thread slot */
#endif
);
/*******************************************************************//** /*******************************************************************//**
Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */ Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */
void void
......
...@@ -1082,6 +1082,9 @@ que_run_threads_low( ...@@ -1082,6 +1082,9 @@ que_run_threads_low(
ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS); ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
ut_ad(!trx_mutex_own(thr_get_trx(thr))); ut_ad(!trx_mutex_own(thr_get_trx(thr)));
/* slot can be received from purge thread for debug_sync setup */
ut_d(srv_slot_t *slot = thr->thread_slot);
/* cumul_resource counts how much resources the OS thread (NOT the /* cumul_resource counts how much resources the OS thread (NOT the
query thread) has spent in this function */ query thread) has spent in this function */
......
...@@ -46,6 +46,7 @@ Created 3/14/1997 Heikki Tuuri ...@@ -46,6 +46,7 @@ Created 3/14/1997 Heikki Tuuri
#include "handler.h" #include "handler.h"
#include "ha_innodb.h" #include "ha_innodb.h"
#include "fil0fil.h" #include "fil0fil.h"
#include "debug_sync.h"
/************************************************************************* /*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there IMPORTANT NOTE: Any operation that generates redo MUST check that there
...@@ -1209,6 +1210,25 @@ row_purge_step( ...@@ -1209,6 +1210,25 @@ row_purge_step(
node->start(); node->start();
#ifdef UNIV_DEBUG
srv_slot_t *slot = thr->thread_slot;
ut_ad(slot);
rw_lock_x_lock(&slot->debug_sync_lock);
while (UT_LIST_GET_LEN(slot->debug_sync)) {
srv_slot_t::debug_sync_t *sync =
UT_LIST_GET_FIRST(slot->debug_sync);
bool result = debug_sync_set_action(current_thd,
sync->str,
strlen(sync->str));
ut_a(!result);
UT_LIST_REMOVE(slot->debug_sync, sync);
ut_free(sync);
}
rw_lock_x_unlock(&slot->debug_sync_lock);
#endif
if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) { if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) {
trx_purge_rec_t*purge_rec; trx_purge_rec_t*purge_rec;
......
...@@ -619,7 +619,8 @@ struct srv_sys_t{ ...@@ -619,7 +619,8 @@ struct srv_sys_t{
ulint n_sys_threads; /*!< size of the sys_threads ulint n_sys_threads; /*!< size of the sys_threads
array */ array */
srv_slot_t sys_threads[32 + 1]; /*!< server thread table; srv_slot_t
sys_threads[srv_max_purge_threads + 1]; /*!< server thread table;
os_event_set() and os_event_set() and
os_event_reset() on os_event_reset() on
sys_threads[]->event are sys_threads[]->event are
...@@ -661,11 +662,15 @@ and/or load it during startup. */ ...@@ -661,11 +662,15 @@ and/or load it during startup. */
char srv_buffer_pool_dump_at_shutdown = TRUE; char srv_buffer_pool_dump_at_shutdown = TRUE;
char srv_buffer_pool_load_at_startup = TRUE; char srv_buffer_pool_load_at_startup = TRUE;
/** Slot index in the srv_sys.sys_threads array for the master thread. */
constexpr ulint SRV_MASTER_SLOT = 0;
/** Slot index in the srv_sys.sys_threads array for the purge thread. */ /** Slot index in the srv_sys.sys_threads array for the purge thread. */
static const ulint SRV_PURGE_SLOT = 1; constexpr ulint SRV_PURGE_SLOT = 1;
/** Slot index in the srv_sys.sys_threads array for the master thread. */ /** Slot index in the srv_sys.sys_threads array from which purge workers start.
static const ulint SRV_MASTER_SLOT = 0; */
constexpr ulint SRV_WORKER_SLOTS_START = 2;
#ifdef HAVE_PSI_STAGE_INTERFACE #ifdef HAVE_PSI_STAGE_INTERFACE
/** Performance schema stage event for monitoring ALTER TABLE progress /** Performance schema stage event for monitoring ALTER TABLE progress
...@@ -813,7 +818,7 @@ srv_reserve_slot( ...@@ -813,7 +818,7 @@ srv_reserve_slot(
case SRV_WORKER: case SRV_WORKER:
/* Find an empty slot, skip the master and purge slots. */ /* Find an empty slot, skip the master and purge slots. */
for (slot = &srv_sys.sys_threads[2]; for (slot = &srv_sys.sys_threads[SRV_WORKER_SLOTS_START];
slot->in_use; slot->in_use;
++slot) { ++slot) {
...@@ -2530,10 +2535,11 @@ srv_purge_should_exit(ulint n_purged) ...@@ -2530,10 +2535,11 @@ srv_purge_should_exit(ulint n_purged)
/*********************************************************************//** /*********************************************************************//**
Fetch and execute a task from the work queue. Fetch and execute a task from the work queue.
@param [in,out] slot purge worker thread slot
@return true if a task was executed */ @return true if a task was executed */
static static
bool bool
srv_task_execute(void) srv_task_execute(ut_d(srv_slot_t *slot))
/*==================*/ /*==================*/
{ {
que_thr_t* thr = NULL; que_thr_t* thr = NULL;
...@@ -2555,6 +2561,7 @@ srv_task_execute(void) ...@@ -2555,6 +2561,7 @@ srv_task_execute(void)
mutex_exit(&srv_sys.tasks_mutex); mutex_exit(&srv_sys.tasks_mutex);
if (thr != NULL) { if (thr != NULL) {
ut_d(thr->thread_slot = slot);
que_run_threads(thr); que_run_threads(thr);
...@@ -2591,6 +2598,13 @@ DECLARE_THREAD(srv_worker_thread)( ...@@ -2591,6 +2598,13 @@ DECLARE_THREAD(srv_worker_thread)(
slot = srv_reserve_slot(SRV_WORKER); slot = srv_reserve_slot(SRV_WORKER);
#ifdef UNIV_DEBUG
UT_LIST_INIT(slot->debug_sync,
&srv_slot_t::debug_sync_t::debug_sync_list);
rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock,
SYNC_NO_ORDER_CHECK);
#endif
ut_a(srv_n_purge_threads > 1); ut_a(srv_n_purge_threads > 1);
ut_a(ulong(my_atomic_loadlint(&srv_sys.n_threads_active[SRV_WORKER])) ut_a(ulong(my_atomic_loadlint(&srv_sys.n_threads_active[SRV_WORKER]))
< srv_n_purge_threads); < srv_n_purge_threads);
...@@ -2603,7 +2617,7 @@ DECLARE_THREAD(srv_worker_thread)( ...@@ -2603,7 +2617,7 @@ DECLARE_THREAD(srv_worker_thread)(
srv_suspend_thread(slot); srv_suspend_thread(slot);
srv_resume_thread(slot); srv_resume_thread(slot);
if (srv_task_execute()) { if (srv_task_execute(ut_d(slot))) {
/* If there are tasks in the queue, wakeup /* If there are tasks in the queue, wakeup
the purge coordinator thread. */ the purge coordinator thread. */
...@@ -2640,10 +2654,15 @@ DECLARE_THREAD(srv_worker_thread)( ...@@ -2640,10 +2654,15 @@ DECLARE_THREAD(srv_worker_thread)(
/** Do the actual purge operation. /** Do the actual purge operation.
@param[in,out] n_total_purged total number of purged pages @param[in,out] n_total_purged total number of purged pages
@param[in,out] slot purge coordinator thread slot
@return length of history list before the last purge batch. */ @return length of history list before the last purge batch. */
static static
ulint ulint
srv_do_purge(ulint* n_total_purged) srv_do_purge(ulint* n_total_purged
#ifdef UNIV_DEBUG
, srv_slot_t *slot
#endif
)
{ {
ulint n_pages_purged; ulint n_pages_purged;
...@@ -2709,7 +2728,11 @@ srv_do_purge(ulint* n_total_purged) ...@@ -2709,7 +2728,11 @@ srv_do_purge(ulint* n_total_purged)
n_pages_purged = trx_purge( n_pages_purged = trx_purge(
n_use_threads, srv_purge_batch_size, n_use_threads, srv_purge_batch_size,
(++count % rseg_truncate_frequency) == 0); (++count % rseg_truncate_frequency) == 0
#ifdef UNIV_DEBUG
, slot
#endif
);
*n_total_purged += n_pages_purged; *n_total_purged += n_pages_purged;
} while (!srv_purge_should_exit(n_pages_purged) } while (!srv_purge_should_exit(n_pages_purged)
...@@ -2829,6 +2852,12 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( ...@@ -2829,6 +2852,12 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
slot = srv_reserve_slot(SRV_PURGE); slot = srv_reserve_slot(SRV_PURGE);
#ifdef UNIV_DEBUG
UT_LIST_INIT(slot->debug_sync,
&srv_slot_t::debug_sync_t::debug_sync_list);
rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock,
SYNC_NO_ORDER_CHECK);
#endif
ulint rseg_history_len = trx_sys->rseg_history_len; ulint rseg_history_len = trx_sys->rseg_history_len;
do { do {
...@@ -2851,7 +2880,11 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( ...@@ -2851,7 +2880,11 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
n_total_purged = 0; n_total_purged = 0;
rseg_history_len = srv_do_purge(&n_total_purged); rseg_history_len = srv_do_purge(&n_total_purged
#ifdef UNIV_DEBUG
, slot
#endif
);
} while (!srv_purge_should_exit(n_total_purged)); } while (!srv_purge_should_exit(n_total_purged));
/* The task queue should always be empty, independent of fast /* The task queue should always be empty, independent of fast
...@@ -3012,3 +3045,35 @@ srv_was_tablespace_truncated(const fil_space_t* space) ...@@ -3012,3 +3045,35 @@ srv_was_tablespace_truncated(const fil_space_t* space)
return (!is_system_tablespace(space->id) return (!is_system_tablespace(space->id)
&& truncate_t::was_tablespace_truncated(space->id)); && truncate_t::was_tablespace_truncated(space->id));
} }
#ifdef UNIV_DEBUG
static uint get_first_slot(srv_thread_type type)
{
switch (type) {
case SRV_MASTER:
return SRV_MASTER_SLOT;
case SRV_PURGE:
return SRV_PURGE_SLOT;
case SRV_WORKER:
/* Find an empty slot, skip the master and purge slots. */
return SRV_WORKER_SLOTS_START;
default:
ut_error;
}
}
void srv_for_each_thread(srv_thread_type type,
srv_slot_callback_t callback,
const void *arg)
{
int slot_idx= get_first_slot(type);
while(slot_idx < srv_sys.n_sys_threads
&& srv_sys.sys_threads[slot_idx].in_use
&& srv_sys.sys_threads[slot_idx].type == type)
{
srv_slot_t *slot= &srv_sys.sys_threads[slot_idx];
callback(slot, arg);
slot_idx++;
}
}
#endif
...@@ -1675,7 +1675,12 @@ trx_purge( ...@@ -1675,7 +1675,12 @@ trx_purge(
to submit to the work queue */ to submit to the work queue */
ulint batch_size, /*!< in: the maximum number of records ulint batch_size, /*!< in: the maximum number of records
to purge in one batch */ to purge in one batch */
bool truncate) /*!< in: truncate history if true */ bool truncate /*!< in: truncate history if true */
#ifdef UNIV_DEBUG
, srv_slot_t *slot /*!< in/out: purge coordinator
thread slot */
#endif
)
{ {
que_thr_t* thr = NULL; que_thr_t* thr = NULL;
ulint n_pages_handled; ulint n_pages_handled;
...@@ -1730,6 +1735,7 @@ trx_purge( ...@@ -1730,6 +1735,7 @@ trx_purge(
run_synchronously: run_synchronously:
++purge_sys->n_submitted; ++purge_sys->n_submitted;
ut_d(thr->thread_slot = slot);
que_run_threads(thr); que_run_threads(thr);
my_atomic_addlint( my_atomic_addlint(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment