Commit a5cf3a80 authored by Jan Lindström's avatar Jan Lindström

Merged latest mt-flush code to xtradb. Cleaned up thread statistic output

code.
parent 18353c6a
...@@ -116,18 +116,13 @@ typedef struct wrk_itm ...@@ -116,18 +116,13 @@ typedef struct wrk_itm
/* Thread syncronization data */ /* Thread syncronization data */
typedef struct thread_sync typedef struct thread_sync
{ {
ulint n_threads; /*!< Number of threads */
os_thread_id_t wthread_id; /*!< Identifier */ os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */ os_thread_t wthread; /*!< Thread id */
ib_wqueue_t *wq; /*!< Work Queue */ ib_wqueue_t *wq; /*!< Work Queue */
ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
wthr_status_t wt_status; /*!< Worker thread status */ wthr_status_t wt_status; /*!< Worker thread status */
ulint stat_universal_num_processed;
/*!< Total number of pages
processed by this thread */
ulint stat_cycle_num_processed;
/*!< Number of pages processed
on this cycle */
mem_heap_t* wheap; /*!< Work heap where memory mem_heap_t* wheap; /*!< Work heap where memory
is allocated */ is allocated */
wrk_t* work_item; /*!< Work items to be processed */ wrk_t* work_item; /*!< Work items to be processed */
...@@ -231,6 +226,7 @@ buf_mtflu_flush_pool_instance( ...@@ -231,6 +226,7 @@ buf_mtflu_flush_pool_instance(
work_item->wr.min, work_item->wr.min,
work_item->wr.lsn_limit); work_item->wr.lsn_limit);
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, work_item->n_flushed); buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
...@@ -239,28 +235,29 @@ buf_mtflu_flush_pool_instance( ...@@ -239,28 +235,29 @@ buf_mtflu_flush_pool_instance(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/******************************************************************//** /******************************************************************//**
Output work item list status, Print flush statistics of work items.
*/ */
static static
void void
mtflu_print_work_list( mtflu_print_thread_stat(
/*==================*/ /*====================*/
wrk_t* wi_list) /*!< in: Work item list */ wrk_t* work_item) /*!< in: Work items */
{ {
wrk_t* wi = wi_list; ulint stat_tot=0;
ulint i=0; ulint i=0;
if(!wi_list) { for(i=0; i< MTFLUSH_MAX_WORKER; i++) {
fprintf(stderr, "list NULL\n"); stat_tot+=work_item[i].n_flushed;
}
while(wi) { fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n",
fprintf(stderr, "-\t[%p]\t[%s]\t[%lu] > %p\n", work_item[i].id_usr,
wi, (wi->id_usr == -1)?"free":"Busy", wi->n_flushed, wi->next); work_item[i].n_flushed);
wi = wi->next;
i++; if (work_item[i].next == NULL) {
break; /* No more filled work items */
}
} }
fprintf(stderr, "list len: %d\n", i); fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot);
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
...@@ -282,10 +279,6 @@ mtflush_service_io( ...@@ -282,10 +279,6 @@ mtflush_service_io(
mtflush_io->wt_status = WTHR_SIG_WAITING; mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs);
#ifdef UNIV_DEBUG
mtflu_print_work_list(mtflush_io->work_item);
#endif
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; mtflush_io->wt_status = WTHR_RUNNING;
} else { } else {
...@@ -345,10 +338,28 @@ DECLARE_THREAD(mtflush_io_thread)( ...@@ -345,10 +338,28 @@ DECLARE_THREAD(mtflush_io_thread)(
void * arg) void * arg)
{ {
thread_sync_t *mtflush_io = ((thread_sync_t *)arg); thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
#ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0;
wrk_t* work_item = mtflush_io[0].work_item;
ulint i;
#endif
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
mtflush_service_io(mtflush_io); mtflush_service_io(mtflush_io);
mtflush_io->stat_cycle_num_processed = 0;
#ifdef UNIV_DEBUG
for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
stat_cycle_num_processed+= work_item[i].n_flushed;
}
stat_universal_num_processed+=stat_cycle_num_processed;
stat_cycle_num_processed = 0;
fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n",
stat_universal_num_processed,
stat_cycle_num_processed);
mtflu_print_thread_stat(work_item);
#endif
} }
/* This should make sure that all current work items are /* This should make sure that all current work items are
...@@ -458,13 +469,16 @@ buf_mtflu_handler_init( ...@@ -458,13 +469,16 @@ buf_mtflu_handler_init(
work_items = (wrk_t*)mem_heap_alloc(mtflush_heap, work_items = (wrk_t*)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(wrk_t)); MTFLUSH_MAX_WORKER * sizeof(wrk_t));
ut_a(work_items != NULL); ut_a(work_items != NULL);
memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER);
memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER);
/* Initialize work items */ /* Initialize work items */
mtflu_setup_work_items(work_items, MTFLUSH_MAX_WORKER); mtflu_setup_work_items(work_items, n_threads);
/* Create threads for page-compression-flush */ /* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) { for(i=0; i < n_threads; i++) {
os_thread_id_t new_thread_id; os_thread_id_t new_thread_id;
mtflush_ctx[i].n_threads = n_threads;
mtflush_ctx[i].wq = mtflush_work_queue; mtflush_ctx[i].wq = mtflush_work_queue;
mtflush_ctx[i].wr_cq = mtflush_write_comp_queue; mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue; mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
...@@ -531,19 +545,16 @@ buf_mtflu_flush_work_items( ...@@ -531,19 +545,16 @@ buf_mtflu_flush_work_items(
per_pool_pages_flushed[i] = done_wi->n_flushed; per_pool_pages_flushed[i] = done_wi->n_flushed;
} }
if(done_wi->id_usr == -1 && if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) { done_wi->wi_status == WRK_ITEM_SET ) {
fprintf(stderr, fprintf(stderr,
"**Set/Unused work_item[%d] flush_type=%lu\n", "**Set/Unused work_item[%lu] flush_type=%lu\n",
i, i,
done_wi->wr.flush_type); done_wi->wr.flush_type);
ut_a(0); ut_a(0);
} }
n_flushed+= done_wi->n_flushed; n_flushed+= done_wi->n_flushed;
/* Reset for next round*/
mtflush_ctx->work_item[i].id_usr = -1;
i++; i++;
} }
} }
...@@ -551,47 +562,6 @@ buf_mtflu_flush_work_items( ...@@ -551,47 +562,6 @@ buf_mtflu_flush_work_items(
return(n_flushed); return(n_flushed);
} }
/*******************************************************************//**
Flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully. false if another batch
of same type was already running. */
bool
buf_mtflu_flush_LRU(
/*================*/
buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
ulint* n_processed) /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint page_count;
if (n_processed) {
*n_processed = 0;
}
if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) {
return(false);
}
page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0);
buf_flush_end(buf_pool, BUF_FLUSH_LRU);
buf_flush_common(BUF_FLUSH_LRU, page_count);
if (n_processed) {
*n_processed = page_count;
}
return(true);
}
/*******************************************************************//** /*******************************************************************//**
Multi-threaded version of buf_flush_list Multi-threaded version of buf_flush_list
*/ */
......
...@@ -284,8 +284,7 @@ SET(INNOBASE_SOURCES ...@@ -284,8 +284,7 @@ SET(INNOBASE_SOURCES
buf/buf0flu.cc buf/buf0flu.cc
buf/buf0lru.cc buf/buf0lru.cc
buf/buf0rea.cc buf/buf0rea.cc
# TODO: JAN uncomment buf/buf0mtflu.cc
# buf/buf0mtflu.cc
data/data0data.cc data/data0data.cc
data/data0type.cc data/data0type.cc
dict/dict0boot.cc dict/dict0boot.cc
......
...@@ -32,6 +32,7 @@ Created 11/11/1995 Heikki Tuuri ...@@ -32,6 +32,7 @@ Created 11/11/1995 Heikki Tuuri
#endif #endif
#include "buf0buf.h" #include "buf0buf.h"
#include "buf0mtflu.h"
#include "buf0checksum.h" #include "buf0checksum.h"
#include "srv0start.h" #include "srv0start.h"
#include "srv0srv.h" #include "srv0srv.h"
...@@ -1949,47 +1950,6 @@ void buf_pool_exit_LRU_mutex( ...@@ -1949,47 +1950,6 @@ void buf_pool_exit_LRU_mutex(
mutex_exit(&buf_pool->LRU_list_mutex); mutex_exit(&buf_pool->LRU_list_mutex);
} }
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
list.
NOTE: The calling thread is not allowed to own any latches on pages!
@return true if a batch was queued successfully. false if another batch
of same type was already running. */
static
bool
pgcomp_buf_flush_LRU(
/*==========*/
buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
ulint* n_processed) /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
flush_counters_t n;
if (n_processed) {
*n_processed = 0;
}
if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) {
return(false);
}
buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0, false, &n);
buf_flush_end(buf_pool, BUF_FLUSH_LRU);
buf_flush_common(BUF_FLUSH_LRU, n.flushed);
if (n_processed) {
*n_processed = n.flushed;
}
return(true);
}
/* JAN: TODO: END: */ /* JAN: TODO: END: */
/*******************************************************************//** /*******************************************************************//**
...@@ -2029,126 +1989,6 @@ buf_flush_LRU( ...@@ -2029,126 +1989,6 @@ buf_flush_LRU(
return(true); return(true);
} }
/* JAN: TODO: */
/*******************************************************************//**/
extern int is_pgcomp_wrk_init_done(void);
extern int pgcomp_flush_work_items(
int buf_pool_inst,
int *pages_flushed,
buf_flush_t flush_type,
int min_n,
lsn_t lsn_limit);
#define MT_COMP_WATER_MARK 50
#ifdef UNIV_DEBUG
#include <time.h>
int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time)
{
if (g_time->tv_usec < s_time->tv_usec)
{
int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1;
s_time->tv_usec -= 1000000 * nsec;
s_time->tv_sec += nsec;
}
if (g_time->tv_usec - s_time->tv_usec > 1000000)
{
int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000;
s_time->tv_usec += 1000000 * nsec;
s_time->tv_sec -= nsec;
}
d_time->tv_sec = g_time->tv_sec - s_time->tv_sec;
d_time->tv_usec = g_time->tv_usec - s_time->tv_usec;
return 0;
}
#endif
static os_fast_mutex_t pgcomp_mtx;
void pgcomp_init(void)
{
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &pgcomp_mtx);
}
void pgcomp_deinit(void)
{
os_fast_mutex_free(&pgcomp_mtx);
}
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
UNIV_INTERN
bool
pgcomp_buf_flush_list(
/*==================*/
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint* n_processed) /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint i;
bool success = true;
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time;
#endif
int cnt_flush[MTFLUSH_MAX_WORKER];
if (n_processed) {
*n_processed = 0;
}
if (min_n != ULINT_MAX) {
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limit
so no limit here. */
min_n = (min_n + srv_buf_pool_instances - 1)
/ srv_buf_pool_instances;
}
#ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0);
#endif
// os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit);
// os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) {
*n_processed += cnt_flush[i];
}
if (cnt_flush[i]) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
cnt_flush[i]);
}
}
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu %llu usec]\n",
__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(success);
}
/* JAN: TODO: END: */
/*******************************************************************//** /*******************************************************************//**
This utility flushes dirty blocks from the end of the flush list of This utility flushes dirty blocks from the end of the flush list of
all buffer pool instances. all buffer pool instances.
...@@ -2181,11 +2021,9 @@ buf_flush_list( ...@@ -2181,11 +2021,9 @@ buf_flush_list(
bool timeout = false; bool timeout = false;
ulint flush_start_time = 0; ulint flush_start_time = 0;
/* JAN: TODO: */ if (buf_mtflu_init_done()) {
if (is_pgcomp_wrk_init_done()) { return(buf_mtflu_flush_list(min_n, lsn_limit, n_processed));
return(pgcomp_buf_flush_list(min_n, lsn_limit, n_processed));
} }
/* JAN: TODO: END: */
for (i = 0; i < srv_buf_pool_instances; i++) { for (i = 0; i < srv_buf_pool_instances; i++) {
requested_pages[i] = 0; requested_pages[i] = 0;
...@@ -2380,60 +2218,6 @@ buf_flush_single_page_from_LRU( ...@@ -2380,60 +2218,6 @@ buf_flush_single_page_from_LRU(
return(freed); return(freed);
} }
/* JAN: TODO: */
/*********************************************************************//**
pgcomp_Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
pgcomp_buf_flush_LRU_tail(void)
/*====================*/
{
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time;
#endif
ulint total_flushed=0, i=0;
int cnt_flush[32];
#ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0);
#endif
ut_ad(is_pgcomp_wrk_init_done());
os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
if (cnt_flush[i]) {
total_flushed += cnt_flush[i];
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_LRU_BATCH_TOTAL_PAGE,
MONITOR_LRU_BATCH_COUNT,
MONITOR_LRU_BATCH_PAGES,
cnt_flush[i]);
}
}
#if UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(total_flushed);
}
/* JAN: TODO: END: */
/*********************************************************************//** /*********************************************************************//**
Clears up tail of the LRU lists: Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list * Put replaceable pages at the tail of LRU to the free list
...@@ -2458,12 +2242,10 @@ buf_flush_LRU_tail(void) ...@@ -2458,12 +2242,10 @@ buf_flush_LRU_tail(void)
ulint free_list_lwm = srv_LRU_scan_depth / 100 ulint free_list_lwm = srv_LRU_scan_depth / 100
* srv_cleaner_free_list_lwm; * srv_cleaner_free_list_lwm;
/* JAN: TODO: */ if(buf_mtflu_init_done())
if(is_pgcomp_wrk_init_done())
{ {
return(pgcomp_buf_flush_LRU_tail()); return(buf_mtflu_flush_LRU_tail());
} }
/* JAN: TODO: END */
for (ulint i = 0; i < srv_buf_pool_instances; i++) { for (ulint i = 0; i < srv_buf_pool_instances; i++) {
......
/*****************************************************************************
Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2014, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/******************************************************************//**
@file buf/buf0mtflu.cc
Multi-threaded flush method implementation
Created 06/11/2013 Dhananjoy Das DDas@fusionio.com
Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/
#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "page0zip.h"
#include "ut0byte.h"
#include "ut0lst.h"
#include "page0page.h"
#include "fil0fil.h"
#include "buf0lru.h"
#include "buf0rea.h"
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
#include "fil0pagecompress.h"
#define MT_COMP_WATER_MARK 50
/* Work item status */
typedef enum wrk_status {
WRK_ITEM_SET=0, /*!< Work item is set */
WRK_ITEM_START=1, /*!< Processing of work item has started */
WRK_ITEM_DONE=2, /*!< Processing is done usually set to
SUCCESS/FAILED */
WRK_ITEM_SUCCESS=2, /*!< Work item successfully processed */
WRK_ITEM_FAILED=3, /*!< Work item process failed */
WRK_ITEM_EXIT=4, /*!< Exiting */
WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t;
/* Work item task type */
typedef enum mt_wrk_tsk {
MT_WRK_NONE=0, /*!< Exit queue-wait */
MT_WRK_WRITE=1, /*!< Flush operation */
MT_WRK_READ=2, /*!< Read operation */
MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
/* Work thread status */
typedef enum wthr_status {
WTHR_NOT_INIT=0, /*!< Work thread not initialized */
WTHR_INITIALIZED=1, /*!< Work thread initialized */
WTHR_SIG_WAITING=2, /*!< Work thread wating signal */
WTHR_RUNNING=3, /*!< Work thread running */
WTHR_NO_WORK=4, /*!< Work thread has no work */
WTHR_KILL_IT=5, /*!< Work thread should exit */
WTHR_STATUS_UNDEFINED
} wthr_status_t;
/* Write work task */
typedef struct wr_tsk {
buf_pool_t *buf_pool; /*!< buffer-pool instance */
buf_flush_t flush_type; /*!< flush-type for buffer-pool
flush operation */
ulint min; /*!< minimum number of pages
requested to be flushed */
lsn_t lsn_limit; /*!< lsn limit for the buffer-pool
flush operation */
} wr_tsk_t;
/* Read work task */
typedef struct rd_tsk {
buf_pool_t *page_pool; /*!< list of pages to decompress; */
} rd_tsk_t;
/* Work item */
typedef struct wrk_itm
{
mt_wrk_tsk_t tsk; /*!< Task type. Based on task-type
one of the entries wr_tsk/rd_tsk
will be used */
wr_tsk_t wr; /*!< Flush page list */
rd_tsk_t rd; /*!< Decompress page list */
ulint n_flushed; /*!< Flushed pages count */
os_thread_t id_usr; /*!< Thread-id currently working */
wrk_status_t wi_status; /*!< Work item status */
struct wrk_itm *next; /*!< Next work item */
} wrk_t;
/* Thread syncronization data */
typedef struct thread_sync
{
ulint n_threads; /*!< Number of threads */
os_thread_id_t wthread_id; /*!< Identifier */
os_thread_t wthread; /*!< Thread id */
ib_wqueue_t *wq; /*!< Work Queue */
ib_wqueue_t *wr_cq; /*!< Write Completion Queue */
ib_wqueue_t *rd_cq; /*!< Read Completion Queue */
wthr_status_t wt_status; /*!< Worker thread status */
mem_heap_t* wheap; /*!< Work heap where memory
is allocated */
wrk_t* work_item; /*!< Work items to be processed */
} thread_sync_t;
/* QUESTION: Is this array used from several threads concurrently ? */
// static wrk_t work_items[MTFLUSH_MAX_WORKER];
/* TODO: REALLY NEEDED ? */
static int mtflush_work_initialized = -1;
static os_fast_mutex_t mtflush_mtx;
static thread_sync_t* mtflush_ctx=NULL;
/******************************************************************//**
Initialize work items. */
static
void
mtflu_setup_work_items(
/*===================*/
wrk_t* work_items, /*!< inout: Work items */
ulint n_items) /*!< in: Number of work items */
{
ulint i;
for(i=0; i<n_items; i++) {
work_items[i].rd.page_pool = NULL;
work_items[i].wr.buf_pool = NULL;
work_items[i].n_flushed = 0;
work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%n_items];
}
/* last node should be the tail */
work_items[n_items-1].next = NULL;
}
/******************************************************************//**
Set multi-threaded flush work initialized. */
static inline
void
buf_mtflu_work_init(void)
/*=====================*/
{
mtflush_work_initialized = 1;
}
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized */
bool
buf_mtflu_init_done(void)
/*=====================*/
{
return(mtflush_work_initialized == 1);
}
/******************************************************************//**
Fush buffer pool instance.
@return number of flushed pages, or 0 if error happened
*/
static
ulint
buf_mtflu_flush_pool_instance(
/*==========================*/
wrk_t *work_item) /*!< inout: work item to be flushed */
{
flush_counters_t n;
ut_a(work_item != NULL);
ut_a(work_item->wr.buf_pool != NULL);
if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
#ifdef UNIV_DEBUG
/* QUESTION: is this a really failure ? */
fprintf(stderr, "flush_start Failed, flush_type:%d\n",
work_item->wr.flush_type);
#endif
return 0;
}
if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_mutex_enter(work_item->wr.buf_pool);
work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU);
buf_pool_mutex_exit(work_item->wr.buf_pool);
work_item->wr.min = ut_min(srv_LRU_scan_depth,work_item->wr.min);
}
buf_flush_batch(work_item->wr.buf_pool,
work_item->wr.flush_type,
work_item->wr.min,
work_item->wr.lsn_limit,
false,
&n);
work_item->n_flushed = n.flushed;
buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);
return 0;
}
#ifdef UNIV_DEBUG
/******************************************************************//**
Print flush statistics of work items
*/
static
void
mtflu_print_thread_stat(
/*====================*/
wrk_t* work_item) /*!< in: Work items */
{
ulint stat_tot=0;
ulint i=0;
for(i=0; i< MTFLUSH_MAX_WORKER; i++) {
stat_tot+=work_item[i].n_flushed;
fprintf(stderr, "MTFLUSH: Thread[%lu] stat [%lu]\n",
work_item[i].id_usr,
work_item[i].n_flushed);
if (work_item[i].next == NULL) {
break; /* No more filled work items */
}
}
fprintf(stderr, "MTFLUSH: Stat-Total:%lu\n", stat_tot);
}
#endif /* UNIV_DEBUG */
/******************************************************************//**
Worker function to wait for work items and processing them and
sending reply back.
*/
static
void
mtflush_service_io(
/*===============*/
thread_sync_t* mtflush_io) /*!< inout: multi-threaded flush
syncronization data */
{
wrk_t *work_item = NULL;
ulint n_flushed=0;
ib_time_t max_wait_usecs = 5000000;
mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs);
if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING;
} else {
/* Because of timeout this thread did not get any work */
mtflush_io->wt_status = WTHR_NO_WORK;
return;
}
work_item->id_usr = mtflush_io->wthread;
switch(work_item->tsk) {
case MT_WRK_NONE:
ut_a(work_item->wi_status == WRK_ITEM_EXIT);
work_item->wi_status = WRK_ITEM_SUCCESS;
/* QUESTION: Why completed work items are inserted to
completion queue ? */
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap);
break;
case MT_WRK_WRITE:
work_item->wi_status = WRK_ITEM_START;
/* Process work item */
/* QUESTION: Is this a really a error ? */
if (0 != (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
fprintf(stderr, "FLUSH op failed ret:%lu\n", n_flushed);
work_item->wi_status = WRK_ITEM_FAILED;
}
work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->wr_cq, work_item, mtflush_io->wheap);
break;
case MT_WRK_READ:
/* Need to also handle the read case */
/* TODO: ? */
ut_a(0);
/* completed task get added to rd_cq */
/* work_item->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(mtflush_io->rd_cq, work_item, mtflush_io->wheap);*/
break;
default:
/* None other than Write/Read handling planned */
ut_a(0);
}
mtflush_io->wt_status = WTHR_NO_WORK;
}
/******************************************************************//**
Thead used to flush dirty pages when multi-threaded flush is
used.
@return a dummy parameter*/
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(mtflush_io_thread)(
/*==============================*/
void * arg)
{
thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
#ifdef UNIV_DEBUG
ib_uint64_t stat_universal_num_processed = 0;
ib_uint64_t stat_cycle_num_processed = 0;
wrk_t* work_item = mtflush_io[0].work_item;
ulint i;
#endif
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
mtflush_service_io(mtflush_io);
#ifdef UNIV_DEBUG
for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
stat_cycle_num_processed+= work_item[i].n_flushed;
}
stat_universal_num_processed+=stat_cycle_num_processed;
stat_cycle_num_processed = 0;
fprintf(stderr, "MTFLUSH_IO_THREAD: total %lu cycle %lu\n",
stat_universal_num_processed,
stat_cycle_num_processed);
mtflu_print_thread_stat(work_item);
#endif
}
/* This should make sure that all current work items are
processed before threads exit. */
while (!ib_wqueue_is_empty(mtflush_io->wq)) {
mtflush_service_io(mtflush_io);
}
os_thread_exit(NULL);
OS_THREAD_DUMMY_RETURN;
}
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void)
/*==========================*/
{
ulint i;
thread_sync_t* mtflush_io = mtflush_ctx;
ut_a(mtflush_io != NULL);
fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n",
srv_buf_pool_instances);
/* Send one exit work item/thread */
for (i=0; i < srv_buf_pool_instances; i++) {
mtflush_io->work_item[i].wr.buf_pool = NULL;
mtflush_io->work_item[i].rd.page_pool = NULL;
mtflush_io->work_item[i].tsk = MT_WRK_NONE;
mtflush_io->work_item[i].wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(mtflush_io->wq,
(void *)&(mtflush_io->work_item[i]),
mtflush_io->wheap);
}
/* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait about 1/2 sec */
os_thread_sleep(50000);
}
ut_a(ib_wqueue_is_empty(mtflush_io->wq));
/* Collect all work done items */
for (i=0; i < srv_buf_pool_instances;) {
wrk_t* work_item;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000);
if (work_item) {
i++;
}
}
ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq));
/* Free all queues */
ib_wqueue_free(mtflush_io->wq);
ib_wqueue_free(mtflush_io->wr_cq);
ib_wqueue_free(mtflush_io->rd_cq);
/* Free heap */
mem_heap_free(mtflush_io->wheap);
os_fast_mutex_free(&mtflush_mtx);
}
/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
ulint n_threads, /*!< in: Number of threads to create */
ulint wrk_cnt) /*!< in: Number of work items */
{
ulint i;
mem_heap_t* mtflush_heap;
ib_wqueue_t* mtflush_work_queue;
ib_wqueue_t* mtflush_write_comp_queue;
ib_wqueue_t* mtflush_read_comp_queue;
wrk_t* work_items;
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);
/* Create heap, work queue, write completion queue, read
completion queue for multi-threaded flush, and init
handler. */
mtflush_heap = mem_heap_create(0);
ut_a(mtflush_heap != NULL);
mtflush_work_queue = ib_wqueue_create();
ut_a(mtflush_work_queue != NULL);
mtflush_write_comp_queue = ib_wqueue_create();
ut_a(mtflush_write_comp_queue != NULL);
mtflush_read_comp_queue = ib_wqueue_create();
ut_a(mtflush_read_comp_queue != NULL);
mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(thread_sync_t));
ut_a(mtflush_ctx != NULL);
work_items = (wrk_t*)mem_heap_alloc(mtflush_heap,
MTFLUSH_MAX_WORKER * sizeof(wrk_t));
ut_a(work_items != NULL);
memset(work_items, 0, sizeof(wrk_t) * MTFLUSH_MAX_WORKER);
memset(mtflush_ctx, 0, sizeof(thread_sync_t) * MTFLUSH_MAX_WORKER);
/* Initialize work items */
mtflu_setup_work_items(work_items, n_threads);
/* Create threads for page-compression-flush */
for(i=0; i < n_threads; i++) {
os_thread_id_t new_thread_id;
mtflush_ctx[i].n_threads = n_threads;
mtflush_ctx[i].wq = mtflush_work_queue;
mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
mtflush_ctx[i].wheap = mtflush_heap;
mtflush_ctx[i].wt_status = WTHR_INITIALIZED;
mtflush_ctx[i].work_item = work_items;
mtflush_ctx[i].wthread = os_thread_create(
mtflush_io_thread,
((void *)(mtflush_ctx + i)),
&new_thread_id);
mtflush_ctx[i].wthread_id = new_thread_id;
}
buf_mtflu_work_init();
return((void *)mtflush_ctx);
}
/******************************************************************//**
Flush buffer pool instances.
@return number of pages flushed. */
ulint
buf_mtflu_flush_work_items(
/*=======================*/
ulint buf_pool_inst, /*!< in: Number of buffer pool instances */
ulint *per_pool_pages_flushed, /*!< out: Number of pages
flushed/instance */
buf_flush_t flush_type, /*!< in: Type of flush */
ulint min_n, /*!< in: Wished minimum number of
blocks to be flushed */
lsn_t lsn_limit) /*!< in: All blocks whose
oldest_modification is smaller than
this should be flushed (if their
number does not exceed min_n) */
{
ulint n_flushed=0, i;
wrk_t *done_wi;
for(i=0;i<buf_pool_inst; i++) {
mtflush_ctx->work_item[i].tsk = MT_WRK_WRITE;
mtflush_ctx->work_item[i].rd.page_pool = NULL;
mtflush_ctx->work_item[i].wr.buf_pool = buf_pool_from_array(i);
mtflush_ctx->work_item[i].wr.flush_type = flush_type;
mtflush_ctx->work_item[i].wr.min = min_n;
mtflush_ctx->work_item[i].wr.lsn_limit = lsn_limit;
mtflush_ctx->work_item[i].id_usr = -1;
mtflush_ctx->work_item[i].wi_status = WRK_ITEM_SET;
ib_wqueue_add(mtflush_ctx->wq,
(void *)(&(mtflush_ctx->work_item[i])),
mtflush_ctx->wheap);
}
/* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) {
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000);
if (done_wi != NULL) {
if(done_wi->n_flushed == 0) {
per_pool_pages_flushed[i] = 0;
} else {
per_pool_pages_flushed[i] = done_wi->n_flushed;
}
if((int)done_wi->id_usr == -1 &&
done_wi->wi_status == WRK_ITEM_SET ) {
fprintf(stderr,
"**Set/Unused work_item[%lu] flush_type=%lu\n",
i,
done_wi->wr.flush_type);
ut_a(0);
}
n_flushed+= done_wi->n_flushed;
i++;
}
}
return(n_flushed);
}
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list(
/*=================*/
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint* n_processed) /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
{
ulint i;
bool success = true;
ulint cnt_flush[MTFLUSH_MAX_WORKER];
if (n_processed) {
*n_processed = 0;
}
if (min_n != ULINT_MAX) {
/* Ensure that flushing is spread evenly amongst the
buffer pool instances. When min_n is ULINT_MAX
we need to flush everything up to the lsn limit
so no limit here. */
min_n = (min_n + srv_buf_pool_instances - 1)
/ srv_buf_pool_instances;
}
/* QUESTION: What is procted by below mutex ? */
os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit);
os_fast_mutex_unlock(&mtflush_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) {
*n_processed += cnt_flush[i];
}
if (cnt_flush[i]) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
cnt_flush[i]);
}
}
#ifdef UNIV_DEBUG
fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n",
__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed);
#endif
return(success);
}
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail(void)
/*==========================*/
{
ulint total_flushed=0, i;
ulint cnt_flush[MTFLUSH_MAX_WORKER];
ut_a(buf_mtflu_init_done());
/* QUESTION: What is protected by below mutex ? */
os_fast_mutex_lock(&mtflush_mtx);
buf_mtflu_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
os_fast_mutex_unlock(&mtflush_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) {
if (cnt_flush[i]) {
total_flushed += cnt_flush[i];
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_LRU_BATCH_TOTAL_PAGE,
MONITOR_LRU_BATCH_COUNT,
MONITOR_LRU_BATCH_PAGES,
cnt_flush[i]);
}
}
#if UNIV_DEBUG
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", (
srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed);
#endif
return(total_flushed);
}
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
ulint n_threads, /*!<in: Number of threads to fill */
void* ctx, /*!<in: thread context */
os_thread_id_t* thread_ids) /*!<in: thread id array */
{
thread_sync_t *mtflush_io = ((thread_sync_t *)ctx);
ulint i;
ut_a(mtflush_io != NULL);
ut_a(thread_ids != NULL);
for(i = 0; i < n_threads; i++) {
thread_ids[i] = mtflush_io[i].wthread_id;
}
}
...@@ -295,6 +295,63 @@ buf_flush_flush_list_in_progress(void) ...@@ -295,6 +295,63 @@ buf_flush_flush_list_in_progress(void)
/*==================================*/ /*==================================*/
__attribute__((warn_unused_result)); __attribute__((warn_unused_result));
/******************************************************************//**
Start a buffer flush batch for LRU or flush list */
ibool
buf_flush_start(
/*============*/
buf_pool_t* buf_pool, /*!< buffer pool instance */
buf_flush_t flush_type); /*!< in: BUF_FLUSH_LRU
or BUF_FLUSH_LIST */
/******************************************************************//**
End a buffer flush batch for LRU or flush list */
void
buf_flush_end(
/*==========*/
buf_pool_t* buf_pool, /*!< buffer pool instance */
buf_flush_t flush_type); /*!< in: BUF_FLUSH_LRU
or BUF_FLUSH_LIST */
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list or flush_list.
NOTE 1: in the case of an LRU flush the calling thread may own latches to
pages: to avoid deadlocks, this function must be written so that it cannot
end up waiting for these latches! NOTE 2: in the case of a flush list flush,
the calling thread is not allowed to own any latches on pages!
@return number of blocks for which the write request was queued */
__attribute__((nonnull))
void
buf_flush_batch(
/*============*/
buf_pool_t* buf_pool, /*!< in: buffer pool instance */
buf_flush_t flush_type, /*!< in: BUF_FLUSH_LRU or
BUF_FLUSH_LIST; if BUF_FLUSH_LIST,
then the caller must not own any
latches on pages */
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in: in the case of BUF_FLUSH_LIST
all blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
bool limited_lru_scan,/*!< in: for LRU flushes, if true,
allow to scan only up to
srv_LRU_scan_depth pages in total */
flush_counters_t* n); /*!< out: flushed/evicted page
counts */
/******************************************************************//**
Gather the aggregated stats for both flush list and LRU list flushing */
void
buf_flush_common(
/*=============*/
buf_flush_t flush_type, /*!< in: type of flush */
ulint page_count); /*!< in: number of pages flushed */
#ifndef UNIV_NONINL #ifndef UNIV_NONINL
#include "buf0flu.ic" #include "buf0flu.ic"
#endif #endif
......
/*****************************************************************************
Copyright (C) 2014 SkySQL Ab. All Rights Reserved.
Copyright (C) 2014 Fusion-io. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/******************************************************************//**
@file include/buf0mtflu.h
Multi-threadef flush method interface function prototypes
Created 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
Dhananjoy Das DDas@fusionio.com
***********************************************************************/
#ifndef buf0mtflu_h
#define buf0mtflu_h
/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void);
/*===========================*/
/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
ulint n_threads, /*!< in: Number of threads to create */
ulint wrk_cnt); /*!< in: Number of work items */
/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized, false if not */
bool
buf_mtflu_init_done(void);
/*======================*/
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
UNIV_INTERN
ulint
buf_mtflu_flush_LRU_tail(void);
/*===========================*/
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
bool
buf_mtflu_flush_list(
/*=================*/
ulint min_n, /*!< in: wished minimum mumber of blocks
flushed (it is not guaranteed that the
actual number is that big, though) */
lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
blocks whose oldest_modification is
smaller than this should be flushed
(if their number does not exceed
min_n), otherwise ignored */
ulint* n_processed); /*!< out: the number of pages
which were processed is passed
back to caller. Ignored if NULL */
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
ulint n_threads, /*!<in: Number of threads to fill */
void* ctx, /*!<in: thread context */
os_thread_id_t* thread_ids); /*!<in: thread id array */
#endif
...@@ -279,7 +279,7 @@ extern my_bool srv_use_lz4; ...@@ -279,7 +279,7 @@ extern my_bool srv_use_lz4;
/* Number of flush threads */ /* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64 #define MTFLUSH_MAX_WORKER 64
extern ulint srv_mtflush_threads; extern long srv_mtflush_threads;
/** Server undo tablespaces directory, can be absolute path. */ /** Server undo tablespaces directory, can be absolute path. */
extern char* srv_undo_dir; extern char* srv_undo_dir;
......
...@@ -180,6 +180,8 @@ UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE; ...@@ -180,6 +180,8 @@ UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE;
UNIV_INTERN my_bool srv_use_atomic_writes = FALSE; UNIV_INTERN my_bool srv_use_atomic_writes = FALSE;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */ /* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
UNIV_INTERN my_bool srv_use_lz4 = FALSE; UNIV_INTERN my_bool srv_use_lz4 = FALSE;
/* Number of threads used for multi-threaded flush */
UNIV_INTERN long srv_mtflush_threads = 0;
#ifdef __WIN__ #ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function /* Windows native condition variables. We use runtime loading / function
......
...@@ -72,6 +72,7 @@ Created 2/16/1996 Heikki Tuuri ...@@ -72,6 +72,7 @@ Created 2/16/1996 Heikki Tuuri
# include "os0proc.h" # include "os0proc.h"
# include "sync0sync.h" # include "sync0sync.h"
# include "buf0flu.h" # include "buf0flu.h"
# include "buf0mtflu.h"
# include "buf0rea.h" # include "buf0rea.h"
# include "dict0boot.h" # include "dict0boot.h"
# include "dict0load.h" # include "dict0load.h"
...@@ -134,6 +135,8 @@ static ulint n[SRV_MAX_N_IO_THREADS + 6]; ...@@ -134,6 +135,8 @@ static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** 6 is the ? */ /** 6 is the ? */
#define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS) #define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS)
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS + MTFLUSH_MAX_WORKER]; static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS + MTFLUSH_MAX_WORKER];
/* Thread contex data for multi-threaded flush */
void *mtflush_ctx=NULL;
/** We use this mutex to test the return value of pthread_mutex_trylock /** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */ on successful locking. HP-UX does NOT return 0, though Linux et al do. */
...@@ -1508,410 +1511,6 @@ init_log_online(void) ...@@ -1508,410 +1511,6 @@ init_log_online(void)
} }
} }
/* JAN: TODO: */
/**********************************************************************************/
#ifdef UNIV_DEBUG
extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time);
#endif
extern ibool buf_flush_start(buf_pool_t* buf_pool, buf_flush_t flush_type);
extern void buf_flush_end(buf_pool_t* buf_pool, buf_flush_t flush_type);
extern void buf_flush_common(buf_flush_t flush_type, ulint page_count);
extern ulint buf_flush_batch(buf_pool_t* buf_pool, buf_flush_t flush_type, ulint min_n, lsn_t lsn_limit, bool limited_lru_scan,
flush_counters_t* n);
extern void pgcomp_init(void);
extern void pgcomp_deinit(void);
extern void buf_pool_enter_LRU_mutex(buf_pool_t*);
extern void buf_pool_exit_LRU_mutex(buf_pool_t*);
typedef enum wrk_status {
WRK_ITEM_SET=0, // wrk-item is set
WRK_ITEM_START=1, // processing of wrk-item has started
WRK_ITEM_DONE=2, // processing is done usually set to SUCCESS/FAILED
WRK_ITEM_SUCCESS=2, // Success processing the wrk-item
WRK_ITEM_FAILED=3, // status of failed
WRK_ITEM_EXIT=4,
WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t;
typedef enum mt_wrk_tsk {
MT_WRK_NONE=0, // Exit queue-wait
MT_WRK_WRITE=1, // Flush operation
MT_WRK_READ=2, // Decompress operation
MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
typedef enum wthr_status {
WTHR_NOT_INIT=0,
WTHR_INITIALIZED=1,
WTHR_SIG_WAITING=2,
WTHR_RUNNING=3,
WTHR_NO_WORK=4,
WTHR_KILL_IT=5,
WTHR_STATUS_UNDEFINED
} wthr_status_t;
typedef struct wr_tsk {
buf_pool_t *buf_pool; // buffer-pool instance
buf_flush_t flush_type; // flush-type for buffer-pool flush operation
ulint min; //minimum number of pages requested to be flushed
lsn_t lsn_limit;//lsn limit for the buffer-pool flush operation
} wr_tsk_t;
typedef struct rd_tsk {
void *page_pool; //list of pages to decompress;
} rd_tsk_t;
typedef struct wrk_itm
{
mt_wrk_tsk_t tsk;
/* based on task-type one of the entries wr_tsk/rd_tsk will be used */
wr_tsk_t wr; //flush page list
rd_tsk_t rd; //decompress page list
unsigned long result; //flush pages count
unsigned long t_usec; //time-taken in usec
long id_usr; //thread-id currently working
wrk_status_t wi_status; //flag
struct wrk_itm *next;
} wrk_t;
typedef struct thread_sync
{
int wthread_id;
os_thread_t wthread;
ib_wqueue_t *wq; // work Queue
ib_wqueue_t *wr_cq;// Write Completion Queue
ib_wqueue_t *rd_cq; // Read Completion Queue
wthr_status_t wt_status; // Worker Thread status
unsigned long stat_universal_num_processed;
unsigned long stat_cycle_num_processed;
} thread_sync_t;
/* Global XXX:DD needs to be cleaned */
ib_wqueue_t *wq=NULL, *wr_cq=NULL, *rd_cq=NULL;
mem_heap_t *heap_allocated=NULL;
thread_sync_t pc_sync[MTFLUSH_MAX_WORKER];
static wrk_t work_items[MTFLUSH_MAX_WORKER];
static int pgcomp_wrk_initialized = -1;
ulint srv_mtflush_threads = 0;
int set_pgcomp_wrk_init_done(void)
{
pgcomp_wrk_initialized = 1;
return 0;
}
int is_pgcomp_wrk_init_done(void)
{
return(pgcomp_wrk_initialized == 1);
}
int setup_wrk_itm(int items)
{
int i;
for(i=0; i<items; i++) {
work_items[i].rd.page_pool = NULL;
work_items[i].wr.buf_pool = NULL;
work_items[i].t_usec = 0;
work_items[i].result = 0;
work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%items];
}
/* last node should be the tail */
work_items[items-1].next = NULL;
return 0;
}
int flush_pool_instance(wrk_t *wi)
{
flush_counters_t n;
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time;
#endif
if (!wi) {
fprintf(stderr, "work item invalid wi:%p\n", wi);
return -1;
}
if (!wi->wr.buf_pool) {
fprintf(stderr, "work-item wi->buf_pool:%p [likely thread exit]\n",
wi->wr.buf_pool);
return -1;
}
wi->t_usec = 0;
if (!buf_flush_start(wi->wr.buf_pool, wi->wr.flush_type)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
fprintf(stderr, "flush_start Failed, flush_type:%d\n",
wi->wr.flush_type);
return -1;
}
#ifdef UNIV_DEBUG
/* Record time taken for the OP in usec */
gettimeofday(&p_start_time, 0x0);
#endif
if (wi->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size.
*/
buf_pool_enter_LRU_mutex(wi->wr.buf_pool);
wi->wr.min = UT_LIST_GET_LEN(wi->wr.buf_pool->LRU);
buf_pool_exit_LRU_mutex(wi->wr.buf_pool);
wi->wr.min = ut_min(srv_LRU_scan_depth,wi->wr.min);
}
wi->result = buf_flush_batch(wi->wr.buf_pool,
wi->wr.flush_type,
wi->wr.min, wi->wr.lsn_limit,
false, &n);
buf_flush_end(wi->wr.buf_pool, wi->wr.flush_type);
buf_flush_common(wi->wr.flush_type, wi->result);
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
#endif
return 0;
}
int service_page_comp_io(thread_sync_t * ppc)
{
wrk_t *wi = NULL;
int ret=0;
ppc->wt_status = WTHR_SIG_WAITING;
wi = (wrk_t *)ib_wqueue_wait(ppc->wq);
if (wi) {
ppc->wt_status = WTHR_RUNNING;
} else {
fprintf(stderr, "%s:%d work-item is NULL\n", __FILE__, __LINE__);
ppc->wt_status = WTHR_NO_WORK;
return (0);
}
assert(wi != NULL);
wi->id_usr = ppc->wthread;
switch(wi->tsk) {
case MT_WRK_NONE:
assert(wi->wi_status == WRK_ITEM_EXIT);
wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break;
case MT_WRK_WRITE:
wi->wi_status = WRK_ITEM_START;
/* Process work item */
if (0 != (ret = flush_pool_instance(wi))) {
fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
wi->wi_status = WRK_ITEM_FAILED;
}
wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break;
case MT_WRK_READ:
/* Need to also handle the read case */
assert(0);
/* completed task get added to rd_cq */
/* wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->rd_cq, wi, heap_allocated);*/
break;
default:
/* None other than Write/Read handling planned */
assert(0);
}
ppc->wt_status = WTHR_NO_WORK;
return(0);
}
void page_comp_io_thread_exit()
{
ulint i;
fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n", srv_buf_pool_instances);
for (i=0; i<srv_buf_pool_instances; i++) {
work_items[i].wr.buf_pool = NULL;
work_items[i].rd.page_pool = NULL;
work_items[i].tsk = MT_WRK_NONE;
work_items[i].wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(wq, (void *)&work_items[i], heap_allocated);
}
}
/******************************************************************//**
@return a dummy parameter*/
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(page_comp_io_thread)(
/*================================*/
void * arg)
{
thread_sync_t *ppc_io = ((thread_sync_t *)arg);
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
service_page_comp_io(ppc_io);
ppc_io->stat_cycle_num_processed = 0;
}
os_thread_exit(NULL);
OS_THREAD_DUMMY_RETURN;
}
int print_wrk_list(wrk_t *wi_list)
{
wrk_t *wi = wi_list;
int i=0;
if(!wi_list) {
fprintf(stderr, "list NULL\n");
}
while(wi) {
fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n",
wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next);
wi = wi->next;
i++;
}
fprintf(stderr, "list len: %d\n", i);
return 0;
}
/******************************************************************//**
@return a dummy parameter*/
int pgcomp_handler_init(int num_threads, int wrk_cnt, ib_wqueue_t *wq, ib_wqueue_t *wr_cq, ib_wqueue_t *rd_cq)
{
int i=0;
if(is_pgcomp_wrk_init_done()) {
fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n");
return -1;
}
if(!wq || !wr_cq || !rd_cq) {
fprintf(stderr, "%s() FAILED wq:%p write-cq:%p read-cq:%p\n",
__FUNCTION__, wq, wr_cq, rd_cq);
return -1;
}
/* work-item setup */
setup_wrk_itm(wrk_cnt);
/* Mark each of the thread sync entires */
for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
pc_sync[i].wthread_id = i;
}
/* Create threads for page-compression-flush */
for(i=0; i < num_threads; i++) {
pc_sync[i].wthread_id = i;
pc_sync[i].wq = wq;
pc_sync[i].wr_cq = wr_cq;
pc_sync[i].rd_cq = rd_cq;
os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
thread_ids + START_OLD_THREAD_CNT + i);
pc_sync[i].wthread = (START_OLD_THREAD_CNT + i);
pc_sync[i].wt_status = WTHR_INITIALIZED;
}
set_pgcomp_wrk_init_done();
fprintf(stderr, "%s() Worker-Threads created..\n", __FUNCTION__);
return 0;
}
int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads)
{
ulong stat_tot=0;
ulint i=0;
for(i=0; i<num_threads;i++) {
stat_tot+=wthr[i].stat_universal_num_processed;
fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
wthr[i].stat_universal_num_processed);
}
fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
}
int reset_wrk_itm(int items)
{
int i;
for(i=0;i<items; i++) {
work_items[i].id_usr = -1;
}
return 0;
}
int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
buf_flush_t flush_type, int min_n, lsn_t lsn_limit)
{
int ret=0, i=0;
wrk_t *done_wi;
for(i=0;i<buf_pool_inst; i++) {
work_items[i].tsk = MT_WRK_WRITE;
work_items[i].rd.page_pool = NULL;
work_items[i].wr.buf_pool = buf_pool_from_array(i);
work_items[i].wr.flush_type = flush_type;
work_items[i].wr.min = min_n;
work_items[i].wr.lsn_limit = lsn_limit;
work_items[i].id_usr = -1;
work_items[i].next = &work_items[(i+1)%buf_pool_inst];
work_items[i].wi_status = WRK_ITEM_SET;
}
work_items[i-1].next=NULL;
for(i=0;i<buf_pool_inst; i++) {
ib_wqueue_add(wq, (void *)(&work_items[i]), heap_allocated);
}
/* wait on the completion to arrive */
for(i=0;i<buf_pool_inst; i++) {
done_wi = (wrk_t *)ib_wqueue_wait(wr_cq);
//fprintf(stderr, "%s: queue-wait DONE\n", __FUNCTION__);
ut_ad(done_wi != NULL);
}
/* collect data/results total pages flushed */
for(i=0; i<buf_pool_inst; i++) {
if(work_items[i].result == -1) {
ret = -1;
per_pool_pages_flushed[i] = 0;
} else {
per_pool_pages_flushed[i] = work_items[i].result;
}
if((work_items[i].id_usr == -1) &&
(work_items[i].wi_status == WRK_ITEM_SET )) {
fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].wr.flush_type);
//assert(0);
}
}
//wrk_thread_stat(pc_sync, pgc_n_threads);
/* clear up work-queue for next flush */
reset_wrk_itm(buf_pool_inst);
return(ret);
}
/* JAN: TODO: END: */
/******************************************************************** /********************************************************************
Starts InnoDB and creates a new database if database files Starts InnoDB and creates a new database if database files
are not found and the user wants. are not found and the user wants.
...@@ -3119,20 +2718,18 @@ files_checked: ...@@ -3119,20 +2718,18 @@ files_checked:
} }
if (!srv_read_only_mode) { if (!srv_read_only_mode) {
/* JAN: TODO: */
if (srv_buf_pool_instances <= MTFLUSH_MAX_WORKER) { if (srv_buf_pool_instances <= MTFLUSH_MAX_WORKER) {
srv_mtflush_threads = srv_buf_pool_instances; srv_mtflush_threads = srv_buf_pool_instances;
} }
heap_allocated = mem_heap_create(0);
ut_a(heap_allocated != NULL);
wq = ib_wqueue_create(); mtflush_ctx = buf_mtflu_handler_init(srv_mtflush_threads,
wr_cq = ib_wqueue_create(); srv_buf_pool_instances);
rd_cq = ib_wqueue_create();
pgcomp_init(); /* Set up the thread ids */
pgcomp_handler_init(srv_mtflush_threads, buf_mtflu_set_thread_ids(srv_mtflush_threads,
srv_buf_pool_instances, mtflush_ctx,
wq, wr_cq, rd_cq); (thread_ids + 6 + 32));
#if UNIV_DEBUG #if UNIV_DEBUG
fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances); fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances);
#endif #endif
...@@ -3411,15 +3008,11 @@ innobase_shutdown_for_mysql(void) ...@@ -3411,15 +3008,11 @@ innobase_shutdown_for_mysql(void)
/* g. Exit the multi threaded flush threads */ /* g. Exit the multi threaded flush threads */
page_comp_io_thread_exit(); buf_mtflu_io_thread_exit();
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
fprintf(stderr, "%s:%d os_thread_count:%lu \n", __FUNCTION__, __LINE__, os_thread_count); fprintf(stderr, "%s:%d os_thread_count:%lu \n", __FUNCTION__, __LINE__, os_thread_count);
#endif #endif
/* h. Remove the mutex */
pgcomp_deinit();
os_mutex_enter(os_sync_mutex); os_mutex_enter(os_sync_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment