Commit 4234fe52 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #3634, separate ct lock from workqueue lock, remove usage of workqueues

git-svn-id: file:///svn/toku/tokudb@45658 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3c8cb48a
...@@ -115,8 +115,6 @@ struct ctpair { ...@@ -115,8 +115,6 @@ struct ctpair {
struct nb_mutex value_nb_mutex; // single writer, protects value_data struct nb_mutex value_nb_mutex; // single writer, protects value_data
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
struct workitem asyncwork; // work item for the worker threads
struct workitem checkpoint_asyncwork; // work item for the worker threads
struct toku_list next_for_cachefile; // link in the cachefile list struct toku_list next_for_cachefile; // link in the cachefile list
}; };
...@@ -154,13 +152,11 @@ struct cachetable { ...@@ -154,13 +152,11 @@ struct cachetable {
int64_t size_evicting; // the sum of the sizes of the pairs being written int64_t size_evicting; // the sum of the sizes of the pairs being written
int64_t size_max; // high water mark of size_current (max value size_current ever had) int64_t size_max; // high water mark of size_current (max value size_current ever had)
TOKULOGGER logger; TOKULOGGER logger;
toku_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs toku_mutex_t mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs
struct workqueue wq; // async work queue
THREADPOOL threadpool; // pool of worker threads
struct workqueue checkpoint_wq;
THREADPOOL checkpoint_threadpool;
KIBBUTZ kibbutz; // another pool of worker threads and jobs to do asynchronously. KIBBUTZ client_kibbutz; // pool of worker threads and jobs to do asynchronously for the client.
KIBBUTZ ct_kibbutz; // pool of worker threads and jobs to do asynchronously for the cachetable
KIBBUTZ checkpointing_kibbutz; // small pool for checkpointing cloned pairs
LSN lsn_of_checkpoint_in_progress; LSN lsn_of_checkpoint_in_progress;
// Variables used to detect threadsafety bugs are declared volatile to prevent compiler from using thread-local cache. // Variables used to detect threadsafety bugs are declared volatile to prevent compiler from using thread-local cache.
...@@ -231,14 +227,14 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -231,14 +227,14 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
// verifying that we just incremented it with the matching BEGIN macro. // verifying that we just incremented it with the matching BEGIN macro.
#define END_CRITICAL_REGION {invariant(ct->checkpoint_prohibited > 0); __sync_fetch_and_sub(&ct->checkpoint_prohibited, 1);} #define END_CRITICAL_REGION {invariant(ct->checkpoint_prohibited > 0); __sync_fetch_and_sub(&ct->checkpoint_prohibited, 1);}
// Lock the cachetable. Used for a variety of purposes. TODO: like what? // Lock the cachetable. Used for a variety of purposes.
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) { static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
toku_mutex_lock(ct->mutex); toku_mutex_lock(&ct->mutex);
} }
// Unlock the cachetable // Unlock the cachetable
static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) { static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
toku_mutex_unlock(ct->mutex); toku_mutex_unlock(&ct->mutex);
} }
// Wait for cache table space to become available // Wait for cache table space to become available
...@@ -247,7 +243,7 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) { ...@@ -247,7 +243,7 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
static inline void cachetable_wait_write(CACHETABLE ct) { static inline void cachetable_wait_write(CACHETABLE ct) {
// if we're writing more than half the data in the cachetable // if we're writing more than half the data in the cachetable
while (2*ct->size_evicting > ct->size_current) { while (2*ct->size_evicting > ct->size_current) {
toku_cond_wait(&ct->flow_control_cond, ct->mutex); toku_cond_wait(&ct->flow_control_cond, &ct->mutex);
} }
} }
...@@ -311,7 +307,7 @@ void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra) ...@@ -311,7 +307,7 @@ void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
// at a time when the manager is accepting background jobs, otherwise // at a time when the manager is accepting background jobs, otherwise
// the client is screwing up // the client is screwing up
assert_zero(r); assert_zero(r);
toku_kibbutz_enq(cf->cachetable->kibbutz, f, extra); toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra);
} }
static int static int
...@@ -394,11 +390,13 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l ...@@ -394,11 +390,13 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
ct->size_limit = size_limit; ct->size_limit = size_limit;
ct->size_reserved = unreservable_memory(size_limit); ct->size_reserved = unreservable_memory(size_limit);
ct->logger = logger; ct->logger = logger;
toku_init_workers(&ct->wq, &ct->threadpool, 1); toku_mutex_init(&ct->mutex, NULL);
toku_init_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool, 8);
ct->mutex = workqueue_lock_ref(&ct->wq);
ct->kibbutz = toku_kibbutz_create(toku_os_get_number_active_processors()); int num_processors = toku_os_get_number_active_processors();
ct->client_kibbutz = toku_kibbutz_create(num_processors);
ct->ct_kibbutz = toku_kibbutz_create(2*num_processors);
int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
ct->checkpointing_kibbutz = toku_kibbutz_create(checkpointing_nworkers);
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
toku_minicron_setup(&ct->cleaner, 0, toku_cleaner_thread, ct); // default is no cleaner, for now toku_minicron_setup(&ct->cleaner, 0, toku_cleaner_thread, ct); // default is no cleaner, for now
...@@ -585,10 +583,9 @@ int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in ...@@ -585,10 +583,9 @@ int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in
return r; return r;
} }
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) { void toku_cachefile_get_workqueue_load (CACHEFILE UU(cf), int *n_in_queue, int *n_threads) {
CACHETABLE ct = cf->cachetable; *n_in_queue = 0;
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1); *n_threads = 0;
*n_threads = toku_thread_pool_get_current_threads(ct->threadpool);
} }
//Test-only function //Test-only function
...@@ -999,7 +996,7 @@ static void cachetable_only_write_locked_data( ...@@ -999,7 +996,7 @@ static void cachetable_only_write_locked_data(
static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) { static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
PAIR_ATTR old_attr = p->attr; PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = p->attr; PAIR_ATTR new_attr = p->attr;
rwlock_read_lock(&ct->pending_lock, ct->mutex); rwlock_read_lock(&ct->pending_lock, &ct->mutex);
BOOL for_checkpoint = p->checkpoint_pending; BOOL for_checkpoint = p->checkpoint_pending;
p->checkpoint_pending = FALSE; p->checkpoint_pending = FALSE;
// grabbing the disk_nb_mutex here ensures that // grabbing the disk_nb_mutex here ensures that
...@@ -1007,7 +1004,7 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) { ...@@ -1007,7 +1004,7 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
// if we grab the disk_nb_mutex inside the if clause, // if we grab the disk_nb_mutex inside the if clause,
// then we may try to evict a PAIR that is in the process // then we may try to evict a PAIR that is in the process
// of having its clone be written out // of having its clone be written out
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
// make sure that assumption about cloned_value_data is true // make sure that assumption about cloned_value_data is true
// if we have grabbed the disk_nb_mutex, then that means that // if we have grabbed the disk_nb_mutex, then that means that
// there should be no cloned value data // there should be no cloned value data
...@@ -1059,8 +1056,8 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) { ...@@ -1059,8 +1056,8 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) {
} }
// Worker thread function to writes and evicts a pair from memory to its cachefile // Worker thread function to writes and evicts a pair from memory to its cachefile
static void cachetable_evicter(WORKITEM wi) { static void cachetable_evicter(void* extra) {
PAIR p = (PAIR) workitem_arg(wi); PAIR p = (PAIR)extra;
CACHEFILE cf = p->cachefile; CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
...@@ -1080,7 +1077,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) { ...@@ -1080,7 +1077,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
// must check for before we grab the write lock because we may // must check for before we grab the write lock because we may
// be trying to evict something this thread is trying to read // be trying to evict something this thread is trying to read
if (!nb_mutex_users(&p->value_nb_mutex)) { if (!nb_mutex_users(&p->value_nb_mutex)) {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
assert(ct->size_evicting >= 0); assert(ct->size_evicting >= 0);
ct->size_evicting += p->attr.size; ct->size_evicting += p->attr.size;
...@@ -1096,10 +1093,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) { ...@@ -1096,10 +1093,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
bjm_remove_background_job(cf->bjm); bjm_remove_background_job(cf->bjm);
} }
else { else {
WORKITEM wi = &p->asyncwork; toku_kibbutz_enq(ct->ct_kibbutz, cachetable_evicter, p);
//responsibility of cachetable_evicter to remove background job
workitem_init(wi, cachetable_evicter, p);
workqueue_enq(&ct->wq, wi, 0);
} }
} }
} }
...@@ -1121,8 +1115,8 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) { ...@@ -1121,8 +1115,8 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
static void cachetable_partial_eviction(WORKITEM wi) { static void cachetable_partial_eviction(void* extra) {
PAIR p = (PAIR) workitem_arg(wi); PAIR p = (PAIR)extra;
CACHEFILE cf = p->cachefile; CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
...@@ -1147,7 +1141,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) { ...@@ -1147,7 +1141,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
if (curr_in_clock->count > 0) { if (curr_in_clock->count > 0) {
curr_in_clock->count--; curr_in_clock->count--;
// call the partial eviction callback // call the partial eviction callback
nb_mutex_lock(&curr_in_clock->value_nb_mutex, ct->mutex); nb_mutex_lock(&curr_in_clock->value_nb_mutex, &ct->mutex);
void *value = curr_in_clock->value_data; void *value = curr_in_clock->value_data;
void* disk_data = curr_in_clock->disk_data; void* disk_data = curr_in_clock->disk_data;
...@@ -1172,10 +1166,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) { ...@@ -1172,10 +1166,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
if (bytes_freed_estimate > 0) { if (bytes_freed_estimate > 0) {
curr_in_clock->size_evicting_estimate = bytes_freed_estimate; curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
ct->size_evicting += bytes_freed_estimate; ct->size_evicting += bytes_freed_estimate;
WORKITEM wi = &curr_in_clock->asyncwork; toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_eviction, curr_in_clock);
// responsibility of cachetable_partial_eviction to remove background job
workitem_init(wi, cachetable_partial_eviction, curr_in_clock);
workqueue_enq(&ct->wq, wi, 0);
} }
else { else {
nb_mutex_unlock(&curr_in_clock->value_nb_mutex); nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
...@@ -1347,7 +1338,7 @@ static int cachetable_put_internal( ...@@ -1347,7 +1338,7 @@ static int cachetable_put_internal(
CACHETABLE_DIRTY CACHETABLE_DIRTY
); );
assert(p); assert(p);
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
//note_hash_count(count); //note_hash_count(count);
return 0; return 0;
} }
...@@ -1405,8 +1396,8 @@ clone_pair(CACHETABLE ct, PAIR p) { ...@@ -1405,8 +1396,8 @@ clone_pair(CACHETABLE ct, PAIR p) {
ct->size_current += p->cloned_value_size; ct->size_current += p->cloned_value_size;
} }
static void checkpoint_cloned_pair(WORKITEM wi) { static void checkpoint_cloned_pair(void* extra) {
PAIR p = (PAIR) workitem_arg(wi); PAIR p = (PAIR)extra;
CACHETABLE ct = p->cachefile->cachetable; CACHETABLE ct = p->cachefile->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
PAIR_ATTR new_attr; PAIR_ATTR new_attr;
...@@ -1427,9 +1418,7 @@ static void checkpoint_cloned_pair(WORKITEM wi) { ...@@ -1427,9 +1418,7 @@ static void checkpoint_cloned_pair(WORKITEM wi) {
static void static void
checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
WORKITEM wi = &p->checkpoint_asyncwork; toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p);
workitem_init(wi, checkpoint_cloned_pair, p);
workqueue_enq(&ct->checkpoint_wq, wi, 1);
} }
...@@ -1446,7 +1435,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p) ...@@ -1446,7 +1435,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
{ {
if (p->dirty && p->checkpoint_pending) { if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) { if (p->clone_callback) {
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
assert(!p->cloned_value_data); assert(!p->cloned_value_data);
clone_pair(ct, p); clone_pair(ct, p);
assert(p->cloned_value_data); assert(p->cloned_value_data);
...@@ -1489,10 +1478,10 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p) ...@@ -1489,10 +1478,10 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
static void static void
write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p) write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
{ {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); // grab an exclusive lock on the pair nb_mutex_lock(&p->value_nb_mutex, &ct->mutex); // grab an exclusive lock on the pair
if (p->dirty && p->checkpoint_pending) { if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) { if (p->clone_callback) {
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
assert(!p->cloned_value_data); assert(!p->cloned_value_data);
clone_pair(ct, p); clone_pair(ct, p);
assert(p->cloned_value_data); assert(p->cloned_value_data);
...@@ -1702,7 +1691,7 @@ do_partial_fetch( ...@@ -1702,7 +1691,7 @@ do_partial_fetch(
// so we do a sanity check here. // so we do a sanity check here.
assert(!p->dirty); assert(!p->dirty);
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
cachetable_unlock(ct); cachetable_unlock(ct);
int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr); int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
lazy_assert_zero(r); lazy_assert_zero(r);
...@@ -1726,18 +1715,19 @@ void toku_cachetable_pf_pinned_pair( ...@@ -1726,18 +1715,19 @@ void toku_cachetable_pf_pinned_pair(
{ {
PAIR_ATTR attr; PAIR_ATTR attr;
PAIR p = NULL; PAIR p = NULL;
cachetable_lock(cf->cachetable); CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
int r = cachetable_get_pair(cf, key, fullhash, &p); int r = cachetable_get_pair(cf, key, fullhash, &p);
assert_zero(r); assert_zero(r);
assert(p->value_data == value); assert(p->value_data == value);
assert(nb_mutex_writers(&p->value_nb_mutex)); assert(nb_mutex_writers(&p->value_nb_mutex));
nb_mutex_lock(&p->disk_nb_mutex, cf->cachetable->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
int fd = cf->fd; int fd = cf->fd;
cachetable_unlock(cf->cachetable); cachetable_unlock(ct);
pf_callback(value, p->disk_data, read_extraargs, fd, &attr); pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
cachetable_lock(cf->cachetable); cachetable_lock(ct);
nb_mutex_unlock(&p->disk_nb_mutex); nb_mutex_unlock(&p->disk_nb_mutex);
cachetable_unlock(cf->cachetable); cachetable_unlock(ct);
} }
...@@ -1802,7 +1792,7 @@ static void cachetable_fetch_pair( ...@@ -1802,7 +1792,7 @@ static void cachetable_fetch_pair(
// FIXME this should be enum cachetable_dirty, right? // FIXME this should be enum cachetable_dirty, right?
int dirty = 0; int dirty = 0;
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
cachetable_unlock(ct); cachetable_unlock(ct);
int r; int r;
...@@ -1907,7 +1897,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs ( ...@@ -1907,7 +1897,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
// still have the cachetable lock // still have the cachetable lock
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
pair_touch(p); pair_touch(p);
if (may_modify_value) { if (may_modify_value) {
checkpoint_pair_and_dependent_pairs( checkpoint_pair_and_dependent_pairs(
...@@ -1962,7 +1952,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs ( ...@@ -1962,7 +1952,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
CACHETABLE_CLEAN CACHETABLE_CLEAN
); );
assert(p); assert(p);
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
if (may_modify_value) { if (may_modify_value) {
checkpoint_pair_and_dependent_pairs( checkpoint_pair_and_dependent_pairs(
ct, ct,
...@@ -2014,7 +2004,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -2014,7 +2004,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
nb_mutex_users(&p->value_nb_mutex) == 0 nb_mutex_users(&p->value_nb_mutex) == 0
) { ) {
// because nb_mutex_users is 0, this is fast // because nb_mutex_users is 0, this is fast
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
*value = p->value_data; *value = p->value_data;
pair_touch(p); pair_touch(p);
r = 0; r = 0;
...@@ -2040,7 +2030,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, ...@@ -2040,7 +2030,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
nb_mutex_users(&p->value_nb_mutex) == 0 nb_mutex_users(&p->value_nb_mutex) == 0
) { ) {
// because nb_mutex_users is 0, this is fast // because nb_mutex_users is 0, this is fast
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
*value = p->value_data; *value = p->value_data;
r = 0; r = 0;
} }
...@@ -2145,7 +2135,7 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2145,7 +2135,7 @@ int toku_cachetable_get_and_pin_nonblocking (
if (!nb_mutex_writers(&p->value_nb_mutex) && if (!nb_mutex_writers(&p->value_nb_mutex) &&
(!may_modify_value || resolve_checkpointing_fast(p))) (!may_modify_value || resolve_checkpointing_fast(p)))
{ {
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
if (may_modify_value && p->checkpoint_pending) { if (may_modify_value && p->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, p); write_locked_pair_for_checkpoint(ct, p);
} }
...@@ -2178,7 +2168,7 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2178,7 +2168,7 @@ int toku_cachetable_get_and_pin_nonblocking (
else { else {
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held. run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
// Now wait for the I/O to occur. // Now wait for the I/O to occur.
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
if (may_modify_value && p->checkpoint_pending) { if (may_modify_value && p->checkpoint_pending) {
write_locked_pair_for_checkpoint(ct, p); write_locked_pair_for_checkpoint(ct, p);
} }
...@@ -2202,7 +2192,7 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2202,7 +2192,7 @@ int toku_cachetable_get_and_pin_nonblocking (
CACHETABLE_CLEAN CACHETABLE_CLEAN
); );
assert(p); assert(p);
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
run_unlockers(unlockers); // we hold the ct mutex. run_unlockers(unlockers); // we hold the ct mutex.
u_int64_t t0 = get_tnow(); u_int64_t t0 = get_tnow();
cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, FALSE); cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, FALSE);
...@@ -2225,8 +2215,8 @@ struct cachefile_partial_prefetch_args { ...@@ -2225,8 +2215,8 @@ struct cachefile_partial_prefetch_args {
}; };
// Worker thread function to read a pair from a cachefile to memory // Worker thread function to read a pair from a cachefile to memory
static void cachetable_reader(WORKITEM wi) { static void cachetable_reader(void* extra) {
struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args *) workitem_arg(wi); struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra;
CACHEFILE cf = cpargs->p->cachefile; CACHEFILE cf = cpargs->p->cachefile;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
...@@ -2243,8 +2233,8 @@ static void cachetable_reader(WORKITEM wi) { ...@@ -2243,8 +2233,8 @@ static void cachetable_reader(WORKITEM wi) {
toku_free(cpargs); toku_free(cpargs);
} }
static void cachetable_partial_reader(WORKITEM wi) { static void cachetable_partial_reader(void* extra) {
struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args *) workitem_arg(wi); struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra;
CACHEFILE cf = cpargs->p->cachefile; CACHEFILE cf = cpargs->p->cachefile;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
...@@ -2294,20 +2284,19 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2294,20 +2284,19 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_CLEAN CACHETABLE_CLEAN
); );
assert(p); assert(p);
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
struct cachefile_prefetch_args *MALLOC(cpargs); struct cachefile_prefetch_args *MALLOC(cpargs);
cpargs->p = p; cpargs->p = p;
cpargs->fetch_callback = fetch_callback; cpargs->fetch_callback = fetch_callback;
cpargs->read_extraargs = read_extraargs; cpargs->read_extraargs = read_extraargs;
workitem_init(&p->asyncwork, cachetable_reader, cpargs); toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
if (doing_prefetch) { if (doing_prefetch) {
*doing_prefetch = TRUE; *doing_prefetch = TRUE;
} }
} }
else if (nb_mutex_users(&p->value_nb_mutex)==0) { else if (nb_mutex_users(&p->value_nb_mutex)==0) {
// nobody else is using the node, so we should go ahead and prefetch // nobody else is using the node, so we should go ahead and prefetch
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs); BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
if (partial_fetch_required) { if (partial_fetch_required) {
...@@ -2317,8 +2306,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2317,8 +2306,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
cpargs->p = p; cpargs->p = p;
cpargs->pf_callback = pf_callback; cpargs->pf_callback = pf_callback;
cpargs->read_extraargs = read_extraargs; cpargs->read_extraargs = read_extraargs;
workitem_init(&p->asyncwork, cachetable_partial_reader, cpargs); toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
if (doing_prefetch) { if (doing_prefetch) {
*doing_prefetch = TRUE; *doing_prefetch = TRUE;
} }
...@@ -2407,8 +2395,8 @@ struct pair_flush_for_close{ ...@@ -2407,8 +2395,8 @@ struct pair_flush_for_close{
BACKGROUND_JOB_MANAGER bjm; BACKGROUND_JOB_MANAGER bjm;
}; };
static void cachetable_flush_pair_for_close(WORKITEM wi) { static void cachetable_flush_pair_for_close(void* extra) {
struct pair_flush_for_close *args = cast_to_typeof(args) workitem_arg(wi); struct pair_flush_for_close *args = (struct pair_flush_for_close*) extra;
PAIR p = args->p; PAIR p = args->p;
CACHEFILE cf = p->cachefile; CACHEFILE cf = p->cachefile;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
...@@ -2498,9 +2486,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -2498,9 +2486,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
struct pair_flush_for_close *XMALLOC(args); struct pair_flush_for_close *XMALLOC(args);
args->p = p; args->p = p;
args->bjm = bjm; args->bjm = bjm;
workitem_init(&p->asyncwork, cachetable_flush_pair_for_close, args); toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
...@@ -2561,9 +2547,9 @@ toku_cachetable_close (CACHETABLE *ctp) { ...@@ -2561,9 +2547,9 @@ toku_cachetable_close (CACHETABLE *ctp) {
assert(ct->size_evicting == 0); assert(ct->size_evicting == 0);
rwlock_destroy(&ct->pending_lock); rwlock_destroy(&ct->pending_lock);
cachetable_unlock(ct); cachetable_unlock(ct);
toku_destroy_workers(&ct->wq, &ct->threadpool); toku_kibbutz_destroy(ct->client_kibbutz);
toku_destroy_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool); toku_kibbutz_destroy(ct->ct_kibbutz);
toku_kibbutz_destroy(ct->kibbutz); toku_kibbutz_destroy(ct->checkpointing_kibbutz);
bjm_destroy(ct->checkpoint_clones_bjm); bjm_destroy(ct->checkpoint_clones_bjm);
toku_cond_destroy(&ct->flow_control_cond); toku_cond_destroy(&ct->flow_control_cond);
toku_free(ct->table); toku_free(ct->table);
...@@ -2592,7 +2578,7 @@ int toku_cachetable_unpin_and_remove ( ...@@ -2592,7 +2578,7 @@ int toku_cachetable_unpin_and_remove (
assert(nb_mutex_writers(&p->value_nb_mutex)); assert(nb_mutex_writers(&p->value_nb_mutex));
// grab disk_nb_mutex to ensure any background thread writing // grab disk_nb_mutex to ensure any background thread writing
// out a cloned value completes // out a cloned value completes
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
assert(p->cloned_value_data == NULL); assert(p->cloned_value_data == NULL);
// //
...@@ -2684,7 +2670,7 @@ int toku_cachetable_unpin_and_remove ( ...@@ -2684,7 +2670,7 @@ int toku_cachetable_unpin_and_remove (
toku_cond_init(&cond, NULL); toku_cond_init(&cond, NULL);
nb_mutex_wait_for_users( nb_mutex_wait_for_users(
&p->value_nb_mutex, &p->value_nb_mutex,
ct->mutex, &ct->mutex,
&cond &cond
); );
toku_cond_destroy(&cond); toku_cond_destroy(&cond);
...@@ -2898,7 +2884,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -2898,7 +2884,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// updated. This is not correct. The result is that the checkpoint does not include the proper // updated. This is not correct. The result is that the checkpoint does not include the proper
// state of this PAIR. // state of this PAIR.
// //
rwlock_write_lock(&ct->pending_lock, ct->mutex); rwlock_write_lock(&ct->pending_lock, &ct->mutex);
ct->checkpoint_is_beginning = TRUE; // detect threadsafety bugs, must set checkpoint_is_beginning ... ct->checkpoint_is_beginning = TRUE; // detect threadsafety bugs, must set checkpoint_is_beginning ...
invariant(ct->checkpoint_prohibited == 0); // ... before testing checkpoint_prohibited invariant(ct->checkpoint_prohibited == 0); // ... before testing checkpoint_prohibited
bjm_reset(ct->checkpoint_clones_bjm); bjm_reset(ct->checkpoint_clones_bjm);
...@@ -3350,7 +3336,7 @@ toku_cleaner_thread (void *cachetable_v) ...@@ -3350,7 +3336,7 @@ toku_cleaner_thread (void *cachetable_v)
cachetable_unlock(ct); cachetable_unlock(ct);
continue; continue;
} }
nb_mutex_lock(&best_pair->value_nb_mutex, ct->mutex); nb_mutex_lock(&best_pair->value_nb_mutex, &ct->mutex);
// verify a key assumption. // verify a key assumption.
assert(cleaner_thread_rate_pair(best_pair) > 0); assert(cleaner_thread_rate_pair(best_pair) > 0);
if (best_pair->checkpoint_pending) { if (best_pair->checkpoint_pending) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment