Commit 2e83023f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #6068, merge to main

git-svn-id: file:///svn/toku/tokudb@53561 c7de825b-a66e-492c-adef-691d508d4ae1
parent 40c1bc80
...@@ -96,6 +96,13 @@ class pair_list; ...@@ -96,6 +96,13 @@ class pair_list;
struct cachefile { struct cachefile {
CACHEFILE next; CACHEFILE next;
CACHEFILE prev; CACHEFILE prev;
// these next two fields are protected by cachetable's list lock
// they are managed whenever we add or remove a pair from
// the cachetable. As of Riddler, this linked list is only used to
// make cachetable_flush_cachefile more efficient
PAIR cf_head;
uint32_t num_pairs; // count on number of pairs in the cachetable belong to this cachefile
bool for_checkpoint; //True if part of the in-progress checkpoint bool for_checkpoint; //True if part of the in-progress checkpoint
// If set and the cachefile closes, the file will be removed. // If set and the cachefile closes, the file will be removed.
...@@ -193,6 +200,15 @@ struct ctpair { ...@@ -193,6 +200,15 @@ struct ctpair {
// pending_next,pending_next represent a non-circular doubly-linked list. // pending_next,pending_next represent a non-circular doubly-linked list.
PAIR pending_next; PAIR pending_next;
PAIR pending_prev; PAIR pending_prev;
// cf_next, cf_prev represent a non-circular doubly-linked list.
// entries in linked list for PAIRs in a cachefile, these are protected
// by the list lock of the PAIR's pair_list. They are used to make
// cachetable_flush_cachefile cheaper so that we don't need
// to search the entire cachetable to find a particular cachefile's
// PAIRs
PAIR cf_next;
PAIR cf_prev;
}; };
// //
...@@ -297,6 +313,8 @@ public: ...@@ -297,6 +313,8 @@ public:
private: private:
void pair_remove (PAIR p); void pair_remove (PAIR p);
void cf_pairs_remove (PAIR p);
void add_to_cf_list (PAIR p);
void add_to_clock (PAIR p); void add_to_clock (PAIR p);
PAIR remove_from_hash_chain (PAIR remove_me, PAIR list); PAIR remove_from_hash_chain (PAIR remove_me, PAIR list);
}; };
......
...@@ -803,6 +803,7 @@ void pair_init(PAIR p, ...@@ -803,6 +803,7 @@ void pair_init(PAIR p,
p->clock_next = p->clock_prev = NULL; p->clock_next = p->clock_prev = NULL;
p->pending_next = p->pending_prev = NULL; p->pending_next = p->pending_prev = NULL;
p->cf_next = p->cf_prev = NULL;
p->hash_chain = NULL; p->hash_chain = NULL;
} }
...@@ -2359,6 +2360,41 @@ static void cachetable_flush_pair_for_close(void* extra) { ...@@ -2359,6 +2360,41 @@ static void cachetable_flush_pair_for_close(void* extra) {
toku_free(args); toku_free(args);
} }
static void flush_pair_for_close_on_background_thread(
PAIR p,
BACKGROUND_JOB_MANAGER bjm,
CACHETABLE ct
)
{
pair_lock(p);
assert(p->value_rwlock.users() == 0);
assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data);
if (p->dirty == CACHETABLE_DIRTY) {
int r = bjm_add_background_job(bjm);
assert_zero(r);
struct pair_flush_for_close *XMALLOC(args);
args->p = p;
args->bjm = bjm;
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
}
pair_unlock(p);
}
static void remove_pair_for_close(PAIR p, CACHETABLE ct) {
pair_lock(p);
assert(p->value_rwlock.users() == 0);
assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data);
assert(p->dirty == CACHETABLE_CLEAN);
assert(p->refcount == 0);
// TODO: maybe break up this function
// so that write lock does not need to be held for entire
// free
cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p);
}
// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL. // the cachefile is NULL.
// Must be holding cachetable lock on entry. // Must be holding cachetable lock on entry.
...@@ -2383,85 +2419,70 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -2383,85 +2419,70 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
// no jobs added to the kibbutz. This implies that the only work other // no jobs added to the kibbutz. This implies that the only work other
// threads may be doing is work by the writer threads. // threads may be doing is work by the writer threads.
// //
unsigned i;
unsigned num_pairs = 0;
unsigned list_size = 256;
PAIR *list = NULL;
XMALLOC_N(list_size, list);
// because we use a linked list head we don't own,
// (the checkpoint thread owns m_checkpoint_head)
// we grab the write list lock here.
ct->list.write_list_lock();
PAIR p;
//Make a list of pairs that belong to this cachefile.
for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table;
i++, p = p->clock_next)
{
if (cf == 0 || p->cachefile == cf) {
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
}
list[num_pairs++] = p;
}
}
ct->list.write_list_unlock();
// first write out dirty PAIRs // first write out dirty PAIRs
BACKGROUND_JOB_MANAGER bjm = NULL; BACKGROUND_JOB_MANAGER bjm = NULL;
bjm_init(&bjm); bjm_init(&bjm);
for (i=0; i < num_pairs; i++) { ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
p = list[i]; PAIR p = NULL;
pair_lock(p); uint32_t i;
assert(p->value_rwlock.users() == 0); // write out dirty PAIRs
assert(nb_mutex_users(&p->disk_nb_mutex) == 0); if (cf) {
assert(!p->cloned_value_data); for (i = 0, p = cf->cf_head;
if (p->dirty == CACHETABLE_DIRTY) { i < cf->num_pairs;
int r = bjm_add_background_job(bjm); i++, p = p->cf_next)
assert_zero(r); {
struct pair_flush_for_close *XMALLOC(args); flush_pair_for_close_on_background_thread(p, bjm, ct);
args->p = p;
args->bjm = bjm;
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
} }
pair_unlock(p);
} }
else {
for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table;
i++, p = p->clock_next)
{
flush_pair_for_close_on_background_thread(p, bjm, ct);
}
}
ct->list.write_list_unlock();
bjm_wait_for_jobs_to_finish(bjm); bjm_wait_for_jobs_to_finish(bjm);
bjm_destroy(bjm); bjm_destroy(bjm);
// now get rid of everything // now that everything is clean, get rid of everything
ct->list.write_list_lock(); ct->list.write_list_lock();
for (i=0; i < num_pairs; i++) { if (cf) {
p = list[i]; while (cf->num_pairs > 0) {
pair_lock(p); p = cf->cf_head;
assert(p->value_rwlock.users() == 0); remove_pair_for_close(p, ct);
assert(nb_mutex_users(&p->disk_nb_mutex) == 0); }
assert(!p->cloned_value_data); }
assert(p->dirty == CACHETABLE_CLEAN); else {
assert(p->refcount == 0); while (ct->list.m_n_in_table > 0) {
// TODO: maybe break up this function p = ct->list.m_checkpoint_head;
// so that write lock does not need to be held for entire remove_pair_for_close(p, ct);
// free }
cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p); }
}
// assert here that cachefile is flushed by checking // assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile // pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile. // Make a list of pairs that belong to this cachefile.
#ifdef TOKU_DEBUG_PARANOID
if (cf) {
// assert here that cachefile is flushed by checking
// pair_list and finding no pairs belonging to this cachefile
// Make a list of pairs that belong to this cachefile.
for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table;
i++, p = p->clock_next)
{
assert(p->cachefile != cf);
}
}
#endif
for (i = 0, p = ct->list.m_checkpoint_head;
i < ct->list.m_n_in_table;
i++, p = p->clock_next)
{
assert(p->cachefile != cf);
}
ct->list.write_list_unlock(); ct->list.write_list_unlock();
if (cf) { if (cf) {
bjm_reset(cf->bjm); bjm_reset(cf->bjm);
} }
toku_free(list);
} }
/* Requires that no locks be held that are used by the checkpoint logic */ /* Requires that no locks be held that are used by the checkpoint logic */
...@@ -3225,6 +3246,7 @@ void pair_list::put(PAIR p) { ...@@ -3225,6 +3246,7 @@ void pair_list::put(PAIR p) {
assert(pp == NULL); assert(pp == NULL);
this->add_to_clock(p); this->add_to_clock(p);
this->add_to_cf_list(p);
uint32_t h = p->fullhash & (m_table_size - 1); uint32_t h = p->fullhash & (m_table_size - 1);
p->hash_chain = m_table[h]; p->hash_chain = m_table[h];
m_table[h] = p; m_table[h] = p;
...@@ -3238,6 +3260,7 @@ void pair_list::put(PAIR p) { ...@@ -3238,6 +3260,7 @@ void pair_list::put(PAIR p) {
void pair_list::evict(PAIR p) { void pair_list::evict(PAIR p) {
this->pair_remove(p); this->pair_remove(p);
this->pending_pairs_remove(p); this->pending_pairs_remove(p);
this->cf_pairs_remove(p);
assert(m_n_in_table > 0); assert(m_n_in_table > 0);
m_n_in_table--; m_n_in_table--;
...@@ -3306,6 +3329,23 @@ void pair_list::pending_pairs_remove (PAIR p) { ...@@ -3306,6 +3329,23 @@ void pair_list::pending_pairs_remove (PAIR p) {
p->pending_prev = p->pending_next = NULL; p->pending_prev = p->pending_next = NULL;
} }
// add the pair to the linked list that of PAIRs belonging
// to the same cachefile.
void pair_list::cf_pairs_remove(PAIR p) {
CACHEFILE cf = p->cachefile;
if (p->cf_next) {
p->cf_next->cf_prev = p->cf_prev;
}
if (p->cf_prev) {
p->cf_prev->cf_next = p->cf_next;
}
else if (p->cachefile->cf_head == p) {
cf->cf_head = p->cf_next;
}
p->cf_prev = p->cf_next = NULL;
cf->num_pairs--;
}
// Returns a pair from the pair list, using the given // Returns a pair from the pair list, using the given
// pair. If the pair cannot be found, null is returned. // pair. If the pair cannot be found, null is returned.
...@@ -3355,6 +3395,21 @@ void pair_list::add_to_clock (PAIR p) { ...@@ -3355,6 +3395,21 @@ void pair_list::add_to_clock (PAIR p) {
} }
} }
// add the pair to the linked list that of PAIRs belonging
// to the same cachefile. This linked list is used
// in cachetable_flush_cachefile.
void pair_list::add_to_cf_list(PAIR p) {
CACHEFILE cf = p->cachefile;
if (cf->cf_head) {
cf->cf_head->cf_prev = p;
}
p->cf_next = cf->cf_head;
p->cf_prev = NULL;
cf->cf_head = p;
cf->num_pairs++;
}
// test function // test function
// //
// grabs and releases write list lock // grabs and releases write list lock
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment