Commit e37c1ac3 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2218 refs[t:2218] Merge r1629 to main

git-svn-id: file:///svn/toku/tokudb@16130 c7de825b-a66e-492c-adef-691d508d4ae1
parent 84314a3a
...@@ -214,8 +214,11 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) { ...@@ -214,8 +214,11 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn); r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0); assert(r == 0);
if (txn == NULL) if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0; return 0;
}
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn); r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn);
...@@ -239,8 +242,11 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { ...@@ -239,8 +242,11 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn); r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0); assert(r == 0);
if (txn == NULL) if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0; return 0;
}
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn); r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn);
...@@ -292,7 +298,7 @@ abort_on_upgrade(DB* UU(pdb), ...@@ -292,7 +298,7 @@ abort_on_upgrade(DB* UU(pdb),
// Open the file if it is not already open. If it is already open, then do nothing. // Open the file if it is not already open. If it is already open, then do nothing.
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate) { static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate, TOKUTXN txn) {
int r; int r;
// already open // already open
...@@ -341,7 +347,7 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, ...@@ -341,7 +347,7 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
} }
int is_create = (flags & O_CREAT) != 0; int is_create = (flags & O_CREAT) != 0;
int only_create = is_create && (flags & O_TRUNC) != 0; int only_create = is_create && (flags & O_TRUNC) != 0;
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, NULL, fake_db, recovery_force_fcreate); r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, txn, fake_db, recovery_force_fcreate);
if (r != 0) { if (r != 0) {
close_brt:; close_brt:;
//Note: If brt_open fails, then close_brt will NOT write a header to disk. //Note: If brt_open fails, then close_brt will NOT write a header to disk.
...@@ -360,12 +366,11 @@ close_brt:; ...@@ -360,12 +366,11 @@ close_brt:;
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) { static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname); char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0); return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
} }
static int static int
maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) { maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT_END) {
// close the tree // close the tree
struct file_map_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
...@@ -379,7 +384,6 @@ maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) { ...@@ -379,7 +384,6 @@ maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
toku_free(fake_db); //Must free the DB after the brt is closed toku_free(fake_db); //Must free the DB after the brt is closed
file_map_remove(&renv->fmap, filenum); file_map_remove(&renv->fmap, filenum);
} }
}
return 0; return 0;
} }
...@@ -395,15 +399,22 @@ static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) { ...@@ -395,15 +399,22 @@ static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
create_dir_from_file(fixedfname); create_dir_from_file(fixedfname);
struct scan_state *ss = &renv->ss; struct scan_state *ss = &renv->ss;
int flags; int flags;
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (ss->ss == SS_BACKWARD_SAW_CKPT) { if (ss->ss == SS_BACKWARD_SAW_CKPT) {
//Treat as fopen //Treat as fopen
flags = 0; flags = 0;
} }
else { else {
assert(txn != NULL); //Straddle txns cannot happen after checkpoint
assert(ss->ss == SS_FORWARD_SAW_CKPT); assert(ss->ss == SS_FORWARD_SAW_CKPT);
flags = O_CREAT|O_TRUNC; flags = O_CREAT|O_TRUNC;
} }
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1); return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1, txn);
} }
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) { static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
...@@ -417,9 +428,11 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) { ...@@ -417,9 +428,11 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
int r = toku_txnid2txn(renv->logger, l->txnid, &txn); int r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0); assert(r == 0);
//TODO: #??? (insert ticket number) bug that could leave orphaned files here. if (txn == NULL) {
if (txn == NULL) //This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0; return 0;
}
char *fixediname = fixup_fname(&l->iname); char *fixediname = fixup_fname(&l->iname);
{ //Skip if does not exist. { //Skip if does not exist.
toku_struct_stat buf; toku_struct_stat buf;
...@@ -451,8 +464,11 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re ...@@ -451,8 +464,11 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn); r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0); assert(r == 0);
if (txn == NULL) if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0; return 0;
}
DBT keydbt, valdbt; DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len); toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len); toku_fill_dbt(&valdbt, l->value.data, l->value.len);
...@@ -481,6 +497,10 @@ static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_emp ...@@ -481,6 +497,10 @@ static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_emp
r = toku_brt_note_table_lock(tuple->brt, txn); r = toku_brt_note_table_lock(tuple->brt, txn);
assert(r == 0); assert(r == 0);
} }
else {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
}
return 0; return 0;
} }
...@@ -499,8 +519,11 @@ static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECO ...@@ -499,8 +519,11 @@ static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECO
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn); r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0); assert(r == 0);
if (txn == NULL) if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0; return 0;
}
DBT keydbt, valdbt; DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len); toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len); toku_fill_dbt(&valdbt, l->value.data, l->value.len);
...@@ -525,8 +548,11 @@ static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVE ...@@ -525,8 +548,11 @@ static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVE
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn); r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0); assert(r == 0);
if (txn == NULL) if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0; return 0;
}
DBT keydbt; DBT keydbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len); toku_fill_dbt(&keydbt, l->key.data, l->key.len);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn); r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn);
...@@ -557,11 +583,9 @@ static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV UU(renv)) ...@@ -557,11 +583,9 @@ static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV UU(renv))
} }
static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV renv) { static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT) {
// tree open // tree open
char *fixedfname = fixup_fname(&l->iname); char *fixedfname = fixup_fname(&l->iname);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0); internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
}
return 0; return 0;
} }
...@@ -602,7 +626,7 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi ...@@ -602,7 +626,7 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
return 0; return 0;
} else { } else {
time_t tnow = time(NULL); time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64"\n", ctime(&tnow), l->lsn.lsn, ss->min_live_txn); fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64". Scanning backwards through %"PRIu64" log entries.\n", ctime(&tnow), l->lsn.lsn, ss->min_live_txn, l->lsn.lsn - ss->min_live_txn);
return 0; return 0;
} }
case SS_BACKWARD_SAW_CKPT: case SS_BACKWARD_SAW_CKPT:
...@@ -645,7 +669,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *UU(l), RECOVER_EN ...@@ -645,7 +669,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *UU(l), RECOVER_EN
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) { static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname); char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0); return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
} }
static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) { static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
...@@ -682,9 +706,12 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) { ...@@ -682,9 +706,12 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
if (l->parenttxnid != 0) { if (l->parenttxnid != 0) {
r = toku_txnid2txn(renv->logger, l->parenttxnid, &parent); r = toku_txnid2txn(renv->logger, l->parenttxnid, &parent);
assert(r == 0); assert(r == 0);
if (parent == NULL) if (parent == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0; return 0;
} }
}
// create a transaction and bind it to the transaction id // create a transaction and bind it to the transaction id
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
...@@ -702,7 +729,7 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV r ...@@ -702,7 +729,7 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV r
return 0; // ignore txns that began during the checkpoint return 0; // ignore txns that began during the checkpoint
case SS_BACKWARD_SAW_CKPT: case SS_BACKWARD_SAW_CKPT:
assert(ss->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one assert(ss->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
// If we got to the min, return nonzero assert(ss->min_live_txn <= l->lsn.lsn); //Did not pass it.
if (ss->min_live_txn >= l->lsn.lsn) { if (ss->min_live_txn >= l->lsn.lsn) {
if (tokudb_recovery_trace) if (tokudb_recovery_trace)
fprintf(stderr, "Tokudb recovery turning around at xbegin %" PRIu64 " (%s)\n", l->lsn.lsn, recover_state(renv)); fprintf(stderr, "Tokudb recovery turning around at xbegin %" PRIu64 " (%s)\n", l->lsn.lsn, recover_state(renv));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment