Commit 26ba2551 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge the recovery changes found by barry and rich during a code review to main closes[t:2231]

git-svn-id: file:///svn/toku/tokudb@16298 c7de825b-a66e-492c-adef-691d508d4ae1
parent a58906fe
......@@ -20,45 +20,53 @@ int tokudb_recovery_trace = 0; // turn on recovery tracing, d
#define TOKUDB_RECOVERY_PROGRESS_TIME 15
struct scan_state {
enum { SS_INIT = 1, SS_BACKWARD_SAW_CKPT_END, SS_BACKWARD_SAW_CKPT, SS_FORWARD_SAW_CKPT } ss;
enum {
BACKWARD_NEWER_CHECKPOINT_END = 1,
BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END,
BACKWARD_OLDER_CHECKPOINT_BEGIN,
FORWARD_OLDER_CHECKPOINT_BEGIN,
FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,
FORWARD_NEWER_CHECKPOINT_END,
} ss;
LSN checkpoint_lsn;
int n_live_txns;
TXNID min_live_txn;
TXNID oldest_txnid;
};
static void scan_state_init(struct scan_state *ss) {
ss->ss = SS_INIT; ss->checkpoint_lsn = ZERO_LSN; ss->n_live_txns = 0; ss->min_live_txn = 0;
ss->ss = BACKWARD_NEWER_CHECKPOINT_END; ss->checkpoint_lsn = ZERO_LSN; ss->n_live_txns = 0; ss->oldest_txnid = 0;
}
static const char *scan_states[] = {
"?", "init", "bw_checkpoint_end", "bw_checkpoint_begin", "fw_checkpoint",
static const char *scan_state_strings[] = {
"?", "bw_newer", "bw_between", "bw_older", "fw_older", "fw_between", "fw_newer",
};
static const char *scan_state(struct scan_state *ss) {
return scan_states[ss->ss];
static const char *scan_state_string(struct scan_state *ss) {
assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END);
return scan_state_strings[ss->ss];
}
// File map tuple
struct file_map_tuple {
FILENUM filenum;
BRT brt;
char *fname;
char *iname;
};
static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, BRT brt, char *fname) {
static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, BRT brt, char *iname) {
tuple->filenum = filenum;
tuple->brt = brt;
tuple->fname = fname;
tuple->iname = iname;
}
static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
if (tuple->fname) {
toku_free(tuple->fname);
tuple->fname = NULL;
if (tuple->iname) {
toku_free(tuple->iname);
tuple->iname = NULL;
}
}
// Map filenum to brt, fname
// Map filenum to brt
struct file_map {
OMT filenums;
};
......@@ -121,10 +129,10 @@ static int file_map_h(OMTVALUE omtv, void *v) {
return 0;
}
static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char *fname) {
static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char *iname) {
struct file_map_tuple *tuple = toku_malloc(sizeof (struct file_map_tuple));
assert(tuple);
file_map_tuple_init(tuple, fnum, brt, fname);
file_map_tuple_init(tuple, fnum, brt, iname);
int r = toku_omt_insert(fmap->filenums, tuple, file_map_h, &fnum, NULL);
return r;
}
......@@ -199,7 +207,7 @@ static void recover_env_cleanup (RECOVER_ENV renv, BOOL recovery_succeeded) {
}
static const char *recover_state(RECOVER_ENV renv) {
return scan_state(&renv->ss);
return scan_state_string(&renv->ss);
}
// Null function supplied to transaction commit and abort
......@@ -207,87 +215,6 @@ static void recover_yield(voidfp UU(f), void *UU(extra)) {
// nothing
}
static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0;
}
// commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
return 0;
}
static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0;
}
// abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
return 0;
}
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void create_dir_from_file (const char *fname) {
char *tmp=toku_strdup(fname);
char ch;
for (int i=0; (ch=fname[i]); i++) {
//
// TODO: this may fail in windows, double check the absolute path names
// and '/' as the directory delimiter or something
//
if (ch=='/') {
if (i>0) {
tmp[i]=0;
mode_t oldu = umask(0);
int r = toku_os_mkdir(tmp, S_IRWXU);
if (r!=0 && errno!=EEXIST) {
fprintf(stderr, "Tokudb recovery %s:%d error: %s\n", __FUNCTION__, __LINE__, strerror(errno));
}
assert (r == 0 || (errno==EEXIST));
umask(oldu);
tmp[i]=ch;
}
}
}
toku_free(tmp);
}
static int
abort_on_upgrade(DB* UU(pdb),
u_int32_t UU(old_version), const DBT *UU(old_descriptor), const DBT *UU(old_key), const DBT *UU(old_val),
......@@ -296,30 +223,13 @@ abort_on_upgrade(DB* UU(pdb),
return ENOSYS;
}
// Open the file if it is not already open. If it is already open, then do nothing.
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate, TOKUTXN txn) {
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int mode, BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags,
u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate, TOKUTXN txn) {
int r;
char *iname = fixup_fname(bs_iname);
// already open
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, filenum, &tuple);
if (r == 0) {
assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname);
return 0;
}
if (flags & O_TRUNC) {
// maybe unlink
r = unlink(fixedfname);
if (r != 0 && errno != ENOENT) {
fprintf(stderr, "Tokudb recovery %s:%d unlink %d\n", __FUNCTION__, __LINE__, errno);
return r;
}
}
BRT brt=0;
BRT brt = NULL;
r = toku_brt_create(&brt);
assert(r == 0);
......@@ -345,38 +255,37 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
r = toku_brt_set_descriptor(brt, descriptor_version, &descriptor_dbt, abort_on_upgrade);
if (r!=0) goto close_brt;
}
int is_create = (flags & O_CREAT) != 0;
int only_create = is_create && (flags & O_TRUNC) != 0;
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, txn, fake_db, recovery_force_fcreate);
r = toku_brt_open_recovery(brt, iname, iname, must_create, must_create, renv->ct, txn, fake_db, recovery_force_fcreate);
if (r != 0) {
close_brt:;
close_brt:
;
//Note: If brt_open fails, then close_brt will NOT write a header to disk.
//No need to provide lsn
int rr = toku_close_brt(brt, NULL); assert(rr == 0);
toku_free(fixedfname);
toku_free(iname);
toku_free(fake_db); //Free memory allocated for the fake db.
if (r==ENOENT) //Not an error to simply be missing.
r = 0;
return r;
}
file_map_insert(&renv->fmap, filenum, brt, fixedfname);
file_map_insert(&renv->fmap, filenum, brt, iname);
return 0;
}
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
}
static int
maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum, BYTESTRING *bs_iname) {
// close the tree
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r == 0) {
char *iname = fixup_fname(bs_iname);
assert(strcmp(tuple->iname, iname) == 0);
toku_free(iname);
struct scan_state *ss = &renv->ss;
assert(ss->ss == SS_BACKWARD_SAW_CKPT);
assert(ss->ss == BACKWARD_OLDER_CHECKPOINT_BEGIN);
//Must keep existing lsn.
//The only way this should be dirty, is if its doing a file-format upgrade.
//If not dirty, header will not be written.
......@@ -389,42 +298,93 @@ maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
return 0;
}
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
int r = maybe_do_fclose_during_recover_backward(renv, l->filenum);
assert(r==0);
return 0;
}
// fcreate is like fopen except that the file must be created. Also creates the dir if needed.
// fcreate is like fopen except that the file must be created.
static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
create_dir_from_file(fixedfname);
struct scan_state *ss = &renv->ss;
int flags;
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (ss->ss == SS_BACKWARD_SAW_CKPT) {
//Treat as fopen
flags = 0;
}
else {
// assert that filenum is closed
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenum, &tuple);
assert(r != 0);
BOOL must_create;
if (ss->ss == FORWARD_OLDER_CHECKPOINT_BEGIN) {
must_create = FALSE; // do not create file if it does not exist
} else {
assert(txn != NULL); //Straddle txns cannot happen after checkpoint
assert(ss->ss == SS_FORWARD_SAW_CKPT);
flags = O_CREAT|O_TRUNC;
assert(ss->ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END || ss->ss == FORWARD_NEWER_CHECKPOINT_END);
must_create = TRUE;
// maybe unlink
char *iname = fixup_fname(&l->iname);
r = unlink(iname);
if (r != 0 && errno != ENOENT) {
fprintf(stderr, "Tokudb recovery %s:%d unlink %s %d\n", __FUNCTION__, __LINE__, iname, errno);
toku_free(iname);
return r;
}
toku_free(iname);
}
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1, txn);
r = internal_toku_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1, txn);
return r;
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
int r = maybe_do_fclose_during_recover_backward(renv, l->filenum);
static int toku_recover_backward_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
int r = maybe_do_fclose_during_recover_backward(renv, l->filenum, &l->iname);
assert(r==0);
return 0;
}
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
// check if the file is already open by backward scan of fassociate
struct file_map_tuple *tuple = NULL;
char *iname = fixup_fname(&l->iname);
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r == 0) {
// file is already opened by fassociate
assert(strcmp(tuple->iname, iname) == 0);
} else {
// file is not open, open it
r = internal_toku_recover_fopen_or_fcreate(renv, FALSE, 0, &l->iname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
}
toku_free(iname);
return r;
}
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
int r = maybe_do_fclose_during_recover_backward(renv, l->filenum, &l->iname);
assert(r==0);
return 0;
}
static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r == 0) {
char *iname = fixup_fname(&l->iname);
assert(strcmp(tuple->iname, iname) == 0);
toku_free(iname);
DB *fake_db = tuple->brt->db; //Need to free the fake db that was malloced
r = toku_close_brt_lsn(tuple->brt, 0, TRUE, l->lsn);
assert(r == 0);
toku_free(fake_db); //Must free the DB after the brt is closed
file_map_remove(&renv->fmap, l->filenum);
}
return 0;
}
static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
// fdelete is a transactional file delete.
static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL;
......@@ -432,7 +392,7 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
char *fixediname = fixup_fname(&l->iname);
......@@ -442,6 +402,8 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
if (r==-1 && errno==ENOENT)
goto cleanup;
}
// txn exists and file exists, so create fdelete rollback entry
DBT iname_dbt;
toku_fill_dbt(&iname_dbt, fixediname, strlen(fixediname)+1);
r = toku_brt_remove_on_commit(txn, &iname_dbt, &iname_dbt);
......@@ -456,7 +418,7 @@ static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER
return 0;
}
static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *l, RECOVER_ENV renv) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
......@@ -466,66 +428,67 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
if (txn != NULL) {
r = toku_brt_note_table_lock(tuple->brt, txn);
assert(r == 0);
} else {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
}
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn);
assert(r == 0);
return 0;
}
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
static int toku_recover_backward_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *l, RECOVER_ENV renv) {
static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn);
assert(r == 0);
if (txn != NULL) {
r = toku_brt_note_table_lock(tuple->brt, txn);
assert(r == 0);
}
else {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
}
return 0;
}
static int toku_recover_backward_tablelock_on_empty_table(struct logtype_tablelock_on_empty_table *UU(l), RECOVER_ENV UU(renv)) {
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECOVER_ENV renv) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
......@@ -541,20 +504,21 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both
}
static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
DBT keydbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn);
......@@ -568,138 +532,171 @@ static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *
return 0;
}
static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV UU(renv)) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r == 0) {
char *fixedfname = fixup_fname(&l->iname);
assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname);
DB *fake_db = tuple->brt->db; //Need to free the fake db that was malloced
r = toku_close_brt_lsn(tuple->brt, 0, TRUE, l->lsn);
assert(r == 0);
toku_free(fake_db); //Must free the DB after the brt is closed
file_map_remove(&renv->fmap, l->filenum);
}
return 0;
}
static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) {
// NO-OP
return 0;
}
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *UU(l), RECOVER_ENV UU(renv)) {
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_BACKWARD_SAW_CKPT:
assert(l->lsn.lsn <= ss->checkpoint_lsn.lsn);
if (l->lsn.lsn == ss->checkpoint_lsn.lsn)
ss->ss = SS_FORWARD_SAW_CKPT;
return 0; // ignore it
case SS_FORWARD_SAW_CKPT:
assert(l->lsn.lsn > ss->checkpoint_lsn.lsn);
return 0; // ignore it
case SS_INIT:
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
switch (renv->ss.ss) {
case FORWARD_OLDER_CHECKPOINT_BEGIN:
assert(l->lsn.lsn <= renv->ss.checkpoint_lsn.lsn);
if (l->lsn.lsn == renv->ss.checkpoint_lsn.lsn)
renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END;
return 0;
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(0);
return 0;
case FORWARD_NEWER_CHECKPOINT_END:
assert(l->lsn.lsn > renv->ss.checkpoint_lsn.lsn);
return 0; // ignore it (log only has a begin checkpoint)
default:
break;
}
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
abort();
// nothing
return 0;
}
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // incomplete checkpoint
case SS_BACKWARD_SAW_CKPT_END:
assert(ss->checkpoint_lsn.lsn == l->lsn.lsn);
ss->ss = SS_BACKWARD_SAW_CKPT;
if (ss->n_live_txns==0) {
switch (renv->ss.ss) {
case BACKWARD_OLDER_CHECKPOINT_BEGIN:
assert(l->lsn.lsn < renv->ss.checkpoint_lsn.lsn);
return 0; // ignore it
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(renv->ss.checkpoint_lsn.lsn == l->lsn.lsn);
if (renv->ss.n_live_txns==0) {
renv->ss.ss = FORWARD_OLDER_CHECKPOINT_BEGIN;
renv->goforward = TRUE;
time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery turning around at begin checkpoint %"PRIu64"\n", ctime(&tnow), l->lsn.lsn);
renv->goforward = TRUE;
return 0;
} else {
renv->ss.ss = BACKWARD_OLDER_CHECKPOINT_BEGIN;
time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64". Scanning backwards through %"PRIu64" log entries.\n", ctime(&tnow), l->lsn.lsn, ss->min_live_txn, l->lsn.lsn - ss->min_live_txn);
return 0;
fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64". Scanning backwards through %"PRIu64" log entries.\n", ctime(&tnow), l->lsn.lsn, renv->ss.oldest_txnid, l->lsn.lsn - renv->ss.oldest_txnid);
}
case SS_BACKWARD_SAW_CKPT:
return 0; // ignore it
return 0;
case BACKWARD_NEWER_CHECKPOINT_END:
return 0; // incomplete checkpoint
default:
break;
}
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
abort();
}
static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
switch (renv->ss.ss) {
case FORWARD_OLDER_CHECKPOINT_BEGIN:
assert(l->lsn.lsn < renv->ss.checkpoint_lsn.lsn);
return 0;
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->txnid == renv->ss.checkpoint_lsn.lsn);
renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END;
return 0;
case FORWARD_NEWER_CHECKPOINT_END:
assert(0);
return 0;
default:
assert(0);
return 0;
}
}
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
ss->ss = SS_BACKWARD_SAW_CKPT_END;
ss->checkpoint_lsn.lsn = l->txnid;
switch (renv->ss.ss) {
case BACKWARD_OLDER_CHECKPOINT_BEGIN:
return 0;
case SS_BACKWARD_SAW_CKPT_END:
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
fprintf(stderr, "Tokudb recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__);
abort();
case SS_BACKWARD_SAW_CKPT:
case BACKWARD_NEWER_CHECKPOINT_END:
renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
renv->ss.checkpoint_lsn.lsn = l->txnid;
return 0;
default:
break;
}
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
abort();
}
static int toku_recover_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
switch (renv->ss.ss) {
case FORWARD_OLDER_CHECKPOINT_BEGIN:
return 0;
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
case FORWARD_NEWER_CHECKPOINT_END: {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r == 0) {
// assert that the filenum maps to the correct iname
char *fname = fixup_fname(&l->iname);
assert(strcmp(fname, tuple->iname) == 0);
toku_free(fname);
}
return 0;
}
default:
assert(0);
return 0;
}
}
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
int r = 0;
struct scan_state *ss = &renv->ss;
if (ss->ss == SS_BACKWARD_SAW_CKPT_END) {
char *fixedfname = fixup_fname(&l->iname);
r = internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
switch (renv->ss.ss) {
case BACKWARD_OLDER_CHECKPOINT_BEGIN:
return 0;
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
// assert that the filenum is not open
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
assert(r != 0);
// open it
r = internal_toku_recover_fopen_or_fcreate(renv, FALSE, 0, &l->iname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
return r;
}
case BACKWARD_NEWER_CHECKPOINT_END:
return 0;
default:
assert(0);
return 0;
}
return r;
}
static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
switch (renv->ss.ss) {
case FORWARD_OLDER_CHECKPOINT_BEGIN:
return 0;
case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
case FORWARD_NEWER_CHECKPOINT_END: {
// assert that the transaction exists
TOKUTXN txn = NULL;
int r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0 && txn != NULL);
return 0;
}
default:
assert(0);
return 0;
}
}
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // ignore live txns from incomplete checkpoint
case SS_BACKWARD_SAW_CKPT_END:
if (ss->n_live_txns == 0)
ss->min_live_txn = l->txnid;
else if (toku_txnid_older(l->txnid, ss->min_live_txn))
ss->min_live_txn = l->txnid;
ss->n_live_txns++;
return 0;
case SS_BACKWARD_SAW_CKPT:
switch (renv->ss.ss) {
case BACKWARD_OLDER_CHECKPOINT_BEGIN:
return 0; // ignore live txns from older checkpoints
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->txnid < renv->ss.checkpoint_lsn.lsn);
if (renv->ss.n_live_txns == 0)
renv->ss.oldest_txnid = l->txnid;
else if (toku_txnid_older(l->txnid, renv->ss.oldest_txnid))
renv->ss.oldest_txnid = l->txnid;
renv->ss.n_live_txns++;
return 0;
case BACKWARD_NEWER_CHECKPOINT_END:
return 0; // ignore live txns from incomplete checkpoint
default:
break;
}
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
abort();
}
......@@ -713,7 +710,7 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
assert(r == 0);
if (parent == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
}
......@@ -728,23 +725,24 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // ignore txns that began after checkpoint
case SS_BACKWARD_SAW_CKPT_END:
return 0; // ignore txns that began during the checkpoint
case SS_BACKWARD_SAW_CKPT:
case BACKWARD_OLDER_CHECKPOINT_BEGIN:
assert(ss->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
assert(ss->min_live_txn <= l->lsn.lsn); //Did not pass it.
if (ss->min_live_txn >= l->lsn.lsn) {
if (tokudb_recovery_trace)
fprintf(stderr, "Tokudb recovery turning around at xbegin %" PRIu64 " (%s)\n", l->lsn.lsn, recover_state(renv));
assert(ss->oldest_txnid <= l->lsn.lsn); //Did not pass it.
if (ss->oldest_txnid == l->lsn.lsn) {
renv->goforward = TRUE;
return 0;
renv->ss.ss = FORWARD_OLDER_CHECKPOINT_BEGIN;
time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery turning around at xbegin %" PRIu64 " live txns=%d (%s)\n", ctime(&tnow), l->lsn.lsn, renv->ss.n_live_txns, recover_state(renv));
} else {
if (tokudb_recovery_trace)
fprintf(stderr, "Tokudb recovery scanning back at xbegin %" PRIu64 " looking for %" PRIu64 " (%s)\n", l->lsn.lsn, ss->min_live_txn, recover_state(renv));
return 0;
fprintf(stderr, "Tokudb recovery scanning back at xbegin %" PRIu64 " looking for %" PRIu64 " (%s)\n", l->lsn.lsn, ss->oldest_txnid, recover_state(renv));
}
return 0;
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->lsn.lsn > renv->ss.checkpoint_lsn.lsn);
return 0; // ignore txns that began during the checkpoint
case BACKWARD_NEWER_CHECKPOINT_END:
return 0; // ignore txns that began after checkpoint
default:
break;
}
......@@ -752,6 +750,62 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV r
abort();
}
static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); // cannot happen after checkpoint begin
return 0;
}
// commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
return 0;
}
static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); // cannot happen after checkpoint begin
return 0;
}
// abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn);
assert(r == 0);
// close the transaction
toku_txn_close_txn(txn);
return 0;
}
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
......@@ -914,7 +968,6 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
rr = DB_RUNRECOVERY; goto errorexit;
}
// TODO use logcursor->invalidate()
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
......@@ -933,6 +986,8 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
time_t tlast = tnow;
fprintf(stderr, "%.24s Tokudb recovery scanning backward from %"PRIu64"\n", ctime(&tnow), lastlsn.lsn);
for (unsigned i=0; 1; i++) {
// get the previous log entry (first time gets the last one)
le = NULL;
r = toku_logcursor_prev(logcursor, &le);
if (tokudb_recovery_trace)
......@@ -943,6 +998,8 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
}
break;
}
// trace progress
if ((i % 1000) == 0) {
tnow = time(NULL);
if (tnow - tlast >= TOKUDB_RECOVERY_PROGRESS_TIME) {
......@@ -951,6 +1008,11 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
tlast = tnow;
}
}
// dispatch the log entry handler
assert(renv->ss.ss == BACKWARD_OLDER_CHECKPOINT_BEGIN ||
renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END);
logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
if (tokudb_recovery_trace)
recover_trace_le(__FUNCTION__, __LINE__, r, le);
......@@ -959,17 +1021,8 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
rr = DB_RUNRECOVERY; goto errorexit;
}
if (renv->goforward) {
logtype_dispatch_assign(le, toku_recover_, r, renv);
if (tokudb_recovery_trace)
recover_trace_le(__FUNCTION__, __LINE__, r, le);
if (r != 0) {
if (tokudb_recovery_trace)
fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
rr = DB_RUNRECOVERY; goto errorexit;
}
if (renv->goforward)
break;
}
}
// run first callback
......@@ -977,23 +1030,14 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
recover_callback_fx(recover_callback_args);
// scan forwards
if (le)
thislsn = toku_log_entry_get_lsn(le);
else
thislsn = ZERO_LSN;
assert(le);
thislsn = toku_log_entry_get_lsn(le);
tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery scanning forward to %"PRIu64" from %"PRIu64" left %"PRIu64" (%s)\n", ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
fprintf(stderr, "%.24s Tokudb recovery starts scanning forward to %"PRIu64" from %"PRIu64" left %"PRIu64" (%s)\n", ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
for (unsigned i=0; 1; i++) {
le = NULL;
r = toku_logcursor_next(logcursor, &le);
if (tokudb_recovery_trace)
recover_trace_le(__FUNCTION__, __LINE__, r, le);
if (r != 0) {
if (r == DB_RUNRECOVERY) {
rr = DB_RUNRECOVERY; goto errorexit;
}
break;
}
// trace progress
if ((i % 1000) == 0) {
tnow = time(NULL);
if (tnow - tlast >= TOKUDB_RECOVERY_PROGRESS_TIME) {
......@@ -1002,6 +1046,11 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
tlast = tnow;
}
}
// dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN ||
renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
logtype_dispatch_assign(le, toku_recover_, r, renv);
if (tokudb_recovery_trace)
recover_trace_le(__FUNCTION__, __LINE__, r, le);
......@@ -1010,11 +1059,26 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
rr = DB_RUNRECOVERY; goto errorexit;
}
// get the next log entry
le = NULL;
r = toku_logcursor_next(logcursor, &le);
if (tokudb_recovery_trace)
recover_trace_le(__FUNCTION__, __LINE__, r, le);
if (r != 0) {
if (r == DB_RUNRECOVERY) {
rr = DB_RUNRECOVERY; goto errorexit;
}
break;
}
}
// verify the final recovery state
assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
// run second callback
if (recover_callback2_fx)
recover_callback2_fx(recover_callback2_args);
......@@ -1028,7 +1092,6 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery aborting %"PRIu32" live transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
}
recover_abort_live_txns(renv);
// close the open dictionaries
......@@ -1146,8 +1209,6 @@ int tokudb_recover(const char *env_dir, const char *log_dir, brt_compare_func bt
return rr;
}
// Return 0 if recovery log exists, ENOENT if log is missing
int
tokudb_recover_log_exists(const char * log_dir) {
......
// test recovery of a clean shutdown
#include "test.h"
#include "includes.h"
......
......@@ -17,6 +17,8 @@ run_test(void) {
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0); assert(r == 0);
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
......
......@@ -17,6 +17,10 @@ run_test(void) {
TOKULOGGER logger;
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0); assert(r == 0);
BYTESTRING iname = { strlen("missing_tokudb_file"), "missing_tokudb_file" };
FILENUM filenum = {42};
uint32_t treeflags = 0;
......
// test recovery of "hello" comments
#include "test.h"
#include "includes.h"
......@@ -16,7 +18,13 @@ run_test(void) {
r = toku_logger_create(&logger); assert(r == 0);
r = toku_logger_open(TESTDIR, logger); assert(r == 0);
BYTESTRING hello = { strlen("hello"), "hello" };
r = toku_log_comment(logger, NULL, TRUE, 0, hello);
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
LSN beginlsn;
r = toku_log_begin_checkpoint(logger, &beginlsn, TRUE, 0); assert(r == 0);
r = toku_log_end_checkpoint(logger, NULL, TRUE, beginlsn.lsn, 0); assert(r == 0);
r = toku_log_comment(logger, NULL, TRUE, 0, hello); assert(r == 0);
BYTESTRING there = { strlen("there"), "there" };
r = toku_log_comment(logger, NULL, TRUE, 0, there); assert(r == 0);
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
......
// test recovery with no data directory
#include "test.h"
#include "includes.h"
......
// test recovery with no log
#include "test.h"
#include "includes.h"
......
// test recovery with a NULL log directory
#include "test.h"
#include "includes.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment