Commit 5850b0ed authored by sbains's avatar sbains

branches/innodb+: Create a separate purge thread to do the purge. Introduce

two new configuration parameters.

  1. innodb-purge-threads := [01] -- default is 0
  2. innodb-purge-batch-size := 20 ... 5000 -- default is 20

rb://271
parent 20f9d908
......@@ -10669,6 +10669,23 @@ static MYSQL_SYSVAR_ULONG(io_capacity, srv_io_capacity,
"Number of IOPs the server can do. Tunes the background IO rate",
NULL, NULL, 200, 100, ~0L, 0);
static MYSQL_SYSVAR_ULONG(purge_batch_size, srv_purge_batch_size,
PLUGIN_VAR_OPCMDARG,
"Number of UNDO logs to purge in one batch from the history list. "
"Default is 20",
NULL, NULL,
20, /* Default setting */
1, /* Minimum value */
5000, 0); /* Maximum value */
static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
"Purge threads can be either 0 or 1. Default is 0.",
NULL, NULL,
0, /* Default setting */
0, /* Minimum value */
1, 0); /* Maximum value */
static MYSQL_SYSVAR_ULONG(fast_shutdown, innobase_fast_shutdown,
PLUGIN_VAR_OPCMDARG,
"Speeds up the shutdown process of the InnoDB storage engine. Possible "
......@@ -10982,6 +10999,8 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(change_buffering),
MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(io_capacity),
MYSQL_SYSVAR(purge_threads),
MYSQL_SYSVAR(purge_batch_size),
NULL
};
......
......@@ -271,6 +271,12 @@ extern ulint srv_os_log_pending_writes;
log buffer and have to flush it */
extern ulint srv_log_waits;
/* the number of purge threads to use from the worker pool (currently 0 or 1) */
extern ulint srv_n_purge_threads;
/* the number of records to purge in one batch */
extern ulint srv_purge_batch_size;
/* variable that counts amount of data read in total (in bytes) */
extern ulint srv_data_read;
......@@ -483,6 +489,12 @@ srv_master_thread(
void* arg); /*!< in: a dummy parameter required by
os_thread_create */
/*******************************************************************//**
Wakes up the purge thread if it's not already awake. */
UNIV_INTERN
void
srv_wake_purge_thread(void);
/*=======================*/
/*******************************************************************//**
Tells the Innobase server that there has been activity in the database
and wakes up the master thread if it is suspended (not sleeping). Used
in the MySQL interface. Note that there is a small chance that the master
......@@ -498,6 +510,16 @@ UNIV_INTERN
void
srv_wake_master_thread(void);
/*========================*/
/*******************************************************************//**
Tells the purge thread that there has been activity in the database
and wakes up the purge thread if it is suspended (not sleeping). Note
that there is a small chance that the purge thread stays suspended
(we do not protect our operation with the kernel mutex, for
performace reasons). */
UNIV_INTERN
void
srv_wake_purge_thread_if_not_active(void);
/*=====================================*/
/*********************************************************************//**
Puts an OS thread to wait if there are too many concurrent threads
(>= srv_thread_concurrency) inside InnoDB. The threads wait in a FIFO queue. */
......@@ -604,6 +626,15 @@ void
srv_export_innodb_status(void);
/*==========================*/
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))); /*!< in: a dummy parameter
required by os_thread_create */
/** Thread slot in the thread table */
typedef struct srv_slot_struct srv_slot_t;
......
......@@ -112,8 +112,10 @@ This function runs a purge batch.
@return number of undo log pages handled in the batch */
UNIV_INTERN
ulint
trx_purge(void);
/*===========*/
trx_purge(
/*======*/
ulint limit); /*!< in: the maximum number of records to
purge in one batch */
/******************************************************************//**
Prints information of the purge system to stderr. */
UNIV_INTERN
......
......@@ -246,6 +246,12 @@ that during a time of heavy update/insert activity. */
UNIV_INTERN ulong srv_max_buf_pool_modified_pct = 75;
/* the number of purge threads to use from the worker pool (currently 0 or 1).*/
UNIV_INTERN ulint srv_n_purge_threads = 0;
/* the number of records to purge in one batch */
UNIV_INTERN ulint srv_purge_batch_size = 20;
/* variable counts amount of data read in total (in bytes) */
UNIV_INTERN ulint srv_data_read = 0;
......@@ -704,6 +710,16 @@ are indexed by the type of the thread. */
UNIV_INTERN ulint srv_n_threads_active[SRV_MASTER + 1];
UNIV_INTERN ulint srv_n_threads[SRV_MASTER + 1];
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))); /*!< in: a dummy parameter
required by os_thread_create */
/***********************************************************************
Prints counters for work done by srv_master_thread. */
static
......@@ -2369,6 +2385,30 @@ srv_active_wake_master_thread(void)
}
}
/*******************************************************************//**
Tells the purge thread that there has been activity in the database
and wakes up the purge thread if it is suspended (not sleeping). Note
that there is a small chance that the purge thread stays suspended
(we do not protect our operation with the kernel mutex, for
performace reasons). */
UNIV_INTERN
void
srv_wake_purge_thread_if_not_active(void)
/*=====================================*/
{
ut_ad(!mutex_own(&kernel_mutex));
if (srv_n_purge_threads > 0
&& srv_n_threads_active[SRV_WORKER] == 0) {
mutex_enter(&kernel_mutex);
srv_release_threads(SRV_WORKER, 1);
mutex_exit(&kernel_mutex);
}
}
/*******************************************************************//**
Wakes up the master thread if it is suspended or being suspended. */
UNIV_INTERN
......@@ -2385,6 +2425,25 @@ srv_wake_master_thread(void)
mutex_exit(&kernel_mutex);
}
/*******************************************************************//**
Wakes up the purge thread if it's not already awake. */
UNIV_INTERN
void
srv_wake_purge_thread(void)
/*=======================*/
{
ut_ad(!mutex_own(&kernel_mutex));
if (srv_n_purge_threads > 0) {
mutex_enter(&kernel_mutex);
srv_release_threads(SRV_WORKER, 1);
mutex_exit(&kernel_mutex);
}
}
/**********************************************************************
The master thread is tasked to ensure that flush of log file happens
once every second in the background. This is to ensure that not more
......@@ -2405,6 +2464,34 @@ srv_sync_log_buffer_in_background(void)
}
}
/********************************************************************//**
Do a full purge, reconfigure the purge sub-system if a dynamic
change is detected. */
static
void
srv_master_do_purge(void)
/*=====================*/
{
ulint n_pages_purged;
ut_ad(!mutex_own(&kernel_mutex));
ut_a(srv_n_purge_threads == 0);
do {
/* Check for shutdown and change in purge config. */
if (srv_fast_shutdown && srv_shutdown_state > 0) {
/* Nothing to purge. */
n_pages_purged = 0;
} else {
n_pages_purged = trx_purge(srv_purge_batch_size);
}
srv_sync_log_buffer_in_background();
} while (n_pages_purged > 0);
}
/*********************************************************************//**
The master thread controlling the server.
@return a dummy parameter */
......@@ -2620,20 +2707,16 @@ loop:
/* We run a full purge every 10 seconds, even if the server
were active */
do {
if (srv_n_purge_threads == 0) {
srv_main_thread_op_info = "master purging";
srv_master_do_purge();
if (srv_fast_shutdown && srv_shutdown_state > 0) {
goto background_loop;
}
srv_main_thread_op_info = "purging";
n_pages_purged = trx_purge();
/* Flush logs if needed */
srv_sync_log_buffer_in_background();
} while (n_pages_purged);
}
srv_main_thread_op_info = "flushing buffer pool pages";
......@@ -2702,23 +2785,12 @@ background_loop:
os_thread_sleep(100000);
}
srv_main_thread_op_info = "purging";
if (srv_n_purge_threads == 0) {
srv_main_thread_op_info = "master purging";
/* Run a full purge */
do {
if (srv_fast_shutdown && srv_shutdown_state > 0) {
break;
srv_master_do_purge();
}
srv_main_thread_op_info = "purging";
n_pages_purged = trx_purge();
/* Flush logs if needed */
srv_sync_log_buffer_in_background();
} while (n_pages_purged);
srv_main_thread_op_info = "reserving kernel mutex";
mutex_enter(&kernel_mutex);
......@@ -2871,3 +2943,100 @@ suspend_thread:
OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */
}
/*********************************************************************//**
Asynchronous purge thread.
@return a dummy parameter */
UNIV_INTERN
os_thread_ret_t
srv_purge_thread(
/*=============*/
void* arg __attribute__((unused))) /*!< in: a dummy parameter
required by os_thread_create */
{
srv_slot_t* slot;
ulint slot_no = ULINT_UNDEFINED;
ulint n_total_purged = ULINT_UNDEFINED;
ut_a(srv_n_purge_threads == 1);
#ifdef UNIV_DEBUG_THREAD_CREATION
fprintf(stderr, "InnoDB: Purge thread running, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif /* UNIV_DEBUG_THREAD_CREATION */
mutex_enter(&kernel_mutex);
slot_no = srv_table_reserve_slot(SRV_WORKER);
++srv_n_threads_active[SRV_WORKER];
mutex_exit(&kernel_mutex);
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
ulint n_pages_purged;
/* If there are very few records to purge or the last
purge didn't purge any records then wait for activity.
We peek at the history len without holding any mutex
because in the worst case we will end up waiting for
the next purge event. */
if (trx_sys->rseg_history_len < srv_purge_batch_size
|| n_total_purged == 0) {
os_event_t event;
mutex_enter(&kernel_mutex);
event = srv_suspend_thread();
mutex_exit(&kernel_mutex);
os_event_wait(event);
}
/* Check for shutdown and whether we should do purge at all. */
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND
|| srv_shutdown_state != 0
|| srv_fast_shutdown) {
break;
}
n_total_purged = 0;
/* Purge until there are no more records to purge and there is
no change in configuration or server state. */
do {
n_pages_purged = trx_purge(srv_purge_batch_size);
n_total_purged += n_pages_purged;
} while (n_pages_purged > 0 && !srv_fast_shutdown);
srv_sync_log_buffer_in_background();
}
/* Free the thread local memory. */
thr_local_free(os_thread_get_curr_id());
mutex_enter(&kernel_mutex);
/* Free the slot for reuse. */
slot = srv_table_get_nth_slot(slot_no);
slot->in_use = FALSE;
mutex_exit(&kernel_mutex);
#ifdef UNIV_DEBUG_THREAD_CREATION
fprintf(stderr, "InnoDB: Purge thread exiting, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif /* UNIV_DEBUG_THREAD_CREATION */
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit(NULL);
OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */
}
......@@ -1742,6 +1742,16 @@ innobase_start_or_create_for_mysql(void)
os_thread_create(&srv_master_thread, NULL, thread_ids
+ (1 + SRV_MAX_N_IO_THREADS));
/* Currently we allow only a single purge thread. */
ut_a(srv_n_purge_threads == 0 || srv_n_purge_threads == 1);
/* If the user has requested a separate purge thread then
start the purge thread. */
if (srv_n_purge_threads == 1) {
os_thread_create(&srv_purge_thread, NULL, NULL);
}
#ifdef UNIV_DEBUG
/* buf_debug_prints = TRUE; */
#endif /* UNIV_DEBUG */
......@@ -1995,7 +2005,10 @@ innobase_shutdown_for_mysql(void)
/* c. We wake the master thread so that it exits */
srv_wake_master_thread();
/* d. Exit the i/o threads */
/* d. We wake the purge thread so that it exits */
srv_wake_purge_thread();
/* e. Exit the i/o threads */
os_aio_wake_all_threads_at_shutdown();
......
......@@ -41,7 +41,7 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h"
#include "row0upd.h"
#include "trx0rec.h"
#include "srv0que.h"
#include "srv0srv.h"
#include "os0thread.h"
/** The global data structure coordinating a purge */
......@@ -364,6 +364,11 @@ trx_purge_add_update_undo_to_history(
trx_sys->rseg_history_len++;
mutex_exit(&kernel_mutex);
if (!(trx_sys->rseg_history_len % srv_purge_batch_size)) {
/* Inform the purge thread that there is work to do. */
srv_wake_purge_thread_if_not_active();
}
/* Write the trx number to the undo log header */
mlog_write_dulint(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
/* Write information about delete markings to the undo log header */
......@@ -1096,8 +1101,10 @@ This function runs a purge batch.
@return number of undo log pages handled in the batch */
UNIV_INTERN
ulint
trx_purge(void)
/*===========*/
trx_purge(
/*======*/
ulint limit) /*!< in: the maximum number of records to
purge in one batch */
{
que_thr_t* thr;
/* que_thr_t* thr2; */
......@@ -1158,9 +1165,7 @@ trx_purge(void)
purge_sys->state = TRX_PURGE_ON;
/* Handle at most 20 undo log pages in one purge batch */
purge_sys->handle_limit = purge_sys->n_pages_handled + 20;
purge_sys->handle_limit = purge_sys->n_pages_handled + limit;
old_pages_handled = purge_sys->n_pages_handled;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment