Commit 18c2c68f authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2164 refs[t:2164] Zombie locking contract now enforced in rename/remove

Checkpoint holds entire brt open (as zombie if necessary) instead of just holding open the cachefile.

git-svn-id: file:///svn/toku/tokudb.2037b@15740 c7de825b-a66e-492c-adef-691d508d4ae1
parent a4f65b11
...@@ -196,8 +196,9 @@ struct brt { ...@@ -196,8 +196,9 @@ struct brt {
DB *db; // To pass to the compare fun, and close once transactions are done. DB *db; // To pass to the compare fun, and close once transactions are done.
OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also) OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also)
int pinned_by_checkpoint; //Keep this brt around for checkpoint, like a transaction
int was_closed; //True when this brt was closed, but is being kept around for transactions. int was_closed; //True when this brt was closed, but is being kept around for transactions (or checkpoint).
int (*close_db)(DB*, u_int32_t); int (*close_db)(DB*, u_int32_t);
u_int32_t close_flags; u_int32_t close_flags;
......
...@@ -2867,6 +2867,9 @@ brtheader_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { ...@@ -2867,6 +2867,9 @@ brtheader_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
} }
static int brtheader_note_pin_by_checkpoint (CACHEFILE cachefile, void *header_v);
static int brtheader_note_unpin_by_checkpoint (CACHEFILE cachefile, void *header_v);
static int static int
brt_init_header_partial (BRT t) { brt_init_header_partial (BRT t) {
int r; int r;
...@@ -2904,7 +2907,15 @@ brt_init_header_partial (BRT t) { ...@@ -2904,7 +2907,15 @@ brt_init_header_partial (BRT t) {
BLOCKNUM root = t->h->root; BLOCKNUM root = t->h->root;
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; } if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
toku_cachefile_set_userdata(t->cf, t->h, brtheader_log_fassociate_during_checkpoint, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint); toku_cachefile_set_userdata(t->cf,
t->h,
brtheader_log_fassociate_during_checkpoint,
toku_brtheader_close,
toku_brtheader_checkpoint,
toku_brtheader_begin_checkpoint,
toku_brtheader_end_checkpoint,
brtheader_note_pin_by_checkpoint,
brtheader_note_unpin_by_checkpoint);
return r; return r;
} }
...@@ -2968,7 +2979,15 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header ...@@ -2968,7 +2979,15 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
if (r!=0) return r; if (r!=0) return r;
h->cf = cf; h->cf = cf;
h->root_put_counter = global_root_put_counter++; h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, brtheader_log_fassociate_during_checkpoint, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint); toku_cachefile_set_userdata(cf,
(void*)h,
brtheader_log_fassociate_during_checkpoint,
toku_brtheader_close,
toku_brtheader_checkpoint,
toku_brtheader_begin_checkpoint,
toku_brtheader_end_checkpoint,
brtheader_note_pin_by_checkpoint,
brtheader_note_unpin_by_checkpoint);
*header = h; *header = h;
return 0; return 0;
} }
...@@ -3247,6 +3266,84 @@ toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), LSN checkpoint_lsn, vo ...@@ -3247,6 +3266,84 @@ toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), LSN checkpoint_lsn, vo
return r; return r;
} }
int
toku_brt_zombie_needed(BRT zombie) {
return toku_omt_size(zombie->txns) != 0 || zombie->pinned_by_checkpoint;
}
//Must be protected by ydb lock.
//Is only called by checkpoint begin, which holds it
static int
brtheader_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
{
//Set arbitrary brt (for given header) as pinned by checkpoint.
//Only one can be pinned (only one checkpoint at a time), but not worth verifying.
struct brt_header *h = header_v;
BRT brt_to_pin;
if (!toku_list_empty(&h->live_brts)) {
brt_to_pin = toku_list_struct(toku_list_head(&h->live_brts), struct brt, live_brt_link);
}
else {
//Header exists, so at least one brt must. No live means at least one zombie.
assert(!toku_list_empty(&h->zombie_brts));
brt_to_pin = toku_list_struct(toku_list_head(&h->zombie_brts), struct brt, zombie_brt_link);
}
assert(!brt_to_pin->pinned_by_checkpoint);
brt_to_pin->pinned_by_checkpoint = 1;
return 0;
}
//Must be protected by ydb lock.
//Called by end_checkpoint, which grabs ydb lock around note_unpin
static int
brtheader_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
{
//Must find which brt for this header is pinned, and unpin it.
//Once found, we might have to close it if it was user closed and no txns touch it.
//
//HOW do you loop through a 'list'????
struct brt_header *h = header_v;
BRT brt_to_unpin = NULL;
if (!toku_list_empty(&h->live_brts)) {
struct toku_list *list;
for (list = h->live_brts.next; list != &h->live_brts; list = list->next) {
BRT candidate;
candidate = toku_list_struct(list, struct brt, live_brt_link);
if (candidate->pinned_by_checkpoint) {
brt_to_unpin = candidate;
break;
}
}
}
if (!brt_to_unpin) {
//Header exists, something is pinned, so exactly one zombie must be pinned
assert(!toku_list_empty(&h->zombie_brts));
struct toku_list *list;
for (list = h->zombie_brts.next; list != &h->zombie_brts; list = list->next) {
BRT candidate;
candidate = toku_list_struct(list, struct brt, zombie_brt_link);
if (candidate->pinned_by_checkpoint) {
brt_to_unpin = candidate;
break;
}
}
}
assert(brt_to_unpin);
assert(brt_to_unpin->pinned_by_checkpoint);
brt_to_unpin->pinned_by_checkpoint = 0; //Unpin
int r = 0;
//Close if necessary
if (brt_to_unpin->was_closed && !toku_brt_zombie_needed(brt_to_unpin)) {
//Close immediately.
assert(brt_to_unpin->close_db);
r = brt_to_unpin->close_db(brt_to_unpin->db, brt_to_unpin->close_flags);
}
return r;
}
// Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer). // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
int int
toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v) toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v)
...@@ -3379,7 +3476,7 @@ toku_brt_db_delay_closed (BRT zombie, DB* db, int (*close_db)(DB*, u_int32_t), u ...@@ -3379,7 +3476,7 @@ toku_brt_db_delay_closed (BRT zombie, DB* db, int (*close_db)(DB*, u_int32_t), u
zombie->close_flags = close_flags; zombie->close_flags = close_flags;
zombie->was_closed = 1; zombie->was_closed = 1;
if (!zombie->db) zombie->db = db; if (!zombie->db) zombie->db = db;
if (toku_omt_size(zombie->txns) == 0) { if (!toku_brt_zombie_needed(zombie)) {
//Close immediately. //Close immediately.
r = zombie->close_db(zombie->db, zombie->close_flags); r = zombie->close_db(zombie->db, zombie->close_flags);
} }
...@@ -3408,7 +3505,7 @@ toku_brt_db_delay_closed (BRT zombie, DB* db, int (*close_db)(DB*, u_int32_t), u ...@@ -3408,7 +3505,7 @@ toku_brt_db_delay_closed (BRT zombie, DB* db, int (*close_db)(DB*, u_int32_t), u
} }
int toku_close_brt_lsn (BRT brt, char **error_string, BOOL oplsn_valid, LSN oplsn) { int toku_close_brt_lsn (BRT brt, char **error_string, BOOL oplsn_valid, LSN oplsn) {
assert(toku_omt_size(brt->txns)==0); assert(!toku_brt_zombie_needed(brt));
int r; int r;
while (!toku_list_empty(&brt->cursors)) { while (!toku_list_empty(&brt->cursors)) {
BRT_CURSOR c = toku_list_struct(toku_list_pop(&brt->cursors), struct brt_cursor, cursors_link); BRT_CURSOR c = toku_list_struct(toku_list_pop(&brt->cursors), struct brt_cursor, cursors_link);
...@@ -5102,12 +5199,24 @@ int toku_brt_remove_on_commit(TOKUTXN txn, DBT* iname_dbt_p, DBT* iname_within_c ...@@ -5102,12 +5199,24 @@ int toku_brt_remove_on_commit(TOKUTXN txn, DBT* iname_dbt_p, DBT* iname_within_c
CACHEFILE cf = NULL; CACHEFILE cf = NULL;
u_int8_t was_open = 0; u_int8_t was_open = 0;
FILENUM filenum = {0}; FILENUM filenum = {0};
//We need to hold a reference (to cf) for an fdelete because brt might not be open (and only cf is open).
//Normal txn operations grab reference to brt instead. r = toku_cachefile_of_iname(txn->logger->ct, iname, &cf);
r = toku_cachefile_of_iname_and_add_reference(txn->logger->ct, iname, &cf);
if (r == 0) { if (r == 0) {
was_open = TRUE; was_open = TRUE;
filenum = toku_cachefile_filenum(cf); filenum = toku_cachefile_filenum(cf);
struct brt_header *h = toku_cachefile_get_userdata(cf);
BRT brt;
//Any arbitrary brt of that header is fine.
if (!toku_list_empty(&h->live_brts)) {
brt = toku_list_struct(toku_list_head(&h->live_brts), struct brt, live_brt_link);
}
else {
//Header exists, so at least one brt must. No live means at least one zombie.
assert(!toku_list_empty(&h->zombie_brts));
brt = toku_list_struct(toku_list_head(&h->zombie_brts), struct brt, zombie_brt_link);
}
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
} }
else else
assert(r==ENOENT); assert(r==ENOENT);
...@@ -5136,13 +5245,10 @@ int toku_brt_remove_now(CACHETABLE ct, DBT* iname_dbt_p, DBT* iname_within_cwd_d ...@@ -5136,13 +5245,10 @@ int toku_brt_remove_now(CACHETABLE ct, DBT* iname_dbt_p, DBT* iname_within_cwd_d
int r; int r;
const char *iname = iname_dbt_p->data; const char *iname = iname_dbt_p->data;
CACHEFILE cf; CACHEFILE cf;
r = toku_cachefile_of_iname_and_add_reference(ct, iname, &cf); r = toku_cachefile_of_iname(ct, iname, &cf);
if (r == 0) { if (r == 0) {
char *error_string = NULL;
r = toku_cachefile_redirect_nullfd(cf); r = toku_cachefile_redirect_nullfd(cf);
assert(r==0); assert(r==0);
r = toku_cachefile_close(&cf, &error_string, FALSE, ZERO_LSN);
assert(r==0);
} }
else else
assert(r==ENOENT); assert(r==ENOENT);
......
...@@ -193,6 +193,8 @@ int maybe_preallocate_in_file (int fd, u_int64_t size); ...@@ -193,6 +193,8 @@ int maybe_preallocate_in_file (int fd, u_int64_t size);
int toku_brt_note_table_lock (BRT brt, TOKUTXN txn); int toku_brt_note_table_lock (BRT brt, TOKUTXN txn);
// Effect: Record the fact that the BRT has a table lock (and thus no other txn will modify it until this txn completes. As a result, we can limit the amount of information in the rollback data structure. // Effect: Record the fact that the BRT has a table lock (and thus no other txn will modify it until this txn completes. As a result, we can limit the amount of information in the rollback data structure.
int toku_brt_zombie_needed (BRT brt);
//TODO: #1485 once we have multiple main threads, restore this code, analyze performance. //TODO: #1485 once we have multiple main threads, restore this code, analyze performance.
#ifndef TOKU_MULTIPLE_MAIN_THREADS #ifndef TOKU_MULTIPLE_MAIN_THREADS
#define TOKU_MULTIPLE_MAIN_THREADS 0 #define TOKU_MULTIPLE_MAIN_THREADS 0
......
...@@ -228,6 +228,8 @@ struct cachefile { ...@@ -228,6 +228,8 @@ struct cachefile {
int (*begin_checkpoint_userdata)(CACHEFILE cf, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. int (*begin_checkpoint_userdata)(CACHEFILE cf, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function. int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function. int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function.
int (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
int (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously
toku_pthread_cond_t closefd_wait; // toku_cachefile_of_iname_and_add_reference() must wait until file is fully closed (purged from cachetable) if run while file is being closed. toku_pthread_cond_t closefd_wait; // toku_cachefile_of_iname_and_add_reference() must wait until file is fully closed (purged from cachetable) if run while file is being closed.
u_int32_t closefd_waiting; // Number of threads waiting on closefd_wait (0 or 1, error otherwise). u_int32_t closefd_waiting; // Number of threads waiting on closefd_wait (0 or 1, error otherwise).
...@@ -240,15 +242,10 @@ checkpoint_thread (void *cachetable_v) ...@@ -240,15 +242,10 @@ checkpoint_thread (void *cachetable_v)
// If someone sets the checkpoint_shutdown boolean , then this thread exits. // If someone sets the checkpoint_shutdown boolean , then this thread exits.
// This thread notices those changes by waiting on a condition variable. // This thread notices those changes by waiting on a condition variable.
{ {
char *error_string;
CACHETABLE ct = cachetable_v; CACHETABLE ct = cachetable_v;
int r = toku_checkpoint(ct, ct->logger, &error_string, NULL, NULL, NULL, NULL); int r = toku_checkpoint(ct, ct->logger, NULL, NULL, NULL, NULL);
if (r) { if (r) {
if (error_string) { fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r);
fprintf(stderr, "%s:%d Got error %d while doing: %s\n", __FILE__, __LINE__, r, error_string);
} else {
fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r);
}
abort(); // Don't quite know what to do with these errors. abort(); // Don't quite know what to do with these errors.
} }
return r; return r;
...@@ -297,9 +294,9 @@ cachefile_refup (CACHEFILE cf) { ...@@ -297,9 +294,9 @@ cachefile_refup (CACHEFILE cf) {
// the close has finished. // the close has finished.
// Once the close has finished, there must not be a cachefile with that name // Once the close has finished, there must not be a cachefile with that name
// in the cachetable. // in the cachetable.
int toku_cachefile_of_iname_and_add_reference (CACHETABLE ct, const char *iname, CACHEFILE *cf) { int toku_cachefile_of_iname (CACHETABLE ct, const char *iname, CACHEFILE *cf) {
BOOL restarted = FALSE; BOOL restarted = FALSE;
cachetable_lock(ct); cachefiles_lock(ct);
CACHEFILE extant; CACHEFILE extant;
int r; int r;
restart: restart:
...@@ -317,13 +314,12 @@ restart: ...@@ -317,13 +314,12 @@ restart:
restarted = TRUE; restarted = TRUE;
goto restart; //Restart and verify that it is not found in the second loop. goto restart; //Restart and verify that it is not found in the second loop.
} }
cachefile_refup(extant);
*cf = extant; *cf = extant;
r = 0; r = 0;
break; break;
} }
} }
cachetable_unlock(ct); cachefiles_unlock(ct);
return r; return r;
} }
...@@ -1852,7 +1848,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1852,7 +1848,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
assert(cf->refcount>0); //Must have a reference if not closing. assert(cf->refcount>0); //Must have a reference if not closing.
//Incremement reference count of cachefile because we're using it for the checkpoint. //Incremement reference count of cachefile because we're using it for the checkpoint.
//This will prevent closing during the checkpoint. //This will prevent closing during the checkpoint.
cachefile_refup(cf); int r = cf->note_pin_by_checkpoint(cf, cf->userdata);
assert(r==0);
cf->next_in_checkpoint = ct->cachefiles_in_checkpoint; cf->next_in_checkpoint = ct->cachefiles_in_checkpoint;
ct->cachefiles_in_checkpoint = cf; ct->cachefiles_in_checkpoint = cf;
cf->for_checkpoint = TRUE; cf->for_checkpoint = TRUE;
...@@ -1933,7 +1930,9 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1933,7 +1930,9 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
int int
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*testcallback_f)(void*), void * testextra) { toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
void (*ydb_lock)(void), void (*ydb_unlock)(void),
void (*testcallback_f)(void*), void * testextra) {
// Requires: The big checkpoint lock must be held (see checkpoint.c). // Requires: The big checkpoint lock must be held (see checkpoint.c).
// Algorithm: Write all pending nodes to disk // Algorithm: Write all pending nodes to disk
// Use checkpoint callback to write snapshot information to disk (header, btt) // Use checkpoint callback to write snapshot information to disk (header, btt)
...@@ -1995,7 +1994,9 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_st ...@@ -1995,7 +1994,9 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_st
ct->cachefiles_in_checkpoint = cf->next_in_checkpoint; ct->cachefiles_in_checkpoint = cf->next_in_checkpoint;
cf->next_in_checkpoint = NULL; cf->next_in_checkpoint = NULL;
cf->for_checkpoint = FALSE; cf->for_checkpoint = FALSE;
int r = toku_cachefile_close(&cf, error_string, FALSE, ZERO_LSN); ydb_lock();
int r = cf->note_unpin_by_checkpoint(cf, cf->userdata);
ydb_unlock();
if (r!=0) { if (r!=0) {
retval = r; retval = r;
goto panic; goto panic;
...@@ -2160,13 +2161,17 @@ toku_cachefile_set_userdata (CACHEFILE cf, ...@@ -2160,13 +2161,17 @@ toku_cachefile_set_userdata (CACHEFILE cf,
int (*close_userdata)(CACHEFILE, void*, char**, BOOL, LSN), int (*close_userdata)(CACHEFILE, void*, char**, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, void*), int (*checkpoint_userdata)(CACHEFILE, void*),
int (*begin_checkpoint_userdata)(CACHEFILE, LSN, void*), int (*begin_checkpoint_userdata)(CACHEFILE, LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, void*)) { int (*end_checkpoint_userdata)(CACHEFILE, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
cf->userdata = userdata; cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->close_userdata = close_userdata; cf->close_userdata = close_userdata;
cf->checkpoint_userdata = checkpoint_userdata; cf->checkpoint_userdata = checkpoint_userdata;
cf->begin_checkpoint_userdata = begin_checkpoint_userdata; cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
cf->end_checkpoint_userdata = end_checkpoint_userdata; cf->end_checkpoint_userdata = end_checkpoint_userdata;
cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
} }
void *toku_cachefile_get_userdata(CACHEFILE cf) { void *toku_cachefile_get_userdata(CACHEFILE cf) {
......
...@@ -49,11 +49,13 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf); ...@@ -49,11 +49,13 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
// What is the cachefile that goes with a particular iname? // What is the cachefile that goes with a particular iname?
// During a transaction, we cannot reuse an iname. // During a transaction, we cannot reuse an iname.
int toku_cachefile_of_iname_and_add_reference (CACHETABLE ct, const char *iname, CACHEFILE *cf); int toku_cachefile_of_iname (CACHETABLE ct, const char *iname, CACHEFILE *cf);
// TODO: #1510 Add comments on how these behave // TODO: #1510 Add comments on how these behave
int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER); int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER);
int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*testcallback_f)(void*), void * testextra); int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger,
void (*ydb_lock)(void), void (*ydb_unlock)(void),
void (*testcallback_f)(void*), void * testextra);
// Shuts down checkpoint thread // Shuts down checkpoint thread
// Requires no locks be held that are taken by the checkpoint function // Requires no locks be held that are taken by the checkpoint function
...@@ -102,7 +104,14 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, ...@@ -102,7 +104,14 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value,
// associated with the key are returned. // associated with the key are returned.
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs); typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*log_fassociate_during_checkpoint)(CACHEFILE, void*), int (*close_userdata)(CACHEFILE, void*, char **/*error_string*/, BOOL, LSN), int (*checkpoint_userdata)(CACHEFILE, void*), int (*begin_checkpoint_userdata)(CACHEFILE, LSN, void*), int (*end_checkpoint_userdata)(CACHEFILE, void*)); void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, void*, char **/*error_string*/, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, void*),
int (*begin_checkpoint_userdata)(CACHEFILE, LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata(). // Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// Before starting a checkpoint, we call checkpoint_prepare_userdata(). // Before starting a checkpoint, we call checkpoint_prepare_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata(). // When the cachefile needs to be checkpointed, we call checkpoint_userdata().
......
...@@ -202,7 +202,7 @@ toku_checkpoint_destroy(void) { ...@@ -202,7 +202,7 @@ toku_checkpoint_destroy(void) {
// Take a checkpoint of all currently open dictionaries // Take a checkpoint of all currently open dictionaries
int int
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
void (*callback_f)(void*), void * extra, void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2) { void (*callback2_f)(void*), void * extra2) {
int r; int r;
...@@ -227,7 +227,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, ...@@ -227,7 +227,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string,
if (r==0) { if (r==0) {
if (callback_f) if (callback_f)
callback_f(extra); // callback is called with checkpoint_safe_lock still held callback_f(extra); // callback is called with checkpoint_safe_lock still held
r = toku_cachetable_end_checkpoint(ct, logger, error_string, callback2_f, extra2); r = toku_cachetable_end_checkpoint(ct, logger, ydb_lock, ydb_unlock, callback2_f, extra2);
} }
if (r==0 && logger) { if (r==0 && logger) {
LSN trim_lsn = (oldest_live_lsn.lsn < logger->checkpoint_lsn.lsn) ? oldest_live_lsn : logger->checkpoint_lsn; LSN trim_lsn = (oldest_live_lsn.lsn < logger->checkpoint_lsn.lsn) ? oldest_live_lsn : logger->checkpoint_lsn;
......
...@@ -54,7 +54,7 @@ int toku_checkpoint_destroy(void); ...@@ -54,7 +54,7 @@ int toku_checkpoint_destroy(void);
// Take a checkpoint of all currently open dictionaries // Take a checkpoint of all currently open dictionaries
// Callbacks are called during checkpoint procedure while checkpoint_safe lock is still held. // Callbacks are called during checkpoint procedure while checkpoint_safe lock is still held.
// Callbacks are primarily intended for use in testing. // Callbacks are primarily intended for use in testing.
int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger,
void (*callback_f)(void*), void * extra, void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2); void (*callback2_f)(void*), void * extra2);
......
...@@ -893,7 +893,7 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di ...@@ -893,7 +893,7 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
assert(r == 0); assert(r == 0);
// checkpoint // checkpoint
r = toku_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
r = chdir(org_wd); r = chdir(org_wd);
......
...@@ -17,7 +17,7 @@ toku_commit_fdelete (u_int8_t file_was_open, ...@@ -17,7 +17,7 @@ toku_commit_fdelete (u_int8_t file_was_open,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void *UU(yield_v), void *UU(yield_v),
LSN oplsn) //oplsn is the lsn of the commit LSN UU(oplsn)) //oplsn is the lsn of the commit
{ {
//TODO: #2037 verify the file is (user) closed //TODO: #2037 verify the file is (user) closed
char *fname = fixup_fname(&bs_fname); char *fname = fixup_fname(&bs_fname);
...@@ -35,9 +35,6 @@ toku_commit_fdelete (u_int8_t file_was_open, ...@@ -35,9 +35,6 @@ toku_commit_fdelete (u_int8_t file_was_open,
} }
r = toku_cachefile_redirect_nullfd(cf); r = toku_cachefile_redirect_nullfd(cf);
assert(r==0); assert(r==0);
char *error_string = NULL;
r = toku_cachefile_close(&cf, &error_string, TRUE, oplsn);
assert(r==0);
} }
r = unlink(fname); // pathname relative to cwd r = unlink(fname); // pathname relative to cwd
assert(r==0); assert(r==0);
...@@ -52,22 +49,10 @@ toku_rollback_fdelete (u_int8_t UU(file_was_open), ...@@ -52,22 +49,10 @@ toku_rollback_fdelete (u_int8_t UU(file_was_open),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield), YIELDF UU(yield),
void* UU(yield_v), void* UU(yield_v),
LSN oplsn) //oplsn is the lsn of the abort LSN UU(oplsn)) //oplsn is the lsn of the abort
{ {
//TODO: #2037 verify the file is (user) closed //Rolling back an fdelete is an no-op.
//Rolling back an fdelete is (almost) a no-op. return 0;
//If the rollback entry is holding a reference to the cachefile, remove the reference.
int r = 0;
if (file_was_open) {
CACHEFILE cf;
r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r == 0);
char *error_string = NULL;
// decrement refcount that was incremented in toku_brt_remove_on_commit()
r = toku_cachefile_close(&cf, &error_string, TRUE, oplsn);
assert(r==0);
}
return r;
} }
int int
......
...@@ -361,6 +361,13 @@ static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) { ...@@ -361,6 +361,13 @@ static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) {
} }
int toku_txn_note_swap_brt (BRT live, BRT zombie) { int toku_txn_note_swap_brt (BRT live, BRT zombie) {
if (zombie->pinned_by_checkpoint) {
//Swap checkpoint responsibility.
assert(!live->pinned_by_checkpoint);
live->pinned_by_checkpoint = 1;
zombie->pinned_by_checkpoint = 0;
}
struct swap_brt_extra swap = {.live = live, .zombie = zombie}; struct swap_brt_extra swap = {.live = live, .zombie = zombie};
int r = toku_omt_iterate(zombie->txns, swap_brt, &swap); int r = toku_omt_iterate(zombie->txns, swap_brt, &swap);
assert(r==0); assert(r==0);
...@@ -368,6 +375,7 @@ int toku_txn_note_swap_brt (BRT live, BRT zombie) { ...@@ -368,6 +375,7 @@ int toku_txn_note_swap_brt (BRT live, BRT zombie) {
//Close immediately. //Close immediately.
assert(zombie->close_db); assert(zombie->close_db);
assert(!toku_brt_zombie_needed(zombie));
r = zombie->close_db(zombie->db, zombie->close_flags); r = zombie->close_db(zombie->db, zombie->close_flags);
return r; return r;
} }
...@@ -406,7 +414,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) { ...@@ -406,7 +414,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) { if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = 0; brt->h->txnid_that_created_or_locked_when_empty = 0;
} }
if (toku_omt_size(brt->txns)==0 && brt->was_closed) { if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately. //Close immediately.
assert(brt->close_db); assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags); r = brt->close_db(brt->db, brt->close_flags);
......
...@@ -87,7 +87,7 @@ do_update (void *UU(ignore)) ...@@ -87,7 +87,7 @@ do_update (void *UU(ignore))
static void* static void*
do_checkpoint (void *UU(v)) do_checkpoint (void *UU(v))
{ {
int r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); int r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
...@@ -98,6 +98,10 @@ do_checkpoint (void *UU(v)) ...@@ -98,6 +98,10 @@ do_checkpoint (void *UU(v))
// make sure that the stuff that was checkpointed includes only the old versions // make sure that the stuff that was checkpointed includes only the old versions
// then do a flush and make sure the new items are written // then do a flush and make sure the new items are written
static int dummy_pin_unpin(CACHEFILE UU(cfu), void* UU(v)) {
return 0;
}
static void checkpoint_pending(void) { static void checkpoint_pending(void) {
if (verbose) printf("%s:%d n=%d\n", __FUNCTION__, __LINE__, N); if (verbose) printf("%s:%d n=%d\n", __FUNCTION__, __LINE__, N);
const int test_limit = N; const int test_limit = N;
...@@ -106,6 +110,8 @@ static void checkpoint_pending(void) { ...@@ -106,6 +110,8 @@ static void checkpoint_pending(void) {
char fname1[] = __FILE__ "test1.dat"; char fname1[] = __FILE__ "test1.dat";
unlink(fname1); unlink(fname1);
r = toku_cachetable_openf(&cf, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&cf, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(cf, NULL, NULL, NULL, NULL, NULL, NULL,
dummy_pin_unpin, dummy_pin_unpin);
// Insert items into the cachetable. All dirty. // Insert items into the cachetable. All dirty.
int i; int i;
...@@ -136,14 +142,14 @@ static void checkpoint_pending(void) { ...@@ -136,14 +142,14 @@ static void checkpoint_pending(void) {
//printf("E43\n"); //printf("E43\n");
n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43; n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
assert(n_flush == N && n_write_me == N && n_keep_me == N); assert(n_flush == N && n_write_me == N && n_keep_me == N);
// a subsequent checkpoint should cause no flushes, or writes since all of the items are clean // a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
n_flush = n_write_me = n_keep_me = n_fetch = 0; n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0); assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);
......
...@@ -46,6 +46,10 @@ static void checkpoint_callback2(void * extra) { ...@@ -46,6 +46,10 @@ static void checkpoint_callback2(void * extra) {
// put n items into the cachetable, maybe mark them dirty, do a checkpoint, and // put n items into the cachetable, maybe mark them dirty, do a checkpoint, and
// verify that all of the items have been written and are clean. // verify that all of the items have been written and are clean.
static int dummy_pin_unpin(CACHEFILE UU(cfu), void* UU(v)) {
return 0;
}
static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
if (verbose) printf("%s:%d n=%d dirty=%d\n", __FUNCTION__, __LINE__, n, (int) dirty); if (verbose) printf("%s:%d n=%d dirty=%d\n", __FUNCTION__, __LINE__, n, (int) dirty);
const int test_limit = n; const int test_limit = n;
...@@ -56,6 +60,8 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { ...@@ -56,6 +60,8 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
unlink(fname1); unlink(fname1);
CACHEFILE f1; CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(f1, NULL, NULL, NULL, NULL, NULL, NULL,
dummy_pin_unpin, dummy_pin_unpin);
// insert items into the cachetable. all should be dirty // insert items into the cachetable. all should be dirty
int i; int i;
...@@ -84,7 +90,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { ...@@ -84,7 +90,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
// all items should be kept in the cachetable // all items should be kept in the cachetable
n_flush = n_write_me = n_keep_me = n_fetch = 0; n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, checkpoint_callback, &callback_was_called, checkpoint_callback2, &callback2_was_called); r = toku_checkpoint(ct, NULL, checkpoint_callback, &callback_was_called, checkpoint_callback2, &callback2_was_called);
assert(r == 0); assert(r == 0);
assert(callback_was_called != 0); assert(callback_was_called != 0);
assert(callback2_was_called != 0); assert(callback2_was_called != 0);
...@@ -116,7 +122,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { ...@@ -116,7 +122,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
n_flush = n_write_me = n_keep_me = n_fetch = 0; n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0); assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);
......
...@@ -31,6 +31,9 @@ static int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void **value, l ...@@ -31,6 +31,9 @@ static int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void **value, l
return 0; return 0;
} }
static int dummy_pin_unpin(CACHEFILE UU(cfu), void* UU(v)) {
return 0;
}
// put n items into the cachetable, maybe mark them dirty, do a checkpoint, and // put n items into the cachetable, maybe mark them dirty, do a checkpoint, and
// verify that all of the items have been written and are clean. // verify that all of the items have been written and are clean.
static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dirty) { static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dirty) {
...@@ -43,6 +46,8 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir ...@@ -43,6 +46,8 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
unlink(fname1); unlink(fname1);
CACHEFILE f1; CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(f1, NULL, NULL, NULL, NULL, NULL, NULL,
dummy_pin_unpin, dummy_pin_unpin);
// prefetch block n+1. this will take 10 seconds. // prefetch block n+1. this will take 10 seconds.
{ {
...@@ -79,7 +84,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir ...@@ -79,7 +84,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
// all items should be kept in the cachetable // all items should be kept in the cachetable
n_flush = n_write_me = n_keep_me = n_fetch = 0; n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
assert(n_flush == n && n_write_me == n && n_keep_me == n); assert(n_flush == n && n_write_me == n && n_keep_me == n);
...@@ -108,7 +113,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir ...@@ -108,7 +113,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
// a subsequent checkpoint should cause no flushes, or writes since all of the items are clean // a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
n_flush = n_write_me = n_keep_me = n_fetch = 0; n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0); assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);
......
...@@ -29,6 +29,7 @@ struct __toku_db_internal { ...@@ -29,6 +29,7 @@ struct __toku_db_internal {
BOOL key_compare_was_set; // true if a comparison function was provided before call to db->open() (if false, use environment's comparison function) BOOL key_compare_was_set; // true if a comparison function was provided before call to db->open() (if false, use environment's comparison function)
BOOL val_compare_was_set; BOOL val_compare_was_set;
char *dname; // dname is constant for this handle (handle must be closed before file is renamed) char *dname; // dname is constant for this handle (handle must be closed before file is renamed)
BOOL is_zombie; // True if DB->close has been called on this DB
struct toku_list dbs_that_must_close_before_abort; struct toku_list dbs_that_must_close_before_abort;
}; };
......
...@@ -569,7 +569,7 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo ...@@ -569,7 +569,7 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo
assert(r==0);//For Now assert(r==0);//For Now
} }
toku_ydb_unlock(); toku_ydb_unlock();
r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL);
assert(r==0);//For Now assert(r==0);//For Now
toku_ydb_lock(); toku_ydb_lock();
return 0; return 0;
...@@ -614,7 +614,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -614,7 +614,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
if ( flags && DB_CLOSE_DONT_TRIM_LOG ) { if ( flags && DB_CLOSE_DONT_TRIM_LOG ) {
toku_logger_trim_log_files(env->i->logger, FALSE); toku_logger_trim_log_files(env->i->logger, FALSE);
} }
r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, NULL); r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL);
if (r) { if (r) {
toku_ydb_do_error(env, r, "Cannot close environment (error during checkpoint)\n"); toku_ydb_do_error(env, r, "Cannot close environment (error during checkpoint)\n");
goto panic_and_quit_early; goto panic_and_quit_early;
...@@ -887,19 +887,13 @@ static void (*checkpoint_callback2_f)(void*) = NULL; ...@@ -887,19 +887,13 @@ static void (*checkpoint_callback2_f)(void*) = NULL;
static void * checkpoint_callback2_extra = NULL; static void * checkpoint_callback2_extra = NULL;
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte __attribute__((__unused__)), u_int32_t min __attribute__((__unused__)), u_int32_t flags __attribute__((__unused__))) { static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte __attribute__((__unused__)), u_int32_t min __attribute__((__unused__)), u_int32_t flags __attribute__((__unused__))) {
char *error_string = NULL; int r = toku_checkpoint(env->i->cachetable, env->i->logger,
int r = toku_checkpoint(env->i->cachetable, env->i->logger, &error_string,
checkpoint_callback_f, checkpoint_callback_extra, checkpoint_callback_f, checkpoint_callback_extra,
checkpoint_callback2_f, checkpoint_callback2_extra); checkpoint_callback2_f, checkpoint_callback2_extra);
if (r) { if (r) {
env->i->is_panicked = r; // Panicking the whole environment may be overkill, but I'm not sure what else to do. env->i->is_panicked = r; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
env->i->panic_string = error_string; env->i->panic_string = toku_strdup("checkpoint error");
if (error_string) { toku_ydb_do_error(env, r, "Checkpoint\n");
toku_ydb_do_error(env, r, "%s\n", error_string);
} else {
toku_ydb_do_error(env, r, "Checkpoint\n");
}
error_string=NULL;
} }
return r; return r;
} }
...@@ -1646,8 +1640,14 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) { ...@@ -1646,8 +1640,14 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) {
return 0; return 0;
} }
static void env_note_zombie_db_closed(DB_ENV *env, DB *db);
static int static int
db_close_before_brt(DB *db, u_int32_t UU(flags)) { db_close_before_brt(DB *db, u_int32_t UU(flags)) {
if (db_opened(db) && db->i->dname) {
// internal (non-user) dictionary has no dname
env_note_zombie_db_closed(db->dbenv, db); // tell env that this db is no longer a zombie (it is completely closed)
}
char *error_string = 0; char *error_string = 0;
int r1 = toku_close_brt(db->i->brt, &error_string); int r1 = toku_close_brt(db->i->brt, &error_string);
if (r1) { if (r1) {
...@@ -1693,10 +1693,16 @@ find_db_by_db (OMTVALUE v, void *dbv) { ...@@ -1693,10 +1693,16 @@ find_db_by_db (OMTVALUE v, void *dbv) {
DB *db = v; // DB* that is stored in the omt DB *db = v; // DB* that is stored in the omt
DB *dbfind = dbv; // extra, to be compared to v DB *dbfind = dbv; // extra, to be compared to v
int cmp; int cmp;
cmp = strcmp(db->i->dname, dbfind->i->dname); const char *dname = db->i->dname;
const char *dnamefind = dbfind->i->dname;
cmp = strcmp(dname, dnamefind);
if (cmp != 0) return cmp;
int is_zombie = db->i->is_zombie != 0;
int is_zombiefind = dbfind->i->is_zombie != 0;
cmp = is_zombie - is_zombiefind;
if (cmp != 0) return cmp; if (cmp != 0) return cmp;
if (db < dbfind) return -1; if (db < dbfind) return -1;
if (db > dbfind) return 1; if (db > dbfind) return 1;
return 0; return 0;
} }
...@@ -1704,6 +1710,7 @@ find_db_by_db (OMTVALUE v, void *dbv) { ...@@ -1704,6 +1710,7 @@ find_db_by_db (OMTVALUE v, void *dbv) {
static void static void
env_note_db_opened(DB_ENV *env, DB *db) { env_note_db_opened(DB_ENV *env, DB *db) {
assert(db->i->dname); // internal (non-user) dictionary has no dname assert(db->i->dname); // internal (non-user) dictionary has no dname
assert(!db->i->is_zombie);
int r; int r;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
...@@ -1716,6 +1723,35 @@ env_note_db_opened(DB_ENV *env, DB *db) { ...@@ -1716,6 +1723,35 @@ env_note_db_opened(DB_ENV *env, DB *db) {
static void static void
env_note_db_closed(DB_ENV *env, DB *db) { env_note_db_closed(DB_ENV *env, DB *db) {
assert(db->i->dname); assert(db->i->dname);
assert(!db->i->is_zombie);
int r;
OMTVALUE dbv;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
assert(r==0); //Must already be there.
assert((DB*)dbv == db);
r = toku_omt_delete_at(env->i->open_dbs, idx);
assert(r==0);
}
// Tell env that there is a new db handle (with non-unique dname in db->i-dname)
static void
env_note_zombie_db(DB_ENV *env, DB *db) {
assert(db->i->dname); // internal (non-user) dictionary has no dname
assert(db->i->is_zombie);
int r;
OMTVALUE dbv;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL);
assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert(r==0);
}
static void
env_note_zombie_db_closed(DB_ENV *env, DB *db) {
assert(db->i->dname);
assert(db->i->is_zombie);
int r; int r;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
...@@ -1727,11 +1763,31 @@ env_note_db_closed(DB_ENV *env, DB *db) { ...@@ -1727,11 +1763,31 @@ env_note_db_closed(DB_ENV *env, DB *db) {
} }
static int static int
find_db_by_dname (OMTVALUE v, void *dnamev) { find_zombie_db_by_dname (OMTVALUE v, void *dnamev) {
DB *db = v; DB *db = v; // DB* that is stored in the omt
int cmp;
const char *dname = db->i->dname;
const char *dnamefind = dnamev;
cmp = strcmp(dname, dnamefind);
if (cmp != 0) return cmp;
int is_zombie = db->i->is_zombie != 0;
int is_zombiefind = 1;
cmp = is_zombie - is_zombiefind;
return cmp;
}
static int
find_open_db_by_dname (OMTVALUE v, void *dnamev) {
DB *db = v; // DB* that is stored in the omt
int cmp;
const char *dname = db->i->dname; const char *dname = db->i->dname;
const char *dnamefind = dnamev; const char *dnamefind = dnamev;
return strcmp(dname, dnamefind); cmp = strcmp(dname, dnamefind);
if (cmp != 0) return cmp;
int is_zombie = db->i->is_zombie != 0;
int is_zombiefind = 0;
cmp = is_zombie - is_zombiefind;
return cmp;
} }
// return true if there is any db open with the given dname // return true if there is any db open with the given dname
...@@ -1741,10 +1797,11 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -1741,10 +1797,11 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
BOOL rval; BOOL rval;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_dname, (void*)dname, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx, NULL);
if (r==0) { if (r==0) {
DB *db = dbv; DB *db = dbv;
assert(strcmp(dname, db->i->dname) == 0); assert(strcmp(dname, db->i->dname) == 0);
assert(!db->i->is_zombie);
rval = TRUE; rval = TRUE;
} }
else { else {
...@@ -1754,9 +1811,36 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -1754,9 +1811,36 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
return rval; return rval;
} }
// return true if there is any db open with the given dname
static DB*
env_get_zombie_db_with_dname(DB_ENV *env, const char *dname) {
int r;
DB* rval;
OMTVALUE dbv;
uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx, NULL);
if (r==0) {
DB *db = dbv;
assert(db);
assert(strcmp(dname, db->i->dname) == 0);
assert(db->i->is_zombie);
rval = db;
}
else {
assert(r==DB_NOTFOUND);
rval = NULL;
}
return rval;
}
static int toku_db_close(DB * db, u_int32_t flags) { static int toku_db_close(DB * db, u_int32_t flags) {
if (db_opened(db) && db->i->dname) // internal (non-user) dictionary has no dname if (db_opened(db) && db->i->dname) {
// internal (non-user) dictionary has no dname
env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals) env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
db->i->is_zombie = TRUE;
env_note_zombie_db(db->dbenv, db); // tell env that this db is a zombie
}
//Remove from transaction's list of 'must close' if necessary. //Remove from transaction's list of 'must close' if necessary.
if (!toku_list_empty(&db->i->dbs_that_must_close_before_abort)) if (!toku_list_empty(&db->i->dbs_that_must_close_before_abort))
toku_list_remove(&db->i->dbs_that_must_close_before_abort); toku_list_remove(&db->i->dbs_that_must_close_before_abort);
...@@ -3946,6 +4030,8 @@ finalize_file_removal(int fd, void * extra) { ...@@ -3946,6 +4030,8 @@ finalize_file_removal(int fd, void * extra) {
} }
} }
static int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn);
static int static int
toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) { toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) {
int r; int r;
...@@ -3996,6 +4082,13 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -3996,6 +4082,13 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
//Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions) //Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
if (r==0 && env_is_db_with_dname_open(env, dname)) if (r==0 && env_is_db_with_dname_open(env, dname))
r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
if (r==0) {
DB* zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child);
if (r!=0)
toku_ydb_do_error(env, r, "Cannot remove dictionary.\n");
}
} }
else { else {
r = toku_brt_remove_now(env->i->cachetable, &iname_dbt, &iname_within_cwd_dbt); r = toku_brt_remove_now(env->i->cachetable, &iname_dbt, &iname_within_cwd_dbt);
...@@ -4106,8 +4199,22 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -4106,8 +4199,22 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
//Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions) //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
if (r==0 && env_is_db_with_dname_open(env, dname)) if (r==0 && env_is_db_with_dname_open(env, dname))
r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
if (r==0) {
DB* zombie = env_get_zombie_db_with_dname(env, dname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child);
if (r!=0)
toku_ydb_do_error(env, r, "Cannot rename dictionary.\n");
}
if (r==0 && env_is_db_with_dname_open(env, newname)) if (r==0 && env_is_db_with_dname_open(env, newname))
r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
if (r==0) {
DB* zombie = env_get_zombie_db_with_dname(env, newname);
if (zombie)
r = toku_db_pre_acquire_table_lock(zombie, child);
if (r!=0)
toku_ydb_do_error(env, r, "Cannot rename dictionary.\n");
}
} }
} }
......
...@@ -64,6 +64,7 @@ static inline struct toku_list *toku_list_pop_head(struct toku_list *head) { ...@@ -64,6 +64,7 @@ static inline struct toku_list *toku_list_pop_head(struct toku_list *head) {
return toku_list; return toku_list;
} }
//What does this do?
static inline void toku_list_move(struct toku_list *newhead, struct toku_list *oldhead) { static inline void toku_list_move(struct toku_list *newhead, struct toku_list *oldhead) {
struct toku_list *first = oldhead->next; struct toku_list *first = oldhead->next;
struct toku_list *last = oldhead->prev; struct toku_list *last = oldhead->prev;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment