Commit 349edf27 authored by Mikael Ronstrom's avatar Mikael Ronstrom

merged Google background IO threads

parents 11b95b5d 008a13b8
......@@ -140,6 +140,14 @@ static long innobase_io_capacity = 100;
/* Write dirty pages when pct dirty is less than max pct dirty */
static my_bool innobase_extra_dirty_writes = TRUE;
/* Max number of IO requests merged to perform large IO in background
IO threads.
*/
long innobase_max_merged_io = 64;
/* Number of background IO threads for read and write. */
long innobase_read_io_threads, innobase_write_io_threads;
/* The following counter is used to convey information to InnoDB
about server activity: in selects it is not sensible to call
srv_active_wake_master_thread after each fetch or search, we only do
......@@ -1616,6 +1624,9 @@ innobase_init(
srv_mem_pool_size = (ulint) innobase_additional_mem_pool_size;
srv_n_file_io_threads = (ulint) innobase_file_io_threads;
srv_n_read_io_threads = (ulint) innobase_read_io_threads;
srv_n_write_io_threads = (ulint) innobase_write_io_threads;
srv_max_merged_io = (ulint) innobase_max_merged_io;
srv_lock_wait_timeout = (ulint) innobase_lock_wait_timeout;
srv_force_recovery = (ulint) innobase_force_recovery;
......@@ -8138,6 +8149,21 @@ static MYSQL_SYSVAR_LONG(file_io_threads, innobase_file_io_threads,
"Number of file I/O threads in InnoDB.",
NULL, NULL, 4, 4, 64, 0);
static MYSQL_SYSVAR_LONG(write_io_threads, innobase_write_io_threads,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Number of write I/O threads in InnoDB.",
NULL, NULL, 1, 1, 64, 0);
static MYSQL_SYSVAR_LONG(read_io_threads, innobase_read_io_threads,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Number of read I/O threads in InnoDB.",
NULL, NULL, 1, 1, 64, 0);
static MYSQL_SYSVAR_LONG(max_merged_io, innobase_max_merged_io,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Max number of adjacent IO requests to merge in InnoDB.",
NULL, NULL, 64, 1, 64, 0);
static MYSQL_SYSVAR_LONG(force_recovery, innobase_force_recovery,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Helps to save your data in case the disk image of the database becomes corrupt.",
......@@ -8217,6 +8243,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(doublewrite),
MYSQL_SYSVAR(fast_shutdown),
MYSQL_SYSVAR(file_io_threads),
MYSQL_SYSVAR(read_io_threads),
MYSQL_SYSVAR(write_io_threads),
MYSQL_SYSVAR(max_merged_io),
MYSQL_SYSVAR(file_per_table),
MYSQL_SYSVAR(flush_log_at_trx_commit),
MYSQL_SYSVAR(flush_method),
......
......@@ -535,21 +535,19 @@ os_file_create_subdirs_if_needed(
FALSE otherwise */
const char* path); /* in: path name */
/****************************************************************************
Initializes the asynchronous io system. Creates separate aio array for
non-ibuf read and write, a third aio array for the ibuf i/o, with just one
segment, two aio arrays for log reads and writes with one segment, and a
synchronous aio array of the specified size. The combined number of segments
in the three first aio arrays is the parameter n_segments given to the
function. The caller must create an i/o handler thread for each segment in
the four first arrays, but not for the sync aio array. */
Initializes the asynchronous io system. Creates n_read_threads segments for
read, n_write_threads segments for writes, one segment for the ibuf i/o, and
one segment for log IO. Returns the number of segments created. When async
IO is not used, and 4 threads should be created to process requests put
in the segments. */
void
ulint
os_aio_init(
/*========*/
ulint n, /* in: maximum number of pending aio operations
allowed; n must be divisible by n_segments */
ulint n_segments, /* in: combined number of segments in the four
first aio arrays; must be >= 4 */
ulint ios_per_array, /* in: maximum number of pending aio operations
allowed per array */
ulint n_read_threads, /* in: number of read threads */
ulint n_write_threads, /* in: number of write threads */
ulint n_slots_sync); /* in: number of slots in the sync aio array */
/***********************************************************************
Requests an asynchronous i/o operation. */
......
......@@ -90,6 +90,12 @@ extern ulint srv_mem_pool_size;
extern ulint srv_lock_table_size;
extern ulint srv_n_file_io_threads;
/* Number of background IO threads for read and write. Replaces
* srv_n_file_io_threads. */
extern ulint srv_n_read_io_threads;
extern ulint srv_n_write_io_threads;
/* Max number of adjacent IO requests to merge into one large request. */
extern ulint srv_max_merged_io;
/* Number of IO operations per second the server can do */
extern ulint srv_io_capacity;
......
......@@ -62,6 +62,28 @@ ibool os_aio_use_native_aio = FALSE;
ibool os_aio_print_debug = FALSE;
/* State for the state of an IO request in simulated AIO.
Protocol for simulated aio:
client requests IO: find slot with reserved = FALSE. Add entry with
status = OS_AIO_NOT_ISSUED.
IO thread wakes: find adjacent slots with reserved = TRUE and status =
OS_AIO_NOT_ISSUED. Change status for slots to
OS_AIO_ISSUED.
IO operation completes: set status for slots to OS_AIO_DONE. set status
for the first slot to OS_AIO_CLAIMED and return
result for that slot.
When there are multiple read and write threads, they all compete to execute
the requests in the array (os_aio_array_t). This avoids the need to load
balance requests at the time the request is made at the cost of waking all
threads when a request is available.
*/
typedef enum {
OS_AIO_NOT_ISSUED, /* Available to be processed by an IO thread. */
OS_AIO_ISSUED, /* Being processed by an IO thread. */
OS_AIO_DONE, /* Request processed. */
OS_AIO_CLAIMED /* Result being returned to client. */
} os_aio_status;
/* The aio array slot structure */
typedef struct os_aio_slot_struct os_aio_slot_t;
......@@ -70,6 +92,8 @@ struct os_aio_slot_struct{
ulint pos; /* index of the slot in the aio
array */
ibool reserved; /* TRUE if this slot is reserved */
os_aio_status status; /* Status for current request. Valid when reserved
is TRUE. Used only in simulated aio. */
time_t reservation_time;/* time when reserved */
ulint len; /* length of the block to read or
write */
......@@ -80,11 +104,6 @@ struct os_aio_slot_struct{
ulint offset_high; /* 32 high bits of file offset */
os_file_t file; /* file where to read or write */
const char* name; /* file name or path */
ibool io_already_done;/* used only in simulated aio:
TRUE if the physical i/o already
made and only the slot message
needs to be passed to the caller
of os_aio_simulated_handle */
fil_node_t* message1; /* message which is given by the */
void* message2; /* the requester of an aio operation
and which can be used to identify
......@@ -114,9 +133,6 @@ struct os_aio_array_struct{
in this array */
ulint n_slots; /* Total number of slots in the aio array.
This must be divisible by n_threads. */
ulint n_segments;/* Number of segments in the aio array of
pending aio requests. A thread can wait
separately for any one of the segments. */
ulint n_reserved;/* Number of reserved slots in the
aio array outside the ibuf segment */
os_aio_slot_t* slots; /* Pointer to the slots in the array */
......@@ -133,6 +149,17 @@ struct os_aio_array_struct{
/* Array of events used in simulated aio */
os_event_t* os_aio_segment_wait_events = NULL;
/* Number of threads for reading and writing. */
ulint os_aio_read_threads = 0;
ulint os_aio_write_threads = 0;
/* Number for the first global segment for reading. */
const ulint os_aio_first_read_segment = 2;
/* Number for the first global segment for writing. Set to
2 + os_aio_read_write_threads. */
ulint os_aio_first_write_segment = 0;
/* The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These
are NULL when the module has not yet been initialized. */
static os_aio_array_t* os_aio_read_array = NULL;
......@@ -141,11 +168,39 @@ static os_aio_array_t* os_aio_ibuf_array = NULL;
static os_aio_array_t* os_aio_log_array = NULL;
static os_aio_array_t* os_aio_sync_array = NULL;
/* Per thread buffer used for merged IO requests. Used by
os_aio_simulated_handle so that a buffer doesn't have to be allocated
for each request. */
static char* os_aio_thread_buffer[SRV_MAX_N_IO_THREADS];
static ulint os_aio_thread_buffer_size[SRV_MAX_N_IO_THREADS];
/* Count pages read and written per thread */
static ulint os_aio_thread_io_reads[SRV_MAX_N_IO_THREADS];
static ulint os_aio_thread_io_writes[SRV_MAX_N_IO_THREADS];
/* Number of IO operations done. One request can be for N pages. */
static ulint os_aio_thread_io_requests[SRV_MAX_N_IO_THREADS];
/* usecs spent blocked on an IO request */
static double os_aio_thread_io_wait[SRV_MAX_N_IO_THREADS];
/* max usecs spent blocked on an IO request */
static double os_aio_thread_max_io_wait[SRV_MAX_N_IO_THREADS];
/* Number of IO global segments. An IO handler thread is created for each
global segment, except for the segment associated with os_aio_sync_array.
Several segments can be associated with os_aio_{read,write}_array. One
segment is created for each of the other arrays. This is also the number
of valid entries in srv_io_thread_reads, srv_io_thread_writes,
srv_io_thread_op_info, srv_io_thread_function and os_aio_segment_wait_events. */
static ulint os_aio_n_segments = ULINT_UNDEFINED;
/* If the following is TRUE, read i/o handler threads try to
wait until a batch of new read requests have been posted */
static ibool os_aio_recommend_sleep_for_read_threads = FALSE;
/* Set to TRUE to temporarily block reads from being scheduled while a batch
of read requests is added to allow them to be merged by the IO handler thread
if they are adjacent. Declared volatile because we don't want this to be
read from a register in a loop when another thread may change the value in
memory.
*/
static volatile ibool os_aio_recommend_sleep_for_read_threads = FALSE;
ulint os_n_file_reads = 0;
ulint os_bytes_read_since_printout = 0;
......@@ -165,6 +220,19 @@ ulint os_file_n_pending_pwrites = 0;
ulint os_n_pending_writes = 0;
ulint os_n_pending_reads = 0;
/* TODO -- does InnoDB provide a portable method for this? */
static double time_usecs() {
#ifdef __WIN__
return 0.0;
#else
struct timeval tv;
if (gettimeofday(&tv, NULL))
return 0;
else
return tv.tv_sec * 1000000.0 + tv.tv_usec;
#endif
}
/***************************************************************************
Gets the operating system version. Currently works only on Windows. */
......@@ -2864,9 +2932,8 @@ os_aio_array_t*
os_aio_array_create(
/*================*/
/* out, own: aio array */
ulint n, /* in: maximum number of pending aio operations
allowed; n must be divisible by n_segments */
ulint n_segments) /* in: number of segments in the aio array */
ulint n) /* in: maximum number of pending aio operations
allowed */
{
os_aio_array_t* array;
ulint i;
......@@ -2875,7 +2942,6 @@ os_aio_array_create(
OVERLAPPED* over;
#endif
ut_a(n > 0);
ut_a(n_segments > 0);
array = ut_malloc(sizeof(os_aio_array_t));
......@@ -2886,7 +2952,6 @@ os_aio_array_create(
os_event_set(array->is_empty);
array->n_slots = n;
array->n_segments = n_segments;
array->n_reserved = 0;
array->slots = ut_malloc(n * sizeof(os_aio_slot_t));
#ifdef __WIN__
......@@ -2913,70 +2978,75 @@ os_aio_array_create(
/****************************************************************************
Initializes the asynchronous io system. Calls also os_io_init_simple.
Creates a separate aio array for
non-ibuf read and write, a third aio array for the ibuf i/o, with just one
segment, two aio arrays for log reads and writes with one segment, and a
synchronous aio array of the specified size. The combined number of segments
in the three first aio arrays is the parameter n_segments given to the
function. The caller must create an i/o handler thread for each segment in
the four first arrays, but not for the sync aio array. */
Creates an aio array for each of non-ibuf read, non-ibuf write, ibuf IO,
log IO, and synchronous IO. The caller must create i/o handler thread for all
but the synchronous aio array. Multiple threads can access the same array for
the non-ibuf read (prefetch) and write (flush dirty buffer pages) arrays.
Return the number of AIO handler threads. */
void
ulint
os_aio_init(
/*========*/
ulint n, /* in: maximum number of pending aio operations
allowed; n must be divisible by n_segments */
ulint n_segments, /* in: combined number of segments in the four
first aio arrays; must be >= 4 */
ulint ios_per_array, /* in: maximum number of pending aio operations
allowed per array */
ulint n_read_threads, /* in: number of read threads */
ulint n_write_threads, /* in: number of write threads */
ulint n_slots_sync) /* in: number of slots in the sync aio array */
{
ulint n_read_segs;
ulint n_write_segs;
ulint n_per_seg;
ulint i;
ulint n_segments = 2 + n_read_threads + n_write_threads;
#ifdef POSIX_ASYNC_IO
sigset_t sigset;
#endif
ut_ad(n % n_segments == 0);
ut_ad(n_segments >= 4);
ut_a(ios_per_array >= OS_AIO_N_PENDING_IOS_PER_THREAD);
ut_a(n_read_threads >= 1 && n_read_threads <= 64);
ut_a(n_write_threads >= 1 && n_write_threads <= 64);
ut_a(n_segments < SRV_MAX_N_IO_THREADS);
os_io_init_simple();
for (i = 0; i < n_segments; i++) {
srv_set_io_thread_op_info(i, "not started yet");
os_aio_thread_io_reads[i] = 0;
os_aio_thread_io_writes[i] = 0;
os_aio_thread_io_requests[i] = 0;
os_aio_thread_buffer[i] = 0;
os_aio_thread_buffer_size[i] = 0;
os_aio_thread_io_wait[i] = 0;
os_aio_thread_max_io_wait[i] = 0;
}
n_per_seg = n / n_segments;
n_write_segs = (n_segments - 2) / 2;
n_read_segs = n_segments - 2 - n_write_segs;
os_aio_read_threads = n_read_threads;
os_aio_write_threads = n_write_threads;
os_aio_first_write_segment = os_aio_first_read_segment + os_aio_read_threads;
/* fprintf(stderr, "Array n per seg %lu\n", n_per_seg); */
fprintf(stderr,
"InnoDB: ios_per_array %lu read threads %lu write threads %lu\n",
ios_per_array, os_aio_read_threads, os_aio_write_threads);
os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1);
os_aio_ibuf_array = os_aio_array_create(ios_per_array);
srv_io_thread_function[0] = "insert buffer thread";
os_aio_log_array = os_aio_array_create(n_per_seg, 1);
os_aio_log_array = os_aio_array_create(ios_per_array);
srv_io_thread_function[1] = "log thread";
os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg,
n_read_segs);
for (i = 2; i < 2 + n_read_segs; i++) {
os_aio_read_array = os_aio_array_create(ios_per_array);
for (i = os_aio_first_read_segment; i < os_aio_first_write_segment; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
srv_io_thread_function[i] = "read thread";
}
os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg,
n_write_segs);
for (i = 2 + n_read_segs; i < n_segments; i++) {
os_aio_write_array = os_aio_array_create(ios_per_array);
for (i = os_aio_first_write_segment; i < n_segments; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
srv_io_thread_function[i] = "write thread";
}
os_aio_sync_array = os_aio_array_create(n_slots_sync, 1);
os_aio_sync_array = os_aio_array_create(n_slots_sync);
os_aio_n_segments = n_segments;
os_aio_n_segments = 2 + os_aio_read_threads + os_aio_write_threads;
os_aio_validate();
......@@ -3004,7 +3074,8 @@ os_aio_init(
pthread_sigmask(SIG_BLOCK, &sigset, NULL); */
#endif
}
return os_aio_n_segments;
}
#ifdef WIN_ASYNC_IO
/****************************************************************************
......@@ -3062,76 +3133,28 @@ os_aio_wait_until_no_pending_writes(void)
}
/**************************************************************************
Calculates segment number for a slot. */
Calculates aio array from global segment number. */
static
ulint
os_aio_get_segment_no_from_slot(
/*============================*/
/* out: segment number (which is the number
used by, for example, i/o-handler threads) */
os_aio_array_t* array, /* in: aio wait array */
os_aio_slot_t* slot) /* in: slot in this array */
{
ulint segment;
ulint seg_len;
if (array == os_aio_ibuf_array) {
segment = 0;
} else if (array == os_aio_log_array) {
segment = 1;
} else if (array == os_aio_read_array) {
seg_len = os_aio_read_array->n_slots
/ os_aio_read_array->n_segments;
segment = 2 + slot->pos / seg_len;
} else {
ut_a(array == os_aio_write_array);
seg_len = os_aio_write_array->n_slots
/ os_aio_write_array->n_segments;
segment = os_aio_read_array->n_segments + 2
+ slot->pos / seg_len;
}
return(segment);
}
/**************************************************************************
Calculates local segment number and aio array from global segment number. */
static
ulint
os_aio_get_array_and_local_segment(
os_aio_array_t*
os_aio_get_array(
/*===============================*/
/* out: local segment number within
the aio array */
os_aio_array_t** array, /* out: aio wait array */
/* out: aio wait array */
ulint global_segment)/* in: global segment number */
{
ulint segment;
ut_a(global_segment < os_aio_n_segments);
if (global_segment == 0) {
*array = os_aio_ibuf_array;
segment = 0;
return os_aio_ibuf_array;
} else if (global_segment == 1) {
*array = os_aio_log_array;
segment = 0;
return os_aio_log_array;
} else if (global_segment < os_aio_read_array->n_segments + 2) {
*array = os_aio_read_array;
} else if (global_segment < os_aio_first_write_segment) {
return os_aio_read_array;
segment = global_segment - 2;
} else {
*array = os_aio_write_array;
segment = global_segment - (os_aio_read_array->n_segments + 2);
return os_aio_write_array;
}
return(segment);
}
/***********************************************************************
......@@ -3253,7 +3276,7 @@ loop:
break;
}
}
ut_a(i < array->n_slots);
array->n_reserved++;
if (array->n_reserved == 1) {
......@@ -3275,7 +3298,7 @@ loop:
slot->buf = buf;
slot->offset = offset;
slot->offset_high = offset_high;
slot->io_already_done = FALSE;
slot->status = OS_AIO_NOT_ISSUED;
#ifdef WIN_ASYNC_IO
control = &(slot->control);
......@@ -3328,6 +3351,7 @@ os_aio_array_free_slot(
ut_ad(slot->reserved);
slot->reserved = FALSE;
slot->status = OS_AIO_NOT_ISSUED;
array->n_reserved--;
......@@ -3351,31 +3375,32 @@ static
void
os_aio_simulated_wake_handler_thread(
/*=================================*/
ulint global_segment) /* in: the number of the segment in the aio
arrays */
os_aio_array_t* array) /* in: aio array for which wakeup is done */
{
os_aio_array_t* array;
os_aio_slot_t* slot;
ulint segment;
ulint n;
ulint i;
ut_ad(!os_aio_use_native_aio);
n = array->n_slots;
segment = os_aio_get_array_and_local_segment(&array, global_segment);
n = array->n_slots / array->n_segments;
/* Look through n slots after the segment * n'th slot */
/* Look through n slots */
os_mutex_enter(array->mutex);
for (i = 0; i < n; i++) {
slot = os_aio_array_get_nth_slot(array, i + segment * n);
if (slot->reserved) {
/* Found an i/o request */
slot = os_aio_array_get_nth_slot(array, i);
if (slot->reserved &&
(slot->status == OS_AIO_NOT_ISSUED ||
slot->status == OS_AIO_DONE)) {
/* Found an i/o request
OS_AIO_NOT_ISSUED means the read or write request has
* yet to be done. OS_AIO_DONE means the request has been
* done but it was part of a set of requests merged into
* one read or write call and was not the first block in
* the request, so the handling of the IO completion for
* that block has not been done. */
break;
}
}
......@@ -3383,7 +3408,25 @@ os_aio_simulated_wake_handler_thread(
os_mutex_exit(array->mutex);
if (i < n) {
os_event_set(os_aio_segment_wait_events[global_segment]);
if (array == os_aio_ibuf_array) {
os_event_set(os_aio_segment_wait_events[0]);
} else if (array == os_aio_log_array) {
os_event_set(os_aio_segment_wait_events[1]);
} else if (array == os_aio_read_array) {
ulint x;
for (x = os_aio_first_read_segment; x < os_aio_first_write_segment; x++)
os_event_set(os_aio_segment_wait_events[x]);
} else if (array == os_aio_write_array) {
ulint x;
for (x = os_aio_first_write_segment; x < os_aio_n_segments; x++)
os_event_set(os_aio_segment_wait_events[x]);
} else {
ut_a(0);
}
}
}
......@@ -3404,9 +3447,10 @@ os_aio_simulated_wake_handler_threads(void)
os_aio_recommend_sleep_for_read_threads = FALSE;
for (i = 0; i < os_aio_n_segments; i++) {
os_aio_simulated_wake_handler_thread(i);
}
os_aio_simulated_wake_handler_thread(os_aio_ibuf_array);
os_aio_simulated_wake_handler_thread(os_aio_log_array);
os_aio_simulated_wake_handler_thread(os_aio_read_array);
os_aio_simulated_wake_handler_thread(os_aio_write_array);
}
/**************************************************************************
......@@ -3419,19 +3463,14 @@ void
os_aio_simulated_put_read_threads_to_sleep(void)
/*============================================*/
{
os_aio_array_t* array;
ulint g;
/* TODO(mcallaghan): provide similar function for write? */
os_aio_recommend_sleep_for_read_threads = TRUE;
for (g = 0; g < os_aio_n_segments; g++) {
os_aio_get_array_and_local_segment(&array, g);
if (array == os_aio_read_array) {
for (g = os_aio_first_read_segment; g < os_aio_first_write_segment; g++) {
os_event_reset(os_aio_segment_wait_events[g]);
}
}
}
/***********************************************************************
......@@ -3560,9 +3599,7 @@ try_again:
#endif
} else {
if (!wake_later) {
os_aio_simulated_wake_handler_thread(
os_aio_get_segment_no_from_slot(
array, slot));
os_aio_simulated_wake_handler_thread(array);
}
}
} else if (type == OS_FILE_WRITE) {
......@@ -3578,9 +3615,7 @@ try_again:
#endif
} else {
if (!wake_later) {
os_aio_simulated_wake_handler_thread(
os_aio_get_segment_no_from_slot(
array, slot));
os_aio_simulated_wake_handler_thread(array);
}
}
} else {
......@@ -3646,7 +3681,7 @@ ibool
os_aio_windows_handle(
/*==================*/
/* out: TRUE if the aio operation succeeded */
ulint segment, /* in: the number of the segment in the aio
ulint global_segment, /* in: the number of the segment in the aio
arrays to wait for; segment 0 is the ibuf
i/o thread, segment 1 the log i/o thread,
then follow the non-ibuf read threads, and as
......@@ -3664,7 +3699,6 @@ os_aio_windows_handle(
void** message2,
ulint* type) /* out: OS_FILE_WRITE or ..._READ */
{
ulint orig_seg = segment;
os_aio_array_t* array;
os_aio_slot_t* slot;
ulint n;
......@@ -3673,34 +3707,30 @@ os_aio_windows_handle(
BOOL ret;
DWORD len;
if (segment == ULINT_UNDEFINED) {
if (global_segment == ULINT_UNDEFINED) {
array = os_aio_sync_array;
segment = 0;
} else {
segment = os_aio_get_array_and_local_segment(&array, segment);
array = os_aio_get_array(global_segment);
}
/* NOTE! We only access constant fields in os_aio_array. Therefore
we do not have to acquire the protecting mutex yet */
ut_ad(os_aio_validate());
ut_ad(segment < array->n_segments);
n = array->n_slots / array->n_segments;
n = array->n_slots;
if (array == os_aio_sync_array) {
os_event_wait(os_aio_array_get_nth_slot(array, pos)->event);
i = pos;
} else {
srv_set_io_thread_op_info(orig_seg, "wait Windows aio");
i = os_event_wait_multiple(n,
(array->native_events)
+ segment * n);
srv_set_io_thread_op_info(global_segment, "wait Windows aio");
i = os_event_wait_multiple(n, (array->native_events));
}
os_mutex_enter(array->mutex);
slot = os_aio_array_get_nth_slot(array, i + segment * n);
slot = os_aio_array_get_nth_slot(array, i);
ut_a(slot->reserved);
......@@ -3878,14 +3908,16 @@ os_aio_simulated_handle(
ulint* type) /* out: OS_FILE_WRITE or ..._READ */
{
os_aio_array_t* array;
ulint segment;
os_aio_slot_t* slot;
os_aio_slot_t* slot2;
os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE];
os_aio_slot_t* lowest_request;
os_aio_slot_t* oldest_request;
ulint n_consecutive;
ulint total_len;
ulint offs;
ulint lowest_offset;
ulint oldest_offset;
ulint biggest_age;
ulint age;
byte* combined_buf;
......@@ -3894,7 +3926,9 @@ os_aio_simulated_handle(
ulint n;
ulint i;
segment = os_aio_get_array_and_local_segment(&array, global_segment);
double start_usecs, stop_usecs, elapsed_usecs;
time_t now;
array = os_aio_get_array(global_segment);
restart:
/* NOTE! We only access constant fields in os_aio_array. Therefore
......@@ -3903,11 +3937,10 @@ restart:
srv_set_io_thread_op_info(global_segment,
"looking for i/o requests (a)");
ut_ad(os_aio_validate());
ut_ad(segment < array->n_segments);
n = array->n_slots / array->n_segments;
n = array->n_slots;
/* Look through n slots after the segment * n'th slot */
/* Look through n slots */
if (array == os_aio_read_array
&& os_aio_recommend_sleep_for_read_threads) {
......@@ -3927,9 +3960,9 @@ restart:
done */
for (i = 0; i < n; i++) {
slot = os_aio_array_get_nth_slot(array, i + segment * n);
slot = os_aio_array_get_nth_slot(array, i);
if (slot->reserved && slot->io_already_done) {
if (slot->reserved && slot->status == OS_AIO_DONE) {
if (os_aio_print_debug) {
fprintf(stderr,
......@@ -3944,74 +3977,64 @@ restart:
}
}
n_consecutive = 0;
/* If there are at least 2 seconds old requests, then pick the oldest
one to prevent starvation. If several requests have the same age,
then pick the one at the lowest offset. */
biggest_age = 0;
lowest_offset = ULINT_MAX;
now = time(NULL);
oldest_request = lowest_request = NULL;
oldest_offset = lowest_offset = ULINT_MAX;
/* Find the oldest request and the request with the smallest offset */
for (i = 0; i < n; i++) {
slot = os_aio_array_get_nth_slot(array, i + segment * n);
slot = os_aio_array_get_nth_slot(array, i);
if (slot->reserved) {
age = (ulint)difftime(time(NULL),
slot->reservation_time);
if (slot->reserved && slot->status == OS_AIO_NOT_ISSUED) {
age = (ulint)difftime(now, slot->reservation_time);
/* If there are at least 2 seconds old requests, then pick the oldest
one to prevent starvation. If several requests have the same age,
then pick the one at the lowest offset. */
if ((age >= 2 && age > biggest_age)
|| (age >= 2 && age == biggest_age
&& slot->offset < lowest_offset)) {
&& slot->offset < oldest_offset)) {
/* Found an i/o request */
consecutive_ios[0] = slot;
n_consecutive = 1;
biggest_age = age;
lowest_offset = slot->offset;
oldest_request = slot;
oldest_offset = slot->offset;
}
}
}
if (n_consecutive == 0) {
/* There were no old requests. Look for an i/o request at the
lowest offset in the array (we ignore the high 32 bits of the
offset in these heuristics) */
lowest_offset = ULINT_MAX;
for (i = 0; i < n; i++) {
slot = os_aio_array_get_nth_slot(array,
i + segment * n);
if (slot->reserved && slot->offset < lowest_offset) {
/* Look for an i/o request at the lowest offset in the array
* (we ignore the high 32 bits of the offset) */
if (slot->offset < lowest_offset) {
/* Found an i/o request */
consecutive_ios[0] = slot;
lowest_request = slot;
n_consecutive = 1;
lowest_offset = slot->offset;
}
}
}
if (n_consecutive == 0) {
if (!lowest_request && !oldest_request) {
/* No i/o requested at the moment */
goto wait_for_io;
}
slot = consecutive_ios[0];
if (oldest_request) {
slot = oldest_request;
} else {
slot = lowest_request;
}
consecutive_ios[0] = slot;
n_consecutive = 1;
/* Check if there are several consecutive blocks to read or write */
consecutive_loop:
for (i = 0; i < n; i++) {
slot2 = os_aio_array_get_nth_slot(array, i + segment * n);
slot2 = os_aio_array_get_nth_slot(array, i);
if (slot2->reserved && slot2 != slot
&& slot2->offset == slot->offset + slot->len
......@@ -4019,7 +4042,8 @@ consecutive_loop:
&& slot->offset + slot->len > slot->offset
&& slot2->offset_high == slot->offset_high
&& slot2->type == slot->type
&& slot2->file == slot->file) {
&& slot2->file == slot->file
&& slot2->status == OS_AIO_NOT_ISSUED) {
/* Found a consecutive i/o request */
......@@ -4028,7 +4052,8 @@ consecutive_loop:
slot = slot2;
if (n_consecutive < OS_AIO_MERGE_N_CONSECUTIVE) {
if (n_consecutive < OS_AIO_MERGE_N_CONSECUTIVE &&
n_consecutive < srv_max_merged_io) {
goto consecutive_loop;
} else {
......@@ -4048,6 +4073,8 @@ consecutive_loop:
for (i = 0; i < n_consecutive; i++) {
total_len += consecutive_ios[i]->len;
ut_a(consecutive_ios[i]->status == OS_AIO_NOT_ISSUED);
consecutive_ios[i]->status = OS_AIO_ISSUED;
}
if (n_consecutive == 1) {
......@@ -4055,7 +4082,16 @@ consecutive_loop:
combined_buf = slot->buf;
combined_buf2 = NULL;
} else {
combined_buf2 = ut_malloc(total_len + UNIV_PAGE_SIZE);
if ((total_len + UNIV_PAGE_SIZE) > os_aio_thread_buffer_size[global_segment]) {
if (os_aio_thread_buffer[global_segment])
ut_free(os_aio_thread_buffer[global_segment]);
os_aio_thread_buffer[global_segment] = ut_malloc(total_len + UNIV_PAGE_SIZE);
os_aio_thread_buffer_size[global_segment] = total_len + UNIV_PAGE_SIZE;
}
combined_buf2 = os_aio_thread_buffer[global_segment];
ut_a(combined_buf2);
......@@ -4066,6 +4102,9 @@ consecutive_loop:
this assumes that there is just one i/o-handler thread serving
a single segment of slots! */
ut_a(slot->reserved);
ut_a(slot->status == OS_AIO_ISSUED);
os_mutex_exit(array->mutex);
if (slot->type == OS_FILE_WRITE && n_consecutive > 1) {
......@@ -4092,6 +4131,7 @@ consecutive_loop:
/* Do the i/o with ordinary, synchronous i/o functions: */
if (slot->type == OS_FILE_WRITE) {
os_aio_thread_io_writes[global_segment] += n_consecutive;
if (array == os_aio_write_array) {
if ((total_len % UNIV_PAGE_SIZE != 0)
|| (slot->offset % UNIV_PAGE_SIZE != 0)) {
......@@ -4106,18 +4146,30 @@ consecutive_loop:
os_file_check_page_trailers(combined_buf, total_len);
}
start_usecs = time_usecs();
ret = os_file_write(slot->name, slot->file, combined_buf,
slot->offset, slot->offset_high,
total_len);
stop_usecs = time_usecs();
elapsed_usecs = stop_usecs - start_usecs;
if (elapsed_usecs < 0) elapsed_usecs = 0;
if (array == os_aio_write_array) {
os_file_check_page_trailers(combined_buf, total_len);
}
} else {
start_usecs = time_usecs();
os_aio_thread_io_reads[global_segment] += n_consecutive;
ret = os_file_read(slot->file, combined_buf,
slot->offset, slot->offset_high, total_len);
stop_usecs = time_usecs();
elapsed_usecs = stop_usecs - start_usecs;
if (elapsed_usecs < 0) elapsed_usecs = 0;
}
if (elapsed_usecs > os_aio_thread_max_io_wait[global_segment])
os_aio_thread_max_io_wait[global_segment] = elapsed_usecs;
os_aio_thread_io_wait[global_segment] += elapsed_usecs;
os_aio_thread_io_requests[global_segment]++;
ut_a(ret);
srv_set_io_thread_op_info(global_segment, "file i/o done");
......@@ -4140,16 +4192,13 @@ consecutive_loop:
}
}
if (combined_buf2) {
ut_free(combined_buf2);
}
os_mutex_enter(array->mutex);
/* Mark the i/os done in slots */
for (i = 0; i < n_consecutive; i++) {
consecutive_ios[i]->io_already_done = TRUE;
ut_a(consecutive_ios[i]->status == OS_AIO_ISSUED);
consecutive_ios[i]->status = OS_AIO_DONE;
}
/* We return the messages for the first slot now, and if there were
......@@ -4159,6 +4208,8 @@ consecutive_loop:
slot_io_done:
ut_a(slot->reserved);
ut_a(slot->status == OS_AIO_DONE);
slot->status = OS_AIO_CLAIMED;
*message1 = slot->message1;
*message2 = slot->message2;
......@@ -4168,6 +4219,7 @@ slot_io_done:
os_mutex_exit(array->mutex);
os_aio_array_free_slot(array, slot);
srv_set_io_thread_op_info(global_segment, "exited handler");
return(ret);
......@@ -4214,7 +4266,6 @@ os_aio_array_validate(
os_mutex_enter(array->mutex);
ut_a(array->n_slots > 0);
ut_a(array->n_segments > 0);
for (i = 0; i < array->n_slots; i++) {
slot = os_aio_array_get_nth_slot(array, i);
......@@ -4264,11 +4315,19 @@ os_aio_print(
double time_elapsed;
double avg_bytes_read;
ulint i;
ulint num_issued, num_done, num_claimed;
for (i = 0; i < srv_n_file_io_threads; i++) {
fprintf(file, "I/O thread %lu state: %s (%s)", (ulong) i,
srv_io_thread_op_info[i],
srv_io_thread_function[i]);
for (i = 0; i < os_aio_n_segments; i++) {
fprintf(file,
"I/O thread %lu state: %s (%s) reads %lu writes %lu "
"requests %lu io secs %lf io msecs/request %lf max_io_wait %lf",
i, srv_io_thread_op_info[i], srv_io_thread_function[i],
os_aio_thread_io_reads[i], os_aio_thread_io_writes[i],
os_aio_thread_io_requests[i],
os_aio_thread_io_wait[i] / 1000000.0,
os_aio_thread_io_requests[i] ?
os_aio_thread_io_wait[i] / os_aio_thread_io_requests[i] / 1000.0 : 0.0,
os_aio_thread_max_io_wait[i] / 1000.0);
#ifndef __WIN__
if (os_aio_segment_wait_events[i]->is_set) {
......@@ -4288,14 +4347,21 @@ loop:
os_mutex_enter(array->mutex);
ut_a(array->n_slots > 0);
ut_a(array->n_segments > 0);
n_reserved = 0;
num_done = num_issued = num_claimed = 0;
for (i = 0; i < array->n_slots; i++) {
slot = os_aio_array_get_nth_slot(array, i);
if (slot->reserved) {
if (slot->status == OS_AIO_ISSUED)
num_issued++;
else if (slot->status == OS_AIO_DONE)
num_done++;
else {
ut_ad(slot->status == OS_AIO_CLAIMED);
num_claimed++;
}
n_reserved++;
#if 0
fprintf(stderr, "Reserved slot, messages %p %p\n",
......@@ -4341,6 +4407,13 @@ loop:
goto loop;
}
putc('\n', file);
fprintf(file,
"Summary of background IO slot status: %lu issued, "
"%lu done, %lu claimed, sleep set %d\n",
num_issued, num_done, num_claimed,
os_aio_recommend_sleep_for_read_threads);
putc('\n', file);
current_time = time(NULL);
time_elapsed = 0.001 + difftime(current_time, os_last_printout);
......
......@@ -179,6 +179,10 @@ ibool srv_extra_dirty_writes = TRUE; /* Write dirty pages to disk when pct
ulint srv_n_file_io_threads = ULINT_MAX;
ulint srv_n_read_io_threads = ULINT_MAX;
ulint srv_n_write_io_threads = ULINT_MAX;
ulint srv_max_merged_io = 64;
#ifdef UNIV_LOG_ARCHIVE
ibool srv_log_archive_on = FALSE;
ibool srv_archive_recovery = 0;
......
......@@ -985,6 +985,7 @@ innobase_start_or_create_for_mysql(void)
ulint i;
ibool srv_file_per_table_original_value = srv_file_per_table;
mtr_t mtr;
ulint n_threads;
#ifdef HAVE_DARWIN_THREADS
# ifdef F_FULLFSYNC
/* This executable has been compiled on Mac OS X 10.3 or later.
......@@ -1238,26 +1239,34 @@ innobase_start_or_create_for_mysql(void)
}
/* Restrict the maximum number of file i/o threads */
if (srv_n_file_io_threads > SRV_MAX_N_IO_THREADS) {
srv_n_file_io_threads = SRV_MAX_N_IO_THREADS;
if ((srv_n_read_io_threads + srv_n_write_io_threads) > SRV_MAX_N_IO_THREADS) {
fprintf(stderr,
"InnoDB: requested too many read(%d) or write(%d) IO threads, max is %d\n",
srv_n_read_io_threads, srv_n_write_io_threads, SRV_MAX_N_IO_THREADS);
return(DB_ERROR);
}
if (!os_aio_use_native_aio) {
/* In simulated aio we currently have use only for 4 threads */
srv_n_file_io_threads = 4;
os_aio_init(8 * SRV_N_PENDING_IOS_PER_THREAD
* srv_n_file_io_threads,
srv_n_file_io_threads,
/* More than 4 threads are now supported. */
n_threads = os_aio_init(8 * SRV_N_PENDING_IOS_PER_THREAD,
srv_n_read_io_threads,
srv_n_write_io_threads,
SRV_MAX_N_PENDING_SYNC_IOS);
} else {
os_aio_init(SRV_N_PENDING_IOS_PER_THREAD
* srv_n_file_io_threads,
srv_n_file_io_threads,
/* Might need more slots here. Alas, I don't do windows. */
n_threads = os_aio_init(SRV_N_PENDING_IOS_PER_THREAD,
srv_n_read_io_threads,
srv_n_write_io_threads,
SRV_MAX_N_PENDING_SYNC_IOS);
}
if (n_threads > SRV_MAX_N_IO_THREADS) {
fprintf(stderr,
"InnoDB: requested too many IO threads(%d), max is %d\n",
n_threads, SRV_MAX_N_IO_THREADS);
return(DB_ERROR);
}
fil_init(srv_max_n_open_files);
if (srv_use_awe) {
......@@ -1295,7 +1304,7 @@ innobase_start_or_create_for_mysql(void)
/* Create i/o-handler threads: */
for (i = 0; i < srv_n_file_io_threads; i++) {
for (i = 0; i < n_threads; i++) {
n[i] = i;
os_thread_create(io_handler_thread, n + i, thread_ids + i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment