Commit 24235e99 authored by Jan Lindström's avatar Jan Lindström

Fixed memory leak on queue nodes by using local memory heap on normal

execution and global memory heap on shutdown.

Added a funcition to get work items from queue without waiting and
additional info when there is no work to do for a extended periods.
parent 38471a6d
...@@ -113,6 +113,8 @@ typedef struct wrk_itm ...@@ -113,6 +113,8 @@ typedef struct wrk_itm
os_thread_id_t id_usr; /*!< Thread-id currently working */ os_thread_id_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */ wrk_status_t wi_status; /*!< Work item status */
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
mem_heap_t *wheap; /*!< Heap were to allocate memory
for queue nodes */
} wrk_t; } wrk_t;
/* Thread syncronization data */ /* Thread syncronization data */
...@@ -127,39 +129,12 @@ typedef struct thread_sync ...@@ -127,39 +129,12 @@ typedef struct thread_sync
wthr_status_t wt_status; /*!< Worker thread status */ wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
wrk_t* work_item; /*!< Array of work-items that are
individually accessed by multiple
threads. Items are accessed in a
thread safe manner.*/
} thread_sync_t; } thread_sync_t;
/* TODO: REALLY NEEDED ? */
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx; static os_fast_mutex_t mtflush_mtx;
static thread_sync_t* mtflush_ctx=NULL; static thread_sync_t* mtflush_ctx=NULL;
/******************************************************************//**
Initialize work items. */
static
void
mtflu_setup_work_items(
/*===================*/
wrk_t* work_items, /*!< inout: Work items */
ulint n_items) /*!< in: Number of work items */
{
ulint i;
for(i=0; i<n_items; i++) {
work_items[i].rd.page_pool = NULL;
work_items[i].wr.buf_pool = NULL;
work_items[i].n_flushed = 0;
work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%n_items];
}
/* last node should be the tail */
work_items[n_items-1].next = NULL;
}
/******************************************************************//** /******************************************************************//**
Set multi-threaded flush work initialized. */ Set multi-threaded flush work initialized. */
static inline static inline
...@@ -233,34 +208,6 @@ buf_mtflu_flush_pool_instance( ...@@ -233,34 +208,6 @@ buf_mtflu_flush_pool_instance(
return work_item->n_flushed; return work_item->n_flushed;
} }
#ifdef UNIV_DEBUG
/******************************************************************//**
Print flush statistics of work items.
*/
static
void
mtflu_print_thread_stat(
/*====================*/
wrk_t* work_item) /*!< in: Work items */
{
ulint stat_tot=0;
ulint i=0;
for(i=0; i< MTFLUSH_MAX_WORKER; i++) {
stat_tot+=work_item[i].n_flushed;
fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n",
work_item[i].id_usr,
work_item[i].n_flushed);
if (work_item[i].next == NULL) {
break; /* No more filled work items */
}
}
fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot);
}
#endif /* UNIV_DEBUG */
/******************************************************************//** /******************************************************************//**
Worker function to wait for work items and processing them and Worker function to wait for work items and processing them and
sending reply back. sending reply back.
...@@ -276,7 +223,12 @@ mtflush_service_io( ...@@ -276,7 +223,12 @@ mtflush_service_io(
ulint n_flushed=0; ulint n_flushed=0;
mtflush_io->wt_status = WTHR_SIG_WAITING; mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq); work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
}
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; mtflush_io->wt_status = WTHR_RUNNING;
...@@ -300,7 +252,7 @@ mtflush_service_io( ...@@ -300,7 +252,7 @@ mtflush_service_io(
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_EXIT; work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
mtflush_io->wt_status = WTHR_KILL_IT; mtflush_io->wt_status = WTHR_KILL_IT;
return; return;
...@@ -314,16 +266,11 @@ mtflush_service_io( ...@@ -314,16 +266,11 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
break; break;
case MT_WRK_READ: case MT_WRK_READ:
/* Need to also handle the read case */
/* TODO: ? */
ut_a(0); ut_a(0);
/* completed task get added to rd_cq */
/* work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->rd_cq, work_item, mtflush_io->wheap);*/
break; break;
default: default:
...@@ -348,25 +295,12 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -348,25 +295,12 @@ DECLARE_THREAD(mtflush_io_thread)(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0; ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0; ib_uint64_t stat_cycle_num_processed = 0;
wrk_t* work_item = mtflush_io[0].work_item;
ulint i; ulint i;
#endif #endif
while (TRUE) { while (TRUE) {
mtflush_service_io(mtflush_io); mtflush_service_io(mtflush_io);
#ifdef UNIV_DEBUG
for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
stat_cycle_num_processed+= work_item[i].n_flushed;
}
stat_universal_num_processed+=stat_cycle_num_processed;
stat_cycle_num_processed = 0;
fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n",
stat_universal_num_processed,
stat_cycle_num_processed);
mtflu_print_thread_stat(work_item);
#endif
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->wt_status == WTHR_KILL_IT) {
break; break;
} }
...@@ -386,26 +320,31 @@ buf_mtflu_io_thread_exit(void) ...@@ -386,26 +320,31 @@ buf_mtflu_io_thread_exit(void)
{ {
long i; long i;
thread_sync_t* mtflush_io = mtflush_ctx; thread_sync_t* mtflush_io = mtflush_ctx;
wrk_t* work_item;
ut_a(mtflush_io != NULL); ut_a(mtflush_io != NULL);
/* Allocate work items for shutdown message */
work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
/* Confirm if the io-thread KILL is in progress, bailout */ /* Confirm if the io-thread KILL is in progress, bailout */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->wt_status == WTHR_KILL_IT) {
return; return;
} }
fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n", fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n",
srv_buf_pool_instances); srv_mtflush_threads);
/* Send one exit work item/thread */ /* Send one exit work item/thread */
for (i=0; i < srv_mtflush_threads; i++) { for (i=0; i < srv_mtflush_threads; i++) {
mtflush_io->work_item[i].wr.buf_pool = NULL; work_item[i].wr.buf_pool = NULL;
mtflush_io->work_item[i].rd.page_pool = NULL; work_item[i].rd.page_pool = NULL;
mtflush_io->work_item[i].tsk = MT_WRK_NONE; work_item[i].tsk = MT_WRK_NONE;
mtflush_io->work_item[i].wi_status = WRK_ITEM_EXIT; work_item[i].wi_status = WRK_ITEM_EXIT;
work_item[i].wheap = mtflush_io->wheap;
ib_wqueue_add(mtflush_io->wq, ib_wqueue_add(mtflush_io->wq,
(void *)&(mtflush_io->work_item[i]), (void *)&(work_item[i]),
mtflush_io->wheap); mtflush_io->wheap);
} }
...@@ -431,7 +370,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -431,7 +370,7 @@ buf_mtflu_io_thread_exit(void)
} }
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(50000); os_thread_sleep(5000000);
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
...@@ -462,7 +401,6 @@ buf_mtflu_handler_init( ...@@ -462,7 +401,6 @@ buf_mtflu_handler_init(
ib_wqueue_t* mtflush_work_queue; ib_wqueue_t* mtflush_work_queue;
ib_wqueue_t* mtflush_write_comp_queue; ib_wqueue_t* mtflush_write_comp_queue;
ib_wqueue_t* mtflush_read_comp_queue; ib_wqueue_t* mtflush_read_comp_queue;
wrk_t* work_items;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx); os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
...@@ -481,14 +419,6 @@ buf_mtflu_handler_init( ...@@ -481,14 +419,6 @@ buf_mtflu_handler_init(
mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap, mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(thread_sync_t)); MTFLUSH_MAX_WORKER * sizeof(thread_sync_t));
ut_a(mtflush_ctx != NULL); ut_a(mtflush_ctx != NULL);
work_items = (wrk_t*)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(wrk_t));
ut_a(work_items != NULL);
memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER);
memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER);
/* Initialize work items */
mtflu_setup_work_items(work_items, n_threads);
/* Create threads for page-compression-flush */ /* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) { for(i=0; i < n_threads; i++) {
...@@ -499,7 +429,6 @@ buf_mtflu_handler_init( ...@@ -499,7 +429,6 @@ buf_mtflu_handler_init(
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
mtflush_ctx[i].wheap = mtflush_heap; mtflush_ctx[i].wheap = mtflush_heap;
mtflush_ctx[i].wt_status = WTHR_INITIALIZED; mtflush_ctx[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].work_item = work_items;
mtflush_ctx[i].wthread = os_thread_create( mtflush_ctx[i].wthread = os_thread_create(
mtflush_io_thread, mtflush_io_thread,
...@@ -533,20 +462,28 @@ buf_mtflu_flush_work_items( ...@@ -533,20 +462,28 @@ buf_mtflu_flush_work_items(
{ {
ulint n_flushed=0, i; ulint n_flushed=0, i;
wrk_t *done_wi; wrk_t *done_wi;
mem_heap_t* work_heap;
wrk_t* work_item;
/* Allocate heap where all work items used and queue
node items areallocated */
work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
mtflush_ctx->work_item[i].tsk = MT_WRK_WRITE; work_item[i].tsk = MT_WRK_WRITE;
mtflush_ctx->work_item[i].rd.page_pool = NULL; work_item[i].rd.page_pool = NULL;
mtflush_ctx->work_item[i].wr.buf_pool = buf_pool_from_array(i); work_item[i].wr.buf_pool = buf_pool_from_array(i);
mtflush_ctx->work_item[i].wr.flush_type = flush_type; work_item[i].wr.flush_type = flush_type;
mtflush_ctx->work_item[i].wr.min = min_n; work_item[i].wr.min = min_n;
mtflush_ctx->work_item[i].wr.lsn_limit = lsn_limit; work_item[i].wr.lsn_limit = lsn_limit;
mtflush_ctx->work_item[i].id_usr = -1; work_item[i].id_usr = -1;
mtflush_ctx->work_item[i].wi_status = WRK_ITEM_SET; work_item[i].wi_status = WRK_ITEM_SET;
work_item[i].wheap = work_heap;
ib_wqueue_add(mtflush_ctx->wq, ib_wqueue_add(mtflush_ctx->wq,
(void *)(&(mtflush_ctx->work_item[i])), (void *)(&(work_item[i])),
mtflush_ctx->wheap); work_heap);
} }
/* wait on the completion to arrive */ /* wait on the completion to arrive */
...@@ -554,21 +491,15 @@ buf_mtflu_flush_work_items( ...@@ -554,21 +491,15 @@ buf_mtflu_flush_work_items(
done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq); done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);
if (done_wi != NULL) { if (done_wi != NULL) {
if(done_wi->n_flushed == 0) {
per_pool_pages_flushed[i] = 0;
} else {
per_pool_pages_flushed[i] = done_wi->n_flushed; per_pool_pages_flushed[i] = done_wi->n_flushed;
}
if((int)done_wi->id_usr == -1 && if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) { done_wi->wi_status == WRK_ITEM_SET ) {
#ifdef UNIV_DEBUG
fprintf(stderr, fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%d\n", "**Set/Unused work_item[%lu] flush_type=%d\n",
i, i,
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_ad(0); ut_a(0);
#endif
} }
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
...@@ -576,6 +507,9 @@ buf_mtflu_flush_work_items( ...@@ -576,6 +507,9 @@ buf_mtflu_flush_work_items(
} }
} }
/* Release used work_items and queue nodes */
mem_heap_free(work_heap);
return(n_flushed); return(n_flushed);
} }
......
...@@ -95,6 +95,15 @@ ib_wqueue_timedwait( ...@@ -95,6 +95,15 @@ ib_wqueue_timedwait(
ib_wqueue_t* wq, /* in: work queue */ ib_wqueue_t* wq, /* in: work queue */
ib_time_t wait_in_usecs); /* in: wait time in micro seconds */ ib_time_t wait_in_usecs); /* in: wait time in micro seconds */
/********************************************************************
Return first item on work queue or NULL if queue is empty
@return work item or NULL */
void*
ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq); /*<! in: work queue */
/* Work queue. */ /* Work queue. */
struct ib_wqueue_t { struct ib_wqueue_t {
ib_mutex_t mutex; /*!< mutex protecting everything */ ib_mutex_t mutex; /*!< mutex protecting everything */
......
...@@ -161,6 +161,38 @@ ib_wqueue_timedwait( ...@@ -161,6 +161,38 @@ ib_wqueue_timedwait(
return(node ? node->data : NULL); return(node ? node->data : NULL);
} }
/********************************************************************
Return first item on work queue or NULL if queue is empty
@return work item or NULL */
void*
ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq) /*<! in: work queue */
{
ib_list_node_t* node = NULL;
mutex_enter(&wq->mutex);
if(!ib_list_is_empty(wq->items)) {
node = ib_list_get_first(wq->items);
if (node) {
ib_list_remove(wq->items, node);
}
}
/* We must reset the event when the list
gets emptied. */
if(ib_list_is_empty(wq->items)) {
os_event_reset(wq->event);
}
mutex_exit(&wq->mutex);
return (node ? node->data : NULL);
}
/******************************************************************** /********************************************************************
Check if queue is empty. */ Check if queue is empty. */
......
...@@ -113,6 +113,8 @@ typedef struct wrk_itm ...@@ -113,6 +113,8 @@ typedef struct wrk_itm
os_thread_id_t id_usr; /*!< Thread-id currently working */ os_thread_id_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */ wrk_status_t wi_status; /*!< Work item status */
struct wrk_itm *next; /*!< Next work item */ struct wrk_itm *next; /*!< Next work item */
mem_heap_t *wheap; /*!< Heap were to allocate memory
for queue nodes */
} wrk_t; } wrk_t;
/* Thread syncronization data */ /* Thread syncronization data */
...@@ -127,39 +129,12 @@ typedef struct thread_sync ...@@ -127,39 +129,12 @@ typedef struct thread_sync
wthr_status_t wt_status; /*!< Worker thread status */ wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
wrk_t* work_item; /*!< Array of work-items that are
individually accessed by multiple
threads. Items are accessed in a
thread safe manner.*/
} thread_sync_t; } thread_sync_t;
/* TODO: REALLY NEEDED ? */
static int mtflush_work_initialized = -1; static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx; static os_fast_mutex_t mtflush_mtx;
static thread_sync_t* mtflush_ctx=NULL; static thread_sync_t* mtflush_ctx=NULL;
/******************************************************************//**
Initialize work items. */
static
void
mtflu_setup_work_items(
/*===================*/
wrk_t* work_items, /*!< inout: Work items */
ulint n_items) /*!< in: Number of work items */
{
ulint i;
for(i=0; i<n_items; i++) {
work_items[i].rd.page_pool = NULL;
work_items[i].wr.buf_pool = NULL;
work_items[i].n_flushed = 0;
work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%n_items];
}
/* last node should be the tail */
work_items[n_items-1].next = NULL;
}
/******************************************************************//** /******************************************************************//**
Set multi-threaded flush work initialized. */ Set multi-threaded flush work initialized. */
static inline static inline
...@@ -238,35 +213,6 @@ buf_mtflu_flush_pool_instance( ...@@ -238,35 +213,6 @@ buf_mtflu_flush_pool_instance(
return work_item->n_flushed; return work_item->n_flushed;
} }
#ifdef UNIV_DEBUG
/******************************************************************//**
Print flush statistics of work items
*/
static
void
mtflu_print_thread_stat(
/*====================*/
wrk_t* work_item) /*!< in: Work items */
{
ulint stat_tot=0;
ulint i=0;
for(i=0; i< MTFLUSH_MAX_WORKER; i++) {
stat_tot+=work_item[i].n_flushed;
fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n",
work_item[i].id_usr,
work_item[i].n_flushed);
if (work_item[i].next == NULL) {
break; /* No more filled work items */
}
}
fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot);
}
#endif /* UNIV_DEBUG */
/******************************************************************//** /******************************************************************//**
Worker function to wait for work items and processing them and Worker function to wait for work items and processing them and
sending reply back. sending reply back.
...@@ -282,7 +228,12 @@ mtflush_service_io( ...@@ -282,7 +228,12 @@ mtflush_service_io(
ulint n_flushed=0; ulint n_flushed=0;
mtflush_io->wt_status = WTHR_SIG_WAITING; mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);
if (work_item == NULL) {
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);
}
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; mtflush_io->wt_status = WTHR_RUNNING;
...@@ -306,7 +257,7 @@ mtflush_service_io( ...@@ -306,7 +257,7 @@ mtflush_service_io(
case MT_WRK_NONE: case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT); ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_EXIT; work_item->wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
mtflush_io->wt_status = WTHR_KILL_IT; mtflush_io->wt_status = WTHR_KILL_IT;
return; return;
...@@ -320,16 +271,11 @@ mtflush_service_io( ...@@ -320,16 +271,11 @@ mtflush_service_io(
work_item->wi_status = WRK_ITEM_FAILED; work_item->wi_status = WRK_ITEM_FAILED;
} }
work_item->wi_status = WRK_ITEM_SUCCESS; work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap); ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
break; break;
case MT_WRK_READ: case MT_WRK_READ:
/* Need to also handle the read case */
/* TODO: ? */
ut_a(0); ut_a(0);
/* completed task get added to rd_cq */
/* work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->rd_cq, work_item, mtflush_io->wheap);*/
break; break;
default: default:
...@@ -351,28 +297,36 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -351,28 +297,36 @@ DECLARE_THREAD(mtflush_io_thread)(
void * arg) void * arg)
{ {
thread_sync_t *mtflush_io = ((thread_sync_t *)arg); thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
ulint n_timeout = 0;
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0; ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0; ib_uint64_t stat_cycle_num_processed = 0;
wrk_t* work_item = mtflush_io[0].work_item;
ulint i; ulint i;
#endif #endif
while (TRUE) { while (TRUE) {
fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
mtflush_service_io(mtflush_io); mtflush_service_io(mtflush_io);
#ifdef UNIV_DEBUG if (mtflush_io->wt_status == WTHR_NO_WORK) {
for(i=0; i < MTFLUSH_MAX_WORKER; i++) { n_timeout++;
stat_cycle_num_processed+= work_item[i].n_flushed;
if (n_timeout > 10) {
fprintf(stderr, "InnoDB: Note: Thread %lu has not received "
" work queue len %lu return queue len %lu\n",
os_thread_get_curr_id(),
ib_wqueue_len(mtflush_io->wq),
ib_wqueue_len(mtflush_io->wr_cq));
n_timeout = 0;
}
} else {
n_timeout = 0;
} }
stat_universal_num_processed+=stat_cycle_num_processed;
stat_cycle_num_processed = 0;
fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n",
stat_universal_num_processed,
stat_cycle_num_processed);
mtflu_print_thread_stat(work_item);
#endif
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->wt_status == WTHR_KILL_IT) {
break; break;
} }
...@@ -392,26 +346,31 @@ buf_mtflu_io_thread_exit(void) ...@@ -392,26 +346,31 @@ buf_mtflu_io_thread_exit(void)
{ {
long i; long i;
thread_sync_t* mtflush_io = mtflush_ctx; thread_sync_t* mtflush_io = mtflush_ctx;
wrk_t* work_item;
ut_a(mtflush_io != NULL); ut_a(mtflush_io != NULL);
/* Allocate work items for shutdown message */
work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);
/* Confirm if the io-thread KILL is in progress, bailout */ /* Confirm if the io-thread KILL is in progress, bailout */
if (mtflush_io->wt_status == WTHR_KILL_IT) { if (mtflush_io->wt_status == WTHR_KILL_IT) {
return; return;
} }
fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n", fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n",
srv_buf_pool_instances); srv_mtflush_threads);
/* Send one exit work item/thread */ /* Send one exit work item/thread */
for (i=0; i < srv_mtflush_threads; i++) { for (i=0; i < srv_mtflush_threads; i++) {
mtflush_io->work_item[i].wr.buf_pool = NULL; work_item[i].wr.buf_pool = NULL;
mtflush_io->work_item[i].rd.page_pool = NULL; work_item[i].rd.page_pool = NULL;
mtflush_io->work_item[i].tsk = MT_WRK_NONE; work_item[i].tsk = MT_WRK_NONE;
mtflush_io->work_item[i].wi_status = WRK_ITEM_EXIT; work_item[i].wi_status = WRK_ITEM_EXIT;
work_item[i].wheap = mtflush_io->wheap;
ib_wqueue_add(mtflush_io->wq, ib_wqueue_add(mtflush_io->wq,
(void *)&(mtflush_io->work_item[i]), (void *)&(work_item[i]),
mtflush_io->wheap); mtflush_io->wheap);
} }
...@@ -437,7 +396,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -437,7 +396,7 @@ buf_mtflu_io_thread_exit(void)
} }
/* Wait about 1/2 sec to allow threads really exit */ /* Wait about 1/2 sec to allow threads really exit */
os_thread_sleep(50000); os_thread_sleep(5000000);
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq)); ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
...@@ -468,7 +427,6 @@ buf_mtflu_handler_init( ...@@ -468,7 +427,6 @@ buf_mtflu_handler_init(
ib_wqueue_t* mtflush_work_queue; ib_wqueue_t* mtflush_work_queue;
ib_wqueue_t* mtflush_write_comp_queue; ib_wqueue_t* mtflush_write_comp_queue;
ib_wqueue_t* mtflush_read_comp_queue; ib_wqueue_t* mtflush_read_comp_queue;
wrk_t* work_items;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx); os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
...@@ -487,14 +445,6 @@ buf_mtflu_handler_init( ...@@ -487,14 +445,6 @@ buf_mtflu_handler_init(
mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap, mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(thread_sync_t)); MTFLUSH_MAX_WORKER * sizeof(thread_sync_t));
ut_a(mtflush_ctx != NULL); ut_a(mtflush_ctx != NULL);
work_items = (wrk_t*)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(wrk_t));
ut_a(work_items != NULL);
memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER);
memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER);
/* Initialize work items */
mtflu_setup_work_items(work_items, n_threads);
/* Create threads for page-compression-flush */ /* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) { for(i=0; i < n_threads; i++) {
...@@ -505,7 +455,6 @@ buf_mtflu_handler_init( ...@@ -505,7 +455,6 @@ buf_mtflu_handler_init(
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
mtflush_ctx[i].wheap = mtflush_heap; mtflush_ctx[i].wheap = mtflush_heap;
mtflush_ctx[i].wt_status = WTHR_INITIALIZED; mtflush_ctx[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].work_item = work_items;
mtflush_ctx[i].wthread = os_thread_create( mtflush_ctx[i].wthread = os_thread_create(
mtflush_io_thread, mtflush_io_thread,
...@@ -539,20 +488,28 @@ buf_mtflu_flush_work_items( ...@@ -539,20 +488,28 @@ buf_mtflu_flush_work_items(
{ {
ulint n_flushed=0, i; ulint n_flushed=0, i;
wrk_t *done_wi; wrk_t *done_wi;
mem_heap_t* work_heap;
wrk_t* work_item;
/* Allocate heap where all work items used and queue
node items areallocated */
work_heap = mem_heap_create(0);
work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
mtflush_ctx->work_item[i].tsk = MT_WRK_WRITE; work_item[i].tsk = MT_WRK_WRITE;
mtflush_ctx->work_item[i].rd.page_pool = NULL; work_item[i].rd.page_pool = NULL;
mtflush_ctx->work_item[i].wr.buf_pool = buf_pool_from_array(i); work_item[i].wr.buf_pool = buf_pool_from_array(i);
mtflush_ctx->work_item[i].wr.flush_type = flush_type; work_item[i].wr.flush_type = flush_type;
mtflush_ctx->work_item[i].wr.min = min_n; work_item[i].wr.min = min_n;
mtflush_ctx->work_item[i].wr.lsn_limit = lsn_limit; work_item[i].wr.lsn_limit = lsn_limit;
mtflush_ctx->work_item[i].id_usr = -1; work_item[i].id_usr = -1;
mtflush_ctx->work_item[i].wi_status = WRK_ITEM_SET; work_item[i].wi_status = WRK_ITEM_SET;
work_item[i].wheap = work_heap;
ib_wqueue_add(mtflush_ctx->wq, ib_wqueue_add(mtflush_ctx->wq,
(void *)(&(mtflush_ctx->work_item[i])), (void *)(&(work_item[i])),
mtflush_ctx->wheap); work_heap);
} }
/* wait on the completion to arrive */ /* wait on the completion to arrive */
...@@ -560,21 +517,15 @@ buf_mtflu_flush_work_items( ...@@ -560,21 +517,15 @@ buf_mtflu_flush_work_items(
done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq); done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);
if (done_wi != NULL) { if (done_wi != NULL) {
if(done_wi->n_flushed == 0) {
per_pool_pages_flushed[i] = 0;
} else {
per_pool_pages_flushed[i] = done_wi->n_flushed; per_pool_pages_flushed[i] = done_wi->n_flushed;
}
if((int)done_wi->id_usr == -1 && if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) { done_wi->wi_status == WRK_ITEM_SET ) {
#ifdef UNIV_DEBUG
fprintf(stderr, fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%d\n", "**Set/Unused work_item[%lu] flush_type=%d\n",
i, i,
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_ad(0); ut_a(0);
#endif
} }
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
...@@ -582,6 +533,12 @@ buf_mtflu_flush_work_items( ...@@ -582,6 +533,12 @@ buf_mtflu_flush_work_items(
} }
} }
ut_a(ib_wqueue_is_empty(mtflush_ctx->wq));
ut_a(ib_wqueue_is_empty(mtflush_ctx->wr_cq));
/* Release used work_items and queue nodes */
mem_heap_free(work_heap);
return(n_flushed); return(n_flushed);
} }
......
...@@ -150,6 +150,15 @@ ib_list_is_empty( ...@@ -150,6 +150,15 @@ ib_list_is_empty(
/* out: TRUE if empty else */ /* out: TRUE if empty else */
const ib_list_t* list); /* in: list */ const ib_list_t* list); /* in: list */
/********************************************************************
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
ib_list_len(
/*========*/
const ib_list_t* list); /*<! in: list */
/* List. */ /* List. */
struct ib_list_t { struct ib_list_t {
ib_list_node_t* first; /*!< first node */ ib_list_node_t* first; /*!< first node */
......
...@@ -58,3 +58,23 @@ ib_list_is_empty( ...@@ -58,3 +58,23 @@ ib_list_is_empty(
{ {
return(!(list->first || list->last)); return(!(list->first || list->last));
} }
/********************************************************************
Get number of items on list.
@return number of items on list */
UNIV_INLINE
ulint
ib_list_len(
/*========*/
const ib_list_t* list) /*<! in: list */
{
ulint len = 0;
ib_list_node_t* node = list->first;
while(node) {
len++;
node = node->next;
}
return (len);
}
...@@ -95,6 +95,23 @@ ib_wqueue_timedwait( ...@@ -95,6 +95,23 @@ ib_wqueue_timedwait(
ib_wqueue_t* wq, /* in: work queue */ ib_wqueue_t* wq, /* in: work queue */
ib_time_t wait_in_usecs); /* in: wait time in micro seconds */ ib_time_t wait_in_usecs); /* in: wait time in micro seconds */
/********************************************************************
Return first item on work queue or NULL if queue is empty
@return work item or NULL */
void*
ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq); /*<! in: work queue */
/********************************************************************
Get number of items on queue.
@return number of items on queue */
ulint
ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq); /*<! in: work queue */
/* Work queue. */ /* Work queue. */
struct ib_wqueue_t { struct ib_wqueue_t {
ib_mutex_t mutex; /*!< mutex protecting everything */ ib_mutex_t mutex; /*!< mutex protecting everything */
......
...@@ -161,6 +161,38 @@ ib_wqueue_timedwait( ...@@ -161,6 +161,38 @@ ib_wqueue_timedwait(
return(node ? node->data : NULL); return(node ? node->data : NULL);
} }
/********************************************************************
Return first item on work queue or NULL if queue is empty
@return work item or NULL */
void*
ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq) /*<! in: work queue */
{
ib_list_node_t* node = NULL;
mutex_enter(&wq->mutex);
if(!ib_list_is_empty(wq->items)) {
node = ib_list_get_first(wq->items);
if (node) {
ib_list_remove(wq->items, node);
}
}
/* We must reset the event when the list
gets emptied. */
if(ib_list_is_empty(wq->items)) {
os_event_reset(wq->event);
}
mutex_exit(&wq->mutex);
return (node ? node->data : NULL);
}
/******************************************************************** /********************************************************************
Check if queue is empty. */ Check if queue is empty. */
...@@ -173,3 +205,20 @@ ib_wqueue_is_empty( ...@@ -173,3 +205,20 @@ ib_wqueue_is_empty(
{ {
return(ib_list_is_empty(wq->items)); return(ib_list_is_empty(wq->items));
} }
/********************************************************************
Get number of items on queue.
@return number of items on queue */
ulint
ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq) /*<! in: work queue */
{
ulint len = 0;
mutex_enter(&wq->mutex);
len = ib_list_len(wq->items);
mutex_exit(&wq->mutex);
return(len);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment