Commit fe0112e2 authored by Jan Lindström's avatar Jan Lindström

MDEV-7424: InnoDB: Assertion failure in thread 139901753345792

  in file buf0mtflu.cc line 439

  Analysis: At shutdown multi-threaded flush sends a exit work
  items to all mtflush threads. We wait until the work queue is
  empty. However, as we did not hold the mutex, some other thread
  could also put work-items to work queue.

  Fix: Take mutex before adding exit work items to work queue and
  wait until all work-items are really processed. Release
  mutex after we have marked that multi-threaded flush is not
  anymore active.

  Fix test failure on innodb_bug12902967 caused by unnecessary
  info output on xtradb/buf/buf0mtflush.cc.

   Do not try to enable atomic writes if the file type
   is not OS_DATA_FILE. Atomic writes are unnecessary
   for log files. If we try to enable atomic writes
   to log writes that are stored to media supporting
   atomic writes we will end up problems later.
parent 2de9427c
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved. Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2014, SkySQL Ab. All Rights Reserved. Copyright (C) 2013, 2015, MariaDB Corporation. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -202,6 +202,7 @@ buf_mtflu_flush_pool_instance( ...@@ -202,6 +202,7 @@ buf_mtflu_flush_pool_instance(
return 0; return 0;
} }
memset(&n, 0, sizeof(flush_counters_t));
if (work_item->wr.flush_type == BUF_FLUSH_LRU) { if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value. /* srv_LRU_scan_depth can be arbitrarily large value.
...@@ -379,9 +380,6 @@ buf_mtflu_io_thread_exit(void) ...@@ -379,9 +380,6 @@ buf_mtflu_io_thread_exit(void)
mtflush_io->gwt_status = WTHR_KILL_IT; mtflush_io->gwt_status = WTHR_KILL_IT;
fprintf(stderr, "InnoDB: [Note]: Signal mtflush_io_threads to exit [%lu]\n",
srv_mtflush_threads);
/* This lock is to safequard against timing bug: flush request take /* This lock is to safequard against timing bug: flush request take
this mutex before sending work items to be processed by flush this mutex before sending work items to be processed by flush
threads. Inside flush thread we assume that work queue contains only threads. Inside flush thread we assume that work queue contains only
...@@ -409,6 +407,9 @@ buf_mtflu_io_thread_exit(void) ...@@ -409,6 +407,9 @@ buf_mtflu_io_thread_exit(void)
mtflush_io->wheap); mtflush_io->wheap);
} }
/* Requests sent */
os_fast_mutex_unlock(&mtflush_mtx);
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
...@@ -433,11 +434,14 @@ buf_mtflu_io_thread_exit(void) ...@@ -433,11 +434,14 @@ buf_mtflu_io_thread_exit(void)
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(MT_WAIT_IN_USECS); os_thread_sleep(MT_WAIT_IN_USECS);
/* Make sure that work queue is empty */
while(!ib_wqueue_is_empty(mtflush_io->wq)) while(!ib_wqueue_is_empty(mtflush_io->wq))
{ {
ib_wqueue_nowait(mtflush_io->wq); ib_wqueue_nowait(mtflush_io->wq);
} }
os_fast_mutex_lock(&mtflush_mtx);
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq));
...@@ -447,14 +451,18 @@ buf_mtflu_io_thread_exit(void) ...@@ -447,14 +451,18 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->wr_cq); ib_wqueue_free(mtflush_io->wr_cq);
ib_wqueue_free(mtflush_io->rd_cq); ib_wqueue_free(mtflush_io->rd_cq);
/* Requests sent */ mtflush_io->wq = NULL;
os_fast_mutex_unlock(&mtflush_mtx); mtflush_io->wr_cq = NULL;
os_fast_mutex_free(&mtflush_mtx); mtflush_io->rd_cq = NULL;
os_fast_mutex_free(&mtflush_io->thread_global_mtx); mtflush_work_initialized = 0;
/* Free heap */ /* Free heap */
mem_heap_free(mtflush_io->wheap); mem_heap_free(mtflush_io->wheap);
mem_heap_free(mtflush_io->rheap); mem_heap_free(mtflush_io->rheap);
os_fast_mutex_unlock(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_io->thread_global_mtx);
} }
/******************************************************************//** /******************************************************************//**
...@@ -541,6 +549,10 @@ buf_mtflu_flush_work_items( ...@@ -541,6 +549,10 @@ buf_mtflu_flush_work_items(
mem_heap_t* reply_heap; mem_heap_t* reply_heap;
wrk_t work_item[MTFLUSH_MAX_WORKER]; wrk_t work_item[MTFLUSH_MAX_WORKER];
if (mtflush_ctx->gwt_status == WTHR_KILL_IT) {
return 0;
}
/* Allocate heap where all work items used and queue /* Allocate heap where all work items used and queue
node items areallocated */ node items areallocated */
work_heap = mem_heap_create(0); work_heap = mem_heap_create(0);
......
...@@ -1919,7 +1919,7 @@ os_file_create_func( ...@@ -1919,7 +1919,7 @@ os_file_create_func(
try to set atomic writes and if that fails when creating a new try to set atomic writes and if that fails when creating a new
table, produce a error. If atomic writes are used on existing table, produce a error. If atomic writes are used on existing
file, ignore error and use traditional writes for that file */ file, ignore error and use traditional writes for that file */
if (file != INVALID_HANDLE_VALUE if (file != INVALID_HANDLE_VALUE && type == OS_DATA_FILE
&& (awrites == ATOMIC_WRITES_ON || && (awrites == ATOMIC_WRITES_ON ||
(srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT)) (srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT))
&& !os_file_set_atomic_writes(name, file)) { && !os_file_set_atomic_writes(name, file)) {
...@@ -2065,7 +2065,7 @@ os_file_create_func( ...@@ -2065,7 +2065,7 @@ os_file_create_func(
try to set atomic writes and if that fails when creating a new try to set atomic writes and if that fails when creating a new
table, produce a error. If atomic writes are used on existing table, produce a error. If atomic writes are used on existing
file, ignore error and use traditional writes for that file */ file, ignore error and use traditional writes for that file */
if (file != -1 if (file != -1 && type == OS_DATA_FILE
&& (awrites == ATOMIC_WRITES_ON || && (awrites == ATOMIC_WRITES_ON ||
(srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT)) (srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT))
&& !os_file_set_atomic_writes(name, file)) { && !os_file_set_atomic_writes(name, file)) {
......
...@@ -2741,11 +2741,6 @@ innobase_start_or_create_for_mysql(void) ...@@ -2741,11 +2741,6 @@ innobase_start_or_create_for_mysql(void)
srv_mtflush_threads, srv_mtflush_threads,
mtflush_ctx, mtflush_ctx,
(thread_ids + 6 + 32)); (thread_ids + 6 + 32));
#if UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
#endif
} }
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL); os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
......
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved. Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2014, SkySQL Ab. All Rights Reserved. Copyright (C) 2013, 2015, MariaDB Corporation. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -182,12 +182,9 @@ buf_mtflu_flush_pool_instance( ...@@ -182,12 +182,9 @@ buf_mtflu_flush_pool_instance(
wrk_t *work_item) /*!< inout: work item to be flushed */ wrk_t *work_item) /*!< inout: work item to be flushed */
{ {
flush_counters_t n; flush_counters_t n;
ut_a(work_item != NULL); ut_a(work_item != NULL);
ut_a(work_item->wr.buf_pool != NULL); ut_a(work_item->wr.buf_pool != NULL);
memset(&n, 0, sizeof(flush_counters_t));
if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) { if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
/* We have two choices here. If lsn_limit was /* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer specified then skipping an instance of buffer
...@@ -205,6 +202,7 @@ buf_mtflu_flush_pool_instance( ...@@ -205,6 +202,7 @@ buf_mtflu_flush_pool_instance(
return 0; return 0;
} }
memset(&n, 0, sizeof(flush_counters_t));
if (work_item->wr.flush_type == BUF_FLUSH_LRU) { if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value. /* srv_LRU_scan_depth can be arbitrarily large value.
...@@ -217,14 +215,13 @@ buf_mtflu_flush_pool_instance( ...@@ -217,14 +215,13 @@ buf_mtflu_flush_pool_instance(
} }
buf_flush_batch(work_item->wr.buf_pool, buf_flush_batch(work_item->wr.buf_pool,
work_item->wr.flush_type, work_item->wr.flush_type,
work_item->wr.min, work_item->wr.min,
work_item->wr.lsn_limit, work_item->wr.lsn_limit,
false, false,
&n); &n);
work_item->n_flushed = n.flushed; work_item->n_flushed = n.flushed;
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
...@@ -254,7 +251,7 @@ mtflush_service_io( ...@@ -254,7 +251,7 @@ mtflush_service_io(
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq); work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) { if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS); work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
} }
if (work_item) { if (work_item) {
...@@ -341,10 +338,10 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -341,10 +338,10 @@ DECLARE_THREAD(mtflush_io_thread)(
while (TRUE) { while (TRUE) {
#ifdef UNIV_MTFLUSH_DEBUG #ifdef UNIV_MTFLUSH_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n", fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(), os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq), ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq)); ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_MTFLUSH_DEBUG */ #endif /* UNIV_MTFLUSH_DEBUG */
mtflush_service_io(mtflush_io, this_thread_data); mtflush_service_io(mtflush_io, this_thread_data);
...@@ -383,9 +380,6 @@ buf_mtflu_io_thread_exit(void) ...@@ -383,9 +380,6 @@ buf_mtflu_io_thread_exit(void)
mtflush_io->gwt_status = WTHR_KILL_IT; mtflush_io->gwt_status = WTHR_KILL_IT;
fprintf(stderr, "InnoDB: [Note]: Signal mtflush_io_threads to exit [%lu]\n",
srv_mtflush_threads);
/* This lock is to safequard against timing bug: flush request take /* This lock is to safequard against timing bug: flush request take
this mutex before sending work items to be processed by flush this mutex before sending work items to be processed by flush
threads. Inside flush thread we assume that work queue contains only threads. Inside flush thread we assume that work queue contains only
...@@ -397,6 +391,9 @@ buf_mtflu_io_thread_exit(void) ...@@ -397,6 +391,9 @@ buf_mtflu_io_thread_exit(void)
os_fast_mutex_lock(&mtflush_mtx); os_fast_mutex_lock(&mtflush_mtx);
/* Make sure the work queue is empty */
ut_a(ib_wqueue_is_empty(mtflush_io->wq));
/* Send one exit work item/thread */ /* Send one exit work item/thread */
for (i=0; i < (ulint)srv_mtflush_threads; i++) { for (i=0; i < (ulint)srv_mtflush_threads; i++) {
work_item[i].tsk = MT_WRK_NONE; work_item[i].tsk = MT_WRK_NONE;
...@@ -410,6 +407,9 @@ buf_mtflu_io_thread_exit(void) ...@@ -410,6 +407,9 @@ buf_mtflu_io_thread_exit(void)
mtflush_io->wheap); mtflush_io->wheap);
} }
/* Requests sent */
os_fast_mutex_unlock(&mtflush_mtx);
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
...@@ -434,15 +434,13 @@ buf_mtflu_io_thread_exit(void) ...@@ -434,15 +434,13 @@ buf_mtflu_io_thread_exit(void)
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(MT_WAIT_IN_USECS); os_thread_sleep(MT_WAIT_IN_USECS);
/* Make sure that work queue is empty */
while(!ib_wqueue_is_empty(mtflush_io->wq)) while(!ib_wqueue_is_empty(mtflush_io->wq))
{ {
ib_wqueue_nowait(mtflush_io->wq); ib_wqueue_nowait(mtflush_io->wq);
} }
while(!ib_wqueue_is_empty(mtflush_io->wq)) os_fast_mutex_lock(&mtflush_mtx);
{
ib_wqueue_nowait(mtflush_io->wq);
}
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
...@@ -453,14 +451,18 @@ buf_mtflu_io_thread_exit(void) ...@@ -453,14 +451,18 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->wr_cq); ib_wqueue_free(mtflush_io->wr_cq);
ib_wqueue_free(mtflush_io->rd_cq); ib_wqueue_free(mtflush_io->rd_cq);
/* Requests sent */ mtflush_io->wq = NULL;
os_fast_mutex_unlock(&mtflush_mtx); mtflush_io->wr_cq = NULL;
os_fast_mutex_free(&mtflush_mtx); mtflush_io->rd_cq = NULL;
os_fast_mutex_free(&mtflush_io->thread_global_mtx); mtflush_work_initialized = 0;
/* Free heap */ /* Free heap */
mem_heap_free(mtflush_io->wheap); mem_heap_free(mtflush_io->wheap);
mem_heap_free(mtflush_io->rheap); mem_heap_free(mtflush_io->rheap);
os_fast_mutex_unlock(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_io->thread_global_mtx);
} }
/******************************************************************//** /******************************************************************//**
...@@ -547,6 +549,10 @@ buf_mtflu_flush_work_items( ...@@ -547,6 +549,10 @@ buf_mtflu_flush_work_items(
mem_heap_t* reply_heap; mem_heap_t* reply_heap;
wrk_t work_item[MTFLUSH_MAX_WORKER]; wrk_t work_item[MTFLUSH_MAX_WORKER];
if (mtflush_ctx->gwt_status == WTHR_KILL_IT) {
return 0;
}
/* Allocate heap where all work items used and queue /* Allocate heap where all work items used and queue
node items areallocated */ node items areallocated */
work_heap = mem_heap_create(0); work_heap = mem_heap_create(0);
...@@ -597,9 +603,6 @@ buf_mtflu_flush_work_items( ...@@ -597,9 +603,6 @@ buf_mtflu_flush_work_items(
} }
} }
ut_a(ib_wqueue_is_empty(mtflush_ctx->wq));
ut_a(ib_wqueue_is_empty(mtflush_ctx->wr_cq));
/* Release used work_items and queue nodes */ /* Release used work_items and queue nodes */
mem_heap_free(work_heap); mem_heap_free(work_heap);
mem_heap_free(reply_heap); mem_heap_free(reply_heap);
......
...@@ -2037,7 +2037,7 @@ os_file_create_func( ...@@ -2037,7 +2037,7 @@ os_file_create_func(
try to set atomic writes and if that fails when creating a new try to set atomic writes and if that fails when creating a new
table, produce a error. If atomic writes are used on existing table, produce a error. If atomic writes are used on existing
file, ignore error and use traditional writes for that file */ file, ignore error and use traditional writes for that file */
if (file != INVALID_HANDLE_VALUE if (file != INVALID_HANDLE_VALUE && type == OS_DATA_FILE
&& (awrites == ATOMIC_WRITES_ON || && (awrites == ATOMIC_WRITES_ON ||
(srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT)) (srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT))
&& !os_file_set_atomic_writes(name, file)) { && !os_file_set_atomic_writes(name, file)) {
...@@ -2186,7 +2186,7 @@ os_file_create_func( ...@@ -2186,7 +2186,7 @@ os_file_create_func(
try to set atomic writes and if that fails when creating a new try to set atomic writes and if that fails when creating a new
table, produce a error. If atomic writes are used on existing table, produce a error. If atomic writes are used on existing
file, ignore error and use traditional writes for that file */ file, ignore error and use traditional writes for that file */
if (file != -1 if (file != -1 && type == OS_DATA_FILE
&& (awrites == ATOMIC_WRITES_ON || && (awrites == ATOMIC_WRITES_ON ||
(srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT)) (srv_use_atomic_writes && awrites == ATOMIC_WRITES_DEFAULT))
&& !os_file_set_atomic_writes(name, file)) { && !os_file_set_atomic_writes(name, file)) {
......
...@@ -2815,10 +2815,6 @@ innobase_start_or_create_for_mysql(void) ...@@ -2815,10 +2815,6 @@ innobase_start_or_create_for_mysql(void)
srv_mtflush_threads, srv_mtflush_threads,
mtflush_ctx, mtflush_ctx,
(thread_ids + 6 + SRV_MAX_N_PURGE_THREADS)); (thread_ids + 6 + SRV_MAX_N_PURGE_THREADS));
#if UNIV_DEBUG
fprintf(stderr, "InnoDB: Note: %s:%d buf-pool-instances:%lu mtflush_threads %lu\n",
__FILE__, __LINE__, srv_buf_pool_instances, srv_mtflush_threads);
#endif
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment