Commit 178ac5b7 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4750], address cr comments of checkpoint variability

git-svn-id: file:///svn/toku/tokudb@42100 c7de825b-a66e-492c-adef-691d508d4ae1
parent e9f6d557
...@@ -658,7 +658,7 @@ static void brtnode_clone_partitions(BRTNODE node, BRTNODE cloned_node) { ...@@ -658,7 +658,7 @@ static void brtnode_clone_partitions(BRTNODE node, BRTNODE cloned_node) {
BP_STATE(cloned_node,i) = PT_AVAIL; BP_STATE(cloned_node,i) = PT_AVAIL;
BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i); BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
if (node->height == 0) { if (node->height == 0) {
set_BLB(cloned_node, i,toku_clone_bn(BLB(node,i))); set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
} }
else { else {
set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i))); set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
......
...@@ -46,7 +46,6 @@ static u_int64_t cachetable_misstime; // time spent waiting for disk read ...@@ -46,7 +46,6 @@ static u_int64_t cachetable_misstime; // time spent waiting for disk read
static u_int64_t cachetable_waittime; // time spent waiting for another thread to release lock (e.g. prefetch, writing) static u_int64_t cachetable_waittime; // time spent waiting for another thread to release lock (e.g. prefetch, writing)
static u_int64_t cachetable_wait_reading; // how many times does get_and_pin() wait for a node to be read? static u_int64_t cachetable_wait_reading; // how many times does get_and_pin() wait for a node to be read?
static u_int64_t cachetable_wait_writing; // how many times does get_and_pin() wait for a node to be written? static u_int64_t cachetable_wait_writing; // how many times does get_and_pin() wait for a node to be written?
static u_int64_t cachetable_wait_checkpoint; // number of times get_and_pin waits for a node to be written for a checkpoint
static u_int64_t cachetable_puts; // how many times has a newly created node been put into the cachetable? static u_int64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static u_int64_t cachetable_maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called? static u_int64_t cachetable_maybe_get_and_pins; // how many times has maybe_get_and_pin(_clean) been called?
...@@ -76,7 +75,6 @@ status_init(void) { ...@@ -76,7 +75,6 @@ status_init(void) {
STATUS_INIT(CT_WAITTIME, UINT64, "wait time"); STATUS_INIT(CT_WAITTIME, UINT64, "wait time");
STATUS_INIT(CT_WAIT_READING, UINT64, "wait reading"); STATUS_INIT(CT_WAIT_READING, UINT64, "wait reading");
STATUS_INIT(CT_WAIT_WRITING, UINT64, "wait writing"); STATUS_INIT(CT_WAIT_WRITING, UINT64, "wait writing");
STATUS_INIT(CT_WAIT_CHECKPOINT, UINT64, "wait checkpoint");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)"); STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches"); STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_MAYBE_GET_AND_PINS, UINT64, "maybe_get_and_pin"); STATUS_INIT(CT_MAYBE_GET_AND_PINS, UINT64, "maybe_get_and_pin");
...@@ -117,10 +115,10 @@ typedef struct ctpair *PAIR; ...@@ -117,10 +115,10 @@ typedef struct ctpair *PAIR;
struct ctpair { struct ctpair {
CACHEFILE cachefile; CACHEFILE cachefile;
CACHEKEY key; CACHEKEY key;
void* value_data; void* value_data; // data used by client threads, BRTNODEs and ROLLBACK_LOG_NODEs
void* cloned_value_data; void* cloned_value_data; // cloned copy of value_data used for checkpointing
long cloned_value_size; long cloned_value_size; // size of cloned_value_data, used for accounting of ct->size_current
void* disk_data; void* disk_data; // data used to fetch/flush value_data to and from disk.
PAIR_ATTR attr; PAIR_ATTR attr;
// //
...@@ -155,8 +153,8 @@ struct ctpair { ...@@ -155,8 +153,8 @@ struct ctpair {
PAIR pending_next; PAIR pending_next;
PAIR pending_prev; PAIR pending_prev;
struct nb_mutex value_nb_mutex; // single writer struct nb_mutex value_nb_mutex; // single writer, protects value_data
struct nb_mutex disk_nb_mutex; // single writer struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
struct workqueue *cq; // writers sometimes return ctpair's using this queue struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct workitem asyncwork; // work item for the worker threads struct workitem asyncwork; // work item for the worker threads
struct workitem checkpoint_asyncwork; // work item for the worker threads struct workitem checkpoint_asyncwork; // work item for the worker threads
...@@ -276,7 +274,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -276,7 +274,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_WAITTIME) = cachetable_waittime; STATUS_VALUE(CT_WAITTIME) = cachetable_waittime;
STATUS_VALUE(CT_WAIT_READING) = cachetable_wait_reading; STATUS_VALUE(CT_WAIT_READING) = cachetable_wait_reading;
STATUS_VALUE(CT_WAIT_WRITING) = cachetable_wait_writing; STATUS_VALUE(CT_WAIT_WRITING) = cachetable_wait_writing;
STATUS_VALUE(CT_WAIT_CHECKPOINT) = cachetable_wait_checkpoint;
STATUS_VALUE(CT_PUTS) = cachetable_puts; STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches; STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_MAYBE_GET_AND_PINS) = cachetable_maybe_get_and_pins; STATUS_VALUE(CT_MAYBE_GET_AND_PINS) = cachetable_maybe_get_and_pins;
...@@ -1939,14 +1936,19 @@ checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { ...@@ -1939,14 +1936,19 @@ checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
} }
//
// Given a PAIR p with the value_nb_mutex altready held, do the following:
// - If the PAIR needs to be written out to disk for checkpoint:
// - If the PAIR is cloneable, clone the PAIR and place the work
// of writing the PAIR on a background thread.
// - If the PAIR is not cloneable, write the PAIR to disk for checkpoint
// on the current thread
//
static void static void
write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p) write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
{ {
if (p->dirty && p->checkpoint_pending) { if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) { if (p->clone_callback) {
// I think it is safe to grab the disk_nb_mutex after
// cloning the pair, but doing it before just to be safe,
// even though the act of cloning does not touch disk_data
nb_mutex_write_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_write_lock(&p->disk_nb_mutex, ct->mutex);
assert(!p->cloned_value_data); assert(!p->cloned_value_data);
clone_pair(ct, p); clone_pair(ct, p);
...@@ -1993,9 +1995,6 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p) ...@@ -1993,9 +1995,6 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
nb_mutex_write_lock(&p->value_nb_mutex, ct->mutex); // grab an exclusive lock on the pair nb_mutex_write_lock(&p->value_nb_mutex, ct->mutex); // grab an exclusive lock on the pair
if (p->dirty && p->checkpoint_pending) { if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) { if (p->clone_callback) {
// I think it is safe to grab the disk_nb_mutex after
// cloning the pair, but doing it before just to be safe,
// even though the act of cloning does not touch disk_data
nb_mutex_write_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_write_lock(&p->disk_nb_mutex, ct->mutex);
assert(!p->cloned_value_data); assert(!p->cloned_value_data);
clone_pair(ct, p); clone_pair(ct, p);
...@@ -2086,7 +2085,6 @@ static void checkpoint_dependent_pairs( ...@@ -2086,7 +2085,6 @@ static void checkpoint_dependent_pairs(
// and if the pair is pending a checkpoint, it needs to be written out // and if the pair is pending a checkpoint, it needs to be written out
if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY; if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY;
if (curr_dep_pair->checkpoint_pending) { if (curr_dep_pair->checkpoint_pending) {
//cachetable_wait_checkpoint++;
write_locked_pair_for_checkpoint(ct, curr_dep_pair); write_locked_pair_for_checkpoint(ct, curr_dep_pair);
} }
} }
...@@ -2421,7 +2419,6 @@ static void checkpoint_pair_and_dependent_pairs( ...@@ -2421,7 +2419,6 @@ static void checkpoint_pair_and_dependent_pairs(
// the cachetable lock. // the cachetable lock.
// //
if (p->checkpoint_pending) { if (p->checkpoint_pending) {
//cachetable_wait_checkpoint++;
write_locked_pair_for_checkpoint(ct, p); write_locked_pair_for_checkpoint(ct, p);
} }
...@@ -2737,8 +2734,8 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2737,8 +2734,8 @@ int toku_cachetable_get_and_pin_nonblocking (
if (p->key.b==key.b && p->cachefile==cf) { if (p->key.b==key.b && p->cachefile==cf) {
// //
// In Doofenshmirts, we keep the root to leaf path pinned // In Doofenshmirtz, we keep the root to leaf path pinned
// as we perform a quiry on a dictionary at any given time. // as we perform a query on a dictionary at any given time.
// This implies that only ONE query client can ever be // This implies that only ONE query client can ever be
// in get_and_pin_nonblocking for this dictionary. // in get_and_pin_nonblocking for this dictionary.
// So, if there is a write lock grabbed // So, if there is a write lock grabbed
...@@ -3758,27 +3755,24 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3758,27 +3755,24 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
printf("*****************************\n"); printf("*****************************\n");
} }
rwlock_write_unlock(&ct->pending_lock); rwlock_write_unlock(&ct->pending_lock);
if (0 && (npending > 0 || ct->checkpoint_num_files > 0 || ct->checkpoint_num_txns > 0)) {
fprintf(stderr, "%s:%d pending=%u %u files=%u txns=%u\n", __FUNCTION__, __LINE__, npending, ct->n_in_table, ct->checkpoint_num_files, ct->checkpoint_num_txns);
}
//begin_checkpoint_userdata must be called AFTER all the pairs are marked as pending. //begin_checkpoint_userdata must be called AFTER all the pairs are marked as pending.
//Once marked as pending, we own write locks on the pairs, which means the writer threads can't conflict. //Once marked as pending, we own write locks on the pairs, which means the writer threads can't conflict.
{ {
CACHEFILE cf; CACHEFILE cf;
cachefiles_lock(ct); cachefiles_lock(ct);
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) { for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
if (cf->begin_checkpoint_userdata) { if (cf->begin_checkpoint_userdata) {
assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS); assert(cf->checkpoint_state == CS_NOT_IN_PROGRESS);
int r = cf->begin_checkpoint_userdata(ct->lsn_of_checkpoint_in_progress, cf->userdata); int r = cf->begin_checkpoint_userdata(ct->lsn_of_checkpoint_in_progress, cf->userdata);
assert(r==0); assert(r==0);
cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT; cf->checkpoint_state = CS_CALLED_BEGIN_CHECKPOINT;
} }
} }
cachefiles_unlock(ct); cachefiles_unlock(ct);
} }
ct->checkpoint_is_beginning = FALSE; // clear before releasing cachetable lock ct->checkpoint_is_beginning = FALSE; // clear before releasing cachetable lock
cachetable_unlock(ct); cachetable_unlock(ct);
} }
return 0; return 0;
} }
...@@ -4291,7 +4285,6 @@ toku_cachetable_helgrind_ignore(void) { ...@@ -4291,7 +4285,6 @@ toku_cachetable_helgrind_ignore(void) {
VALGRIND_HG_DISABLE_CHECKING(&cachetable_waittime, sizeof cachetable_waittime); VALGRIND_HG_DISABLE_CHECKING(&cachetable_waittime, sizeof cachetable_waittime);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_reading, sizeof cachetable_wait_reading); VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_reading, sizeof cachetable_wait_reading);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_writing, sizeof cachetable_wait_writing); VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_writing, sizeof cachetable_wait_writing);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_wait_checkpoint, sizeof cachetable_wait_checkpoint);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts); VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches); VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
VALGRIND_HG_DISABLE_CHECKING(&cachetable_maybe_get_and_pins, sizeof cachetable_maybe_get_and_pins); VALGRIND_HG_DISABLE_CHECKING(&cachetable_maybe_get_and_pins, sizeof cachetable_maybe_get_and_pins);
......
...@@ -491,7 +491,6 @@ typedef enum { ...@@ -491,7 +491,6 @@ typedef enum {
CT_WAITTIME, // how many usec spent waiting for another thread to release cache line CT_WAITTIME, // how many usec spent waiting for another thread to release cache line
CT_WAIT_READING, CT_WAIT_READING,
CT_WAIT_WRITING, CT_WAIT_WRITING,
CT_WAIT_CHECKPOINT, // number of times get_and_pin waits for a node to be written for a checkpoint
CT_PUTS, // how many times has a newly created node been put into the cachetable? CT_PUTS, // how many times has a newly created node been put into the cachetable?
CT_PREFETCHES, // how many times has a block been prefetched into the cachetable? CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
CT_MAYBE_GET_AND_PINS, // how many times has maybe_get_and_pin(_clean) been called? CT_MAYBE_GET_AND_PINS, // how many times has maybe_get_and_pin(_clean) been called?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment