Commit 3ab29cbd authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2155 refs[t:2155] Renamed variables,

Added missing state transaction (turn around on checkpoint begin with no transactions)
Added error checking for unlink (anything but 0/ENOENT is real error)

Changes due to code review with Barry

git-svn-id: file:///svn/toku/tokudb@15848 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9bc0fed8
......@@ -16,15 +16,15 @@ int toku_recover_trace = 0;
#define VERIFY_COUNTS(n) ((void)0)
#endif
struct backward_scan_state {
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT, BS_FORWARD_SAW_CKPT_BEGIN } bs;
struct scan_state {
enum { SS_INIT, SS_BACKWARD_SAW_CKPT_END, SS_BACKWARD_SAW_CKPT, SS_FORWARD_SAW_CKPT } ss;
LSN checkpoint_lsn;
int n_live_txns;
TXNID min_live_txn;
};
static void backward_scan_state_init(struct backward_scan_state *bs) {
bs->bs = BS_INIT; bs->checkpoint_lsn = ZERO_LSN; bs->n_live_txns = 0; bs->min_live_txn = 0;
static void scan_state_init(struct scan_state *ss) {
ss->ss = SS_INIT; ss->checkpoint_lsn = ZERO_LSN; ss->n_live_txns = 0; ss->min_live_txn = 0;
}
// File map tuple
......@@ -143,7 +143,7 @@ struct recover_env {
TOKULOGGER logger;
brt_compare_func bt_compare;
brt_compare_func dup_compare;
struct backward_scan_state bs;
struct scan_state ss;
struct file_map fmap;
BOOL goforward;
};
......@@ -289,8 +289,10 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
if (flags & O_TRUNC) {
// maybe unlink
r = unlink(fixedfname);
if (r != 0)
if (r != 0 && errno != ENOENT) {
printf("%s:%d unlink %d\n", __FUNCTION__, __LINE__, errno);
return r;
}
}
BRT brt=0;
......@@ -319,7 +321,9 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
r = toku_brt_set_descriptor(brt, descriptor_version, &descriptor_dbt, abort_on_upgrade);
if (r!=0) goto close_brt;
}
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, fake_db, recovery_force_fcreate);
int is_create = (flags & O_CREAT) != 0;
int only_create = is_create && (flags & O_TRUNC) != 0;
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, NULL, fake_db, recovery_force_fcreate);
if (r != 0) {
close_brt:;
//Note: If brt_open fails, then close_brt will NOT write a header to disk.
......@@ -343,7 +347,7 @@ static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
static int
maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
if (renv->bs.bs == BS_SAW_CKPT_END) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT_END) {
// close the tree
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, filenum, &tuple);
......@@ -371,14 +375,14 @@ static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV ren
static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
create_dir_from_file(fixedfname);
struct backward_scan_state *bs = &renv->bs;
struct scan_state *ss = &renv->ss;
int flags;
if (bs->bs == BS_SAW_CKPT) {
if (ss->ss == SS_BACKWARD_SAW_CKPT) {
//Treat as fopen
flags = 0;
}
else {
assert(bs->bs == BS_FORWARD_SAW_CKPT_BEGIN);
assert(ss->ss == SS_FORWARD_SAW_CKPT);
flags = O_CREAT|O_TRUNC;
}
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1);
......@@ -535,7 +539,7 @@ static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV UU(renv))
}
static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
if (renv->bs.bs == BS_SAW_CKPT) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT) {
// tree open
char *fixedfname = fixup_fname(&l->iname);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
......@@ -544,47 +548,48 @@ static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV r
}
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *UU(l), RECOVER_ENV UU(renv)) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_SAW_CKPT:
assert(l->lsn.lsn <= bs->checkpoint_lsn.lsn);
if (l->lsn.lsn == bs->checkpoint_lsn.lsn)
bs->bs = BS_FORWARD_SAW_CKPT_BEGIN;
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_BACKWARD_SAW_CKPT:
assert(l->lsn.lsn <= ss->checkpoint_lsn.lsn);
if (l->lsn.lsn == ss->checkpoint_lsn.lsn)
ss->ss = SS_FORWARD_SAW_CKPT;
return 0; // ignore it
case BS_FORWARD_SAW_CKPT_BEGIN:
assert(l->lsn.lsn > bs->checkpoint_lsn.lsn);
case SS_FORWARD_SAW_CKPT:
assert(l->lsn.lsn > ss->checkpoint_lsn.lsn);
return 0; // ignore it
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
abort();
// nothing
return 0;
}
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // incomplete checkpoint
case BS_SAW_CKPT_END:
assert(bs->checkpoint_lsn.lsn == l->lsn.lsn);
bs->bs = BS_SAW_CKPT;
if (bs->n_live_txns==0) {
case SS_BACKWARD_SAW_CKPT_END:
assert(ss->checkpoint_lsn.lsn == l->lsn.lsn);
ss->ss = SS_BACKWARD_SAW_CKPT;
if (ss->n_live_txns==0) {
fprintf(stderr, "Tokudb recovery turning around at begin checkpoint %"PRIu64"\n", l->lsn.lsn);
renv->goforward = TRUE;
ss->ss = SS_FORWARD_SAW_CKPT;
return 0;
} else {
fprintf(stderr, "Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64"\n", l->lsn.lsn, bs->min_live_txn);
fprintf(stderr, "Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64"\n", l->lsn.lsn, ss->min_live_txn);
return 0;
}
case BS_SAW_CKPT:
case SS_BACKWARD_SAW_CKPT:
return 0; // ignore it
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
abort();
}
......@@ -594,21 +599,21 @@ static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *UU(l), RE
}
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
bs->bs = BS_SAW_CKPT_END;
bs->checkpoint_lsn.lsn = l->txnid;
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
ss->ss = SS_BACKWARD_SAW_CKPT_END;
ss->checkpoint_lsn.lsn = l->txnid;
return 0;
case BS_SAW_CKPT_END:
case SS_BACKWARD_SAW_CKPT_END:
fprintf(stderr, "%s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__);
abort();
case BS_SAW_CKPT:
case SS_BACKWARD_SAW_CKPT:
return 0;
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
abort();
}
......@@ -628,23 +633,23 @@ static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_EN
}
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // ignore live txns from incomplete checkpoint
case BS_SAW_CKPT_END:
if (bs->n_live_txns == 0)
bs->min_live_txn = l->txnid;
else if (toku_txnid_older(l->txnid, bs->min_live_txn))
bs->min_live_txn = l->txnid;
bs->n_live_txns++;
case SS_BACKWARD_SAW_CKPT_END:
if (ss->n_live_txns == 0)
ss->min_live_txn = l->txnid;
else if (toku_txnid_older(l->txnid, ss->min_live_txn))
ss->min_live_txn = l->txnid;
ss->n_live_txns++;
return 0;
case BS_SAW_CKPT:
case SS_BACKWARD_SAW_CKPT:
return 0; // ignore live txns from older checkpoints
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
abort();
}
......@@ -668,29 +673,29 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
}
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
struct scan_state *ss = &renv->ss;
switch (ss->ss) {
case SS_INIT:
return 0; // ignore txns that began after checkpoint
case BS_SAW_CKPT_END:
case SS_BACKWARD_SAW_CKPT_END:
return 0; // ignore txns that began during the checkpoint
case BS_SAW_CKPT:
assert(bs->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
case SS_BACKWARD_SAW_CKPT:
assert(ss->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
// If we got to the min, return nonzero
if (bs->min_live_txn >= l->lsn.lsn) {
if (ss->min_live_txn >= l->lsn.lsn) {
if (toku_recover_trace)
fprintf(stderr, "Tokudb recovery turning around at xbegin %" PRIu64 "\n", l->lsn.lsn);
renv->goforward = TRUE;
return 0;
} else {
if (toku_recover_trace)
fprintf(stderr, "Tokudb recovery scanning back at xbegin %" PRIu64 " (looking for %" PRIu64 ")\n", l->lsn.lsn, bs->min_live_txn);
fprintf(stderr, "Tokudb recovery scanning back at xbegin %" PRIu64 " (looking for %" PRIu64 ")\n", l->lsn.lsn, ss->min_live_txn);
return 0;
}
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)ss->ss);
abort();
}
......@@ -861,7 +866,7 @@ static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_di
// scan backwards
fprintf(stderr, "Tokudb recovery scanning backward from %"PRIu64"\n", lastlsn.lsn);
backward_scan_state_init(&renv->bs);
scan_state_init(&renv->ss);
while (1) {
le = NULL;
r = toku_logcursor_prev(logcursor, &le);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment