Commit 55fab3db authored by Jan Lindström's avatar Jan Lindström

Fixed issue on atomic writes on startup, removed incorrect assert.

Fixed issue on file space extend when posix_fallocate is used.

Merged second iteration of multi-threaded flush code.
parent 8c5d5bc5
...@@ -37,6 +37,7 @@ buffer_pool_bytes_dirty disabled ...@@ -37,6 +37,7 @@ buffer_pool_bytes_dirty disabled
buffer_pool_pages_free disabled buffer_pool_pages_free disabled
buffer_pages_created disabled buffer_pages_created disabled
buffer_pages_written disabled buffer_pages_written disabled
buffer_index_pages_written disabled
buffer_pages_read disabled buffer_pages_read disabled
buffer_data_reads disabled buffer_data_reads disabled
buffer_data_written disabled buffer_data_written disabled
...@@ -160,6 +161,13 @@ compress_pages_compressed disabled ...@@ -160,6 +161,13 @@ compress_pages_compressed disabled
compress_pages_decompressed disabled compress_pages_decompressed disabled
compression_pad_increments disabled compression_pad_increments disabled
compression_pad_decrements disabled compression_pad_decrements disabled
compress_saved disabled
compress_trim_sect512 disabled
compress_trim_sect4096 disabled
compress_pages_page_compressed disabled
compress_page_compressed_trim_op disabled
compress_page_compressed_trim_op_saved disabled
compress_pages_page_decompressed disabled
index_splits disabled index_splits disabled
index_merges disabled index_merges disabled
adaptive_hash_searches disabled adaptive_hash_searches disabled
......
...@@ -46,6 +46,7 @@ Created 11/11/1995 Heikki Tuuri ...@@ -46,6 +46,7 @@ Created 11/11/1995 Heikki Tuuri
#include "ibuf0ibuf.h" #include "ibuf0ibuf.h"
#include "log0log.h" #include "log0log.h"
#include "os0file.h" #include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h" #include "trx0sys.h"
#include "srv0mon.h" #include "srv0mon.h"
#include "mysql/plugin.h" #include "mysql/plugin.h"
...@@ -1934,11 +1935,16 @@ buf_flush_LRU( ...@@ -1934,11 +1935,16 @@ buf_flush_LRU(
/* JAN: TODO: */ /* JAN: TODO: */
/*******************************************************************//**/ /*******************************************************************//**/
extern int is_pgcomp_wrk_init_done(void); extern int is_pgcomp_wrk_init_done(void);
extern int pgcomp_flush_work_items(int buf_pool_inst, int *pages_flushed, extern int pgcomp_flush_work_items(
int flush_type, int min_n, unsigned long long lsn_limit); int buf_pool_inst,
int *pages_flushed,
enum buf_flush flush_type,
int min_n,
lsn_t lsn_limit);
#define MT_COMP_WATER_MARK 50 #define MT_COMP_WATER_MARK 50
#ifdef UNIV_DEBUG
#include <time.h> #include <time.h>
int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time) int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time)
{ {
...@@ -1959,8 +1965,15 @@ int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_t ...@@ -1959,8 +1965,15 @@ int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_t
return 0; return 0;
} }
#endif
static os_fast_mutex_t pgcomp_mtx;
void pgcomp_init(void)
{
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &pgcomp_mtx);
}
static pthread_mutex_t pgcomp_mtx = PTHREAD_MUTEX_INITIALIZER;
/*******************************************************************//** /*******************************************************************//**
Multi-threaded version of buf_flush_list Multi-threaded version of buf_flush_list
*/ */
...@@ -1983,7 +1996,10 @@ pgcomp_buf_flush_list( ...@@ -1983,7 +1996,10 @@ pgcomp_buf_flush_list(
{ {
ulint i; ulint i;
bool success = true; bool success = true;
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time; struct timeval p_start_time, p_end_time, d_time;
#endif
int cnt_flush[MTFLUSH_MAX_WORKER];
if (n_processed) { if (n_processed) {
*n_processed = 0; *n_processed = 0;
...@@ -2001,16 +2017,11 @@ pgcomp_buf_flush_list( ...@@ -2001,16 +2017,11 @@ pgcomp_buf_flush_list(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif #endif
if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) { os_fast_mutex_lock(&pgcomp_mtx);
int cnt_flush[32];
//stack_trace();
pthread_mutex_lock(&pgcomp_mtx);
//gettimeofday(&p_start_time, 0x0);
//fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n);
pgcomp_flush_work_items(srv_buf_pool_instances, pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST, cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit); min_n, lsn_limit);
os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) { for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) { if (n_processed) {
...@@ -2022,75 +2033,18 @@ pgcomp_buf_flush_list( ...@@ -2022,75 +2033,18 @@ pgcomp_buf_flush_list(
MONITOR_FLUSH_BATCH_COUNT, MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES, MONITOR_FLUSH_BATCH_PAGES,
cnt_flush[i]); cnt_flush[i]);
} }
} }
pthread_mutex_unlock(&pgcomp_mtx);
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu %llu usec]\n",
min_n * srv_buf_pool_instances), *n_processed, __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(success);
}
/* Flush to lsn_limit in all buffer pool instances */
for (i = 0; i < srv_buf_pool_instances; i++) {
buf_pool_t* buf_pool;
ulint page_count = 0;
buf_pool = buf_pool_from_array(i);
if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
success = false;
continue;
}
page_count = buf_flush_batch(
buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit);
buf_flush_end(buf_pool, BUF_FLUSH_LIST);
buf_flush_common(BUF_FLUSH_LIST, page_count);
if (n_processed) {
*n_processed += page_count;
}
if (page_count) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
page_count);
}
}
#if UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", (
min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif #endif
return(success); return(success);
} }
#endif
/* JAN: TODO: END: */ /* JAN: TODO: END: */
/*******************************************************************//** /*******************************************************************//**
...@@ -2292,18 +2246,21 @@ ulint ...@@ -2292,18 +2246,21 @@ ulint
pgcomp_buf_flush_LRU_tail(void) pgcomp_buf_flush_LRU_tail(void)
/*====================*/ /*====================*/
{ {
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time; struct timeval p_start_time, p_end_time, d_time;
#endif
ulint total_flushed=0, i=0; ulint total_flushed=0, i=0;
int cnt_flush[32]; int cnt_flush[32];
#if UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif #endif
assert(is_pgcomp_wrk_init_done()); ut_ad(is_pgcomp_wrk_init_done());
pthread_mutex_lock(&pgcomp_mtx); os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances, pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) { for (i = 0; i < srv_buf_pool_instances; i++) {
if (cnt_flush[i]) { if (cnt_flush[i]) {
...@@ -2317,8 +2274,6 @@ pgcomp_buf_flush_LRU_tail(void) ...@@ -2317,8 +2274,6 @@ pgcomp_buf_flush_LRU_tail(void)
} }
} }
pthread_mutex_unlock(&pgcomp_mtx);
#if UNIV_DEBUG #if UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
...@@ -2894,6 +2849,7 @@ buf_flush_validate( ...@@ -2894,6 +2849,7 @@ buf_flush_validate(
} }
#endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */
#endif /* !UNIV_HOTBACKUP */
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
......
...@@ -1280,7 +1280,6 @@ fil_space_create( ...@@ -1280,7 +1280,6 @@ fil_space_create(
DBUG_EXECUTE_IF("fil_space_create_failure", return(false);); DBUG_EXECUTE_IF("fil_space_create_failure", return(false););
ut_a(fil_system); ut_a(fil_system);
ut_a(fsp_flags_is_valid(flags));
/* Look for a matching tablespace and if found free it. */ /* Look for a matching tablespace and if found free it. */
do { do {
......
...@@ -859,6 +859,10 @@ dict_tf_set( ...@@ -859,6 +859,10 @@ dict_tf_set(
if (awrites != ATOMIC_WRITES_DEFAULT) { if (awrites != ATOMIC_WRITES_DEFAULT) {
*flags |= (atomic_writes << DICT_TF_POS_ATOMIC_WRITES); *flags |= (atomic_writes << DICT_TF_POS_ATOMIC_WRITES);
ut_ad(dict_tf_get_atomic_writes(*flags) == awrites); ut_ad(dict_tf_get_atomic_writes(*flags) == awrites);
}
if (awrites == ATOMIC_WRITES_ON ||
(awrites == ATOMIC_WRITES_DEFAULT && srv_use_atomic_writes )) {
*flags |= (1 << DICT_TF_POS_ATOMIC_BLOBS); *flags |= (1 << DICT_TF_POS_ATOMIC_BLOBS);
} }
......
...@@ -257,6 +257,10 @@ extern my_bool srv_use_atomic_writes; ...@@ -257,6 +257,10 @@ extern my_bool srv_use_atomic_writes;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */ /* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
extern my_bool srv_use_lz4; extern my_bool srv_use_lz4;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
extern ulint srv_mtflush_threads;
#ifdef __WIN__ #ifdef __WIN__
extern ibool srv_use_native_conditions; extern ibool srv_use_native_conditions;
#endif /* __WIN__ */ #endif /* __WIN__ */
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved. Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, Google Inc. Copyright (c) 2008, Google Inc.
Copyright (c) 2009, Percona Inc. Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described Google, Inc. Those modifications are gratefully acknowledged and are described
...@@ -127,10 +127,9 @@ static os_file_t files[1000]; ...@@ -127,10 +127,9 @@ static os_file_t files[1000];
/** io_handler_thread parameters for thread identification */ /** io_handler_thread parameters for thread identification */
static ulint n[SRV_MAX_N_IO_THREADS + 6]; static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */ /** io_handler_thread identifiers, 32 is the maximum number of purge threads */
/** pgcomp_thread are 16 total */ /** 6 is the ? */
#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + 32) #define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
#define PGCOMP_MAX_WORKER 16 static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + MTFLUSH_MAX_WORKER];
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + PGCOMP_MAX_WORKER];
/** We use this mutex to test the return value of pthread_mutex_trylock /** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */ on successful locking. HP-UX does NOT return 0, though Linux et al do. */
...@@ -1442,16 +1441,25 @@ extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type); ...@@ -1442,16 +1441,25 @@ extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type);
extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type); extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type);
extern void buf_flush_common(enum buf_flush flush_type, ulint page_count); extern void buf_flush_common(enum buf_flush flush_type, ulint page_count);
extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit); extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit);
extern void pgcomp_init(void);
typedef enum wrk_status { typedef enum wrk_status {
WRK_ITEM_SET=0, WRK_ITEM_SET=0, // wrk-item is set
WRK_ITEM_START=1, WRK_ITEM_START=1, // processing of wrk-item has started
WRK_ITEM_DONE=2, WRK_ITEM_DONE=2, // processing is done usually set to SUCCESS/FAILED
WRK_ITEM_SUCCESS=2, WRK_ITEM_SUCCESS=2, // Success processing the wrk-item
WRK_ITEM_FAILED=3, WRK_ITEM_FAILED=3, // status of failed
WRK_ITEM_EXIT=4,
WRK_ITEM_STATUS_UNDEFINED WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t; } wrk_status_t;
typedef enum mt_wrk_tsk {
MT_WRK_NONE=0, // Exit queue-wait
MT_WRK_WRITE=1, // Flush operation
MT_WRK_READ=2, // Decompress operation
MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
typedef enum wthr_status { typedef enum wthr_status {
WTHR_NOT_INIT=0, WTHR_NOT_INIT=0,
WTHR_INITIALIZED=1, WTHR_INITIALIZED=1,
...@@ -1462,16 +1470,24 @@ typedef enum wthr_status { ...@@ -1462,16 +1470,24 @@ typedef enum wthr_status {
WTHR_STATUS_UNDEFINED WTHR_STATUS_UNDEFINED
} wthr_status_t; } wthr_status_t;
typedef struct wr_tsk {
buf_pool_t *buf_pool; // buffer-pool instance
enum buf_flush flush_type; // flush-type for buffer-pool flush operation
ulint min; //minimum number of pages requested to be flushed
lsn_t lsn_limit;//lsn limit for the buffer-pool flush operation
} wr_tsk_t;
typedef struct rd_tsk {
void *page_pool; //list of pages to decompress;
} rd_tsk_t;
typedef struct wrk_itm typedef struct wrk_itm
{ {
/****************************/ mt_wrk_tsk_t tsk;
/* Need to group into struct*/ /* based on task-type one of the entries wr_tsk/rd_tsk will be used */
buf_pool_t* buf_pool; //buffer-pool instance wr_tsk_t wr; //flush page list
int flush_type; //flush-type for buffer-pool flush operation rd_tsk_t rd; //decompress page list
int min; //minimum number of pages requested to be flushed
unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation
/****************************/
unsigned long result; //flush pages count unsigned long result; //flush pages count
unsigned long t_usec; //time-taken in usec unsigned long t_usec; //time-taken in usec
long id_usr; //thread-id currently working long id_usr; //thread-id currently working
...@@ -1479,52 +1495,25 @@ typedef struct wrk_itm ...@@ -1479,52 +1495,25 @@ typedef struct wrk_itm
struct wrk_itm *next; struct wrk_itm *next;
} wrk_t; } wrk_t;
typedef enum op_q_status {
Q_NOT_INIT=0,
Q_EMPTY=1,
Q_INITIALIZED=2,
Q_PROCESS=3,
Q_DONE=4,
Q_ERROR=5,
Q_STATUS_UNDEFINED
} q_status_t;
typedef struct op_queue
{
pthread_mutex_t mtx;
pthread_cond_t cv;
q_status_t flag;
wrk_t *head;
wrk_t *tail;
} opq_t;
opq_t wq, cq;
typedef struct thread_sync typedef struct thread_sync
{ {
int wthread_id; int wthread_id;
pthread_t wthread; os_thread_t wthread;
opq_t *wq; ib_wqueue_t *wq; // work Queue
opq_t *cq; ib_wqueue_t *wr_cq;// Write Completion Queue
wthr_status_t wt_status; ib_wqueue_t *rd_cq; // Read Completion Queue
wthr_status_t wt_status; // Worker Thread status
unsigned long stat_universal_num_processed; unsigned long stat_universal_num_processed;
unsigned long stat_cycle_num_processed; unsigned long stat_cycle_num_processed;
} thread_sync_t; } thread_sync_t;
/* Global XXX:DD needs to be cleaned */ /* Global XXX:DD needs to be cleaned */
int exit_flag; ib_wqueue_t *wq=NULL, *wr_cq=NULL, *rd_cq=NULL;
ulint check_wrk_done_count; mem_heap_t *heap_allocated=NULL;
static ulint done_cnt_flag; thread_sync_t pc_sync[MTFLUSH_MAX_WORKER];
static int pgc_n_threads = 8; static wrk_t work_items[MTFLUSH_MAX_WORKER];
thread_sync_t pc_sync[PGCOMP_MAX_WORKER];
static wrk_t work_items[PGCOMP_MAX_WORKER];
static int pgcomp_wrk_initialized = -1; static int pgcomp_wrk_initialized = -1;
ulint srv_mtflush_threads = 0;
int set_check_done_flag_count(int cnt)
{
return(check_wrk_done_count = cnt);
}
int set_pgcomp_wrk_init_done(void) int set_pgcomp_wrk_init_done(void)
{ {
...@@ -1537,83 +1526,14 @@ int is_pgcomp_wrk_init_done(void) ...@@ -1537,83 +1526,14 @@ int is_pgcomp_wrk_init_done(void)
return(pgcomp_wrk_initialized == 1); return(pgcomp_wrk_initialized == 1);
} }
ulint set_done_cnt_flag(ulint val)
{
/*
* Assumption: The thread calling into set_done_cnt_flag
* needs to have "cq.mtx" acquired, else not safe.
*/
done_cnt_flag = val;
return done_cnt_flag;
}
ulint cv_done_inc_flag_sig(thread_sync_t * ppc)
{
pthread_mutex_lock(&ppc->cq->mtx);
ppc->stat_universal_num_processed++;
ppc->stat_cycle_num_processed++;
done_cnt_flag++;
if(!(done_cnt_flag <= check_wrk_done_count)) {
fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n",
done_cnt_flag, check_wrk_done_count);
}
assert(done_cnt_flag <= check_wrk_done_count);
pthread_mutex_unlock(&ppc->cq->mtx);
if(done_cnt_flag == check_wrk_done_count) {
ppc->wq->flag = Q_DONE;
pthread_mutex_lock(&ppc->cq->mtx);
ppc->cq->flag = Q_DONE;
pthread_cond_signal(&ppc->cq->cv);
pthread_mutex_unlock(&ppc->cq->mtx);
}
return(done_cnt_flag);
}
int q_remove_wrk(opq_t *q, wrk_t **wi)
{
int ret = 0;
if(!wi || !q) {
return -1;
}
pthread_mutex_lock(&q->mtx);
assert(!((q->tail == NULL) && (q->head != NULL)));
assert(!((q->tail != NULL) && (q->head == NULL)));
/* get the first in the list*/
*wi = q->head;
if(q->head) {
ret = 0;
q->head = q->head->next;
(*wi)->next = NULL;
if(!q->head) {
q->tail = NULL;
}
} else {
q->tail = NULL;
ret = 1; /* indicating remove from queue failed */
}
pthread_mutex_unlock(&q->mtx);
return (ret);
}
int is_busy_wrk_itm(wrk_t *wi)
{
if(!wi) {
return -1;
}
return(!(wi->id_usr == -1));
}
int setup_wrk_itm(int items) int setup_wrk_itm(int items)
{ {
int i; int i;
for(i=0; i<items; i++) { for(i=0; i<items; i++) {
work_items[i].buf_pool = NULL; work_items[i].rd.page_pool = NULL;
work_items[i].result = 0; work_items[i].wr.buf_pool = NULL;
work_items[i].t_usec = 0; work_items[i].t_usec = 0;
work_items[i].result = 0;
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED; work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%items]; work_items[i].next = &work_items[(i+1)%items];
...@@ -1623,81 +1543,23 @@ int setup_wrk_itm(int items) ...@@ -1623,81 +1543,23 @@ int setup_wrk_itm(int items)
return 0; return 0;
} }
int init_queue(opq_t *q)
{
if(!q) {
return -1;
}
/* Initialize Queue mutex and CV */
pthread_mutex_init(&q->mtx, NULL);
pthread_cond_init(&q->cv, NULL);
q->flag = Q_INITIALIZED;
q->head = q->tail = NULL;
return 0;
}
#if 0
int drain_cq(opq_t *cq, int items)
{
int i=0;
if(!cq) {
return -1;
}
pthread_mutex_lock(&cq->mtx);
for(i=0; i<items; i++) {
work_items[i].result=0;
work_items[i].t_usec = 0;
work_items[i].id_usr = -1;
}
cq->head = cq->tail = NULL;
pthread_mutex_unlock(&cq->mtx);
return 0;
}
#endif
int q_insert_wrk_list(opq_t *q, wrk_t *w_list)
{
if((!q) || (!w_list)) {
fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list);
return -1;
}
pthread_mutex_lock(&q->mtx);
assert(!((q->tail == NULL) && (q->head != NULL)));
assert(!((q->tail != NULL) && (q->head == NULL)));
/* list is empty */
if(!q->tail) {
q->head = q->tail = w_list;
} else {
/* added the first of the node to list */
assert(q->head != NULL);
q->tail->next = w_list;
}
/* move tail to the last node */
while(q->tail->next) {
q->tail = q->tail->next;
}
pthread_mutex_unlock(&q->mtx);
return 0;
}
int flush_pool_instance(wrk_t *wi) int flush_pool_instance(wrk_t *wi)
{ {
struct timeval p_start_time, p_end_time, d_time; struct timeval p_start_time, p_end_time, d_time;
if(!wi) { if (!wi) {
fprintf(stderr, "work item invalid wi:%p\n", wi); fprintf(stderr, "work item invalid wi:%p\n", wi);
return -1; return -1;
} }
if (!wi->wr.buf_pool) {
fprintf(stderr, "work-item wi->buf_pool:%p [likely thread exit]\n",
wi->wr.buf_pool);
return -1;
}
wi->t_usec = 0; wi->t_usec = 0;
if (!buf_flush_start(wi->buf_pool, (buf_flush)wi->flush_type)) { if (!buf_flush_start(wi->wr.buf_pool, wi->wr.flush_type)) {
/* We have two choices here. If lsn_limit was /* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer specified then skipping an instance of buffer
pool means we cannot guarantee that all pages pool means we cannot guarantee that all pages
...@@ -1709,39 +1571,34 @@ int flush_pool_instance(wrk_t *wi) ...@@ -1709,39 +1571,34 @@ int flush_pool_instance(wrk_t *wi)
help in the retry which will follow the help in the retry which will follow the
failure. */ failure. */
fprintf(stderr, "flush_start Failed, flush_type:%d\n", fprintf(stderr, "flush_start Failed, flush_type:%d\n",
(buf_flush)wi->flush_type); wi->wr.flush_type);
return -1; return -1;
} }
#ifdef UNIV_DEBUG
/* Record time taken for the OP in usec */ /* Record time taken for the OP in usec */
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif
if((buf_flush)wi->flush_type == BUF_FLUSH_LRU) { if (wi->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value. /* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size. * We cap it with current LRU size.
*/ */
buf_pool_mutex_enter(wi->buf_pool); buf_pool_mutex_enter(wi->wr.buf_pool);
wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU); wi->wr.min = UT_LIST_GET_LEN(wi->wr.buf_pool->LRU);
buf_pool_mutex_exit(wi->buf_pool); buf_pool_mutex_exit(wi->wr.buf_pool);
wi->min = ut_min(srv_LRU_scan_depth,wi->min); wi->wr.min = ut_min(srv_LRU_scan_depth,wi->wr.min);
} }
wi->result = buf_flush_batch(wi->buf_pool, wi->result = buf_flush_batch(wi->wr.buf_pool,
(buf_flush)wi->flush_type, wi->wr.flush_type,
wi->min, wi->lsn_limit); wi->wr.min, wi->wr.lsn_limit);
buf_flush_end(wi->buf_pool, (buf_flush)wi->flush_type); buf_flush_end(wi->wr.buf_pool, wi->wr.flush_type);
buf_flush_common((buf_flush)wi->flush_type, wi->result); buf_flush_common(wi->wr.flush_type, wi->result);
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
#endif
return 0; return 0;
} }
...@@ -1750,68 +1607,75 @@ int service_page_comp_io(thread_sync_t * ppc) ...@@ -1750,68 +1607,75 @@ int service_page_comp_io(thread_sync_t * ppc)
wrk_t *wi = NULL; wrk_t *wi = NULL;
int ret=0; int ret=0;
pthread_mutex_lock(&ppc->wq->mtx);
do{
ppc->wt_status = WTHR_SIG_WAITING; ppc->wt_status = WTHR_SIG_WAITING;
ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx); wi = (wrk_t *)ib_wqueue_wait(ppc->wq);
if (wi) {
ppc->wt_status = WTHR_RUNNING; ppc->wt_status = WTHR_RUNNING;
if(ret == ETIMEDOUT) {
fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n",
done_cnt_flag, ret);
} else if(ret == EINVAL || ret == EPERM) {
fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n",
done_cnt_flag, ret);
}
if(ppc->wq->flag == Q_PROCESS) {
break;
} else { } else {
pthread_mutex_unlock(&ppc->wq->mtx); fprintf(stderr, "%s:%d work-item is NULL\n", __FILE__, __LINE__);
return -1;
}
} while (ppc->wq->flag == Q_PROCESS && ret == 0);
pthread_mutex_unlock(&ppc->wq->mtx);
while (ppc->cq->flag == Q_PROCESS) {
wi = NULL;
/* Get the work item */
if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) {
ppc->wt_status = WTHR_NO_WORK; ppc->wt_status = WTHR_NO_WORK;
return -1; return (0);
} }
assert(ret==0);
assert(wi != NULL); assert(wi != NULL);
assert(0 == is_busy_wrk_itm(wi));
assert(wi->id_usr == -1);
wi->id_usr = ppc->wthread; wi->id_usr = ppc->wthread;
wi->wi_status = WRK_ITEM_START;
switch(wi->tsk) {
case MT_WRK_NONE:
assert(wi->wi_status == WRK_ITEM_EXIT);
wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break;
case MT_WRK_WRITE:
wi->wi_status = WRK_ITEM_START;
/* Process work item */ /* Process work item */
if(0 != (ret = flush_pool_instance(wi))) { if (0 != (ret = flush_pool_instance(wi))) {
fprintf(stderr, "FLUSH op failed ret:%d\n", ret); fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
wi->wi_status = WRK_ITEM_FAILED; wi->wi_status = WRK_ITEM_FAILED;
} }
ret = q_insert_wrk_list(ppc->cq, wi);
assert(0==ret);
assert(check_wrk_done_count >= done_cnt_flag);
wi->wi_status = WRK_ITEM_SUCCESS; wi->wi_status = WRK_ITEM_SUCCESS;
if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) { ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break; break;
case MT_WRK_READ:
/* Need to also handle the read case */
assert(0);
/* completed task get added to rd_cq */
/* wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->rd_cq, wi, heap_allocated);*/
break;
default:
/* None other than Write/Read handling planned */
assert(0);
} }
}
ppc->wt_status = WTHR_NO_WORK;
return(0); return(0);
} }
void page_comp_io_thread_exit()
{
ulint i;
fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n", srv_buf_pool_instances);
for (i=0; i<srv_buf_pool_instances; i++) {
work_items[i].wr.buf_pool = NULL;
work_items[i].rd.page_pool = NULL;
work_items[i].tsk = MT_WRK_NONE;
work_items[i].wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(wq, (void *)&work_items[i], heap_allocated);
}
}
/******************************************************************//** /******************************************************************//**
@return a dummy parameter*/ @return a dummy parameter*/
extern "C" UNIV_INTERN extern "C" UNIV_INTERN
os_thread_ret_t os_thread_ret_t
DECLARE_THREAD(page_comp_io_thread)( DECLARE_THREAD(page_comp_io_thread)(
/*==========================================*/ /*================================*/
void * arg) void * arg)
{ {
thread_sync_t *ppc_io = ((thread_sync_t *)arg); thread_sync_t *ppc_io = ((thread_sync_t *)arg);
...@@ -1824,34 +1688,6 @@ DECLARE_THREAD(page_comp_io_thread)( ...@@ -1824,34 +1688,6 @@ DECLARE_THREAD(page_comp_io_thread)(
OS_THREAD_DUMMY_RETURN; OS_THREAD_DUMMY_RETURN;
} }
int print_queue_wrk_itm(opq_t *q)
{
#if UNIV_DEBUG
wrk_t *wi = NULL;
if(!q) {
fprintf(stderr, "queue NULL\n");
return -1;
}
if(!q->head || !q->tail) {
assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL))));
fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail);
return 0;
}
pthread_mutex_lock(&q->mtx);
for(wi = q->head; (wi != NULL) ; wi = wi->next) {
//fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
// wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
fprintf(stderr, "- [%p] [%s] >%p\n",
wi, (wi->id_usr == -1)?"free":"Busy", wi->next);
}
pthread_mutex_unlock(&q->mtx);
#endif
return(0);
}
int print_wrk_list(wrk_t *wi_list) int print_wrk_list(wrk_t *wi_list)
{ {
wrk_t *wi = wi_list; wrk_t *wi = wi_list;
...@@ -1871,111 +1707,9 @@ int print_wrk_list(wrk_t *wi_list) ...@@ -1871,111 +1707,9 @@ int print_wrk_list(wrk_t *wi_list)
return 0; return 0;
} }
int pgcomp_handler(wrk_t *w_list)
{
int ret=0;
opq_t *wrk_q=NULL, *comp_q=NULL;
wrk_q=&wq;
comp_q=&cq;
pthread_mutex_lock(&wrk_q->mtx);
/* setup work queue here.. */
wrk_q->flag = Q_EMPTY;
pthread_mutex_unlock(&wrk_q->mtx);
ret = q_insert_wrk_list(wrk_q, w_list);
if(ret != 0) {
fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n",
__FUNCTION__, &wq, w_list);
return -1;
}
retry_submit:
pthread_mutex_lock(&wrk_q->mtx);
/* setup work queue here.. */
wrk_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&wrk_q->mtx);
pthread_mutex_lock(&comp_q->mtx);
if(0 != set_done_cnt_flag(0)) {
fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&comp_q->mtx);
return -1;
}
comp_q->flag = Q_PROCESS;
pthread_mutex_unlock(&comp_q->mtx);
/* if threads are waiting request them to start */
pthread_mutex_lock(&wrk_q->mtx);
wrk_q->flag = Q_PROCESS;
pthread_cond_broadcast(&wrk_q->cv);
pthread_mutex_unlock(&wrk_q->mtx);
/* Wait on all worker-threads to complete */
pthread_mutex_lock(&comp_q->mtx);
if (comp_q->flag != Q_DONE) {
do {
pthread_cond_wait(&comp_q->cv, &comp_q->mtx);
if(comp_q->flag != Q_DONE) {
fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
if (done_cnt_flag != srv_buf_pool_instances) {
fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
}
continue;
} else if (done_cnt_flag != srv_buf_pool_instances) {
fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
comp_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&comp_q->mtx);
goto retry_submit;
assert(!done_cnt_flag);
continue;
}
assert(done_cnt_flag == srv_buf_pool_instances);
if ((comp_q->flag == Q_DONE) &&
(done_cnt_flag == srv_buf_pool_instances)) {
break;
}
} while((comp_q->flag == Q_INITIALIZED) &&
(done_cnt_flag != srv_buf_pool_instances));
} else {
fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
if (!done_cnt_flag) {
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
comp_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&comp_q->mtx);
goto retry_submit;
assert(!done_cnt_flag);
}
assert(done_cnt_flag == srv_buf_pool_instances);
}
pthread_mutex_unlock(&comp_q->mtx);
pthread_mutex_lock(&wrk_q->mtx);
wrk_q->flag = Q_DONE;
pthread_mutex_unlock(&wrk_q->mtx);
return 0;
}
/******************************************************************//** /******************************************************************//**
@return a dummy parameter*/ @return a dummy parameter*/
int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) int pgcomp_handler_init(int num_threads, int wrk_cnt, ib_wqueue_t *wq, ib_wqueue_t *wr_cq, ib_wqueue_t *rd_cq)
{ {
int i=0; int i=0;
...@@ -1984,20 +1718,17 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) ...@@ -1984,20 +1718,17 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
return -1; return -1;
} }
if(!wq || !cq) { if(!wq || !wr_cq || !rd_cq) {
fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq); fprintf(stderr, "%s() FAILED wq:%p write-cq:%p read-cq:%p\n",
__FUNCTION__, wq, wr_cq, rd_cq);
return -1; return -1;
} }
/* work-item setup */ /* work-item setup */
setup_wrk_itm(wrk_cnt); setup_wrk_itm(wrk_cnt);
/* wq & cq setup */
init_queue(wq);
init_queue(cq);
/* Mark each of the thread sync entires */ /* Mark each of the thread sync entires */
for(i=0; i < PGCOMP_MAX_WORKER; i++) { for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
pc_sync[i].wthread_id = i; pc_sync[i].wthread_id = i;
} }
...@@ -2005,85 +1736,71 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) ...@@ -2005,85 +1736,71 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
for(i=0; i < num_threads; i++) { for(i=0; i < num_threads; i++) {
pc_sync[i].wthread_id = i; pc_sync[i].wthread_id = i;
pc_sync[i].wq = wq; pc_sync[i].wq = wq;
pc_sync[i].cq = cq; pc_sync[i].wr_cq = wr_cq;
pc_sync[i].rd_cq = rd_cq;
os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
thread_ids + START_PGCOMP_CNT + i); thread_ids + START_OLD_THREAD_CNT + i);
//pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i]; pc_sync[i].wthread = (START_OLD_THREAD_CNT + i);
pc_sync[i].wthread = (START_PGCOMP_CNT + i);
pc_sync[i].wt_status = WTHR_INITIALIZED; pc_sync[i].wt_status = WTHR_INITIALIZED;
} }
set_check_done_flag_count(wrk_cnt);
set_pgcomp_wrk_init_done(); set_pgcomp_wrk_init_done();
fprintf(stderr, "%s() Worker-Threads created..\n", __FUNCTION__);
return 0; return 0;
} }
int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads) int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads)
{ {
long stat_tot=0; ulong stat_tot=0;
unsigned int i=0; ulint i=0;
for(i=0; i< num_threads;i++) { for(i=0; i<num_threads;i++) {
stat_tot+=wthr[i].stat_universal_num_processed; stat_tot+=wthr[i].stat_universal_num_processed;
fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id, fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
wthr[i].stat_universal_num_processed); wthr[i].stat_universal_num_processed);
} }
fprintf(stderr, "Stat-Total:%lu\n", stat_tot); fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
return (0);
} }
int reset_wrk_itm(int items) int reset_wrk_itm(int items)
{ {
int i; int i;
pthread_mutex_lock(&wq.mtx);
wq.head = wq.tail = NULL;
pthread_mutex_unlock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
for(i=0;i<items; i++) { for(i=0;i<items; i++) {
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
} }
cq.head = cq.tail = NULL;
pthread_mutex_unlock(&cq.mtx);
return 0; return 0;
} }
int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed, int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
int flush_type, int min_n, unsigned long long lsn_limit) enum buf_flush flush_type, int min_n, lsn_t lsn_limit)
{ {
int ret=0, i=0; int ret=0, i=0;
wrk_t *done_wi;
pthread_mutex_lock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
assert(wq.head == NULL);
assert(wq.tail == NULL);
if(cq.head) {
print_wrk_list(cq.head);
}
assert(cq.head == NULL);
assert(cq.tail == NULL);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
work_items[i].buf_pool = buf_pool_from_array(i); work_items[i].tsk = MT_WRK_WRITE;
work_items[i].flush_type = flush_type; work_items[i].rd.page_pool = NULL;
work_items[i].min = min_n; work_items[i].wr.buf_pool = buf_pool_from_array(i);
work_items[i].lsn_limit = lsn_limit; work_items[i].wr.flush_type = (enum buf_flush)flush_type;
work_items[i].wr.min = min_n;
work_items[i].wr.lsn_limit = lsn_limit;
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
work_items[i].next = &work_items[(i+1)%buf_pool_inst]; work_items[i].next = &work_items[(i+1)%buf_pool_inst];
work_items[i].wi_status = WRK_ITEM_SET; work_items[i].wi_status = WRK_ITEM_SET;
} }
work_items[i-1].next=NULL; work_items[i-1].next=NULL;
pthread_mutex_unlock(&cq.mtx); for(i=0;i<buf_pool_inst; i++) {
pthread_mutex_unlock(&wq.mtx); ib_wqueue_add(wq, (void *)(&work_items[i]), heap_allocated);
}
pgcomp_handler(work_items); /* wait on the completion to arrive */
for(i=0;i<buf_pool_inst; i++) {
done_wi = (wrk_t *)ib_wqueue_wait(wr_cq);
//fprintf(stderr, "%s: queue-wait DONE\n", __FUNCTION__);
ut_ad(done_wi != NULL);
}
pthread_mutex_lock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
/* collect data/results total pages flushed */ /* collect data/results total pages flushed */
for(i=0; i<buf_pool_inst; i++) { for(i=0; i<buf_pool_inst; i++) {
if(work_items[i].result == -1) { if(work_items[i].result == -1) {
...@@ -2092,26 +1809,13 @@ int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed, ...@@ -2092,26 +1809,13 @@ int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
} else { } else {
per_pool_pages_flushed[i] = work_items[i].result; per_pool_pages_flushed[i] = work_items[i].result;
} }
if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) { if((work_items[i].id_usr == -1) &&
fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type); (work_items[i].wi_status == WRK_ITEM_SET )) {
assert(0); fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].wr.flush_type);
//assert(0);
} }
} }
//wrk_thread_stat(pc_sync, pgc_n_threads);
wq.flag = cq.flag = Q_INITIALIZED;
pthread_mutex_unlock(&cq.mtx);
pthread_mutex_unlock(&wq.mtx);
#if UNIV_DEBUG
/* Print work-list stats */
fprintf(stderr, "==wq== [DONE]\n");
print_wrk_list(wq.head);
fprintf(stderr, "==cq== [DONE]\n");
print_wrk_list(cq.head);
fprintf(stderr, "==worker-thread-stats==\n");
wrk_thread_stat(pc_sync, pgc_n_threads);
#endif
/* clear up work-queue for next flush */ /* clear up work-queue for next flush */
reset_wrk_itm(buf_pool_inst); reset_wrk_itm(buf_pool_inst);
...@@ -3276,15 +2980,25 @@ files_checked: ...@@ -3276,15 +2980,25 @@ files_checked:
if (!srv_read_only_mode) { if (!srv_read_only_mode) {
/* JAN: TODO: */ /* JAN: TODO: */
if (srv_buf_pool_instances <= PGCOMP_MAX_WORKER) { if (srv_buf_pool_instances <= MTFLUSH_MAX_WORKER) {
pgc_n_threads = srv_buf_pool_instances; srv_mtflush_threads = srv_buf_pool_instances;
} }
/* else we default to 8 worker-threads */ /* else we default to 8 worker-threads */
pgcomp_handler_init(pgc_n_threads, srv_buf_pool_instances, &wq, &cq); heap_allocated = mem_heap_create(0);
/* JAN: TODO: END */ ut_a(heap_allocated != NULL);
wq = ib_wqueue_create();
wr_cq = ib_wqueue_create();
rd_cq = ib_wqueue_create();
pgcomp_init();
pgcomp_handler_init(srv_mtflush_threads,
srv_buf_pool_instances,
wq, wr_cq, rd_cq);
#if UNIV_DEBUG #if UNIV_DEBUG
fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances); fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances);
#endif #endif
/* JAN: TODO: END */
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL); os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
} }
...@@ -3549,6 +3263,14 @@ innobase_shutdown_for_mysql(void) ...@@ -3549,6 +3263,14 @@ innobase_shutdown_for_mysql(void)
logs_empty_and_mark_files_at_shutdown() and should have logs_empty_and_mark_files_at_shutdown() and should have
already quit or is quitting right now. */ already quit or is quitting right now. */
/* g. Exit the multi threaded flush threads */
page_comp_io_thread_exit();
#ifdef UNIV_DEBUG
fprintf(stderr, "%s:%d os_thread_count:%lu \n", __FUNCTION__, __LINE__, os_thread_count);
#endif
os_mutex_enter(os_sync_mutex); os_mutex_enter(os_sync_mutex);
if (os_thread_count == 0) { if (os_thread_count == 0) {
......
...@@ -46,6 +46,7 @@ Created 11/11/1995 Heikki Tuuri ...@@ -46,6 +46,7 @@ Created 11/11/1995 Heikki Tuuri
#include "ibuf0ibuf.h" #include "ibuf0ibuf.h"
#include "log0log.h" #include "log0log.h"
#include "os0file.h" #include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h" #include "trx0sys.h"
#include "srv0mon.h" #include "srv0mon.h"
#include "mysql/plugin.h" #include "mysql/plugin.h"
...@@ -2013,11 +2014,16 @@ buf_flush_LRU( ...@@ -2013,11 +2014,16 @@ buf_flush_LRU(
/* JAN: TODO: */ /* JAN: TODO: */
/*******************************************************************//**/ /*******************************************************************//**/
extern int is_pgcomp_wrk_init_done(void); extern int is_pgcomp_wrk_init_done(void);
extern int pgcomp_flush_work_items(int buf_pool_inst, int *pages_flushed, extern int pgcomp_flush_work_items(
int flush_type, int min_n, unsigned long long lsn_limit); int buf_pool_inst,
int *pages_flushed,
buf_flush_t flush_type,
int min_n,
lsn_t lsn_limit);
#define MT_COMP_WATER_MARK 50 #define MT_COMP_WATER_MARK 50
#ifdef UNIV_DEBUG
#include <time.h> #include <time.h>
int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time) int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time)
{ {
...@@ -2038,8 +2044,15 @@ int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_t ...@@ -2038,8 +2044,15 @@ int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_t
return 0; return 0;
} }
#endif
static os_fast_mutex_t pgcomp_mtx;
void pgcomp_init(void)
{
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &pgcomp_mtx);
}
static pthread_mutex_t pgcomp_mtx = PTHREAD_MUTEX_INITIALIZER;
/*******************************************************************//** /*******************************************************************//**
Multi-threaded version of buf_flush_list Multi-threaded version of buf_flush_list
*/ */
...@@ -2062,8 +2075,10 @@ pgcomp_buf_flush_list( ...@@ -2062,8 +2075,10 @@ pgcomp_buf_flush_list(
{ {
ulint i; ulint i;
bool success = true; bool success = true;
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time; struct timeval p_start_time, p_end_time, d_time;
flush_counters_t n; #endif
int cnt_flush[MTFLUSH_MAX_WORKER];
if (n_processed) { if (n_processed) {
*n_processed = 0; *n_processed = 0;
...@@ -2081,16 +2096,11 @@ pgcomp_buf_flush_list( ...@@ -2081,16 +2096,11 @@ pgcomp_buf_flush_list(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif #endif
if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) { os_fast_mutex_lock(&pgcomp_mtx);
int cnt_flush[32];
//stack_trace();
pthread_mutex_lock(&pgcomp_mtx);
//gettimeofday(&p_start_time, 0x0);
//fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n);
pgcomp_flush_work_items(srv_buf_pool_instances, pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST, cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit); min_n, lsn_limit);
os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) { for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) { if (n_processed) {
...@@ -2102,69 +2112,13 @@ pgcomp_buf_flush_list( ...@@ -2102,69 +2112,13 @@ pgcomp_buf_flush_list(
MONITOR_FLUSH_BATCH_COUNT, MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES, MONITOR_FLUSH_BATCH_PAGES,
cnt_flush[i]); cnt_flush[i]);
} }
} }
pthread_mutex_unlock(&pgcomp_mtx);
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu %llu usec]\n",
min_n * srv_buf_pool_instances), *n_processed, __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(success);
}
/* Flush to lsn_limit in all buffer pool instances */
for (i = 0; i < srv_buf_pool_instances; i++) {
buf_pool_t* buf_pool;
buf_pool = buf_pool_from_array(i);
if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
success = false;
continue;
}
buf_flush_batch(
buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit, false, &n);
buf_flush_end(buf_pool, BUF_FLUSH_LIST);
buf_flush_common(BUF_FLUSH_LIST, n.flushed);
if (n_processed) {
*n_processed += n.flushed;
}
if (n.flushed) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
n.flushed);
}
}
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", (
min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif #endif
return(success); return(success);
...@@ -2416,18 +2370,21 @@ ulint ...@@ -2416,18 +2370,21 @@ ulint
pgcomp_buf_flush_LRU_tail(void) pgcomp_buf_flush_LRU_tail(void)
/*====================*/ /*====================*/
{ {
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time; struct timeval p_start_time, p_end_time, d_time;
#endif
ulint total_flushed=0, i=0; ulint total_flushed=0, i=0;
int cnt_flush[32]; int cnt_flush[32];
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif #endif
assert(is_pgcomp_wrk_init_done()); ut_ad(is_pgcomp_wrk_init_done());
pthread_mutex_lock(&pgcomp_mtx); os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances, pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
os_fast_mutex_unlock(&pgcomp_mtx);
for (i = 0; i < srv_buf_pool_instances; i++) { for (i = 0; i < srv_buf_pool_instances; i++) {
if (cnt_flush[i]) { if (cnt_flush[i]) {
...@@ -2441,9 +2398,7 @@ pgcomp_buf_flush_LRU_tail(void) ...@@ -2441,9 +2398,7 @@ pgcomp_buf_flush_LRU_tail(void)
} }
} }
pthread_mutex_unlock(&pgcomp_mtx); #if UNIV_DEBUG
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
...@@ -2454,9 +2409,8 @@ pgcomp_buf_flush_LRU_tail(void) ...@@ -2454,9 +2409,8 @@ pgcomp_buf_flush_LRU_tail(void)
return(total_flushed); return(total_flushed);
} }
/* JAN: TODO: END: */
/* JAN: TODO: END: */
/*********************************************************************//** /*********************************************************************//**
Clears up tail of the LRU lists: Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list * Put replaceable pages at the tail of LRU to the free list
......
...@@ -1323,7 +1323,6 @@ fil_space_create( ...@@ -1323,7 +1323,6 @@ fil_space_create(
DBUG_EXECUTE_IF("fil_space_create_failure", return(false);); DBUG_EXECUTE_IF("fil_space_create_failure", return(false););
ut_a(fil_system); ut_a(fil_system);
ut_a(fsp_flags_is_valid(flags));
/* Look for a matching tablespace and if found free it. */ /* Look for a matching tablespace and if found free it. */
do { do {
...@@ -4989,21 +4988,42 @@ retry: ...@@ -4989,21 +4988,42 @@ retry:
#ifdef HAVE_POSIX_FALLOCATE #ifdef HAVE_POSIX_FALLOCATE
if (srv_use_posix_fallocate) { if (srv_use_posix_fallocate) {
ulint n_pages = size_after_extend;
success = os_file_set_size(node->name, node->handle, n_pages * page_size);
/* Temporal solution: In directFS using atomic writes
we must use posix_fallocate to extend the file because
pwrite past end of file fails but when compression is
used the file pages must be physically initialized with
zeroes, thus after file extend with posix_fallocate
we still write empty pages to file. */
if (success &&
srv_use_atomic_writes &&
srv_compress_pages) {
goto extend_file;
}
success = os_file_set_size(node->name, node->handle,
(size_after_extend
- file_start_page_no) * page_size);
mutex_enter(&fil_system->mutex); mutex_enter(&fil_system->mutex);
if (success) { if (success) {
node->size += (size_after_extend - start_page_no); node->size += n_pages;
space->size += (size_after_extend - start_page_no); space->size += n_pages;
os_has_said_disk_full = FALSE; os_has_said_disk_full = FALSE;
} }
node->being_extended = FALSE;
/* If posix_fallocate was used to extent the file space
we need to complete the io. Because no actual writes were
dispatched read operation is enough here. Without this
there will be assertion at shutdown indicating that
all IO is not completed. */
fil_node_complete_io(node, fil_system, OS_FILE_READ);
goto complete_io; goto complete_io;
} }
#endif #endif
extend_file:
/* Extend at most 64 pages at a time */ /* Extend at most 64 pages at a time */
buf_size = ut_min(64, size_after_extend - start_page_no) * page_size; buf_size = ut_min(64, size_after_extend - start_page_no) * page_size;
buf2 = static_cast<byte*>(mem_alloc(buf_size + page_size)); buf2 = static_cast<byte*>(mem_alloc(buf_size + page_size));
...@@ -5057,24 +5077,11 @@ retry: ...@@ -5057,24 +5077,11 @@ retry:
space->size += pages_added; space->size += pages_added;
node->size += pages_added; node->size += pages_added;
node->being_extended = FALSE;
#ifdef HAVE_POSIX_FALLOCATE
complete_io:
/* If posix_fallocate was used to extent the file space
we need to complete the io. Because no actual writes were
dispatched read operation is enough here. Without this
there will be assertion at shutdown indicating that
all IO is not completed. */
if (srv_use_posix_fallocate) {
fil_node_complete_io(node, fil_system, OS_FILE_READ);
} else {
fil_node_complete_io(node, fil_system, OS_FILE_WRITE);
}
#else
fil_node_complete_io(node, fil_system, OS_FILE_WRITE); fil_node_complete_io(node, fil_system, OS_FILE_WRITE);
#endif
complete_io:
node->being_extended = FALSE;
*actual_size = space->size; *actual_size = space->size;
#ifndef UNIV_HOTBACKUP #ifndef UNIV_HOTBACKUP
......
...@@ -873,7 +873,6 @@ dict_tf_set( ...@@ -873,7 +873,6 @@ dict_tf_set(
(awrites == ATOMIC_WRITES_DEFAULT && srv_use_atomic_writes )) { (awrites == ATOMIC_WRITES_DEFAULT && srv_use_atomic_writes )) {
*flags |= (1 << DICT_TF_POS_ATOMIC_BLOBS); *flags |= (1 << DICT_TF_POS_ATOMIC_BLOBS);
} }
} }
/********************************************************************//** /********************************************************************//**
......
...@@ -277,6 +277,10 @@ extern my_bool srv_use_atomic_writes; ...@@ -277,6 +277,10 @@ extern my_bool srv_use_atomic_writes;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */ /* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
extern my_bool srv_use_lz4; extern my_bool srv_use_lz4;
/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
extern ulint srv_mtflush_threads;
/** Server undo tablespaces directory, can be absolute path. */ /** Server undo tablespaces directory, can be absolute path. */
extern char* srv_undo_dir; extern char* srv_undo_dir;
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, Google Inc. Copyright (c) 2008, Google Inc.
Copyright (c) 2009, Percona Inc. Copyright (c) 2009, Percona Inc.
Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described Google, Inc. Those modifications are gratefully acknowledged and are described
...@@ -131,14 +131,9 @@ static os_file_t files[1000]; ...@@ -131,14 +131,9 @@ static os_file_t files[1000];
/** io_handler_thread parameters for thread identification */ /** io_handler_thread parameters for thread identification */
static ulint n[SRV_MAX_N_IO_THREADS + 6]; static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */ /** io_handler_thread identifiers, 32 is the maximum number of purge threads */
/* /** 6 is the ? */
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 #define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS)
/ + SRV_MAX_N_PURGE_THREADS]; static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS + MTFLUSH_MAX_WORKER];
*/
/** pgcomp_thread are 16 total */
#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS)
#define PGCOMP_MAX_WORKER 16
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + SRV_MAX_N_PURGE_THREADS + PGCOMP_MAX_WORKER];
/** We use this mutex to test the return value of pthread_mutex_trylock /** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */ on successful locking. HP-UX does NOT return 0, though Linux et al do. */
...@@ -1519,17 +1514,27 @@ extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timev ...@@ -1519,17 +1514,27 @@ extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timev
extern ibool buf_flush_start(buf_pool_t* buf_pool, buf_flush_t flush_type); extern ibool buf_flush_start(buf_pool_t* buf_pool, buf_flush_t flush_type);
extern void buf_flush_end(buf_pool_t* buf_pool, buf_flush_t flush_type); extern void buf_flush_end(buf_pool_t* buf_pool, buf_flush_t flush_type);
extern void buf_flush_common(buf_flush_t flush_type, ulint page_count); extern void buf_flush_common(buf_flush_t flush_type, ulint page_count);
extern ulint buf_flush_batch(buf_pool_t* buf_pool, buf_flush_t flush_type, ulint min_n, lsn_t lsn_limit, bool limited_lru_scan, flush_counters_t*); extern ulint buf_flush_batch(buf_pool_t* buf_pool, buf_flush_t flush_type, ulint min_n, lsn_t lsn_limit, bool limited_lru_scan,
flush_counters_t* n);
extern void pgcomp_init(void);
typedef enum wrk_status { typedef enum wrk_status {
WRK_ITEM_SET=0, WRK_ITEM_SET=0, // wrk-item is set
WRK_ITEM_START=1, WRK_ITEM_START=1, // processing of wrk-item has started
WRK_ITEM_DONE=2, WRK_ITEM_DONE=2, // processing is done usually set to SUCCESS/FAILED
WRK_ITEM_SUCCESS=2, WRK_ITEM_SUCCESS=2, // Success processing the wrk-item
WRK_ITEM_FAILED=3, WRK_ITEM_FAILED=3, // status of failed
WRK_ITEM_EXIT=4,
WRK_ITEM_STATUS_UNDEFINED WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t; } wrk_status_t;
typedef enum mt_wrk_tsk {
MT_WRK_NONE=0, // Exit queue-wait
MT_WRK_WRITE=1, // Flush operation
MT_WRK_READ=2, // Decompress operation
MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
typedef enum wthr_status { typedef enum wthr_status {
WTHR_NOT_INIT=0, WTHR_NOT_INIT=0,
WTHR_INITIALIZED=1, WTHR_INITIALIZED=1,
...@@ -1540,16 +1545,24 @@ typedef enum wthr_status { ...@@ -1540,16 +1545,24 @@ typedef enum wthr_status {
WTHR_STATUS_UNDEFINED WTHR_STATUS_UNDEFINED
} wthr_status_t; } wthr_status_t;
typedef struct wr_tsk {
buf_pool_t *buf_pool; // buffer-pool instance
buf_flush_t flush_type; // flush-type for buffer-pool flush operation
ulint min; //minimum number of pages requested to be flushed
lsn_t lsn_limit;//lsn limit for the buffer-pool flush operation
} wr_tsk_t;
typedef struct rd_tsk {
void *page_pool; //list of pages to decompress;
} rd_tsk_t;
typedef struct wrk_itm typedef struct wrk_itm
{ {
/****************************/ mt_wrk_tsk_t tsk;
/* Need to group into struct*/ /* based on task-type one of the entries wr_tsk/rd_tsk will be used */
buf_pool_t* buf_pool; //buffer-pool instance wr_tsk_t wr; //flush page list
int flush_type; //flush-type for buffer-pool flush operation rd_tsk_t rd; //decompress page list
int min; //minimum number of pages requested to be flushed
unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation
/****************************/
unsigned long result; //flush pages count unsigned long result; //flush pages count
unsigned long t_usec; //time-taken in usec unsigned long t_usec; //time-taken in usec
long id_usr; //thread-id currently working long id_usr; //thread-id currently working
...@@ -1557,52 +1570,25 @@ typedef struct wrk_itm ...@@ -1557,52 +1570,25 @@ typedef struct wrk_itm
struct wrk_itm *next; struct wrk_itm *next;
} wrk_t; } wrk_t;
typedef enum op_q_status {
Q_NOT_INIT=0,
Q_EMPTY=1,
Q_INITIALIZED=2,
Q_PROCESS=3,
Q_DONE=4,
Q_ERROR=5,
Q_STATUS_UNDEFINED
} q_status_t;
typedef struct op_queue
{
pthread_mutex_t mtx;
pthread_cond_t cv;
q_status_t flag;
wrk_t *head;
wrk_t *tail;
} opq_t;
opq_t wq, cq;
typedef struct thread_sync typedef struct thread_sync
{ {
int wthread_id; int wthread_id;
pthread_t wthread; os_thread_t wthread;
opq_t *wq; ib_wqueue_t *wq; // work Queue
opq_t *cq; ib_wqueue_t *wr_cq;// Write Completion Queue
wthr_status_t wt_status; ib_wqueue_t *rd_cq; // Read Completion Queue
wthr_status_t wt_status; // Worker Thread status
unsigned long stat_universal_num_processed; unsigned long stat_universal_num_processed;
unsigned long stat_cycle_num_processed; unsigned long stat_cycle_num_processed;
} thread_sync_t; } thread_sync_t;
/* Global XXX:DD needs to be cleaned */ /* Global XXX:DD needs to be cleaned */
int exit_flag; ib_wqueue_t *wq=NULL, *wr_cq=NULL, *rd_cq=NULL;
ulint check_wrk_done_count; mem_heap_t *heap_allocated=NULL;
static ulint done_cnt_flag; thread_sync_t pc_sync[MTFLUSH_MAX_WORKER];
static int pgc_n_threads = 8; static wrk_t work_items[MTFLUSH_MAX_WORKER];
thread_sync_t pc_sync[PGCOMP_MAX_WORKER];
static wrk_t work_items[PGCOMP_MAX_WORKER];
static int pgcomp_wrk_initialized = -1; static int pgcomp_wrk_initialized = -1;
ulint srv_mtflush_threads = 0;
int set_check_done_flag_count(int cnt)
{
return(check_wrk_done_count = cnt);
}
int set_pgcomp_wrk_init_done(void) int set_pgcomp_wrk_init_done(void)
{ {
...@@ -1615,83 +1601,14 @@ int is_pgcomp_wrk_init_done(void) ...@@ -1615,83 +1601,14 @@ int is_pgcomp_wrk_init_done(void)
return(pgcomp_wrk_initialized == 1); return(pgcomp_wrk_initialized == 1);
} }
ulint set_done_cnt_flag(ulint val)
{
/*
* Assumption: The thread calling into set_done_cnt_flag
* needs to have "cq.mtx" acquired, else not safe.
*/
done_cnt_flag = val;
return done_cnt_flag;
}
ulint cv_done_inc_flag_sig(thread_sync_t * ppc)
{
pthread_mutex_lock(&ppc->cq->mtx);
ppc->stat_universal_num_processed++;
ppc->stat_cycle_num_processed++;
done_cnt_flag++;
if(!(done_cnt_flag <= check_wrk_done_count)) {
fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n",
done_cnt_flag, check_wrk_done_count);
}
assert(done_cnt_flag <= check_wrk_done_count);
pthread_mutex_unlock(&ppc->cq->mtx);
if(done_cnt_flag == check_wrk_done_count) {
ppc->wq->flag = Q_DONE;
pthread_mutex_lock(&ppc->cq->mtx);
ppc->cq->flag = Q_DONE;
pthread_cond_signal(&ppc->cq->cv);
pthread_mutex_unlock(&ppc->cq->mtx);
}
return(done_cnt_flag);
}
int q_remove_wrk(opq_t *q, wrk_t **wi)
{
int ret = 0;
if(!wi || !q) {
return -1;
}
pthread_mutex_lock(&q->mtx);
assert(!((q->tail == NULL) && (q->head != NULL)));
assert(!((q->tail != NULL) && (q->head == NULL)));
/* get the first in the list*/
*wi = q->head;
if(q->head) {
ret = 0;
q->head = q->head->next;
(*wi)->next = NULL;
if(!q->head) {
q->tail = NULL;
}
} else {
q->tail = NULL;
ret = 1; /* indicating remove from queue failed */
}
pthread_mutex_unlock(&q->mtx);
return (ret);
}
int is_busy_wrk_itm(wrk_t *wi)
{
if(!wi) {
return -1;
}
return(!(wi->id_usr == -1));
}
int setup_wrk_itm(int items) int setup_wrk_itm(int items)
{ {
int i; int i;
for(i=0; i<items; i++) { for(i=0; i<items; i++) {
work_items[i].buf_pool = NULL; work_items[i].rd.page_pool = NULL;
work_items[i].result = 0; work_items[i].wr.buf_pool = NULL;
work_items[i].t_usec = 0; work_items[i].t_usec = 0;
work_items[i].result = 0;
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED; work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
work_items[i].next = &work_items[(i+1)%items]; work_items[i].next = &work_items[(i+1)%items];
...@@ -1701,82 +1618,26 @@ int setup_wrk_itm(int items) ...@@ -1701,82 +1618,26 @@ int setup_wrk_itm(int items)
return 0; return 0;
} }
int init_queue(opq_t *q) int flush_pool_instance(wrk_t *wi)
{
if(!q) {
return -1;
}
/* Initialize Queue mutex and CV */
pthread_mutex_init(&q->mtx, NULL);
pthread_cond_init(&q->cv, NULL);
q->flag = Q_INITIALIZED;
q->head = q->tail = NULL;
return 0;
}
#if 0
int drain_cq(opq_t *cq, int items)
{ {
int i=0; flush_counters_t n;
#ifdef UNIV_DEBUG
if(!cq) { struct timeval p_start_time, p_end_time, d_time;
return -1;
}
pthread_mutex_lock(&cq->mtx);
for(i=0; i<items; i++) {
work_items[i].result=0;
work_items[i].t_usec = 0;
work_items[i].id_usr = -1;
}
cq->head = cq->tail = NULL;
pthread_mutex_unlock(&cq->mtx);
return 0;
}
#endif #endif
int q_insert_wrk_list(opq_t *q, wrk_t *w_list) if (!wi) {
{ fprintf(stderr, "work item invalid wi:%p\n", wi);
if((!q) || (!w_list)) {
fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list);
return -1; return -1;
} }
pthread_mutex_lock(&q->mtx); if (!wi->wr.buf_pool) {
fprintf(stderr, "work-item wi->buf_pool:%p [likely thread exit]\n",
assert(!((q->tail == NULL) && (q->head != NULL))); wi->wr.buf_pool);
assert(!((q->tail != NULL) && (q->head == NULL)));
/* list is empty */
if(!q->tail) {
q->head = q->tail = w_list;
} else {
/* added the first of the node to list */
assert(q->head != NULL);
q->tail->next = w_list;
}
/* move tail to the last node */
while(q->tail->next) {
q->tail = q->tail->next;
}
pthread_mutex_unlock(&q->mtx);
return 0;
}
int flush_pool_instance(wrk_t *wi)
{
struct timeval p_start_time, p_end_time, d_time;
flush_counters_t n;
if(!wi) {
fprintf(stderr, "work item invalid wi:%p\n", wi);
return -1; return -1;
} }
wi->t_usec = 0; wi->t_usec = 0;
if (!buf_flush_start(wi->buf_pool, (buf_flush_t)wi->flush_type)) { if (!buf_flush_start(wi->wr.buf_pool, wi->wr.flush_type)) {
/* We have two choices here. If lsn_limit was /* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer specified then skipping an instance of buffer
pool means we cannot guarantee that all pages pool means we cannot guarantee that all pages
...@@ -1788,7 +1649,7 @@ int flush_pool_instance(wrk_t *wi) ...@@ -1788,7 +1649,7 @@ int flush_pool_instance(wrk_t *wi)
help in the retry which will follow the help in the retry which will follow the
failure. */ failure. */
fprintf(stderr, "flush_start Failed, flush_type:%d\n", fprintf(stderr, "flush_start Failed, flush_type:%d\n",
(buf_flush_t)wi->flush_type); wi->wr.flush_type);
return -1; return -1;
} }
...@@ -1797,32 +1658,28 @@ int flush_pool_instance(wrk_t *wi) ...@@ -1797,32 +1658,28 @@ int flush_pool_instance(wrk_t *wi)
gettimeofday(&p_start_time, 0x0); gettimeofday(&p_start_time, 0x0);
#endif #endif
if((buf_flush_t)wi->flush_type == BUF_FLUSH_LRU) { if (wi->wr.flush_type == BUF_FLUSH_LRU) {
/* srv_LRU_scan_depth can be arbitrarily large value. /* srv_LRU_scan_depth can be arbitrarily large value.
* We cap it with current LRU size. * We cap it with current LRU size.
*/ */
buf_pool_mutex_enter(wi->buf_pool); buf_pool_mutex_enter(wi->wr.buf_pool);
wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU); wi->wr.min = UT_LIST_GET_LEN(wi->wr.buf_pool->LRU);
buf_pool_mutex_exit(wi->buf_pool); buf_pool_mutex_exit(wi->wr.buf_pool);
wi->min = ut_min(srv_LRU_scan_depth,wi->min); wi->wr.min = ut_min(srv_LRU_scan_depth,wi->wr.min);
} }
buf_flush_batch(wi->buf_pool, wi->result = buf_flush_batch(wi->wr.buf_pool,
(buf_flush_t)wi->flush_type, wi->wr.flush_type,
wi->min, wi->lsn_limit, false, &n); wi->wr.min, wi->wr.lsn_limit,
false, &n);
wi->result = n.flushed;
buf_flush_end(wi->buf_pool, (buf_flush_t)wi->flush_type); buf_flush_end(wi->wr.buf_pool, wi->wr.flush_type);
buf_flush_common((buf_flush_t)wi->flush_type, wi->result); buf_flush_common(wi->wr.flush_type, wi->result);
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0); gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time); timediff(&p_end_time, &p_start_time, &d_time);
wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
#endif
return 0; return 0;
} }
...@@ -1831,68 +1688,75 @@ int service_page_comp_io(thread_sync_t * ppc) ...@@ -1831,68 +1688,75 @@ int service_page_comp_io(thread_sync_t * ppc)
wrk_t *wi = NULL; wrk_t *wi = NULL;
int ret=0; int ret=0;
pthread_mutex_lock(&ppc->wq->mtx);
do{
ppc->wt_status = WTHR_SIG_WAITING; ppc->wt_status = WTHR_SIG_WAITING;
ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx); wi = (wrk_t *)ib_wqueue_wait(ppc->wq);
if (wi) {
ppc->wt_status = WTHR_RUNNING; ppc->wt_status = WTHR_RUNNING;
if(ret == ETIMEDOUT) {
fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n",
done_cnt_flag, ret);
} else if(ret == EINVAL || ret == EPERM) {
fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n",
done_cnt_flag, ret);
}
if(ppc->wq->flag == Q_PROCESS) {
break;
} else { } else {
pthread_mutex_unlock(&ppc->wq->mtx); fprintf(stderr, "%s:%d work-item is NULL\n", __FILE__, __LINE__);
return -1;
}
} while (ppc->wq->flag == Q_PROCESS && ret == 0);
pthread_mutex_unlock(&ppc->wq->mtx);
while (ppc->cq->flag == Q_PROCESS) {
wi = NULL;
/* Get the work item */
if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) {
ppc->wt_status = WTHR_NO_WORK; ppc->wt_status = WTHR_NO_WORK;
return -1; return (0);
} }
assert(ret==0);
assert(wi != NULL); assert(wi != NULL);
assert(0 == is_busy_wrk_itm(wi));
assert(wi->id_usr == -1);
wi->id_usr = ppc->wthread; wi->id_usr = ppc->wthread;
wi->wi_status = WRK_ITEM_START;
switch(wi->tsk) {
case MT_WRK_NONE:
assert(wi->wi_status == WRK_ITEM_EXIT);
wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break;
case MT_WRK_WRITE:
wi->wi_status = WRK_ITEM_START;
/* Process work item */ /* Process work item */
if(0 != (ret = flush_pool_instance(wi))) { if (0 != (ret = flush_pool_instance(wi))) {
fprintf(stderr, "FLUSH op failed ret:%d\n", ret); fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
wi->wi_status = WRK_ITEM_FAILED; wi->wi_status = WRK_ITEM_FAILED;
} }
ret = q_insert_wrk_list(ppc->cq, wi);
assert(0==ret);
assert(check_wrk_done_count >= done_cnt_flag);
wi->wi_status = WRK_ITEM_SUCCESS; wi->wi_status = WRK_ITEM_SUCCESS;
if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) { ib_wqueue_add(ppc->wr_cq, wi, heap_allocated);
break; break;
case MT_WRK_READ:
/* Need to also handle the read case */
assert(0);
/* completed task get added to rd_cq */
/* wi->wi_status = WRK_ITEM_SUCCESS;
ib_wqueue_add(ppc->rd_cq, wi, heap_allocated);*/
break;
default:
/* None other than Write/Read handling planned */
assert(0);
} }
}
ppc->wt_status = WTHR_NO_WORK;
return(0); return(0);
} }
void page_comp_io_thread_exit()
{
ulint i;
fprintf(stderr, "signal page_comp_io_threads to exit [%lu]\n", srv_buf_pool_instances);
for (i=0; i<srv_buf_pool_instances; i++) {
work_items[i].wr.buf_pool = NULL;
work_items[i].rd.page_pool = NULL;
work_items[i].tsk = MT_WRK_NONE;
work_items[i].wi_status = WRK_ITEM_EXIT;
ib_wqueue_add(wq, (void *)&work_items[i], heap_allocated);
}
}
/******************************************************************//** /******************************************************************//**
@return a dummy parameter*/ @return a dummy parameter*/
extern "C" UNIV_INTERN extern "C" UNIV_INTERN
os_thread_ret_t os_thread_ret_t
DECLARE_THREAD(page_comp_io_thread)( DECLARE_THREAD(page_comp_io_thread)(
/*==========================================*/ /*================================*/
void * arg) void * arg)
{ {
thread_sync_t *ppc_io = ((thread_sync_t *)arg); thread_sync_t *ppc_io = ((thread_sync_t *)arg);
...@@ -1905,34 +1769,6 @@ DECLARE_THREAD(page_comp_io_thread)( ...@@ -1905,34 +1769,6 @@ DECLARE_THREAD(page_comp_io_thread)(
OS_THREAD_DUMMY_RETURN; OS_THREAD_DUMMY_RETURN;
} }
int print_queue_wrk_itm(opq_t *q)
{
#if UNIV_DEBUG
wrk_t *wi = NULL;
if(!q) {
fprintf(stderr, "queue NULL\n");
return -1;
}
if(!q->head || !q->tail) {
assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL))));
fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail);
return 0;
}
pthread_mutex_lock(&q->mtx);
for(wi = q->head; (wi != NULL) ; wi = wi->next) {
//fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
// wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
fprintf(stderr, "- [%p] [%s] >%p\n",
wi, (wi->id_usr == -1)?"free":"Busy", wi->next);
}
pthread_mutex_unlock(&q->mtx);
#endif
return(0);
}
int print_wrk_list(wrk_t *wi_list) int print_wrk_list(wrk_t *wi_list)
{ {
wrk_t *wi = wi_list; wrk_t *wi = wi_list;
...@@ -1952,111 +1788,9 @@ int print_wrk_list(wrk_t *wi_list) ...@@ -1952,111 +1788,9 @@ int print_wrk_list(wrk_t *wi_list)
return 0; return 0;
} }
int pgcomp_handler(wrk_t *w_list)
{
int ret=0;
opq_t *wrk_q=NULL, *comp_q=NULL;
wrk_q=&wq;
comp_q=&cq;
pthread_mutex_lock(&wrk_q->mtx);
/* setup work queue here.. */
wrk_q->flag = Q_EMPTY;
pthread_mutex_unlock(&wrk_q->mtx);
ret = q_insert_wrk_list(wrk_q, w_list);
if(ret != 0) {
fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n",
__FUNCTION__, &wq, w_list);
return -1;
}
retry_submit:
pthread_mutex_lock(&wrk_q->mtx);
/* setup work queue here.. */
wrk_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&wrk_q->mtx);
pthread_mutex_lock(&comp_q->mtx);
if(0 != set_done_cnt_flag(0)) {
fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&comp_q->mtx);
return -1;
}
comp_q->flag = Q_PROCESS;
pthread_mutex_unlock(&comp_q->mtx);
/* if threads are waiting request them to start */
pthread_mutex_lock(&wrk_q->mtx);
wrk_q->flag = Q_PROCESS;
pthread_cond_broadcast(&wrk_q->cv);
pthread_mutex_unlock(&wrk_q->mtx);
/* Wait on all worker-threads to complete */
pthread_mutex_lock(&comp_q->mtx);
if (comp_q->flag != Q_DONE) {
do {
pthread_cond_wait(&comp_q->cv, &comp_q->mtx);
if(comp_q->flag != Q_DONE) {
fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
if (done_cnt_flag != srv_buf_pool_instances) {
fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
}
continue;
} else if (done_cnt_flag != srv_buf_pool_instances) {
fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
comp_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&comp_q->mtx);
goto retry_submit;
assert(!done_cnt_flag);
continue;
}
assert(done_cnt_flag == srv_buf_pool_instances);
if ((comp_q->flag == Q_DONE) &&
(done_cnt_flag == srv_buf_pool_instances)) {
break;
}
} while((comp_q->flag == Q_INITIALIZED) &&
(done_cnt_flag != srv_buf_pool_instances));
} else {
fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n",
comp_q->flag, done_cnt_flag);
if (!done_cnt_flag) {
fprintf(stderr, "============\n");
print_wrk_list(w_list);
fprintf(stderr, "============\n");
comp_q->flag = Q_INITIALIZED;
pthread_mutex_unlock(&comp_q->mtx);
goto retry_submit;
assert(!done_cnt_flag);
}
assert(done_cnt_flag == srv_buf_pool_instances);
}
pthread_mutex_unlock(&comp_q->mtx);
pthread_mutex_lock(&wrk_q->mtx);
wrk_q->flag = Q_DONE;
pthread_mutex_unlock(&wrk_q->mtx);
return 0;
}
/******************************************************************//** /******************************************************************//**
@return a dummy parameter*/ @return a dummy parameter*/
int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) int pgcomp_handler_init(int num_threads, int wrk_cnt, ib_wqueue_t *wq, ib_wqueue_t *wr_cq, ib_wqueue_t *rd_cq)
{ {
int i=0; int i=0;
...@@ -2065,20 +1799,17 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) ...@@ -2065,20 +1799,17 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
return -1; return -1;
} }
if(!wq || !cq) { if(!wq || !wr_cq || !rd_cq) {
fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq); fprintf(stderr, "%s() FAILED wq:%p write-cq:%p read-cq:%p\n",
__FUNCTION__, wq, wr_cq, rd_cq);
return -1; return -1;
} }
/* work-item setup */ /* work-item setup */
setup_wrk_itm(wrk_cnt); setup_wrk_itm(wrk_cnt);
/* wq & cq setup */
init_queue(wq);
init_queue(cq);
/* Mark each of the thread sync entires */ /* Mark each of the thread sync entires */
for(i=0; i < PGCOMP_MAX_WORKER; i++) { for(i=0; i < MTFLUSH_MAX_WORKER; i++) {
pc_sync[i].wthread_id = i; pc_sync[i].wthread_id = i;
} }
...@@ -2086,85 +1817,71 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) ...@@ -2086,85 +1817,71 @@ int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
for(i=0; i < num_threads; i++) { for(i=0; i < num_threads; i++) {
pc_sync[i].wthread_id = i; pc_sync[i].wthread_id = i;
pc_sync[i].wq = wq; pc_sync[i].wq = wq;
pc_sync[i].cq = cq; pc_sync[i].wr_cq = wr_cq;
pc_sync[i].rd_cq = rd_cq;
os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
thread_ids + START_PGCOMP_CNT + i); thread_ids + START_OLD_THREAD_CNT + i);
//pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i]; pc_sync[i].wthread = (START_OLD_THREAD_CNT + i);
pc_sync[i].wthread = (START_PGCOMP_CNT + i);
pc_sync[i].wt_status = WTHR_INITIALIZED; pc_sync[i].wt_status = WTHR_INITIALIZED;
} }
set_check_done_flag_count(wrk_cnt);
set_pgcomp_wrk_init_done(); set_pgcomp_wrk_init_done();
fprintf(stderr, "%s() Worker-Threads created..\n", __FUNCTION__);
return 0; return 0;
} }
int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads) int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads)
{ {
long stat_tot=0; ulong stat_tot=0;
unsigned int i=0; ulint i=0;
for(i=0; i< num_threads;i++) { for(i=0; i<num_threads;i++) {
stat_tot+=wthr[i].stat_universal_num_processed; stat_tot+=wthr[i].stat_universal_num_processed;
fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id, fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
wthr[i].stat_universal_num_processed); wthr[i].stat_universal_num_processed);
} }
fprintf(stderr, "Stat-Total:%lu\n", stat_tot); fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
return (0);
} }
int reset_wrk_itm(int items) int reset_wrk_itm(int items)
{ {
int i; int i;
pthread_mutex_lock(&wq.mtx);
wq.head = wq.tail = NULL;
pthread_mutex_unlock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
for(i=0;i<items; i++) { for(i=0;i<items; i++) {
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
} }
cq.head = cq.tail = NULL;
pthread_mutex_unlock(&cq.mtx);
return 0; return 0;
} }
int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed, int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
int flush_type, int min_n, unsigned long long lsn_limit) buf_flush_t flush_type, int min_n, lsn_t lsn_limit)
{ {
int ret=0, i=0; int ret=0, i=0;
wrk_t *done_wi;
pthread_mutex_lock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
assert(wq.head == NULL);
assert(wq.tail == NULL);
if(cq.head) {
print_wrk_list(cq.head);
}
assert(cq.head == NULL);
assert(cq.tail == NULL);
for(i=0;i<buf_pool_inst; i++) { for(i=0;i<buf_pool_inst; i++) {
work_items[i].buf_pool = buf_pool_from_array(i); work_items[i].tsk = MT_WRK_WRITE;
work_items[i].flush_type = flush_type; work_items[i].rd.page_pool = NULL;
work_items[i].min = min_n; work_items[i].wr.buf_pool = buf_pool_from_array(i);
work_items[i].lsn_limit = lsn_limit; work_items[i].wr.flush_type = flush_type;
work_items[i].wr.min = min_n;
work_items[i].wr.lsn_limit = lsn_limit;
work_items[i].id_usr = -1; work_items[i].id_usr = -1;
work_items[i].next = &work_items[(i+1)%buf_pool_inst]; work_items[i].next = &work_items[(i+1)%buf_pool_inst];
work_items[i].wi_status = WRK_ITEM_SET; work_items[i].wi_status = WRK_ITEM_SET;
} }
work_items[i-1].next=NULL; work_items[i-1].next=NULL;
pthread_mutex_unlock(&cq.mtx); for(i=0;i<buf_pool_inst; i++) {
pthread_mutex_unlock(&wq.mtx); ib_wqueue_add(wq, (void *)(&work_items[i]), heap_allocated);
}
pgcomp_handler(work_items); /* wait on the completion to arrive */
for(i=0;i<buf_pool_inst; i++) {
done_wi = (wrk_t *)ib_wqueue_wait(wr_cq);
//fprintf(stderr, "%s: queue-wait DONE\n", __FUNCTION__);
ut_ad(done_wi != NULL);
}
pthread_mutex_lock(&wq.mtx);
pthread_mutex_lock(&cq.mtx);
/* collect data/results total pages flushed */ /* collect data/results total pages flushed */
for(i=0; i<buf_pool_inst; i++) { for(i=0; i<buf_pool_inst; i++) {
if(work_items[i].result == -1) { if(work_items[i].result == -1) {
...@@ -2173,26 +1890,13 @@ int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed, ...@@ -2173,26 +1890,13 @@ int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
} else { } else {
per_pool_pages_flushed[i] = work_items[i].result; per_pool_pages_flushed[i] = work_items[i].result;
} }
if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) { if((work_items[i].id_usr == -1) &&
fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type); (work_items[i].wi_status == WRK_ITEM_SET )) {
assert(0); fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].wr.flush_type);
//assert(0);
} }
} }
//wrk_thread_stat(pc_sync, pgc_n_threads);
wq.flag = cq.flag = Q_INITIALIZED;
pthread_mutex_unlock(&cq.mtx);
pthread_mutex_unlock(&wq.mtx);
#if UNIV_DEBUG
/* Print work-list stats */
fprintf(stderr, "==wq== [DONE]\n");
print_wrk_list(wq.head);
fprintf(stderr, "==cq== [DONE]\n");
print_wrk_list(cq.head);
fprintf(stderr, "==worker-thread-stats==\n");
wrk_thread_stat(pc_sync, pgc_n_threads);
#endif
/* clear up work-queue for next flush */ /* clear up work-queue for next flush */
reset_wrk_itm(buf_pool_inst); reset_wrk_itm(buf_pool_inst);
...@@ -3409,15 +3113,24 @@ files_checked: ...@@ -3409,15 +3113,24 @@ files_checked:
if (!srv_read_only_mode) { if (!srv_read_only_mode) {
/* JAN: TODO: */ /* JAN: TODO: */
if (srv_buf_pool_instances <= PGCOMP_MAX_WORKER) { if (srv_buf_pool_instances <= MTFLUSH_MAX_WORKER) {
pgc_n_threads = srv_buf_pool_instances; srv_mtflush_threads = srv_buf_pool_instances;
} }
/* else we default to 8 worker-threads */ heap_allocated = mem_heap_create(0);
pgcomp_handler_init(pgc_n_threads, srv_buf_pool_instances, &wq, &cq); ut_a(heap_allocated != NULL);
/* JAN: TODO: END */
wq = ib_wqueue_create();
wr_cq = ib_wqueue_create();
rd_cq = ib_wqueue_create();
pgcomp_init();
pgcomp_handler_init(srv_mtflush_threads,
srv_buf_pool_instances,
wq, wr_cq, rd_cq);
#if UNIV_DEBUG #if UNIV_DEBUG
fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances); fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances);
#endif #endif
/* JAN: TODO: END */
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL); os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment