Commit fd38dca5 authored by Jan Lindström's avatar Jan Lindström

Fixed a hang. The core issues is with the heap-thrashing by the

individual queue's. Tried to minimize memory allocation from heap
whenever it is unnecessary.
parent 81318f04
...@@ -116,26 +116,40 @@ typedef struct wrk_itm ...@@ -116,26 +116,40 @@ typedef struct wrk_itm
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
mem_heap_t *wheap; /*!< Heap were to allocate memory mem_heap_t *wheap; /*!< Heap were to allocate memory
for queue nodes */ for queue nodes */
mem_heap_t *rheap;
} wrk_t; } wrk_t;
typedef struct thread_data
{
os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */
wthr_status_t wt_status; /*!< Worker thread status */
} thread_data_t;
/* Thread syncronization data */ /* Thread syncronization data */
typedef struct thread_sync typedef struct thread_sync
{ {
/* Global variables used by all threads */
os_fast_mutex_t thread_global_mtx; /*!< Mutex used protecting below
variables */
ulint n_threads; /*!< Number of threads */ ulint n_threads; /*!< Number of threads */
os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */
ib_wqueue_t *wq; /*!< Work Queue */ ib_wqueue_t *wq; /*!< Work Queue */
ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
mem_heap_t* rheap; /*!< Work heap where memory
is allocated */
wthr_status_t gwt_status; /*!< Global thread status */
/* Variables used by only one thread at a time */
thread_data_t* thread_data; /*!< Thread specific data */
} thread_sync_t; } thread_sync_t;
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx;
static os_fast_mutex_t mtflush_mtx_wait;
static thread_sync_t* mtflush_ctx=NULL; static thread_sync_t* mtflush_ctx=NULL;
static os_fast_mutex_t mtflush_mtx;
/******************************************************************//** /******************************************************************//**
Set multi-threaded flush work initialized. */ Set multi-threaded flush work initialized. */
...@@ -218,29 +232,29 @@ static ...@@ -218,29 +232,29 @@ static
void void
mtflush_service_io( mtflush_service_io(
/*===============*/ /*===============*/
thread_sync_t* mtflush_io) /*!< inout: multi-threaded flush thread_sync_t* mtflush_io, /*!< inout: multi-threaded flush
syncronization data */ syncronization data */
thread_data_t* thread_data) /* Thread status data */
{ {
wrk_t *work_item = NULL; wrk_t *work_item = NULL;
ulint n_flushed=0; ulint n_flushed=0;
mtflush_io->wt_status = WTHR_SIG_WAITING; ut_a(mtflush_io != NULL);
ut_a(thread_data != NULL);
thread_data->wt_status = WTHR_SIG_WAITING;
/* TODO: Temporal fix for the hang bug. This needs a real fix. */
os_fast_mutex_lock(&mtflush_mtx_wait);
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq); work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) { if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq); work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
} }
os_fast_mutex_unlock(&mtflush_mtx_wait);
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; thread_data->wt_status = WTHR_RUNNING;
} else { } else {
/* Thread did not get any work */ /* Thread did not get any work */
mtflush_io->wt_status = WTHR_NO_WORK; thread_data->wt_status = WTHR_NO_WORK;
return; return;
} }
...@@ -262,8 +276,8 @@ mtflush_service_io( ...@@ -262,8 +276,8 @@ mtflush_service_io(
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_EXIT; work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
mtflush_io->wt_status = WTHR_KILL_IT; thread_data->wt_status = WTHR_KILL_IT;
break; break;
case MT_WRK_WRITE: case MT_WRK_WRITE:
...@@ -274,7 +288,7 @@ mtflush_service_io( ...@@ -274,7 +288,7 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
break; break;
case MT_WRK_READ: case MT_WRK_READ:
...@@ -286,7 +300,6 @@ mtflush_service_io( ...@@ -286,7 +300,6 @@ mtflush_service_io(
ut_a(0); ut_a(0);
break; break;
} }
} }
/******************************************************************//** /******************************************************************//**
...@@ -300,14 +313,23 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -300,14 +313,23 @@ DECLARE_THREAD(mtflush_io_thread)(
void * arg) void * arg)
{ {
thread_sync_t *mtflush_io = ((thread_sync_t *)arg); thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
ulint n_timeout = 0; thread_data_t *this_thread_data = NULL;
#ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0;
ulint i; ulint i;
#endif
/* Find correct slot for this thread */
os_fast_mutex_lock(&(mtflush_io->thread_global_mtx));
for(i=0; i < mtflush_io->n_threads; i ++) {
if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) {
break;
}
}
ut_a(i <= mtflush_io->n_threads);
this_thread_data = &mtflush_io->thread_data[i];
os_fast_mutex_unlock(&(mtflush_io->thread_global_mtx));
while (TRUE) { while (TRUE) {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n", fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(), os_thread_get_curr_id(),
...@@ -315,26 +337,10 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -315,26 +337,10 @@ DECLARE_THREAD(mtflush_io_thread)(
ib_wqueue_len(mtflush_io->wr_cq)); ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
mtflush_service_io(mtflush_io); mtflush_service_io(mtflush_io, this_thread_data);
#ifdef UNIV_DEBUG
if (mtflush_io->wt_status == WTHR_NO_WORK) {
n_timeout++;
if (n_timeout > 10) {
fprintf(stderr, "InnoDB: Note: Thread %lu has not received "
" work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
n_timeout = 0;
}
} else {
n_timeout = 0;
}
#endif /* UNIV_DEBUG */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (this_thread_data->wt_status == WTHR_KILL_IT) {
break; break;
} }
} }
...@@ -359,22 +365,24 @@ buf_mtflu_io_thread_exit(void) ...@@ -359,22 +365,24 @@ buf_mtflu_io_thread_exit(void)
/* Allocate work items for shutdown message */ /* Allocate work items for shutdown message */
work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads); work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
memset(work_item, 0, sizeof(wrk_t)*srv_mtflush_threads);
/* Confirm if the io-thread KILL is in progress, bailout */ /* Confirm if the io-thread KILL is in progress, bailout */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->gwt_status == WTHR_KILL_IT) {
return; return;
} }
mtflush_io->gwt_status = WTHR_KILL_IT;
fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n", fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n",
srv_mtflush_threads); srv_mtflush_threads);
/* Send one exit work item/thread */ /* Send one exit work item/thread */
for (i=0; i < srv_mtflush_threads; i++) { for (i=0; i < srv_mtflush_threads; i++) {
work_item[i].wr.buf_pool = NULL;
work_item[i].rd.page_pool = NULL;
work_item[i].tsk = MT_WRK_NONE; work_item[i].tsk = MT_WRK_NONE;
work_item[i].wi_status = WRK_ITEM_EXIT; work_item[i].wi_status = WRK_ITEM_EXIT;
work_item[i].wheap = mtflush_io->wheap; work_item[i].wheap = mtflush_io->wheap;
work_item[i].rheap = mtflush_io->rheap;
ib_wqueue_add(mtflush_io->wq, ib_wqueue_add(mtflush_io->wq,
(void *)&(work_item[i]), (void *)&(work_item[i]),
...@@ -384,7 +392,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -384,7 +392,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
os_thread_sleep(MT_WAIT_IN_USECS * 2); os_thread_sleep(MT_WAIT_IN_USECS);
} }
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
...@@ -403,7 +411,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -403,7 +411,7 @@ buf_mtflu_io_thread_exit(void)
} }
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(5000000); os_thread_sleep(MT_WAIT_IN_USECS);
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
...@@ -415,10 +423,11 @@ buf_mtflu_io_thread_exit(void) ...@@ -415,10 +423,11 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->rd_cq); ib_wqueue_free(mtflush_io->rd_cq);
os_fast_mutex_free(&mtflush_mtx); os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx_wait); os_fast_mutex_free(&mtflush_io->thread_global_mtx);
/* Free heap */ /* Free heap */
mem_heap_free(mtflush_io->wheap); mem_heap_free(mtflush_io->wheap);
mem_heap_free(mtflush_io->rheap);
} }
/******************************************************************//** /******************************************************************//**
...@@ -432,45 +441,50 @@ buf_mtflu_handler_init( ...@@ -432,45 +441,50 @@ buf_mtflu_handler_init(
{ {
ulint i; ulint i;
mem_heap_t* mtflush_heap; mem_heap_t* mtflush_heap;
ib_wqueue_t* mtflush_work_queue; mem_heap_t* mtflush_heap2;
ib_wqueue_t* mtflush_write_comp_queue;
ib_wqueue_t* mtflush_read_comp_queue;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx_wait);
/* Create heap, work queue, write completion queue, read /* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init completion queue for multi-threaded flush, and init
handler. */ handler. */
mtflush_heap = mem_heap_create(0); mtflush_heap = mem_heap_create(0);
ut_a(mtflush_heap != NULL); ut_a(mtflush_heap != NULL);
mtflush_work_queue = ib_wqueue_create(); mtflush_heap2 = mem_heap_create(0);
ut_a(mtflush_work_queue != NULL); ut_a(mtflush_heap2 != NULL);
mtflush_write_comp_queue = ib_wqueue_create();
ut_a(mtflush_write_comp_queue != NULL);
mtflush_read_comp_queue = ib_wqueue_create();
ut_a(mtflush_read_comp_queue != NULL);
mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap, mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(thread_sync_t)); sizeof(thread_sync_t));
memset(mtflush_ctx, 0, sizeof(thread_sync_t));
ut_a(mtflush_ctx != NULL); ut_a(mtflush_ctx != NULL);
mtflush_ctx->thread_data = (thread_data_t*)mem_heap_alloc(
mtflush_heap, sizeof(thread_data_t) * n_threads);
ut_a(mtflush_ctx->thread_data);
memset(mtflush_ctx->thread_data, 0, sizeof(thread_data_t) * n_threads);
mtflush_ctx->n_threads = n_threads;
mtflush_ctx->wq = ib_wqueue_create();
ut_a(mtflush_ctx->wq);
mtflush_ctx->wr_cq = ib_wqueue_create();
ut_a(mtflush_ctx->wr_cq);
mtflush_ctx->rd_cq = ib_wqueue_create();
ut_a(mtflush_ctx->rd_cq);
mtflush_ctx->wheap = mtflush_heap;
mtflush_ctx->rheap = mtflush_heap2;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_ctx->thread_global_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
/* Create threads for page-compression-flush */ /* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) { for(i=0; i < n_threads; i++) {
os_thread_id_t new_thread_id; os_thread_id_t new_thread_id;
mtflush_ctx[i].n_threads = n_threads;
mtflush_ctx[i].wq = mtflush_work_queue; mtflush_ctx->thread_data[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; mtflush_ctx->thread_data[i].wthread = os_thread_create(
mtflush_ctx[i].wheap = mtflush_heap;
mtflush_ctx[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].wthread = os_thread_create(
mtflush_io_thread, mtflush_io_thread,
((void *)(mtflush_ctx + i)), ((void *) mtflush_ctx),
&new_thread_id); &new_thread_id);
mtflush_ctx[i].wthread_id = new_thread_id; mtflush_ctx->thread_data[i].wthread_id = new_thread_id;
} }
buf_mtflu_work_init(); buf_mtflu_work_init();
...@@ -497,13 +511,15 @@ buf_mtflu_flush_work_items( ...@@ -497,13 +511,15 @@ buf_mtflu_flush_work_items(
{ {
ulint n_flushed=0, i; ulint n_flushed=0, i;
mem_heap_t* work_heap; mem_heap_t* work_heap;
wrk_t* work_item=NULL; mem_heap_t* reply_heap;
wrk_t work_item[MTFLUSH_MAX_WORKER];
/* Allocate heap where all work items used and queue /* Allocate heap where all work items used and queue
node items areallocated */ node items areallocated */
work_heap = mem_heap_create(0); work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst); reply_heap = mem_heap_create(0);
memset(work_item, 0, sizeof(wrk_t)*buf_pool_inst); memset(work_item, 0, sizeof(wrk_t)*MTFLUSH_MAX_WORKER);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
work_item[i].tsk = MT_WRK_WRITE; work_item[i].tsk = MT_WRK_WRITE;
...@@ -513,9 +529,10 @@ buf_mtflu_flush_work_items( ...@@ -513,9 +529,10 @@ buf_mtflu_flush_work_items(
work_item[i].wr.lsn_limit = lsn_limit; work_item[i].wr.lsn_limit = lsn_limit;
work_item[i].wi_status = WRK_ITEM_UNSET; work_item[i].wi_status = WRK_ITEM_UNSET;
work_item[i].wheap = work_heap; work_item[i].wheap = work_heap;
work_item[i].rheap = reply_heap;
ib_wqueue_add(mtflush_ctx->wq, ib_wqueue_add(mtflush_ctx->wq,
(void *)(&(work_item[i])), (void *)(work_item + i),
work_heap); work_heap);
} }
...@@ -527,8 +544,6 @@ buf_mtflu_flush_work_items( ...@@ -527,8 +544,6 @@ buf_mtflu_flush_work_items(
if (done_wi != NULL) { if (done_wi != NULL) {
per_pool_pages_flushed[i] = done_wi->n_flushed; per_pool_pages_flushed[i] = done_wi->n_flushed;
#ifdef UNIV_DEBUG
/* TODO: Temporal fix for hang. This is really a bug. */
if((int)done_wi->id_usr == 0 && if((int)done_wi->id_usr == 0 &&
(done_wi->wi_status == WRK_ITEM_SET || (done_wi->wi_status == WRK_ITEM_SET ||
done_wi->wi_status == WRK_ITEM_UNSET)) { done_wi->wi_status == WRK_ITEM_UNSET)) {
...@@ -538,7 +553,6 @@ buf_mtflu_flush_work_items( ...@@ -538,7 +553,6 @@ buf_mtflu_flush_work_items(
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_a(0); ut_a(0);
} }
#endif
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
i++; i++;
...@@ -547,6 +561,7 @@ buf_mtflu_flush_work_items( ...@@ -547,6 +561,7 @@ buf_mtflu_flush_work_items(
/* Release used work_items and queue nodes */ /* Release used work_items and queue nodes */
mem_heap_free(work_heap); mem_heap_free(work_heap);
mem_heap_free(reply_heap);
return(n_flushed); return(n_flushed);
} }
...@@ -672,6 +687,6 @@ buf_mtflu_set_thread_ids( ...@@ -672,6 +687,6 @@ buf_mtflu_set_thread_ids(
ut_a(thread_ids != NULL); ut_a(thread_ids != NULL);
for(i = 0; i < n_threads; i++) { for(i = 0; i < n_threads; i++) {
thread_ids[i] = mtflush_io[i].wthread_id; thread_ids[i] = mtflush_io->thread_data[i].wthread_id;
} }
} }
...@@ -116,26 +116,40 @@ typedef struct wrk_itm ...@@ -116,26 +116,40 @@ typedef struct wrk_itm
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
mem_heap_t *wheap; /*!< Heap were to allocate memory mem_heap_t *wheap; /*!< Heap were to allocate memory
for queue nodes */ for queue nodes */
mem_heap_t *rheap;
} wrk_t; } wrk_t;
typedef struct thread_data
{
os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */
wthr_status_t wt_status; /*!< Worker thread status */
} thread_data_t;
/* Thread syncronization data */ /* Thread syncronization data */
typedef struct thread_sync typedef struct thread_sync
{ {
/* Global variables used by all threads */
os_fast_mutex_t thread_global_mtx; /*!< Mutex used protecting below
variables */
ulint n_threads; /*!< Number of threads */ ulint n_threads; /*!< Number of threads */
os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */
ib_wqueue_t *wq; /*!< Work Queue */ ib_wqueue_t *wq; /*!< Work Queue */
ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
mem_heap_t* rheap; /*!< Work heap where memory
is allocated */
wthr_status_t gwt_status; /*!< Global thread status */
/* Variables used by only one thread at a time */
thread_data_t* thread_data; /*!< Thread specific data */
} thread_sync_t; } thread_sync_t;
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx;
static os_fast_mutex_t mtflush_mtx_wait;
static thread_sync_t* mtflush_ctx=NULL; static thread_sync_t* mtflush_ctx=NULL;
static os_fast_mutex_t mtflush_mtx;
/******************************************************************//** /******************************************************************//**
Set multi-threaded flush work initialized. */ Set multi-threaded flush work initialized. */
...@@ -172,6 +186,8 @@ buf_mtflu_flush_pool_instance( ...@@ -172,6 +186,8 @@ buf_mtflu_flush_pool_instance(
ut_a(work_item != NULL); ut_a(work_item != NULL);
ut_a(work_item->wr.buf_pool != NULL); ut_a(work_item->wr.buf_pool != NULL);
memset(&n, 0, sizeof(flush_counters_t));
if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) { if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
/* We have two choices here. If lsn_limit was /* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer specified then skipping an instance of buffer
...@@ -223,29 +239,29 @@ static ...@@ -223,29 +239,29 @@ static
void void
mtflush_service_io( mtflush_service_io(
/*===============*/ /*===============*/
thread_sync_t* mtflush_io) /*!< inout: multi-threaded flush thread_sync_t* mtflush_io, /*!< inout: multi-threaded flush
syncronization data */ syncronization data */
thread_data_t* thread_data) /* Thread status data */
{ {
wrk_t *work_item = NULL; wrk_t *work_item = NULL;
ulint n_flushed=0; ulint n_flushed=0;
mtflush_io->wt_status = WTHR_SIG_WAITING; ut_a(mtflush_io != NULL);
ut_a(thread_data != NULL);
thread_data->wt_status = WTHR_SIG_WAITING;
/* TODO: Temporal fix for the hang bug. This needs a real fix. */
os_fast_mutex_lock(&mtflush_mtx_wait);
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq); work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) { if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);
} }
os_fast_mutex_unlock(&mtflush_mtx_wait);
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; thread_data->wt_status = WTHR_RUNNING;
} else { } else {
/* Thread did not get any work */ /* Thread did not get any work */
mtflush_io->wt_status = WTHR_NO_WORK; thread_data->wt_status = WTHR_NO_WORK;
return; return;
} }
...@@ -267,8 +283,8 @@ mtflush_service_io( ...@@ -267,8 +283,8 @@ mtflush_service_io(
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_EXIT; work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
mtflush_io->wt_status = WTHR_KILL_IT; thread_data->wt_status = WTHR_KILL_IT;
break; break;
case MT_WRK_WRITE: case MT_WRK_WRITE:
...@@ -279,7 +295,7 @@ mtflush_service_io( ...@@ -279,7 +295,7 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap);
break; break;
case MT_WRK_READ: case MT_WRK_READ:
...@@ -291,7 +307,6 @@ mtflush_service_io( ...@@ -291,7 +307,6 @@ mtflush_service_io(
ut_a(0); ut_a(0);
break; break;
} }
} }
/******************************************************************//** /******************************************************************//**
...@@ -305,14 +320,23 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -305,14 +320,23 @@ DECLARE_THREAD(mtflush_io_thread)(
void * arg) void * arg)
{ {
thread_sync_t *mtflush_io = ((thread_sync_t *)arg); thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
ulint n_timeout = 0; thread_data_t *this_thread_data = NULL;
#ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0;
ulint i; ulint i;
#endif
/* Find correct slot for this thread */
os_fast_mutex_lock(&(mtflush_io->thread_global_mtx));
for(i=0; i < mtflush_io->n_threads; i ++) {
if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) {
break;
}
}
ut_a(i <= mtflush_io->n_threads);
this_thread_data = &mtflush_io->thread_data[i];
os_fast_mutex_unlock(&(mtflush_io->thread_global_mtx));
while (TRUE) { while (TRUE) {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n", fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(), os_thread_get_curr_id(),
...@@ -320,26 +344,10 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -320,26 +344,10 @@ DECLARE_THREAD(mtflush_io_thread)(
ib_wqueue_len(mtflush_io->wr_cq)); ib_wqueue_len(mtflush_io->wr_cq));
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
mtflush_service_io(mtflush_io); mtflush_service_io(mtflush_io, this_thread_data);
#ifdef UNIV_DEBUG
if (mtflush_io->wt_status == WTHR_NO_WORK) {
n_timeout++;
if (n_timeout > 10) {
fprintf(stderr, "InnoDB: Note: Thread %lu has not received "
" work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
n_timeout = 0;
}
} else {
n_timeout = 0;
}
#endif /* UNIV_DEBUG */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (this_thread_data->wt_status == WTHR_KILL_IT) {
break; break;
} }
} }
...@@ -364,22 +372,24 @@ buf_mtflu_io_thread_exit(void) ...@@ -364,22 +372,24 @@ buf_mtflu_io_thread_exit(void)
/* Allocate work items for shutdown message */ /* Allocate work items for shutdown message */
work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads); work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
memset(work_item, 0, sizeof(wrk_t)*srv_mtflush_threads);
/* Confirm if the io-thread KILL is in progress, bailout */ /* Confirm if the io-thread KILL is in progress, bailout */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->gwt_status == WTHR_KILL_IT) {
return; return;
} }
mtflush_io->gwt_status = WTHR_KILL_IT;
fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n", fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n",
srv_mtflush_threads); srv_mtflush_threads);
/* Send one exit work item/thread */ /* Send one exit work item/thread */
for (i=0; i < srv_mtflush_threads; i++) { for (i=0; i < srv_mtflush_threads; i++) {
work_item[i].wr.buf_pool = NULL;
work_item[i].rd.page_pool = NULL;
work_item[i].tsk = MT_WRK_NONE; work_item[i].tsk = MT_WRK_NONE;
work_item[i].wi_status = WRK_ITEM_EXIT; work_item[i].wi_status = WRK_ITEM_EXIT;
work_item[i].wheap = mtflush_io->wheap; work_item[i].wheap = mtflush_io->wheap;
work_item[i].rheap = mtflush_io->rheap;
ib_wqueue_add(mtflush_io->wq, ib_wqueue_add(mtflush_io->wq,
(void *)&(work_item[i]), (void *)&(work_item[i]),
...@@ -389,7 +399,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -389,7 +399,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
os_thread_sleep(MT_WAIT_IN_USECS * 2); os_thread_sleep(MT_WAIT_IN_USECS);
} }
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
...@@ -408,7 +418,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -408,7 +418,7 @@ buf_mtflu_io_thread_exit(void)
} }
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(5000000); os_thread_sleep(MT_WAIT_IN_USECS);
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
...@@ -420,10 +430,11 @@ buf_mtflu_io_thread_exit(void) ...@@ -420,10 +430,11 @@ buf_mtflu_io_thread_exit(void)
ib_wqueue_free(mtflush_io->rd_cq); ib_wqueue_free(mtflush_io->rd_cq);
os_fast_mutex_free(&mtflush_mtx); os_fast_mutex_free(&mtflush_mtx);
os_fast_mutex_free(&mtflush_mtx_wait); os_fast_mutex_free(&mtflush_io->thread_global_mtx);
/* Free heap */ /* Free heap */
mem_heap_free(mtflush_io->wheap); mem_heap_free(mtflush_io->wheap);
mem_heap_free(mtflush_io->rheap);
} }
/******************************************************************//** /******************************************************************//**
...@@ -437,45 +448,50 @@ buf_mtflu_handler_init( ...@@ -437,45 +448,50 @@ buf_mtflu_handler_init(
{ {
ulint i; ulint i;
mem_heap_t* mtflush_heap; mem_heap_t* mtflush_heap;
ib_wqueue_t* mtflush_work_queue; mem_heap_t* mtflush_heap2;
ib_wqueue_t* mtflush_write_comp_queue;
ib_wqueue_t* mtflush_read_comp_queue;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx_wait);
/* Create heap, work queue, write completion queue, read /* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init completion queue for multi-threaded flush, and init
handler. */ handler. */
mtflush_heap = mem_heap_create(0); mtflush_heap = mem_heap_create(0);
ut_a(mtflush_heap != NULL); ut_a(mtflush_heap != NULL);
mtflush_work_queue = ib_wqueue_create(); mtflush_heap2 = mem_heap_create(0);
ut_a(mtflush_work_queue != NULL); ut_a(mtflush_heap2 != NULL);
mtflush_write_comp_queue = ib_wqueue_create();
ut_a(mtflush_write_comp_queue != NULL);
mtflush_read_comp_queue = ib_wqueue_create();
ut_a(mtflush_read_comp_queue != NULL);
mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap, mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(thread_sync_t)); sizeof(thread_sync_t));
memset(mtflush_ctx, 0, sizeof(thread_sync_t));
ut_a(mtflush_ctx != NULL); ut_a(mtflush_ctx != NULL);
mtflush_ctx->thread_data = (thread_data_t*)mem_heap_alloc(
mtflush_heap, sizeof(thread_data_t) * n_threads);
ut_a(mtflush_ctx->thread_data);
memset(mtflush_ctx->thread_data, 0, sizeof(thread_data_t) * n_threads);
mtflush_ctx->n_threads = n_threads;
mtflush_ctx->wq = ib_wqueue_create();
ut_a(mtflush_ctx->wq);
mtflush_ctx->wr_cq = ib_wqueue_create();
ut_a(mtflush_ctx->wr_cq);
mtflush_ctx->rd_cq = ib_wqueue_create();
ut_a(mtflush_ctx->rd_cq);
mtflush_ctx->wheap = mtflush_heap;
mtflush_ctx->rheap = mtflush_heap2;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_ctx->thread_global_mtx);
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
/* Create threads for page-compression-flush */ /* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) { for(i=0; i < n_threads; i++) {
os_thread_id_t new_thread_id; os_thread_id_t new_thread_id;
mtflush_ctx[i].n_threads = n_threads;
mtflush_ctx[i].wq = mtflush_work_queue; mtflush_ctx->thread_data[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; mtflush_ctx->thread_data[i].wthread = os_thread_create(
mtflush_ctx[i].wheap = mtflush_heap;
mtflush_ctx[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].wthread = os_thread_create(
mtflush_io_thread, mtflush_io_thread,
((void *)(mtflush_ctx + i)), ((void *) mtflush_ctx),
&new_thread_id); &new_thread_id);
mtflush_ctx[i].wthread_id = new_thread_id; mtflush_ctx->thread_data[i].wthread_id = new_thread_id;
} }
buf_mtflu_work_init(); buf_mtflu_work_init();
...@@ -502,13 +518,15 @@ buf_mtflu_flush_work_items( ...@@ -502,13 +518,15 @@ buf_mtflu_flush_work_items(
{ {
ulint n_flushed=0, i; ulint n_flushed=0, i;
mem_heap_t* work_heap; mem_heap_t* work_heap;
wrk_t* work_item=NULL; mem_heap_t* reply_heap;
wrk_t work_item[MTFLUSH_MAX_WORKER];
/* Allocate heap where all work items used and queue /* Allocate heap where all work items used and queue
node items areallocated */ node items areallocated */
work_heap = mem_heap_create(0); work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst); reply_heap = mem_heap_create(0);
memset(work_item, 0, sizeof(wrk_t)*buf_pool_inst); memset(work_item, 0, sizeof(wrk_t)*MTFLUSH_MAX_WORKER);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
work_item[i].tsk = MT_WRK_WRITE; work_item[i].tsk = MT_WRK_WRITE;
...@@ -518,9 +536,10 @@ buf_mtflu_flush_work_items( ...@@ -518,9 +536,10 @@ buf_mtflu_flush_work_items(
work_item[i].wr.lsn_limit = lsn_limit; work_item[i].wr.lsn_limit = lsn_limit;
work_item[i].wi_status = WRK_ITEM_UNSET; work_item[i].wi_status = WRK_ITEM_UNSET;
work_item[i].wheap = work_heap; work_item[i].wheap = work_heap;
work_item[i].rheap = reply_heap;
ib_wqueue_add(mtflush_ctx->wq, ib_wqueue_add(mtflush_ctx->wq,
(void *)(&(work_item[i])), (void *)(work_item + i),
work_heap); work_heap);
} }
...@@ -532,8 +551,6 @@ buf_mtflu_flush_work_items( ...@@ -532,8 +551,6 @@ buf_mtflu_flush_work_items(
if (done_wi != NULL) { if (done_wi != NULL) {
per_pool_pages_flushed[i] = done_wi->n_flushed; per_pool_pages_flushed[i] = done_wi->n_flushed;
#ifdef UNIV_DEBUG
/* TODO: Temporal fix for hang. This is really a bug. */
if((int)done_wi->id_usr == 0 && if((int)done_wi->id_usr == 0 &&
(done_wi->wi_status == WRK_ITEM_SET || (done_wi->wi_status == WRK_ITEM_SET ||
done_wi->wi_status == WRK_ITEM_UNSET)) { done_wi->wi_status == WRK_ITEM_UNSET)) {
...@@ -543,7 +560,6 @@ buf_mtflu_flush_work_items( ...@@ -543,7 +560,6 @@ buf_mtflu_flush_work_items(
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_a(0); ut_a(0);
} }
#endif
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
i++; i++;
...@@ -555,6 +571,7 @@ buf_mtflu_flush_work_items( ...@@ -555,6 +571,7 @@ buf_mtflu_flush_work_items(
/* Release used work_items and queue nodes */ /* Release used work_items and queue nodes */
mem_heap_free(work_heap); mem_heap_free(work_heap);
mem_heap_free(reply_heap);
return(n_flushed); return(n_flushed);
} }
...@@ -680,6 +697,6 @@ buf_mtflu_set_thread_ids( ...@@ -680,6 +697,6 @@ buf_mtflu_set_thread_ids(
ut_a(thread_ids != NULL); ut_a(thread_ids != NULL);
for(i = 0; i < n_threads; i++) { for(i = 0; i < n_threads; i++) {
thread_ids[i] = mtflush_io[i].wthread_id; thread_ids[i] = mtflush_io->thread_data[i].wthread_id;
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment