Commit 1d8c2993 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

close[t:4715] merging 4715 to main. at a high level, ft files are removed by...

close[t:4715] merging 4715 to main. at a high level, ft files are removed by marking OPEN ft/cf's as unlink on close, so when the last reference goes away, the file is unlinked. we log this bit during a checkpoint's fassociate to help with aborting a hot index in the future (so it isn't stricly necessary right now).


git-svn-id: file:///svn/toku/tokudb@44003 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6de3fadd
...@@ -96,7 +96,6 @@ ft_set_dirty(FT ft, BOOL for_checkpoint){ ...@@ -96,7 +96,6 @@ ft_set_dirty(FT ft, BOOL for_checkpoint){
} }
} }
//fd is protected (must be holding fdlock)
static void static void
maybe_truncate_cachefile(BLOCK_TABLE bt, int fd, FT h, u_int64_t size_needed_before) { maybe_truncate_cachefile(BLOCK_TABLE bt, int fd, FT h, u_int64_t size_needed_before) {
assert(toku_mutex_is_locked(&bt->mutex)); assert(toku_mutex_is_locked(&bt->mutex));
...@@ -106,7 +105,6 @@ maybe_truncate_cachefile(BLOCK_TABLE bt, int fd, FT h, u_int64_t size_needed_bef ...@@ -106,7 +105,6 @@ maybe_truncate_cachefile(BLOCK_TABLE bt, int fd, FT h, u_int64_t size_needed_bef
toku_maybe_truncate_cachefile(h->cf, fd, new_size_needed); toku_maybe_truncate_cachefile(h->cf, fd, new_size_needed);
} }
//fd is protected (must be holding fdlock)
void void
toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, int fd, FT h) { toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, int fd, FT h) {
lock_for_blocktable(bt); lock_for_blocktable(bt);
...@@ -254,7 +252,6 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt); ...@@ -254,7 +252,6 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
// free (offset,len) from checkpoint // free (offset,len) from checkpoint
// move inprogress to checkpoint (resetting type) // move inprogress to checkpoint (resetting type)
// inprogress = NULL // inprogress = NULL
//fd is protected (must be holding fdlock)
void void
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd, FT h) { toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd, FT h) {
// Free unused blocks // Free unused blocks
......
This diff is collapsed.
...@@ -413,32 +413,27 @@ int toku_cachefile_flush (CACHEFILE); ...@@ -413,32 +413,27 @@ int toku_cachefile_flush (CACHEFILE);
// Get the file descriptor associated with the cachefile // Get the file descriptor associated with the cachefile
// Return the file descriptor // Return the file descriptor
// Grabs a read lock protecting the fd // Grabs a read lock protecting the fd
int toku_cachefile_get_and_pin_fd (CACHEFILE); int toku_cachefile_get_fd (CACHEFILE);
// Get the iname (within the environment) associated with the cachefile // Get the iname (within the environment) associated with the cachefile
// Return the filename // Return the filename
char * toku_cachefile_fname_in_env (CACHEFILE cf); char * toku_cachefile_fname_in_env (CACHEFILE cf);
// Releases the read lock (taken by toku_cachefile_get_and_pin_fd) protecting the fd
void toku_cachefile_unpin_fd (CACHEFILE);
// For test programs only. // For test programs only.
// Set the cachefile's fd and fname. // Set the cachefile's fd and fname.
// Effect: Bind the cachefile to a new fd and fname. The old fd is closed. // Effect: Bind the cachefile to a new fd and fname. The old fd is closed.
// Returns: 0 if success, otherwise an error number // Returns: 0 if success, otherwise an error number
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_relative_to_env); int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_relative_to_env);
// Equivalent to toku_cachefile_set_fd to /dev/null but without // Make it so when the cachefile closes, the underlying file is unlinked
// closing the user data. void toku_cachefile_unlink_on_close(CACHEFILE cf);
int toku_cachefile_redirect_nullfd (CACHEFILE cf);
// is this cachefile marked as unlink on close?
bool toku_cachefile_is_unlink_on_close(CACHEFILE cf);
// Truncate a cachefile // Truncate a cachefile
int toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size); int toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size);
//has it been redirected to dev null?
//Must have called toku_cachefile_get_and_pin_fd to hold a lock around this function
BOOL toku_cachefile_is_dev_null_unlocked (CACHEFILE cf);
// Return the logger associated with the cachefile // Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE); TOKULOGGER toku_cachefile_logger (CACHEFILE);
......
...@@ -856,9 +856,6 @@ toku_verify_ftnode (FT_HANDLE brt, ...@@ -856,9 +856,6 @@ toku_verify_ftnode (FT_HANDLE brt,
int toku_db_badformat(void) __attribute__((__warn_unused_result__)); int toku_db_badformat(void) __attribute__((__warn_unused_result__));
int toku_ft_remove_on_commit(TOKUTXN child, DBT* iname_dbt_p) __attribute__((__warn_unused_result__));
int toku_ft_remove_now(CACHETABLE ct, DBT* iname_dbt_p) __attribute__((__warn_unused_result__));
typedef enum { typedef enum {
FT_UPGRADE_FOOTPRINT = 0, FT_UPGRADE_FOOTPRINT = 0,
FT_UPGRADE_STATUS_NUM_ROWS FT_UPGRADE_STATUS_NUM_ROWS
......
...@@ -697,7 +697,6 @@ void toku_ftnode_clone_callback( ...@@ -697,7 +697,6 @@ void toku_ftnode_clone_callback(
} }
//fd is protected (must be holding fdlock)
void toku_ftnode_flush_callback ( void toku_ftnode_flush_callback (
CACHEFILE cachefile, CACHEFILE cachefile,
int fd, int fd,
...@@ -765,7 +764,6 @@ toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe) ...@@ -765,7 +764,6 @@ toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
} }
} }
//fd is protected (must be holding fdlock)
int toku_ftnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM nodename, u_int32_t fullhash, int toku_ftnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM nodename, u_int32_t fullhash,
void **ftnode_pv, void** disk_data, PAIR_ATTR *sizep, int *dirtyp, void *extraargs) { void **ftnode_pv, void** disk_data, PAIR_ATTR *sizep, int *dirtyp, void *extraargs) {
assert(extraargs); assert(extraargs);
...@@ -2568,19 +2566,18 @@ int toku_ft_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn) { ...@@ -2568,19 +2566,18 @@ int toku_ft_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn) {
} }
int int
toku_ft_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) { toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
int r = 0; int r = 0;
assert(txn); assert(txn);
toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log
//before the (old) file is actually unlinked //before the (old) file is actually unlinked
TOKULOGGER logger = toku_txn_logger(txn); TOKULOGGER logger = toku_txn_logger(txn);
BYTESTRING old_iname_bs = {.len=strlen(old_iname), .data=(char*)old_iname};
BYTESTRING new_iname_bs = {.len=strlen(new_iname), .data=(char*)new_iname}; BYTESTRING new_iname_bs = {.len=strlen(new_iname), .data=(char*)new_iname};
r = toku_logger_save_rollback_load(txn, &old_iname_bs, &new_iname_bs); r = toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs);
if (r==0 && do_log && logger) { if (r==0 && do_log && logger) {
TXNID xid = toku_txn_get_txnid(txn); TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_load(logger, load_lsn, do_fsync, xid, old_iname_bs, new_iname_bs); r = toku_log_load(logger, load_lsn, do_fsync, xid, old_filenum, new_iname_bs);
} }
return r; return r;
} }
...@@ -2637,9 +2634,9 @@ toku_ft_optimize (FT_HANDLE brt) { ...@@ -2637,9 +2634,9 @@ toku_ft_optimize (FT_HANDLE brt) {
int int
toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) { toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) {
int r = 0; int r = 0;
char const * old_iname = toku_cachefile_fname_in_env(brt->ft->cf); FILENUM old_filenum = toku_cachefile_filenum(brt->ft->cf);
int do_log = 1; int do_log = 1;
r = toku_ft_load_recovery(txn, old_iname, new_iname, do_fsync, do_log, load_lsn); r = toku_ft_load_recovery(txn, old_filenum, new_iname, do_fsync, do_log, load_lsn);
return r; return r;
} }
...@@ -3126,13 +3123,12 @@ toku_ft_change_descriptor( ...@@ -3126,13 +3123,12 @@ toku_ft_change_descriptor(
// write new_descriptor to header // write new_descriptor to header
new_d.dbt = *new_descriptor; new_d.dbt = *new_descriptor;
fd = toku_cachefile_get_and_pin_fd (t->ft->cf); fd = toku_cachefile_get_fd (t->ft->cf);
r = toku_update_descriptor(t->ft, &new_d, fd); r = toku_update_descriptor(t->ft, &new_d, fd);
// very infrequent operation, worth precise threadsafe count // very infrequent operation, worth precise threadsafe count
if (r == 0) { if (r == 0) {
STATUS_VALUE(FT_DESCRIPTOR_SET)++; STATUS_VALUE(FT_DESCRIPTOR_SET)++;
} }
toku_cachefile_unpin_fd(t->ft->cf);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
if (update_cmp_descriptor) { if (update_cmp_descriptor) {
...@@ -3279,9 +3275,8 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr ...@@ -3279,9 +3275,8 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
//Opening a brt may restore to previous checkpoint. Truncate if necessary. //Opening a brt may restore to previous checkpoint. Truncate if necessary.
{ {
int fd = toku_cachefile_get_and_pin_fd (ft->cf); int fd = toku_cachefile_get_fd (ft->cf);
toku_maybe_truncate_cachefile_on_open(ft->blocktable, fd, ft); toku_maybe_truncate_cachefile_on_open(ft->blocktable, fd, ft);
toku_cachefile_unpin_fd(ft->cf);
} }
r = 0; r = 0;
...@@ -5494,91 +5489,74 @@ int toku_ft_handle_set_panic(FT_HANDLE brt, int panic, char *panic_string) { ...@@ -5494,91 +5489,74 @@ int toku_ft_handle_set_panic(FT_HANDLE brt, int panic, char *panic_string) {
return toku_ft_set_panic(brt->ft, panic, panic_string); return toku_ft_set_panic(brt->ft, panic, panic_string);
} }
#if 0
int toku_logger_save_rollback_fdelete (TOKUTXN txn, u_int8_t file_was_open, FILENUM filenum, BYTESTRING iname)
int toku_logger_log_fdelete (TOKUTXN txn, const char *fname, FILENUM filenum, u_int8_t was_open)
#endif
// Prepare to remove a dictionary from the database when this transaction is committed: // Prepare to remove a dictionary from the database when this transaction is committed:
// - if cachetable has file open, mark it as in use so that cf remains valid until we're done
// - mark transaction as NEED fsync on commit // - mark transaction as NEED fsync on commit
// - make entry in rollback log // - make entry in rollback log
// - make fdelete entry in recovery log // - make fdelete entry in recovery log
int toku_ft_remove_on_commit(TOKUTXN txn, DBT* iname_in_env_dbt_p) { //
assert(txn); // Effect: when the txn commits, the ft's cachefile will be marked as unlink
// on close. see toku_commit_fdelete and how unlink on close works
// in toku_cachefile_close();
// Requires: serialized with begin checkpoint
// this does not need to take the open close lock because
// 1.) the ft/cf cannot go away because we have a live handle.
// 2.) we're not setting the unlink on close bit _here_. that
// happens on txn commit (as the name suggests).
// 3.) we're already holding the multi operation lock to
// synchronize with begin checkpoint.
// Contract: the iname of the ft should never be reused.
int
toku_ft_remove_on_commit(FT_HANDLE handle, TOKUTXN txn) {
int r; int r;
const char *iname_in_env = iname_in_env_dbt_p->data; CACHEFILE cf;
CACHEFILE cf = NULL;
u_int8_t was_open = 0;
FILENUM filenum = {0};
r = toku_cachefile_of_iname_in_env(txn->logger->ct, iname_in_env, &cf); assert(txn);
if (r == 0) { cf = handle->ft->cf;
was_open = TRUE; FT ft = toku_cachefile_get_userdata(cf);
filenum = toku_cachefile_filenum(cf);
FT h = toku_cachefile_get_userdata(cf);
r = toku_txn_note_ft(txn, h);
if (r!=0) return r;
}
else {
assert(r==ENOENT);
}
toku_txn_force_fsync_on_commit(txn); // If the txn commits, the commit MUST be in the log // TODO: toku_txn_note_ft should return void
// before the file is actually unlinked // Assert success here because note_ft also asserts success internally.
{ r = toku_txn_note_ft(txn, ft);
BYTESTRING iname_in_env_bs = { .len=strlen(iname_in_env), .data = (char*)iname_in_env }; assert(r == 0);
// make entry in rollback log
r = toku_logger_save_rollback_fdelete(txn, was_open, filenum, &iname_in_env_bs); // If the txn commits, the commit MUST be in the log before the file is actually unlinked
assert_zero(r); //On error we would need to remove the CF reference, which is complicated. toku_txn_force_fsync_on_commit(txn);
} // make entry in rollback log
if (r==0) FILENUM filenum = toku_cachefile_filenum(cf);
// make entry in recovery log r = toku_logger_save_rollback_fdelete(txn, filenum);
r = toku_logger_log_fdelete(txn, iname_in_env); assert_zero(r);
// make entry in recovery log
r = toku_logger_log_fdelete(txn, filenum);
return r; return r;
} }
// Non-transactional version of fdelete
// Non-transaction version of fdelete //
int toku_ft_remove_now(CACHETABLE ct, DBT* iname_in_env_dbt_p) { // Effect: The ft file is unlinked when the handle closes and it's ft is not
int r; // pinned by checkpoint. see toku_remove_ft_ref() and how unlink on
const char *iname_in_env = iname_in_env_dbt_p->data; // close works in toku_cachefile_close();
// Requires: serialized with begin checkpoint
void
toku_ft_remove(FT_HANDLE handle) {
CACHEFILE cf; CACHEFILE cf;
r = toku_cachefile_of_iname_in_env(ct, iname_in_env, &cf); cf = handle->ft->cf;
if (r == 0) { toku_cachefile_unlink_on_close(cf);
r = toku_cachefile_redirect_nullfd(cf);
assert_zero(r);
}
else
assert(r==ENOENT);
char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(ct, iname_in_env_dbt_p->data);
r = unlink(iname_in_cwd); // we need a pathname relative to cwd
assert_zero(r);
toku_free(iname_in_cwd);
return r;
} }
int int
toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) { toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) {
int r; int r;
int fd = toku_cachefile_get_and_pin_fd(brt->ft->cf); int fd = toku_cachefile_get_fd(brt->ft->cf);
toku_ft_lock(brt->ft); toku_ft_lock(brt->ft);
int64_t file_size; int64_t file_size;
if (toku_cachefile_is_dev_null_unlocked(brt->ft->cf)) r = toku_os_get_file_size(fd, &file_size);
r = EINVAL;
else
r = toku_os_get_file_size(fd, &file_size);
if (r==0) { if (r==0) {
report->file_size_bytes = file_size; report->file_size_bytes = file_size;
toku_block_table_get_fragmentation_unlocked(brt->ft->blocktable, report); toku_block_table_get_fragmentation_unlocked(brt->ft->blocktable, report);
} }
toku_ft_unlock(brt->ft); toku_ft_unlock(brt->ft);
toku_cachefile_unpin_fd(brt->ft->cf);
return r; return r;
} }
......
...@@ -133,7 +133,7 @@ int toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_functi ...@@ -133,7 +133,7 @@ int toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_functi
// Returns 0 if successful // Returns 0 if successful
int toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, BOOL is_resetting_op) __attribute__ ((warn_unused_result)); int toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, BOOL is_resetting_op) __attribute__ ((warn_unused_result));
int toku_ft_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) __attribute__ ((warn_unused_result)); int toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) __attribute__ ((warn_unused_result));
int toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *get_lsn) __attribute__ ((warn_unused_result)); int toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *get_lsn) __attribute__ ((warn_unused_result));
// 2954 // 2954
int toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn); int toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn);
......
...@@ -122,8 +122,9 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { ...@@ -122,8 +122,9 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
BYTESTRING bs = { strlen(fname_in_env), // don't include the NUL BYTESTRING bs = { strlen(fname_in_env), // don't include the NUL
fname_in_env }; fname_in_env };
TOKULOGGER logger = toku_cachefile_logger(cf); TOKULOGGER logger = toku_cachefile_logger(cf);
FILENUM filenum = toku_cachefile_filenum (cf); FILENUM filenum = toku_cachefile_filenum(cf);
int r = toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs); bool unlink_on_close = toku_cachefile_is_unlink_on_close(cf);
int r = toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
return r; return r;
} }
...@@ -508,7 +509,7 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac ...@@ -508,7 +509,7 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
FT h; FT h;
int r; int r;
{ {
int fd = toku_cachefile_get_and_pin_fd (cf); int fd = toku_cachefile_get_fd(cf);
enum deserialize_error_code e = toku_deserialize_ft_from(fd, max_acceptable_lsn, &h); enum deserialize_error_code e = toku_deserialize_ft_from(fd, max_acceptable_lsn, &h);
if (e == DS_XSUM_FAIL) { if (e == DS_XSUM_FAIL) {
fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf)); fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
...@@ -520,7 +521,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac ...@@ -520,7 +521,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
} else { } else {
assert(false); assert(false);
} }
toku_cachefile_unpin_fd(cf);
} }
if (r!=0) return r; if (r!=0) return r;
h->cf = cf; h->cf = cf;
......
...@@ -14,8 +14,16 @@ ...@@ -14,8 +14,16 @@
#include "ft-search.h" #include "ft-search.h"
#include "compress.h" #include "compress.h"
void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn); // remove a ft, transactionless.
// if the ft is being checkpointed, it will be removed after checkpoint.
void toku_ft_remove(FT_HANDLE handle);
// remove a ft using the given txn. when the txn commits, the ft is removed.
// if the ft is being checkpointed, it will be removed after checkpoint.
int toku_ft_remove_on_commit(FT_HANDLE handle, TOKUTXN txn) __attribute__((__warn_unused_result__));
//Effect: suppresses rollback logs //Effect: suppresses rollback logs
void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn);
void toku_ft_init_treelock(FT h); void toku_ft_init_treelock(FT h);
void toku_ft_destroy_treelock(FT h); void toku_ft_destroy_treelock(FT h);
......
...@@ -90,7 +90,6 @@ alignup64(u_int64_t a, u_int64_t b) { ...@@ -90,7 +90,6 @@ alignup64(u_int64_t a, u_int64_t b) {
//Race condition if ydb lock is split. //Race condition if ydb lock is split.
//Ydb lock is held when this function is called. //Ydb lock is held when this function is called.
//Not going to truncate and delete (redirect to devnull) at same time. //Not going to truncate and delete (redirect to devnull) at same time.
//Must be holding a read or write lock on fdlock (fd is protected)
void void
toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used) toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
// Effect: If file size >= SIZE+32MiB, reduce file size. // Effect: If file size >= SIZE+32MiB, reduce file size.
...@@ -101,7 +100,6 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used) ...@@ -101,7 +100,6 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
//the pwrite lock needlessly. //the pwrite lock needlessly.
//Check file size after taking lock to avoid race conditions. //Check file size after taking lock to avoid race conditions.
int64_t file_size; int64_t file_size;
if (toku_cachefile_is_dev_null_unlocked(cf)) goto done;
{ {
int r = toku_os_get_file_size(fd, &file_size); int r = toku_os_get_file_size(fd, &file_size);
lazy_assert_zero(r); lazy_assert_zero(r);
...@@ -123,7 +121,6 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used) ...@@ -123,7 +121,6 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
} }
toku_unlock_for_pwrite(); toku_unlock_for_pwrite();
} }
done:
return; return;
} }
......
...@@ -44,9 +44,7 @@ struct logtype { ...@@ -44,9 +44,7 @@ struct logtype {
const struct logtype rollbacks[] = { const struct logtype rollbacks[] = {
//TODO: #2037 Add dname //TODO: #2037 Add dname
{"fdelete", 'U', FA{{"u_int8_t", "file_was_open", 0}, {"fdelete", 'U', FA{{"FILENUM", "filenum", 0},
{"FILENUM", "filenum", 0},
{"BYTESTRING", "iname", 0},
NULLFIELD}}, NULLFIELD}},
//TODO: #2037 Add dname //TODO: #2037 Add dname
{"fcreate", 'F', FA{{"FILENUM", "filenum", 0}, {"fcreate", 'F', FA{{"FILENUM", "filenum", 0},
...@@ -68,7 +66,7 @@ const struct logtype rollbacks[] = { ...@@ -68,7 +66,7 @@ const struct logtype rollbacks[] = {
{"BLOCKNUM", "spilled_tail", 0}, {"BLOCKNUM", "spilled_tail", 0},
{"u_int32_t", "spilled_tail_hash", 0}, {"u_int32_t", "spilled_tail_hash", 0},
NULLFIELD}}, NULLFIELD}},
{"load", 'l', FA{{"BYTESTRING", "old_iname", 0}, {"load", 'l', FA{{"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0}, {"BYTESTRING", "new_iname", 0},
NULLFIELD}}, NULLFIELD}},
// #2954 // #2954
...@@ -104,6 +102,7 @@ const struct logtype logtypes[] = { ...@@ -104,6 +102,7 @@ const struct logtype logtypes[] = {
{"fassociate", 'f', FA{{"FILENUM", "filenum", 0}, {"fassociate", 'f', FA{{"FILENUM", "filenum", 0},
{"u_int32_t", "treeflags", 0}, {"u_int32_t", "treeflags", 0},
{"BYTESTRING", "iname", 0}, // pathname of file {"BYTESTRING", "iname", 0}, // pathname of file
{"u_int8_t", "unlink_on_close", 0},
NULLFIELD}}, NULLFIELD}},
//We do not use a TXNINFO struct since recovery log has //We do not use a TXNINFO struct since recovery log has
//FILENUMS and TOKUTXN has FTs (for open_fts) //FILENUMS and TOKUTXN has FTs (for open_fts)
...@@ -159,7 +158,7 @@ const struct logtype logtypes[] = { ...@@ -159,7 +158,7 @@ const struct logtype logtypes[] = {
NULLFIELD}}, NULLFIELD}},
//TODO: #2037 Add dname //TODO: #2037 Add dname
{"fdelete", 'U', FA{{"TXNID", "xid", 0}, {"fdelete", 'U', FA{{"TXNID", "xid", 0},
{"BYTESTRING", "iname", 0}, {"FILENUM", "filenum", 0},
NULLFIELD}}, NULLFIELD}},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0}, {"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0}, {"TXNID", "xid", 0},
...@@ -193,7 +192,7 @@ const struct logtype logtypes[] = { ...@@ -193,7 +192,7 @@ const struct logtype logtypes[] = {
{"shutdown", 'Q', FA{{"u_int64_t", "timestamp", 0}, {"shutdown", 'Q', FA{{"u_int64_t", "timestamp", 0},
NULLFIELD}}, NULLFIELD}},
{"load", 'l', FA{{"TXNID", "xid", 0}, {"load", 'l', FA{{"TXNID", "xid", 0},
{"BYTESTRING", "old_iname", 0}, {"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0}, {"BYTESTRING", "new_iname", 0},
NULLFIELD}}, NULLFIELD}},
// #2954 // #2954
......
...@@ -860,13 +860,12 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_ ...@@ -860,13 +860,12 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_
} }
// fname is the iname // We only do fdelete on open ft's, so we pass the filenum here
int toku_logger_log_fdelete (TOKUTXN txn, const char *fname) { int toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
if (txn==0) return 0; if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL; if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = (char *) fname };
//No fsync. //No fsync.
int r = toku_log_fdelete (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs); int r = toku_log_fdelete (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), filenum);
return r; return r;
} }
......
...@@ -61,7 +61,7 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn); ...@@ -61,7 +61,7 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn);
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN oldest_open_lsn); int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN oldest_open_lsn);
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_int32_t mode, u_int32_t flags, u_int32_t nodesize, u_int32_t basementnodesize, enum toku_compression_method compression_method); int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, u_int32_t mode, u_int32_t flags, u_int32_t nodesize, u_int32_t basementnodesize, enum toku_compression_method compression_method);
int toku_logger_log_fdelete (TOKUTXN txn, const char *fname); int toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum);
int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags); int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags);
int toku_fread_u_int8_t (FILE *f, u_int8_t *v, struct x1764 *mm, u_int32_t *len); int toku_fread_u_int8_t (FILE *f, u_int8_t *v, struct x1764 *mm, u_int32_t *len);
......
...@@ -438,6 +438,12 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re ...@@ -438,6 +438,12 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re
assert(r==0); assert(r==0);
} }
} }
// try to open the file again and if we get it, restore
// the unlink on close bit.
int ret = file_map_find(&renv->fmap, l->filenum, &tuple);
if (ret == 0 && l->unlink_on_close) {
toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf);
}
break; break;
case FORWARD_NEWER_CHECKPOINT_END: case FORWARD_NEWER_CHECKPOINT_END:
if (r == 0) { //IF it is open if (r == 0) { //IF it is open
...@@ -451,6 +457,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re ...@@ -451,6 +457,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re
return 0; return 0;
} }
toku_free(fname); toku_free(fname);
return r; return r;
} }
...@@ -850,23 +857,16 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) { ...@@ -850,23 +857,16 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
int r = toku_txnid2txn(renv->logger, l->xid, &txn); int r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0); assert(r == 0);
assert(txn != NULL); assert(txn != NULL);
char *fixediname = fixup_fname(&l->iname);
{ //Only if it exists // if the forward scan in recovery found this file and opened it, we
toku_struct_stat buf; // need to mark the txn to remove the ft on commit. if the file was
r = toku_stat(fixediname, &buf); // not found and not opened, we don't need to do anything - the ft
if (r==0) { // is already gone, so we're happy.
// txn exists and file exists, so create fdelete rollback entry struct file_map_tuple *tuple;
DBT iname_dbt; r = file_map_find(&renv->fmap, l->filenum, &tuple);
toku_fill_dbt(&iname_dbt, fixediname, strlen(fixediname)+1); if (r == 0) {
r = toku_ft_remove_on_commit(txn, &iname_dbt); r = toku_ft_remove_on_commit(tuple->ft_handle, txn);
assert(r==0);
}
else {
assert(errno==ENOENT);
}
} }
toku_free(fixediname);
return 0; return 0;
} }
...@@ -1141,13 +1141,11 @@ static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) { ...@@ -1141,13 +1141,11 @@ static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
r = toku_txnid2txn(renv->logger, l->xid, &txn); r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0); assert(r == 0);
assert(txn!=NULL); assert(txn!=NULL);
char *old_iname = fixup_fname(&l->old_iname);
char *new_iname = fixup_fname(&l->new_iname); char *new_iname = fixup_fname(&l->new_iname);
r = toku_ft_load_recovery(txn, old_iname, new_iname, 0, 0, (LSN*)NULL); r = toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL);
assert(r==0); assert(r==0);
toku_free(old_iname);
toku_free(new_iname); toku_free(new_iname);
return 0; return 0;
} }
......
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-verify.c 36689 2011-11-07 22:08:05Z zardosht $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
CACHEFILE f1;
static void
flush (CACHEFILE f __attribute__((__unused__)),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void** UU(dd),
void *e __attribute__((__unused__)),
PAIR_ATTR s __attribute__((__unused__)),
PAIR_ATTR* new_size __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
BOOL c __attribute__((__unused__)),
BOOL UU(is_clone)
) {
/* Do nothing */
if (verbose) { printf("FLUSH: %d\n", (int)k.b); }
//usleep (5*1024*1024);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 8;
*new_size = attr;
if (w) {
assert(c);
}
}
static int
cleaner_callback(
void* UU(ftnode_pv),
BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash),
void* UU(extraargs)
)
{
assert(FALSE);
return 0;
}
static void
cachetable_test (void) {
const int test_limit = 12;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
//void* v2;
long s1;
//long s2;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
wc.cleaner_callback = cleaner_callback;
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, TRUE, NULL);
PAIR_ATTR attr = make_pair_attr(8);
attr.cache_pressure_size = 8;
r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, attr);
// test that once we have redirected to /dev/null,
// cleaner callback is NOT called
r = toku_cachefile_redirect_nullfd(f1);
assert_zero(r);
toku_cleaner_thread(ct);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0);
r = toku_cachetable_close(&ct); lazy_assert_zero(r);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_test();
return 0;
}
...@@ -16,23 +16,20 @@ cachetable_fd_test (void) { ...@@ -16,23 +16,20 @@ cachetable_fd_test (void) {
CACHEFILE cf; CACHEFILE cf;
r = toku_cachetable_openf(&cf, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&cf, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
int fd1 = toku_cachefile_get_and_pin_fd(cf); assert(fd1 >= 0); int fd1 = toku_cachefile_get_fd(cf); assert(fd1 >= 0);
toku_cachefile_unpin_fd(cf);
// test set to good fd succeeds // test set to good fd succeeds
char fname2[] = __SRCFILE__ "test2.data"; char fname2[] = __SRCFILE__ "test2.data";
unlink(fname2); unlink(fname2);
int fd2 = open(fname2, O_RDWR | O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd2 >= 0 && fd1 != fd2); int fd2 = open(fname2, O_RDWR | O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd2 >= 0 && fd1 != fd2);
r = toku_cachefile_set_fd(cf, fd2, fname2); assert(r == 0); r = toku_cachefile_set_fd(cf, fd2, fname2); assert(r == 0);
assert(toku_cachefile_get_and_pin_fd(cf) == fd2); assert(toku_cachefile_get_fd(cf) == fd2);
toku_cachefile_unpin_fd(cf);
// test set to bogus fd fails // test set to bogus fd fails
int fd3 = open(DEV_NULL_FILE, O_RDWR); assert(fd3 >= 0); int fd3 = open(DEV_NULL_FILE, O_RDWR); assert(fd3 >= 0);
r = close(fd3); assert(r == 0); r = close(fd3); assert(r == 0);
r = toku_cachefile_set_fd(cf, fd3, DEV_NULL_FILE); assert(r != 0); r = toku_cachefile_set_fd(cf, fd3, DEV_NULL_FILE); assert(r != 0);
assert(toku_cachefile_get_and_pin_fd(cf) == fd2); assert(toku_cachefile_get_fd(cf) == fd2);
toku_cachefile_unpin_fd(cf);
// test the filenum functions // test the filenum functions
FILENUM fn = toku_cachefile_filenum(cf); FILENUM fn = toku_cachefile_filenum(cf);
......
...@@ -26,8 +26,7 @@ static void f_flush (CACHEFILE f, ...@@ -26,8 +26,7 @@ static void f_flush (CACHEFILE f,
) { ) {
assert(size.size==BLOCKSIZE); assert(size.size==BLOCKSIZE);
if (write_me) { if (write_me) {
toku_os_full_pwrite(toku_cachefile_get_and_pin_fd(f), value, BLOCKSIZE, key.b); toku_os_full_pwrite(toku_cachefile_get_fd(f), value, BLOCKSIZE, key.b);
toku_cachefile_unpin_fd(f);
} }
if (!keep_me) { if (!keep_me) {
toku_free(value); toku_free(value);
...@@ -44,8 +43,7 @@ static int f_fetch (CACHEFILE f, ...@@ -44,8 +43,7 @@ static int f_fetch (CACHEFILE f,
int *dirtyp, int *dirtyp,
void*extraargs __attribute__((__unused__))) { void*extraargs __attribute__((__unused__))) {
void *buf = toku_malloc(BLOCKSIZE); void *buf = toku_malloc(BLOCKSIZE);
int r = pread(toku_cachefile_get_and_pin_fd(f), buf, BLOCKSIZE, key.b); int r = pread(toku_cachefile_get_fd(f), buf, BLOCKSIZE, key.b);
toku_cachefile_unpin_fd(f);
assert(r==BLOCKSIZE); assert(r==BLOCKSIZE);
*value = buf; *value = buf;
*sizep = make_pair_attr(BLOCKSIZE); *sizep = make_pair_attr(BLOCKSIZE);
......
...@@ -202,10 +202,10 @@ int create_logfiles() { ...@@ -202,10 +202,10 @@ int create_logfiles() {
//begin_checkpoint 'x': lsn=9 timestamp=1251309957584197 crc=cd067878 len=29 //begin_checkpoint 'x': lsn=9 timestamp=1251309957584197 crc=cd067878 len=29
r = toku_log_begin_checkpoint(logger, &lsn, NO_FSYNC, 1251309957584197); assert(r==0); cp_txnid = lsn.lsn; r = toku_log_begin_checkpoint(logger, &lsn, NO_FSYNC, 1251309957584197); assert(r==0); cp_txnid = lsn.lsn;
//fassociate 'f': lsn=11 filenum=1 fname={len=4 data="b.db"} crc=a7126035 len=33 //fassociate 'f': lsn=11 filenum=1 fname={len=4 data="b.db"} crc=a7126035 len=33
r = toku_log_fassociate(logger, &lsn, NO_FSYNC, fn_bname, 0, bs_bname); assert(r==0); r = toku_log_fassociate(logger, &lsn, NO_FSYNC, fn_bname, 0, bs_bname, 0); assert(r==0);
num_fassociate++; num_fassociate++;
//fassociate 'f': lsn=12 filenum=0 fname={len=4 data="a.db"} crc=a70c5f35 len=33 //fassociate 'f': lsn=12 filenum=0 fname={len=4 data="a.db"} crc=a70c5f35 len=33
r = toku_log_fassociate(logger, &lsn, NO_FSYNC, fn_aname, 0, bs_aname); assert(r==0); r = toku_log_fassociate(logger, &lsn, NO_FSYNC, fn_aname, 0, bs_aname, 0); assert(r==0);
num_fassociate++; num_fassociate++;
//xstillopen 's': lsn=10 txnid=7 parent=0 crc=00061816 len=37 <- obsolete //xstillopen 's': lsn=10 txnid=7 parent=0 crc=00061816 len=37 <- obsolete
{ {
......
...@@ -23,7 +23,6 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r ...@@ -23,7 +23,6 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r
#include "toku_assert.h" #include "toku_assert.h"
#include "ydb.h" #include "ydb.h"
#include "ydb-internal.h" #include "ydb-internal.h"
#include <ft/ft-internal.h>
#include <ft/ft-flusher.h> #include <ft/ft-flusher.h>
#include <ft/cachetable.h> #include <ft/cachetable.h>
#include <ft/log.h> #include <ft/log.h>
...@@ -795,21 +794,21 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -795,21 +794,21 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
{ {
BOOL made_new_home = FALSE; BOOL made_new_home = FALSE;
char* new_home = NULL; char* new_home = NULL;
toku_struct_stat buf; toku_struct_stat buf;
if (strlen(home) > 1 && home[strlen(home)-1] == '\\') { if (strlen(home) > 1 && home[strlen(home)-1] == '\\') {
new_home = toku_malloc(strlen(home)); new_home = toku_malloc(strlen(home));
memcpy(new_home, home, strlen(home)); memcpy(new_home, home, strlen(home));
new_home[strlen(home) - 1] = 0; new_home[strlen(home) - 1] = 0;
made_new_home = TRUE; made_new_home = TRUE;
} }
r = toku_stat(made_new_home? new_home : home, &buf); r = toku_stat(made_new_home? new_home : home, &buf);
if (made_new_home) { if (made_new_home) {
toku_free(new_home); toku_free(new_home);
} }
if (r!=0) { if (r!=0) {
r = toku_ydb_do_error(env, errno, "Error from toku_stat(\"%s\",...)\n", home); r = toku_ydb_do_error(env, errno, "Error from toku_stat(\"%s\",...)\n", home);
goto cleanup; goto cleanup;
} }
} }
unused_flags &= ~DB_PRIVATE; unused_flags &= ~DB_PRIVATE;
...@@ -2649,6 +2648,8 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2649,6 +2648,8 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
assert(dbname == NULL); assert(dbname == NULL);
if (flags!=0) return EINVAL; if (flags!=0) return EINVAL;
// We check for an open db here as a "fast path" to error.
// We'll need to check again below to be sure.
if (env_is_db_with_dname_open(env, dname)) if (env_is_db_with_dname_open(env, dname))
return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n"); return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
...@@ -2668,41 +2669,54 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2668,41 +2669,54 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
// get iname // get iname
r = toku_db_get(env->i->directory, child, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname r = toku_db_get(env->i->directory, child, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
char *iname = iname_dbt.data; char *iname = iname_dbt.data;
if (r==DB_NOTFOUND) DB *db = NULL;
if (r == DB_NOTFOUND) {
r = ENOENT; r = ENOENT;
else if (r==0) { } else if (r == 0) {
// remove (dname,iname) from directory // remove (dname,iname) from directory
r = toku_db_del(env->i->directory, child, &dname_dbt, DB_DELETE_ANY, TRUE); r = toku_db_del(env->i->directory, child, &dname_dbt, DB_DELETE_ANY, TRUE);
if (r == 0) { if (r != 0) {
if (using_txns) { goto exit;
// this writes an fdelete to the transaction's rollback log. }
// it is removed if the child txn aborts after any error case below r = toku_db_create(&db, env, 0);
r = toku_ft_remove_on_commit(db_txn_struct_i(child)->tokutxn, &iname_dbt); assert_zero(r);
assert_zero(r); r = db_open_iname(db, txn, iname, 0, 0);
//Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions) assert_zero(r);
if (r==0 && env_is_db_with_dname_open(env, dname)) if (using_txns) {
r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n"); // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
if (r==0) { if (env_is_db_with_dname_open(env, dname)) {
// we know a live db handle does not exist. r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
// goto exit;
// if the lock tree still exists, try to get a full table
// lock. if we can't get it, then some txn still needs
// the ft and we should return lock not granted.
// otherwise, we're okay in marking this brt as remove on
// commit. no new handles can open for this dictionary
// because the txn has directory write locks on the dname
if (!can_acquire_table_lock(env, child, iname)) {
r = DB_LOCK_NOTGRANTED;
}
}
} }
else { // we know a live db handle does not exist.
r = toku_ft_remove_now(env->i->cachetable, &iname_dbt); //
// use the internally opened db to try and get a table lock
//
// if we can't get it, then some txn needs the ft and we
// should return lock not granted.
//
// otherwise, we're okay in marking this ft as remove on
// commit. no new handles can open for this dictionary
// because the txn has directory write locks on the dname
if (toku_db_pre_acquire_table_lock(db, child) != 0) {
r = DB_LOCK_NOTGRANTED;
} else {
// The ft will be removed when the txn commits
r = toku_ft_remove_on_commit(db->i->ft_handle, db_txn_struct_i(child)->tokutxn);
assert_zero(r); assert_zero(r);
} }
} }
else {
// Remove the ft without a txn
toku_ft_remove(db->i->ft_handle);
}
} }
exit:
if (db) {
int ret = toku_db_close(db);
assert(ret == 0);
}
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
...@@ -2714,13 +2728,12 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2714,13 +2728,12 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
invariant(r2==0); // TODO panic invariant(r2==0); // TODO panic
} }
} }
if (iname) {
if (iname) toku_free(iname); toku_free(iname);
}
return r; return r;
} }
static int static int
env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) { env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) {
int r; int r;
...@@ -2756,7 +2769,9 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2756,7 +2769,9 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
const char * dname = fname; const char * dname = fname;
assert(dbname == NULL); assert(dbname == NULL);
if (flags!=0) return EINVAL; if (flags != 0) return EINVAL;
// We check for open dnames for the old and new name as a "fast path" to error.
// We will need to check these again later.
if (env_is_db_with_dname_open(env, dname)) if (env_is_db_with_dname_open(env, dname))
return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n"); return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
if (env_is_db_with_dname_open(env, newname)) if (env_is_db_with_dname_open(env, newname))
...@@ -2777,12 +2792,12 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2777,12 +2792,12 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
assert_zero(r); assert_zero(r);
} }
char *iname; // get iname
r = toku_db_get(env->i->directory, child, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname r = toku_db_get(env->i->directory, child, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
iname = iname_dbt.data; char *iname = iname_dbt.data;
if (r==DB_NOTFOUND) if (r == DB_NOTFOUND) {
r = ENOENT; r = ENOENT;
else if (r==0) { } else if (r == 0) {
// verify that newname does not already exist // verify that newname does not already exist
r = db_getf_set(env->i->directory, child, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL); r = db_getf_set(env->i->directory, child, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
if (r == 0) { if (r == 0) {
...@@ -2791,27 +2806,39 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2791,27 +2806,39 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
else if (r == DB_NOTFOUND) { else if (r == DB_NOTFOUND) {
// remove old (dname,iname) and insert (newname,iname) in directory // remove old (dname,iname) and insert (newname,iname) in directory
r = toku_db_del(env->i->directory, child, &old_dname_dbt, DB_DELETE_ANY, TRUE); r = toku_db_del(env->i->directory, child, &old_dname_dbt, DB_DELETE_ANY, TRUE);
if (r == 0) if (r != 0) { goto exit; }
r = toku_db_put(env->i->directory, child, &new_dname_dbt, &iname_dbt, 0, TRUE); r = toku_db_put(env->i->directory, child, &new_dname_dbt, &iname_dbt, 0, TRUE);
if (r != 0) { goto exit; }
//Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions) //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
if (r==0 && env_is_db_with_dname_open(env, dname)) if (env_is_db_with_dname_open(env, dname)) {
r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
if (r == 0) { goto exit;
// we know a live db handle for the old name does not exist.
//
// if the lock tree still exists, try to get a full table
// lock. if we can't get it, then some txn still references
// this dictionary, so we can't proceed.
if (!can_acquire_table_lock(env, child, iname)) {
r = DB_LOCK_NOTGRANTED;
}
} }
if (r==0 && env_is_db_with_dname_open(env, newname)) { if (env_is_db_with_dname_open(env, newname)) {
r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
goto exit;
}
// we know a live db handle does not exist.
//
// use the internally opened db to try and get a table lock
//
// if we can't get it, then some txn needs the ft and we
// should return lock not granted.
//
// otherwise, we're okay in marking this ft as remove on
// commit. no new handles can open for this dictionary
// because the txn has directory write locks on the dname
if (!can_acquire_table_lock(env, child, iname)) {
r = DB_LOCK_NOTGRANTED;
} }
// We don't do anything at the ft or cachetable layer for rename.
// We just update entries in the environment's directory.
} }
} }
exit:
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
...@@ -2823,11 +2850,10 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2823,11 +2850,10 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
invariant(r2==0); // TODO panic invariant(r2==0); // TODO panic
} }
} }
if (iname) {
if (iname) toku_free(iname); toku_free(iname);
}
return r; return r;
} }
int int
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment