Commit 9fd9a26e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #4634, make flow control depend on a cachetable cond variable and not the workqueue

git-svn-id: file:///svn/toku/tokudb@45651 c7de825b-a66e-492c-adef-691d508d4ae1
parent d843fcc1
...@@ -190,14 +190,17 @@ struct cachetable { ...@@ -190,14 +190,17 @@ struct cachetable {
// variable used by the checkpoint thread to know // variable used by the checkpoint thread to know
// when all work induced by cloning on client threads is done // when all work induced by cloning on client threads is done
BACKGROUND_JOB_MANAGER checkpoint_clones_bjm; BACKGROUND_JOB_MANAGER checkpoint_clones_bjm;
// temporary, for handling flow control
toku_cond_t flow_control_cond;
}; };
void void
toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized) if (!ct_status.initialized) {
status_init(); status_init();
}
STATUS_VALUE(CT_MISS) = cachetable_miss; STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime; STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_PUTS) = cachetable_puts; STATUS_VALUE(CT_PUTS) = cachetable_puts;
...@@ -214,7 +217,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -214,7 +217,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions; STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions;
STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct); STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct); STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
*statp = ct_status; *statp = ct_status;
} }
...@@ -245,7 +247,13 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) { ...@@ -245,7 +247,13 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
static inline void cachetable_wait_write(CACHETABLE ct) { static inline void cachetable_wait_write(CACHETABLE ct) {
// if we're writing more than half the data in the cachetable // if we're writing more than half the data in the cachetable
while (2*ct->size_evicting > ct->size_current) { while (2*ct->size_evicting > ct->size_current) {
workqueue_wait_write(&ct->wq, 0); toku_cond_wait(&ct->flow_control_cond, ct->mutex);
}
}
static inline void cachetable_wakeup_write(CACHETABLE ct) {
if (8*ct->size_evicting <= ct->size_current) {
toku_cond_broadcast(&ct->flow_control_cond);
} }
} }
...@@ -397,6 +405,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l ...@@ -397,6 +405,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
ct->cleaner_iterations = 1; // default is one iteration ct->cleaner_iterations = 1; // default is one iteration
ct->env_dir = toku_xstrdup("."); ct->env_dir = toku_xstrdup(".");
bjm_init(&ct->checkpoint_clones_bjm); bjm_init(&ct->checkpoint_clones_bjm);
toku_cond_init(&ct->flow_control_cond, NULL);
*result = ct; *result = ct;
return 0; return 0;
} }
...@@ -1045,9 +1054,7 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) { ...@@ -1045,9 +1054,7 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) {
// 1/8 of the size of the cachetable // 1/8 of the size of the cachetable
ct->size_evicting -= old_size; ct->size_evicting -= old_size;
assert(ct->size_evicting >= 0); assert(ct->size_evicting >= 0);
if (8*ct->size_evicting <= ct->size_current) { cachetable_wakeup_write(ct);
workqueue_wakeup_write(&ct->wq, 0);
}
cachetable_complete_write_pair(ct, p); cachetable_complete_write_pair(ct, p);
} }
...@@ -1110,9 +1117,7 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) { ...@@ -1110,9 +1117,7 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
assert(ct->size_evicting >= p->size_evicting_estimate); assert(ct->size_evicting >= p->size_evicting_estimate);
ct->size_evicting -= p->size_evicting_estimate; ct->size_evicting -= p->size_evicting_estimate;
if (8*ct->size_evicting <= ct->size_current) { cachetable_wakeup_write(ct);
workqueue_wakeup_write(&ct->wq, 0);
}
nb_mutex_unlock(&p->value_nb_mutex); nb_mutex_unlock(&p->value_nb_mutex);
} }
...@@ -2560,6 +2565,7 @@ toku_cachetable_close (CACHETABLE *ctp) { ...@@ -2560,6 +2565,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
toku_destroy_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool); toku_destroy_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool);
toku_kibbutz_destroy(ct->kibbutz); toku_kibbutz_destroy(ct->kibbutz);
bjm_destroy(ct->checkpoint_clones_bjm); bjm_destroy(ct->checkpoint_clones_bjm);
toku_cond_destroy(&ct->flow_control_cond);
toku_free(ct->table); toku_free(ct->table);
toku_free(ct->env_dir); toku_free(ct->env_dir);
toku_free(ct); toku_free(ct);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment