Commit e667ae91 authored by Yoni Fogel's avatar Yoni Fogel

closes[t:2483] Fix recovery bug in loader.

checkpoint_lsn written to headers made by loader are now the fsynced 'load' log entry's lsn instead of MAX_UINT64

git-svn-id: file:///svn/toku/tokudb@19136 c7de825b-a66e-492c-adef-691d508d4ae1
parent 90125eb7
......@@ -2658,7 +2658,7 @@ txn_note_doing_work(TOKUTXN txn) {
int
toku_brt_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log) {
toku_brt_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
int r = 0;
assert(txn);
toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log
......@@ -2673,18 +2673,18 @@ toku_brt_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_ina
r = toku_logger_save_rollback_load(txn, old_iname_bs, new_iname_bs);
if (r==0 && do_log && logger) {
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_load(logger, (LSN*)NULL, do_fsync, xid, old_iname_bs, new_iname_bs);
r = toku_log_load(logger, load_lsn, do_fsync, xid, old_iname_bs, new_iname_bs);
}
return r;
}
int
toku_brt_load(BRT brt, TOKUTXN txn, char const * new_iname, int do_fsync) {
toku_brt_load(BRT brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) {
int r = 0;
char const * old_iname = toku_cachefile_fname_in_env(brt->cf);
int do_log = 1;
r = toku_brt_load_recovery(txn, old_iname, new_iname, do_fsync, do_log);
r = toku_brt_load_recovery(txn, old_iname, new_iname, do_fsync, do_log, load_lsn);
return r;
}
......
......@@ -66,8 +66,8 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging, enum brt_msg_type type);
int toku_brt_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log);
int toku_brt_load(BRT brt, TOKUTXN txn, char const * new_iname, int do_fsync);
int toku_brt_load_recovery(TOKUTXN txn, char const * old_iname, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn);
int toku_brt_load(BRT brt, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *get_lsn);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
......
......@@ -55,6 +55,7 @@ struct brtloader_s {
void *poll_extra;
int user_said_stop; // 0 if the poll_function always returned zero. If it ever returns nonzero, then store that value here.
LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
};
/* These data structures are used for manipulating a collection of rows in main memory. */
......
......@@ -172,7 +172,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
const struct descriptor *descriptors[/*N*/],
const char *new_fnames_in_env[/*N*/],
brt_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template)
const char *temp_file_template,
LSN load_lsn)
/* Effect: called by DB_ENV->create_loader to create a brt loader.
* Arguments:
* blp Return the brt loader here.
......@@ -218,6 +219,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
bl->n_rows = 0;
bl->progress = 0;
bl->load_lsn = load_lsn;
*blp = bl;
return 0;
......@@ -1275,10 +1277,10 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
}
static void write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk) {
static void write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk, BLOCKNUM root_blocknum_on_disk, LSN load_lsn) {
struct brt_header h = {.layout_version = BRT_LAYOUT_VERSION,
.checkpoint_count = 1,
.checkpoint_lsn = (LSN){0xFFFFFFFFFFFFFFFF}, // (max_uint_long means that this doesn't need any kind of recovery
.checkpoint_lsn = load_lsn, // Nothing is logged after the load.
.nodesize = nodesize,
.root = root_blocknum_on_disk,
.flags = 0,
......@@ -1658,7 +1660,7 @@ int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct d
long long off_of_translation;
int r = write_translation_table(&out, &off_of_translation);
assert(r==0);
write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block);
write_header(&out, off_of_translation, (out.n_translations+1)*16+4, root_block, bl->load_lsn);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
if (out.translation) toku_free(out.translation);
......
......@@ -14,7 +14,8 @@ int toku_brt_loader_open (BRTLOADER *bl,
const struct descriptor *descriptors[/*N*/],
const char * new_fnames_in_env[/*N*/],
brt_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template);
const char *temp_file_template,
LSN load_lsn);
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
......
......@@ -1000,7 +1000,7 @@ static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
char *old_iname = fixup_fname(&l->old_iname);
char *new_iname = fixup_fname(&l->new_iname);
r = toku_brt_load_recovery(txn, old_iname, new_iname, 0, 0);
r = toku_brt_load_recovery(txn, old_iname, new_iname, 0, 0, (LSN*)NULL);
assert(r==0);
toku_free(old_iname);
......
......@@ -197,7 +197,8 @@ int toku_loader_create_loader(DB_ENV *env,
}
loader->i->ekeys = NULL;
loader->i->evals = NULL;
r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env);
LSN load_lsn;
r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env, &load_lsn);
if ( r!=0 ) {
toku_free(descriptors);
free_loader(loader);
......@@ -212,7 +213,8 @@ int toku_loader_create_loader(DB_ENV *env,
descriptors,
(const char **)new_inames_in_env,
compare_functions,
loader->i->temp_file_template);
loader->i->temp_file_template,
load_lsn);
loader->i->inames_in_env = new_inames_in_env;
toku_free(descriptors);
}
......
......@@ -5538,7 +5538,7 @@ env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
// It is the caller's responsibility to free them.
// Return 0 on success (could fail if write lock not available).
int
ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N]) {
ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn) {
int rval;
int i;
......@@ -5575,11 +5575,15 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
if (!rval && using_txns) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
int do_fsync = 0;
LSN *get_lsn = NULL;
for (i = 0; i < N; i++) {
BRT brt = dbs[i]->i->brt;
//Fsync is necessary for the last one only.
if (i==N-1) do_fsync = 1; //We only need a single fsync of logs.
rval = toku_brt_load(brt, ttxn, new_inames_in_env[i], do_fsync);
if (i==N-1) {
do_fsync = 1; //We only need a single fsync of logs.
get_lsn = load_lsn; //Set pointer to capture the last lsn.
}
rval = toku_brt_load(brt, ttxn, new_inames_in_env[i], do_fsync, get_lsn);
if (rval) break;
}
}
......@@ -5606,9 +5610,9 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
}
int
locked_ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N]) {
locked_ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn) {
toku_ydb_lock();
int r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env);
int r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, load_lsn);
toku_ydb_unlock();
return r;
}
......
......@@ -23,14 +23,16 @@ int ydb_load_inames(DB_ENV * env,
DB_TXN * txn,
int N,
DB * dbs[/*N*/],
/*out*/ char * new_inames_in_env[N]);
/*out*/ char * new_inames_in_env[N],
LSN *load_lsn);
// Wrapper to ydb_load_inames if you are not holding the ydb lock.
int locked_ydb_load_inames(DB_ENV * env,
DB_TXN * txn,
int N,
DB * dbs[/*N*/],
/*out*/ char * new_inames_in_env[N]);
/*out*/ char * new_inames_in_env[N],
LSN *load_lsn);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment