Commit ff579f3a authored by Barry Perlman's avatar Barry Perlman Committed by Yoni Fogel

[t:4283] Closes #4283 Detect begin checkpoint in critical regions.

git-svn-id: file:///svn/toku/tokudb@37926 c7de825b-a66e-492c-adef-691d508d4ae1
parent 08b4add1
...@@ -169,6 +169,8 @@ struct cachetable { ...@@ -169,6 +169,8 @@ struct cachetable {
KIBBUTZ kibbutz; // another pool of worker threads and jobs to do asynchronously. KIBBUTZ kibbutz; // another pool of worker threads and jobs to do asynchronously.
LSN lsn_of_checkpoint_in_progress; LSN lsn_of_checkpoint_in_progress;
uint64_t checkpoint_sequence_number; // just a free-running sequence number, used for detecting threadsafety bugs
uint64_t checkpoint_prohibited; // nonzero when checkpoints are prohibited, used for detecting threadsafety bugs
u_int32_t checkpoint_num_files; // how many cachefiles are in the checkpoint u_int32_t checkpoint_num_files; // how many cachefiles are in the checkpoint
u_int32_t checkpoint_num_txns; // how many transactions are in the checkpoint u_int32_t checkpoint_num_txns; // how many transactions are in the checkpoint
PAIR pending_head; // list of pairs marked with checkpoint_pending PAIR pending_head; // list of pairs marked with checkpoint_pending
...@@ -196,6 +198,18 @@ struct cachetable { ...@@ -196,6 +198,18 @@ struct cachetable {
int64_t size_cachepressure; int64_t size_cachepressure;
}; };
// Code bracketed with {BEGIN_CRITICAL_REGION; ... END_CRITICAL_REGION;} macros
// are critical regions in which a checkpoint is not permitted to begin.
#define BEGIN_CRITICAL_REGION {uint64_t cp_seqnum = ct->checkpoint_sequence_number; \
__sync_fetch_and_add(&ct->checkpoint_prohibited, 1);
#define END_CRITICAL_REGION invariant(cp_seqnum == ct->checkpoint_sequence_number); \
invariant(ct->checkpoint_prohibited > 0); \
__sync_fetch_and_sub(&ct->checkpoint_prohibited, 1); }
// Lock the cachetable // Lock the cachetable
static inline void cachefiles_lock(CACHETABLE ct) { static inline void cachefiles_lock(CACHETABLE ct) {
int r = toku_pthread_mutex_lock(&ct->cachefiles_mutex); resource_assert_zero(r); int r = toku_pthread_mutex_lock(&ct->cachefiles_mutex); resource_assert_zero(r);
...@@ -1840,6 +1854,7 @@ void toku_checkpoint_pairs( ...@@ -1840,6 +1854,7 @@ void toku_checkpoint_pairs(
); );
} }
int toku_cachetable_put_with_dep_pairs( int toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile, CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
...@@ -1879,8 +1894,12 @@ int toku_cachetable_put_with_dep_pairs( ...@@ -1879,8 +1894,12 @@ int toku_cachetable_put_with_dep_pairs(
// cachetable_put_internal does not release the cachetable lock // cachetable_put_internal does not release the cachetable lock
// //
cachetable_wait_write(ct); cachetable_wait_write(ct);
int rval;
{
BEGIN_CRITICAL_REGION;
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
int r = cachetable_put_internal( rval = cachetable_put_internal(
cachefile, cachefile,
*key, *key,
*fullhash, *fullhash,
...@@ -1906,8 +1925,10 @@ int toku_cachetable_put_with_dep_pairs( ...@@ -1906,8 +1925,10 @@ int toku_cachetable_put_with_dep_pairs(
dependent_dirty dependent_dirty
); );
END_CRITICAL_REGION;
}
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return rval;
} }
...@@ -2176,8 +2197,12 @@ got_value: ...@@ -2176,8 +2197,12 @@ got_value:
*value = p->value; *value = p->value;
if (sizep) *sizep = p->attr.size; if (sizep) *sizep = p->attr.size;
{
BEGIN_CRITICAL_REGION;
// //
// A checkpoint cannot begin while we are checking dependent pairs or pending bits. // A checkpoint must not begin while we are checking dependent pairs or pending bits.
// Here is why. // Here is why.
// //
// Now that we have all of the locks on the pairs we // Now that we have all of the locks on the pairs we
...@@ -2211,6 +2236,9 @@ got_value: ...@@ -2211,6 +2236,9 @@ got_value:
dependent_dirty dependent_dirty
); );
END_CRITICAL_REGION;
}
r = maybe_flush_some(ct, 0); r = maybe_flush_some(ct, 0);
cachetable_unlock(ct); cachetable_unlock(ct);
WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value)); WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
...@@ -3111,6 +3139,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3111,6 +3139,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
assert(r==0); assert(r==0);
} }
cachetable_lock(ct); cachetable_lock(ct);
invariant(ct->checkpoint_prohibited == 0); // detect threadsafety bugs
ct->checkpoint_sequence_number++;
//Initialize accountability counters //Initialize accountability counters
ct->checkpoint_num_files = 0; ct->checkpoint_num_files = 0;
ct->checkpoint_num_txns = 0; ct->checkpoint_num_txns = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment