Commit 64cdcbc2 authored by Zardosht Kasheff's avatar Zardosht Kasheff

refs #61,

 - have closed cachefiles not immedietely free pairs, but set them to the side
 - leave freeing of pairs to the evictor and/or shutdown
 - should a cachefile be reopened before all pairs are freed,
   the pairs belonging to that cachefile are reintegrated into the cachetable
parent 013bea40
...@@ -380,7 +380,10 @@ class pair_list { ...@@ -380,7 +380,10 @@ class pair_list {
toku_pthread_rwlock_t m_pending_lock_cheap; toku_pthread_rwlock_t m_pending_lock_cheap;
void init(); void init();
void destroy(); void destroy();
void evict(PAIR pair); void evict_completely(PAIR pair);
void evict_from_cachetable(PAIR pair);
void evict_from_cachefile(PAIR pair);
void add_to_cachetable_only(PAIR p);
void put(PAIR pair); void put(PAIR pair);
PAIR find_pair(CACHEFILE file, CACHEKEY key, uint32_t hash); PAIR find_pair(CACHEFILE file, CACHEKEY key, uint32_t hash);
void pending_pairs_remove (PAIR p); void pending_pairs_remove (PAIR p);
...@@ -404,7 +407,6 @@ class pair_list { ...@@ -404,7 +407,6 @@ class pair_list {
private: private:
void pair_remove (PAIR p); void pair_remove (PAIR p);
void cf_pairs_remove (PAIR p);
void remove_from_hash_chain(PAIR p); void remove_from_hash_chain(PAIR p);
void add_to_cf_list (PAIR p); void add_to_cf_list (PAIR p);
void add_to_clock (PAIR p); void add_to_clock (PAIR p);
...@@ -426,16 +428,25 @@ class cachefile_list { ...@@ -426,16 +428,25 @@ class cachefile_list {
int cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf); int cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf);
int cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf); int cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf);
void add_cf_unlocked(CACHEFILE newcf); void add_cf_unlocked(CACHEFILE newcf);
void add_stale_cf(CACHEFILE newcf);
void remove_cf(CACHEFILE cf); void remove_cf(CACHEFILE cf);
void remove_stale_cf_unlocked(CACHEFILE cf);
FILENUM reserve_filenum(); FILENUM reserve_filenum();
uint32_t get_new_hash_id_unlocked(); uint32_t get_new_hash_id_unlocked();
CACHEFILE find_cachefile_unlocked(struct fileid* fileid); CACHEFILE find_cachefile_unlocked(struct fileid* fileid);
CACHEFILE find_stale_cachefile_unlocked(struct fileid* fileid);
void verify_unused_filenum(FILENUM filenum); void verify_unused_filenum(FILENUM filenum);
bool evict_some_stale_pair(evictor* ev);
void free_stale_data(evictor* ev);
// access to these fields are protected by the lock // access to these fields are protected by the lock
CACHEFILE m_active_head; // head of CACHEFILEs that are active CACHEFILE m_active_head; // head of CACHEFILEs that are active
CACHEFILE m_stale_head; // head of CACHEFILEs that are stale
CACHEFILE m_stale_tail; // tail of CACHEFILEs that are stale
FILENUM m_next_filenum_to_use; FILENUM m_next_filenum_to_use;
uint32_t m_next_hash_id_to_use; uint32_t m_next_hash_id_to_use;
toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD
private:
CACHEFILE find_cachefile_in_list_unlocked(CACHEFILE start, struct fileid* fileid);
}; };
...@@ -500,7 +511,7 @@ const int EVICTION_PERIOD = 1; ...@@ -500,7 +511,7 @@ const int EVICTION_PERIOD = 1;
// //
class evictor { class evictor {
public: public:
void init(long _size_limit, pair_list* _pl, KIBBUTZ _kibbutz, uint32_t eviction_period); void init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period);
void destroy(); void destroy();
void add_pair_attr(PAIR_ATTR attr); void add_pair_attr(PAIR_ATTR attr);
void remove_pair_attr(PAIR_ATTR attr); void remove_pair_attr(PAIR_ATTR attr);
...@@ -533,6 +544,7 @@ class evictor { ...@@ -533,6 +544,7 @@ class evictor {
int64_t unsafe_read_size_evicting(void) const; int64_t unsafe_read_size_evicting(void) const;
pair_list* m_pl; pair_list* m_pl;
cachefile_list* m_cf_list;
int64_t m_size_current; // the sum of the sizes of the pairs in the cachetable int64_t m_size_current; // the sum of the sizes of the pairs in the cachetable
// changes to these two values are protected // changes to these two values are protected
// by ev_thread_lock // by ev_thread_lock
......
This diff is collapsed.
...@@ -164,10 +164,10 @@ int toku_cachetable_openf(CACHEFILE *,CACHETABLE, const char *fname_in_env, int ...@@ -164,10 +164,10 @@ int toku_cachetable_openf(CACHEFILE *,CACHETABLE, const char *fname_in_env, int
// Bind a file to a new cachefile object. // Bind a file to a new cachefile object.
int toku_cachetable_openfd(CACHEFILE *,CACHETABLE, int fd, int toku_cachetable_openfd(CACHEFILE *,CACHETABLE, int fd,
const char *fname_relative_to_env); const char *fname_relative_to_env);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int fd, int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int fd,
const char *fname_in_env, const char *fname_in_env,
FILENUM filenum); FILENUM filenum, bool* was_open);
// reserve a unique filenum // reserve a unique filenum
FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct); FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct);
......
...@@ -3612,6 +3612,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only ...@@ -3612,6 +3612,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
FILENUM reserved_filenum; FILENUM reserved_filenum;
reserved_filenum = use_filenum; reserved_filenum = use_filenum;
fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env); fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env);
bool was_already_open;
{ {
int fd = -1; int fd = -1;
r = ft_open_file(fname_in_cwd, &fd); r = ft_open_file(fname_in_cwd, &fd);
...@@ -3631,13 +3632,12 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only ...@@ -3631,13 +3632,12 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
if (r) { goto exit; } if (r) { goto exit; }
} }
if (r) { goto exit; } if (r) { goto exit; }
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum); r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum, &was_already_open);
if (r) { goto exit; } if (r) { goto exit; }
} }
assert(ft_h->options.nodesize>0); assert(ft_h->options.nodesize>0);
bool was_already_open;
if (is_create) { if (is_create) {
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open); r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
if (r==TOKUDB_DICTIONARY_NO_HEADER) { if (r==TOKUDB_DICTIONARY_NO_HEADER) {
toku_ft_create(&ft, &ft_h->options, cf, txn); toku_ft_create(&ft, &ft_h->options, cf, txn);
} }
...@@ -3653,7 +3653,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only ...@@ -3653,7 +3653,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
// so it is ok for toku_read_ft_and_store_in_cachefile to have read // so it is ok for toku_read_ft_and_store_in_cachefile to have read
// the header via toku_read_ft_and_store_in_cachefile // the header via toku_read_ft_and_store_in_cachefile
} else { } else {
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open); r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
if (r) { goto exit; } if (r) { goto exit; }
} }
if (!ft_h->did_set_flags) { if (!ft_h->did_set_flags) {
......
...@@ -464,7 +464,7 @@ void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) { ...@@ -464,7 +464,7 @@ void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
} }
// TODO: (Zardosht) get rid of brt parameter // TODO: (Zardosht) get rid of brt parameter
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, bool* was_open) int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
// If the cachefile already has the header, then just get it. // If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything. // If the cachefile has not been initialized, then don't modify anything.
// max_acceptable_lsn is the latest acceptable checkpointed version of the file. // max_acceptable_lsn is the latest acceptable checkpointed version of the file.
...@@ -473,13 +473,11 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac ...@@ -473,13 +473,11 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
FT h; FT h;
if ((h = (FT) toku_cachefile_get_userdata(cf))!=0) { if ((h = (FT) toku_cachefile_get_userdata(cf))!=0) {
*header = h; *header = h;
*was_open = true;
assert(brt->options.update_fun == h->update_fun); assert(brt->options.update_fun == h->update_fun);
assert(brt->options.compare_fun == h->compare_fun); assert(brt->options.compare_fun == h->compare_fun);
return 0; return 0;
} }
} }
*was_open = false;
FT h = nullptr; FT h = nullptr;
int r; int r;
{ {
......
...@@ -112,7 +112,7 @@ void toku_ft_release_reflock(FT ft); ...@@ -112,7 +112,7 @@ void toku_ft_release_reflock(FT ft);
void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn); void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_free (FT h); void toku_ft_free (FT h);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, bool* was_open); int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header);
void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live); void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live);
bool toku_ft_needed_unlocked(FT ft); bool toku_ft_needed_unlocked(FT ft);
......
...@@ -230,7 +230,7 @@ void checkpointer_test::test_pending_bits() { ...@@ -230,7 +230,7 @@ void checkpointer_test::test_pending_bits() {
m_cp.turn_on_pending_bits(); m_cp.turn_on_pending_bits();
assert(p.checkpoint_pending); assert(p.checkpoint_pending);
m_cp.m_list->evict(&p); m_cp.m_list->evict_completely(&p);
// //
// 3. Many hash chain entries. // 3. Many hash chain entries.
...@@ -251,7 +251,7 @@ void checkpointer_test::test_pending_bits() { ...@@ -251,7 +251,7 @@ void checkpointer_test::test_pending_bits() {
uint32_t full_hash = toku_cachetable_hash(&cf, key); uint32_t full_hash = toku_cachetable_hash(&cf, key);
PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash); PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash);
assert(pp); assert(pp);
m_cp.m_list->evict(pp); m_cp.m_list->evict_completely(pp);
} }
ctbl.list.destroy(); ctbl.list.destroy();
...@@ -389,7 +389,7 @@ void checkpointer_test::test_end_checkpoint() { ...@@ -389,7 +389,7 @@ void checkpointer_test::test_end_checkpoint() {
uint32_t full_hash = toku_cachetable_hash(&cf, key); uint32_t full_hash = toku_cachetable_hash(&cf, key);
PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash); PAIR pp = m_cp.m_list->find_pair(&cf, key, full_hash);
assert(pp); assert(pp);
m_cp.m_list->evict(pp); m_cp.m_list->evict_completely(pp);
} }
m_cp.destroy(); m_cp.destroy();
ctbl.list.destroy(); ctbl.list.destroy();
......
...@@ -94,6 +94,7 @@ class evictor_unit_test { ...@@ -94,6 +94,7 @@ class evictor_unit_test {
public: public:
evictor m_ev; evictor m_ev;
pair_list m_pl; pair_list m_pl;
cachefile_list m_cf_list;
KIBBUTZ m_kb; KIBBUTZ m_kb;
void init(); void init();
void destroy(); void destroy();
...@@ -111,13 +112,16 @@ class evictor_unit_test { ...@@ -111,13 +112,16 @@ class evictor_unit_test {
// initialize this class to run tests // initialize this class to run tests
void evictor_unit_test::init() { void evictor_unit_test::init() {
ZERO_STRUCT(m_pl); ZERO_STRUCT(m_pl);
ZERO_STRUCT(m_cf_list);
m_pl.init(); m_pl.init();
m_cf_list.init();
m_kb = toku_kibbutz_create(1); m_kb = toku_kibbutz_create(1);
} }
// destroy class after tests have run // destroy class after tests have run
void evictor_unit_test::destroy() { void evictor_unit_test::destroy() {
m_pl.destroy(); m_pl.destroy();
m_cf_list.destroy();
toku_kibbutz_destroy(m_kb); toku_kibbutz_destroy(m_kb);
} }
...@@ -125,6 +129,7 @@ void evictor_unit_test::destroy() { ...@@ -125,6 +129,7 @@ void evictor_unit_test::destroy() {
void evictor_unit_test::verify_ev_init(long limit) { void evictor_unit_test::verify_ev_init(long limit) {
assert(m_ev.m_kibbutz == m_kb); assert(m_ev.m_kibbutz == m_kb);
assert(m_ev.m_pl == &m_pl); assert(m_ev.m_pl == &m_pl);
assert(m_ev.m_cf_list == &m_cf_list);
assert(m_ev.m_low_size_watermark == limit); assert(m_ev.m_low_size_watermark == limit);
assert(m_ev.m_num_sleepers == 0); assert(m_ev.m_num_sleepers == 0);
assert(m_ev.m_run_thread == true); assert(m_ev.m_run_thread == true);
...@@ -161,7 +166,7 @@ void evictor_unit_test::verify_ev_counts() { ...@@ -161,7 +166,7 @@ void evictor_unit_test::verify_ev_counts() {
long limit = 10; long limit = 10;
long expected_m_size_reserved = limit/4; long expected_m_size_reserved = limit/4;
ZERO_STRUCT(m_ev); ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0); m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit); this->verify_ev_init(limit);
m_ev.add_to_size_current(1); m_ev.add_to_size_current(1);
...@@ -227,7 +232,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() { ...@@ -227,7 +232,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() {
long limit = 400; long limit = 400;
long expected_m_size_reserved = 100; //limit/4 long expected_m_size_reserved = 100; //limit/4
ZERO_STRUCT(m_ev); ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0); m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit); this->verify_ev_init(limit);
assert(m_ev.m_size_reserved == expected_m_size_reserved); assert(m_ev.m_size_reserved == expected_m_size_reserved);
m_ev.m_num_eviction_thread_runs = 0; m_ev.m_num_eviction_thread_runs = 0;
...@@ -250,7 +255,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() { ...@@ -250,7 +255,7 @@ void evictor_unit_test::verify_ev_m_size_reserved() {
void evictor_unit_test::verify_ev_handling_cache_pressure() { void evictor_unit_test::verify_ev_handling_cache_pressure() {
long limit = 400; long limit = 400;
ZERO_STRUCT(m_ev); ZERO_STRUCT(m_ev);
m_ev.init(limit, &m_pl, m_kb, 0); m_ev.init(limit, &m_pl, &m_cf_list, m_kb, 0);
this->verify_ev_init(limit); this->verify_ev_init(limit);
m_ev.m_low_size_watermark = 400; m_ev.m_low_size_watermark = 400;
m_ev.m_low_size_hysteresis = 400; m_ev.m_low_size_hysteresis = 400;
......
...@@ -213,6 +213,8 @@ cachetable_test (void) { ...@@ -213,6 +213,8 @@ cachetable_test (void) {
// close and reopen cachefile so we can do some simple prefetch tests // close and reopen cachefile so we can do some simple prefetch tests
toku_cachefile_close(&f1, false, ZERO_LSN); toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// //
// verify that a prefetch of the node will succeed // verify that a prefetch of the node will succeed
......
This diff is collapsed.
...@@ -116,6 +116,27 @@ PATENT RIGHTS GRANT: ...@@ -116,6 +116,27 @@ PATENT RIGHTS GRANT:
// update operations. // update operations.
// //
static int remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
// TODO: Need to call before_db_open_hook() and after_db_open_hook()
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
return 0;
}
static void static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// //
......
...@@ -1635,27 +1635,6 @@ get_ith_table_name(char *buf, size_t len, int i) { ...@@ -1635,27 +1635,6 @@ get_ith_table_name(char *buf, size_t len, int i) {
DB_TXN * const null_txn = 0; DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
// TODO: Need to call before_db_open_hook() and after_db_open_hook()
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
return 0;
}
// For each line of engine status output, look for lines that contain substrings // For each line of engine status output, look for lines that contain substrings
// that match any of the strings in the pattern string. The pattern string contains // that match any of the strings in the pattern string. The pattern string contains
// 0 or more strings separated by the '|' character, kind of like a regex. // 0 or more strings separated by the '|' character, kind of like a regex.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment