Commit e4a27ac8 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5500 logger fsync does not fail


git-svn-id: file:///svn/toku/tokudb@48233 c7de825b-a66e-492c-adef-691d508d4ae1
parent c7606997
......@@ -226,17 +226,7 @@ toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) {
//Debugging function
#define PRNTF(str, b, siz, ad, bt)
void
toku_block_translation_note_failed_checkpoint (BLOCK_TABLE bt) {
lock_for_blocktable(bt);
assert(bt->inprogress.block_translation);
bt->checkpoint_failed = true;
unlock_for_blocktable(bt);
}
void
toku_block_translation_note_skipped_checkpoint (BLOCK_TABLE bt) {
void toku_block_translation_note_skipped_checkpoint (BLOCK_TABLE bt) {
//Purpose, alert block translation that the checkpoint was skipped, e.x. for a non-dirty header
lock_for_blocktable(bt);
assert(bt->inprogress.block_translation);
......
......@@ -29,7 +29,6 @@ void toku_ft_unlock(FT h);
void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, int fd);
void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt);
void toku_maybe_truncate_file_on_open(BLOCK_TABLE bt, int fd);
......
......@@ -3113,8 +3113,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
if (is_create) {
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
r = toku_create_new_ft(&ft, &ft_h->options, cf, txn);
if (r) { goto exit; }
toku_ft_create(&ft, &ft_h->options, cf, txn);
}
else if (r!=0) {
goto exit;
......
......@@ -168,7 +168,6 @@ static int
ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
FT ft = (FT) header_v;
FT_HEADER ch = ft->checkpoint_header;
int r = 0;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
assert(ch);
......@@ -176,8 +175,7 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header)
TOKULOGGER logger = toku_cachefile_logger(cf);
if (logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
if (r!=0) goto handle_error;
toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
}
uint64_t now = (uint64_t) time(NULL);
ft->h->time_of_last_modification = now;
......@@ -197,12 +195,8 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
else {
toku_block_translation_note_skipped_checkpoint(ft->blocktable);
}
if (0) {
handle_error:
toku_block_translation_note_failed_checkpoint(ft->blocktable);
}
return r;
// TODO can't fail
return 0;
}
// maps to cf->end_checkpoint_userdata
......@@ -228,7 +222,6 @@ ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN opls
// We already have exclusive access to this field already, so skip the locking.
// This should already never fail.
invariant(!toku_ft_needed_unlocked(ft));
int r = 0;
assert(ft->cf == cachefile);
TOKULOGGER logger = toku_cachefile_logger(cachefile);
LSN lsn = ZERO_LSN;
......@@ -237,9 +230,10 @@ ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN opls
//Use recovery-specified lsn
lsn = oplsn;
//Recovery cannot reduce lsn of a header.
if (lsn.lsn < ft->h->checkpoint_lsn.lsn)
if (lsn.lsn < ft->h->checkpoint_lsn.lsn) {
lsn = ft->h->checkpoint_lsn;
}
}
else {
//Get LSN from logger
lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
......@@ -256,12 +250,15 @@ ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN opls
assert(logger->rollback_cachefile != cachefile);
}
ft_begin_checkpoint(lsn, header_v);
r = ft_checkpoint(cachefile, fd, ft);
// TODO: can't fail
int r = ft_checkpoint(cachefile, fd, ft);
invariant(r == 0);
ft_end_checkpoint(cachefile, fd, header_v);
assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
}
toku_ft_free(ft);
return r;
// TODO: can't fail
return 0;
}
// maps to cf->note_pin_by_checkpoint
......@@ -277,10 +274,8 @@ static void ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
toku_ft_release_reflock(ft);
}
static void
unpin_by_checkpoint_callback(FT ft, void *extra)
// Requires: the reflock is held.
{
static void unpin_by_checkpoint_callback(FT ft, void *extra) {
invariant(extra == NULL);
invariant(ft->pinned_by_checkpoint);
ft->pinned_by_checkpoint = false;
......@@ -303,9 +298,8 @@ void toku_node_save_ct_pair(void *value_data, PAIR p) {
node->ct_pair = p;
}
// TODO: can't fail
static int setup_initial_ft_root_node (FT ft, BLOCKNUM blocknum) {
FTNODE XMALLOC(node);
static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
FTNODE XCALLOC(node);
toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->flags);
BP_STATE(node,0) = PT_AVAIL;
......@@ -316,13 +310,12 @@ static int setup_initial_ft_root_node (FT ft, BLOCKNUM blocknum) {
get_write_callbacks_for_node(ft),
toku_node_save_ct_pair);
toku_unpin_ftnode(ft, node);
return 0;
}
static int
ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
// fake, prevent unnecessary upgrade logic
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft->checkpoint_header = NULL;
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
toku_list_init(&ft->live_ft_handles);
......@@ -335,12 +328,7 @@ ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
ft->cf = cf;
ft->in_memory_stats = ZEROSTATS;
int r;
r = setup_initial_ft_root_node(ft, ft->h->root_blocknum);
if (r != 0) {
goto exit;
}
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, ft, 0);
setup_initial_ft_root_node(ft, ft->h->root_blocknum);
toku_cachefile_set_userdata(ft->cf,
ft,
ft_log_fassociate_during_checkpoint,
......@@ -353,14 +341,11 @@ ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
ft_note_unpin_by_checkpoint);
toku_block_verify_no_free_blocknums(ft->blocktable);
r = 0;
exit:
return r;
}
static FT_HEADER
ft_header_new(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
{
uint64_t now = (uint64_t) time(NULL);
struct ft_header h = {
......@@ -393,40 +378,24 @@ ft_header_new(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_cr
}
// allocate and initialize a fractal tree.
// t->ft->cf is not set to anything. TODO(leif): I don't think that's true
int
toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
int r;
void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
invariant(ftp);
FT XCALLOC(ft);
memset(&ft->descriptor, 0, sizeof(ft->descriptor));
memset(&ft->cmp_descriptor, 0, sizeof(ft->cmp_descriptor));
ft->h = ft_header_new(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE));
ft->h = ft_header_create(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE));
toku_ft_init_reflock(ft);
// Assign blocknum for root block, also dirty the header
toku_blocktable_create_new(&ft->blocktable);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum(ft->blocktable, &ft->h->root_blocknum, ft);
r = ft_init(ft, options, cf);
if (r != 0) {
goto exit;
}
ft_init(ft, options, cf);
*ftp = ft;
r = 0;
exit:
if (r != 0) {
if (ft) {
toku_free(ft);
ft = NULL;
}
return r;
}
return r;
}
// TODO: (Zardosht) get rid of brt parameter
......@@ -584,7 +553,7 @@ toku_ft_init(FT ft,
.compare_fun = NULL,
.update_fun = NULL
};
ft->h = ft_header_new(&options, root_blocknum_on_disk, root_xid_that_created);
ft->h = ft_header_create(&options, root_blocknum_on_disk, root_xid_that_created);
ft->h->checkpoint_count = 1;
ft->h->checkpoint_lsn = checkpoint_lsn;
}
......
......@@ -28,7 +28,7 @@ void toku_ft_destroy_reflock(FT ft);
void toku_ft_grab_reflock(FT ft);
void toku_ft_release_reflock(FT ft);
int toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn);
void toku_ft_free (FT h);
int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, bool* was_open);
......
......@@ -462,29 +462,24 @@ toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
release_output(logger, fsynced_lsn);
}
int toku_logger_fsync (TOKULOGGER logger)
void toku_logger_fsync (TOKULOGGER logger)
// Effect: This is the exported fsync used by ydb.c for env_log_flush. Group commit doesn't have to work.
// Entry: Holds no locks
// Exit: Holds no locks
// Implementation note: Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
// Then release everything.
// TODO: can't fail
{
ml_lock(&logger->input_lock);
logger->input_lock_ctr++;
toku_logger_maybe_fsync(logger, logger->inbuf.max_lsn_in_buf, true);
return 0;
}
// TODO: can't fail
int
toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
if (logger->write_log_files) {
ml_lock(&logger->input_lock);
logger->input_lock_ctr++;
toku_logger_maybe_fsync(logger, lsn, true);
}
return 0;
}
int toku_logger_is_open(TOKULOGGER logger) {
......
......@@ -29,8 +29,8 @@ int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool cre
int toku_logger_close_rollback(TOKULOGGER logger);
bool toku_logger_rollback_is_open (TOKULOGGER); // return true iff the rollback is open.
int toku_logger_fsync (TOKULOGGER logger);
int toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger, LSN lsn);
void toku_logger_fsync (TOKULOGGER logger);
void toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger, LSN lsn);
int toku_logger_is_open(TOKULOGGER logger);
void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct);
int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max);
......
......@@ -56,8 +56,7 @@ toku_commit_fdelete (FILENUM filenum,
// here as part of a transactoin that may abort if we do not fsync the log.
// So, we fsync the log here.
if (txn->logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
assert_zero(r);
toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
}
// Mark the cachefile as unlink on close. There are two ways for close
......@@ -404,10 +403,11 @@ toku_commit_load (FILENUM old_filenum,
// here as part of a transactoin that may abort if we do not fsync the log.
// So, we fsync the log here.
if (txn->logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
lazy_assert(r == 0);
toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
}
// TODO: Zardosht
// Explain why this condition is valid, because I forget.
if (!toku_cachefile_is_unlink_on_close(old_cf)) {
toku_cachefile_unlink_on_close(old_cf);
}
......
......@@ -45,7 +45,7 @@ test_main (int argc __attribute__((__unused__)),
logger->lsn.lsn++;
logger->inbuf.max_lsn_in_buf = logger->lsn;
ml_unlock(&logger->input_lock);
r = toku_logger_fsync(logger); assert(r == 0);
toku_logger_fsync(logger);
}
r = toku_logger_close(&logger);
assert(r == 0);
......
......@@ -360,12 +360,10 @@ int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist prepl
);
}
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) {
int r = 0;
void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) {
if (logger && do_fsync) {
r = toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
}
return r;
}
void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn) {
......
......@@ -59,7 +59,7 @@ void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid);
void toku_txn_get_prepared_xa_xid (TOKUTXN, TOKU_XA_XID *);
// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values.
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync);
void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync);
void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn);
......
......@@ -1000,7 +1000,9 @@ env_close(DB_ENV * env, uint32_t flags) {
// if panicked, or if any open transactions, or any open dbs, then do nothing.
if (toku_env_is_panicked(env)) goto panic_and_quit_early;
if (toku_env_is_panicked(env)) {
goto panic_and_quit_early;
}
if (env->i->open_txns != 0) {
err_msg = "Cannot close environment due to open transactions\n";
r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
......@@ -1013,7 +1015,6 @@ env_close(DB_ENV * env, uint32_t flags) {
goto panic_and_quit_early;
}
}
{
if (env->i->persistent_environment) {
r = toku_db_close(env->i->persistent_environment);
if (r) {
......@@ -1030,7 +1031,6 @@ env_close(DB_ENV * env, uint32_t flags) {
goto panic_and_quit_early;
}
}
}
if (env->i->cachetable) {
toku_cachetable_minicron_shutdown(env->i->cachetable);
if (env->i->logger) {
......@@ -1059,7 +1059,7 @@ env_close(DB_ENV * env, uint32_t flags) {
toku_cachetable_close(&env->i->cachetable);
}
if (env->i->logger) {
r=toku_logger_close(&env->i->logger);
r = toku_logger_close(&env->i->logger);
if (r) {
err_msg = "Cannot close environment (logger close error)\n";
env->i->logger = NULL;
......@@ -1069,10 +1069,11 @@ env_close(DB_ENV * env, uint32_t flags) {
}
// Even if nothing else went wrong, but we were panicked, then raise an error.
// But if something else went wrong then raise that error (above)
if (toku_env_is_panicked(env))
if (toku_env_is_panicked(env)) {
goto panic_and_quit_early;
else
assert(env->i->panic_string==0);
} else {
assert(env->i->panic_string == 0);
}
env_fs_destroy(env);
if (env->i->ltm) {
......@@ -1093,17 +1094,17 @@ env_close(DB_ENV * env, uint32_t flags) {
toku_free(env->i->real_tmp_dir);
if (env->i->open_dbs)
toku_omt_destroy(&env->i->open_dbs);
toku_mutex_destroy(&env->i->open_dbs_lock);
if (env->i->dir)
toku_free(env->i->dir);
//Immediately before freeing internal environment unlock the directories
toku_mutex_destroy(&env->i->open_dbs_lock);
// Immediately before freeing internal environment unlock the directories
unlock_single_process(env);
toku_free(env->i);
env->i = NULL;
toku_free(env);
env = NULL;
if (flags!=0)
if (flags != 0) {
r = EINVAL;
}
return r;
panic_and_quit_early:
......@@ -1128,19 +1129,23 @@ env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
static int
env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
HANDLE_PANICKED_ENV(env);
// We just flush everything. MySQL uses lsn==0 which means flush everything. For anyone else using the log, it is correct to flush too much, so we are OK.
return toku_logger_fsync(env->i->logger);
// We just flush everything. MySQL uses lsn == 0 which means flush everything.
// For anyone else using the log, it is correct to flush too much, so we are OK.
toku_logger_fsync(env->i->logger);
return 0;
}
static int
env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
HANDLE_PANICKED_ENV(env);
if (ncache != 1)
if (ncache != 1) {
return EINVAL;
}
uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
unsigned long cs = cs64;
if (cs64 > cs)
if (cs64 > cs) {
return EINVAL;
}
env->i->cachetable_size = cs;
return 0;
}
......@@ -1163,7 +1168,7 @@ locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *db
toku_multi_operation_client_unlock();
if (using_txns) {
if (r == 0) { // commit
if (r == 0) {
ret = locked_txn_commit(child_txn, 0);
lazy_assert_zero(ret);
} else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment