Commit 800921a7 authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #61,

 - have closed cachefiles not immedietely free pairs, but set them to the side
 - leave freeing of pairs to the evictor and/or shutdown
 - should a cachefile be reopened before all pairs are freed,
   the pairs belonging to that cachefile are reintegrated into the cachetable
parent dfb30afc
......@@ -380,7 +380,10 @@ class pair_list {
toku_pthread_rwlock_t m_pending_lock_cheap;
void init();
void destroy();
void evict(PAIR pair);
void evict_completely(PAIR pair);
void evict_from_cachetable(PAIR pair);
void evict_from_cachefile(PAIR pair);
void add_to_cachetable_only(PAIR p);
void put(PAIR pair);
PAIR find_pair(CACHEFILE file, CACHEKEY key, uint32_t hash);
void pending_pairs_remove (PAIR p);
......@@ -404,7 +407,6 @@ class pair_list {
private:
void pair_remove (PAIR p);
void cf_pairs_remove (PAIR p);
void remove_from_hash_chain(PAIR p);
void add_to_cf_list (PAIR p);
void add_to_clock (PAIR p);
......@@ -426,16 +428,25 @@ class cachefile_list {
int cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf);
int cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf);
void add_cf_unlocked(CACHEFILE newcf);
void add_stale_cf(CACHEFILE newcf);
void remove_cf(CACHEFILE cf);
void remove_stale_cf_unlocked(CACHEFILE cf);
FILENUM reserve_filenum();
uint32_t get_new_hash_id_unlocked();
CACHEFILE find_cachefile_unlocked(struct fileid* fileid);
CACHEFILE find_stale_cachefile_unlocked(struct fileid* fileid);
void verify_unused_filenum(FILENUM filenum);
bool evict_some_stale_pair(evictor* ev);
void free_stale_data(evictor* ev);
// access to these fields are protected by the lock
CACHEFILE m_active_head; // head of CACHEFILEs that are active
CACHEFILE m_stale_head; // head of CACHEFILEs that are stale
CACHEFILE m_stale_tail; // tail of CACHEFILEs that are stale
FILENUM m_next_filenum_to_use;
uint32_t m_next_hash_id_to_use;
toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD
private:
CACHEFILE find_cachefile_in_list_unlocked(CACHEFILE start, struct fileid* fileid);
};
......@@ -500,7 +511,7 @@ const int EVICTION_PERIOD = 1;
//
class evictor {
public:
void init(long _size_limit, pair_list* _pl, KIBBUTZ _kibbutz, uint32_t eviction_period);
void init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period);
void destroy();
void add_pair_attr(PAIR_ATTR attr);
void remove_pair_attr(PAIR_ATTR attr);
......@@ -533,6 +544,7 @@ class evictor {
int64_t unsafe_read_size_evicting(void) const;
pair_list* m_pl;
cachefile_list* m_cf_list;
int64_t m_size_current; // the sum of the sizes of the pairs in the cachetable
// changes to these two values are protected
// by ev_thread_lock
......
This diff is collapsed.
......@@ -164,10 +164,10 @@ int toku_cachetable_openf(CACHEFILE *,CACHETABLE, const char *fname_in_env, int
// Bind a file to a new cachefile object.
int toku_cachetable_openfd(CACHEFILE *,CACHETABLE, int fd,
const char *fname_relative_to_env);
const char *fname_relative_to_env);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int fd,
const char *fname_in_env,
FILENUM filenum);
const char *fname_in_env,
FILENUM filenum, bool* was_open);
// reserve a unique filenum
FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct);
......
......@@ -3612,6 +3612,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
FILENUM reserved_filenum;
reserved_filenum = use_filenum;
fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env);
bool was_already_open;
{
int fd = -1;
r = ft_open_file(fname_in_cwd, &fd);
......@@ -3631,13 +3632,12 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
if (r) { goto exit; }
}
if (r) { goto exit; }
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum);
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum, &was_already_open);
if (r) { goto exit; }
}
assert(ft_h->options.nodesize>0);
bool was_already_open;
if (is_create) {
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
toku_ft_create(&ft, &ft_h->options, cf, txn);
}
......@@ -3653,7 +3653,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
// so it is ok for toku_read_ft_and_store_in_cachefile to have read
// the header via toku_read_ft_and_store_in_cachefile
} else {
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
if (r) { goto exit; }
}
if (!ft_h->did_set_flags) {
......
......@@ -464,7 +464,7 @@ void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
}
// TODO: (Zardosht) get rid of brt parameter
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, bool* was_open)
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
// If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything.
// max_acceptable_lsn is the latest acceptable checkpointed version of the file.
......@@ -473,13 +473,11 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
FT h;
if ((h = (FT) toku_cachefile_get_userdata(cf))!=0) {
*header = h;
*was_open = true;
assert(brt->options.update_fun == h->update_fun);
assert(brt->options.compare_fun == h->compare_fun);
return 0;
}
}
*was_open = false;
FT h = nullptr;
int r;
{
......
......@@ -112,7 +112,7 @@ void toku_ft_release_reflock(FT ft);
void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_free (FT h);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, bool* was_open);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header);
void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live);
bool toku_ft_needed_unlocked(FT ft);
......
......@@ -230,7 +230,7 @@ void checkpointer_test::test_pending_bits() {
m_cp.turn_on_pending_bits();
assert(p.checkpoint_pending);
m_cp.m_list->evict(&p);
m_cp.m_list->evict_completely(&p);
//
// 3. Many hash chain entries.
......@@ -251,7 +251,7 @@ void checkpointer_test::test_pending_bits() {
uint32_t full_hash = toku_cachetable_hash(&cf, key);
PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash);
assert(pp);
m_cp.m_list->evict(pp);
m_cp.m_list->evict_completely(pp);
}
ctbl.list.destroy();
......@@ -389,7 +389,7 @@ void checkpointer_test::test_end_checkpoint() {
uint32_t full_hash = toku_cachetable_hash(&cf, key);
PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash);
assert(pp);
m_cp.m_list->evict(pp);
m_cp.m_list->evict_completely(pp);
}
m_cp.destroy();
ctbl.list.destroy();
......
......@@ -94,6 +94,7 @@ class evictor_unit_test {
public:
evictor m_ev;
pair_list m_pl;
cachefile_list m_cf_list;
KIBBUTZ m_kb;
void init();
void destroy();
......@@ -111,13 +112,16 @@ class evictor_unit_test {
// initialize this class to run tests
void evictor_unit_test::init() {
ZERO_STRUCT(m_pl);
ZERO_STRUCT(m_cf_list);
m_pl.init();
m_cf_list.init();
m_kb = toku_kibbutz_create(1);
}
// destroy class after tests have run
void evictor_unit_test::destroy() {
m_pl.destroy();
m_cf_list.destroy();
toku_kibbutz_destroy(m_kb);
}
......@@ -125,6 +129,7 @@ void evictor_unit_test::destroy() {
void evictor_unit_test::verify_ev_init(long limit) {
assert(m_ev.m_kibbutz == m_kb);
assert(m_ev.m_pl == &m_pl);
assert(m_ev.m_cf_list == &m_cf_list);
assert(m_ev.m_low_size_watermark == limit);
assert(m_ev.m_num_sleepers == 0);
assert(m_ev.m_run_thread == true);
......@@ -161,7 +166,7 @@ void evictor_unit_test::verify_ev_counts() {
long limit = 10;
long expected_m_size_reserved = limit/4;
ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0);
m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit);
m_ev.add_to_size_current(1);
......@@ -227,7 +232,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() {
long limit = 400;
long expected_m_size_reserved = 100; //limit/4
ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0);
m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit);
assert(m_ev.m_size_reserved == expected_m_size_reserved);
m_ev.m_num_eviction_thread_runs = 0;
......@@ -250,7 +255,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() {
void evictor_unit_test::verify_ev_handling_cache_pressure() {
long limit = 400;
ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0);
m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit);
m_ev.m_low_size_watermark = 400;
m_ev.m_low_size_hysteresis = 400;
......
......@@ -213,6 +213,8 @@ cachetable_test (void) {
// close and reopen cachefile so we can do some simple prefetch tests
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
//
// verify that a prefetch of the node will succeed
......
This diff is collapsed.
......@@ -116,6 +116,27 @@ PATENT RIGHTS GRANT:
// update operations.
//
static int remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
// TODO: Need to call before_db_open_hook() and after_db_open_hook()
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
return 0;
}
static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
......
......@@ -1635,27 +1635,6 @@ get_ith_table_name(char *buf, size_t len, int i) {
DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
// TODO: Need to call before_db_open_hook() and after_db_open_hook()
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
return 0;
}
// For each line of engine status output, look for lines that contain substrings
// that match any of the strings in the pattern string. The pattern string contains
// 0 or more strings separated by the '|' character, kind of like a regex.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment