Commit e9e22dd4 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix some recovery tests close[t:2005] close[t:2010]

git-svn-id: file:///svn/toku/tokudb@14546 c7de825b-a66e-492c-adef-691d508d4ae1
parent ed6f49cf
...@@ -40,16 +40,16 @@ check: $(TARGETS) $(RUNTARGETS); ...@@ -40,16 +40,16 @@ check: $(TARGETS) $(RUNTARGETS);
ifeq ($(VGRIND),) ifeq ($(VGRIND),)
./$< $(SUMMARIZE_CMD) ./$< $(SUMMARIZE_CMD)
else else
$(VGRIND) --error-exitcode=1 --quiet --leak-check=full --log-file=$<.check.valgrind ./$<; \ $(VGRIND) --error-exitcode=1 --quiet --leak-check=full --log-file=$<.check.valgrind ./$< >$<.check.output 2>&1; \
if [ $$? = 0 ] ; then \ if [ $$? = 0 ] ; then \
lines=`cat $<.check.valgrind | wc -l`; \ lines=`cat $<.check.valgrind | wc -l`; \
if [ $$lines -ne 0 ] ; then cat $<.check.valgrind; test 0 = 1; fi \ if [ $$lines -ne 0 ] ; then cat $<.check.valgrind; test 0 = 1; fi \
else \ else \
test 0 = 1; \ cat $<.check.valgrind; test 0 = 1; \
fi \ fi \
$(SUMMARIZE_CMD) $(SUMMARIZE_CMD)
endif endif
clean: clean:
rm -rf $(TARGETS) *.check.valgrind pwrite4g.data testdir rm -rf $(TARGETS) *.check.output *.check.valgrind pwrite4g.data testdir
...@@ -2593,10 +2593,10 @@ toku_brt_broadcast_commit_all (BRT brt) ...@@ -2593,10 +2593,10 @@ toku_brt_broadcast_commit_all (BRT brt)
// Effect: Insert the key-val pair into brt. // Effect: Insert the key-val pair into brt.
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_insert(brt, key, val, txn, ZERO_LSN); return toku_brt_maybe_insert(brt, key, val, txn, FALSE, ZERO_LSN);
} }
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) { int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn) {
int r = 0; int r = 0;
XIDS message_xids; XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn); TXNID xid = toku_txn_get_txnid(txn);
...@@ -2628,7 +2628,7 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) ...@@ -2628,7 +2628,7 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn)
} }
LSN treelsn = toku_brt_checkpoint_lsn(brt); LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) { if (oplsn_valid && oplsn.lsn <= treelsn.lsn) {
r = 0; r = 0;
} else { } else {
BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={key,val}}; BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={key,val}};
...@@ -2638,10 +2638,10 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) ...@@ -2638,10 +2638,10 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn)
} }
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
return toku_brt_maybe_delete(brt, key, txn, ZERO_LSN); return toku_brt_maybe_delete(brt, key, txn, FALSE, ZERO_LSN);
} }
int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, LSN oplsn) { int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn) {
int r; int r;
XIDS message_xids; XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn); TXNID xid = toku_txn_get_txnid(txn);
...@@ -2666,7 +2666,7 @@ int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, LSN oplsn) { ...@@ -2666,7 +2666,7 @@ int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, LSN oplsn) {
} }
LSN treelsn = toku_brt_checkpoint_lsn(brt); LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) { if (oplsn_valid && oplsn.lsn <= treelsn.lsn) {
r = 0; r = 0;
} else { } else {
DBT val; DBT val;
...@@ -4531,10 +4531,10 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void * ...@@ -4531,10 +4531,10 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
/* ********************************* delete **************************************/ /* ********************************* delete **************************************/
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_delete_both(brt, key, val, txn, ZERO_LSN); return toku_brt_maybe_delete_both(brt, key, val, txn, FALSE, ZERO_LSN);
} }
int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) { int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); } //{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r; int r;
XIDS message_xids; XIDS message_xids;
...@@ -4562,7 +4562,7 @@ int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN opl ...@@ -4562,7 +4562,7 @@ int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN opl
} }
LSN treelsn = toku_brt_checkpoint_lsn(brt); LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) { if (oplsn_valid && oplsn.lsn <= treelsn.lsn) {
r = 0; r = 0;
} else { } else {
BRT_MSG_S brtcmd = { BRT_DELETE_BOTH, message_xids, .u.id={key,val}}; BRT_MSG_S brtcmd = { BRT_DELETE_BOTH, message_xids, .u.id={key,val}};
......
...@@ -61,7 +61,7 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn); ...@@ -61,7 +61,7 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery. // Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful // Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn); int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn);
// Effect: Delete a key from a brt // Effect: Delete a key from a brt
// Returns 0 if successful // Returns 0 if successful
...@@ -69,7 +69,7 @@ int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn); ...@@ -69,7 +69,7 @@ int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery. // Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful // Returns 0 if successful
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, LSN oplsn); int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function. // Effect: Delete a pair only if both k and v are equal according to the comparison function.
// Returns 0 if successful // Returns 0 if successful
...@@ -78,7 +78,7 @@ int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn); ...@@ -78,7 +78,7 @@ int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function and the // Effect: Delete a pair only if both k and v are equal according to the comparison function and the
// oplsn is newer than the brt lsn. This function is called by recovery. // oplsn is newer than the brt lsn. This function is called by recovery.
// Returns 0 if successful // Returns 0 if successful
int toku_brt_maybe_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn); int toku_brt_maybe_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn);
int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags); int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags);
int toku_close_brt (BRT, TOKULOGGER, char **error_string); int toku_close_brt (BRT, TOKULOGGER, char **error_string);
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include "includes.h" #include "includes.h"
#include "log_header.h" #include "log_header.h"
#include "varray.h"
#include "checkpoint.h" #include "checkpoint.h"
static int toku_recover_trace = 0; static int toku_recover_trace = 0;
...@@ -28,83 +27,101 @@ static void backward_scan_state_init(struct backward_scan_state *bs) { ...@@ -28,83 +27,101 @@ static void backward_scan_state_init(struct backward_scan_state *bs) {
bs->bs = BS_INIT; bs->checkpoint_lsn = ZERO_LSN; bs->n_live_txns = 0; bs->min_live_txn = 0; bs->bs = BS_INIT; bs->checkpoint_lsn = ZERO_LSN; bs->n_live_txns = 0; bs->min_live_txn = 0;
} }
// Map filenum to brt // File map tuple
// TODO why can't we use the cachetable to find by filenum? struct file_map_tuple {
// TODO O(n) time for linear search. should we use an OMT?
struct file_map {
struct cf_tuple {
FILENUM filenum; FILENUM filenum;
CACHEFILE cf; BRT brt;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
char *fname; char *fname;
} *cf_tuples; };
int n_cf_tuples, max_cf_tuples;
static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, BRT brt, char *fname) {
tuple->filenum = filenum;
tuple->brt = brt;
tuple->fname = fname;
}
static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
if (tuple->fname) {
toku_free(tuple->fname);
tuple->fname = NULL;
}
}
// Map filenum to brt, fname
// TODO why can't we use the cachetable to find by filenum?
struct file_map {
OMT filenums;
}; };
static void file_map_init(struct file_map *fmap) { static void file_map_init(struct file_map *fmap) {
fmap->cf_tuples = NULL; int r = toku_omt_create(&fmap->filenums);
fmap->n_cf_tuples = fmap->max_cf_tuples = 0; assert(r == 0);
}
static void file_map_destroy(struct file_map *fmap) {
toku_omt_destroy(&fmap->filenums);
} }
static void file_map_close_dictionaries(struct file_map *fmap) { static void file_map_close_dictionaries(struct file_map *fmap) {
int r; int r;
for (int i=0; i<fmap->n_cf_tuples; i++) { while (1) {
struct cf_tuple *tuple = &fmap->cf_tuples[i]; u_int32_t n = toku_omt_size(fmap->filenums);
if (tuple->brt) { if (n == 0)
break;
OMTVALUE v;
r = toku_omt_fetch(fmap->filenums, n-1, &v, NULL);
assert(r == 0);
r = toku_omt_delete_at(fmap->filenums, n-1);
assert(r == 0);
struct file_map_tuple *tuple = v;
assert(tuple->brt);
r = toku_close_brt(tuple->brt, 0, 0); r = toku_close_brt(tuple->brt, 0, 0);
//r = toku_cachefile_close(&cf_tuples[i].cf);
assert(r == 0); assert(r == 0);
} file_map_tuple_destroy(tuple);
if (tuple->fname) { toku_free(tuple);
toku_free(tuple->fname);
tuple->fname = NULL;
}
}
fmap->n_cf_tuples = fmap->max_cf_tuples = 0;
if (fmap->cf_tuples) {
toku_free(fmap->cf_tuples);
fmap->cf_tuples = NULL;
} }
} }
static int file_map_add (struct file_map *fmap, FILENUM fnum, CACHEFILE cf, BRT brt, char *fname) { static int file_map_h(OMTVALUE omtv, void *v) {
if (fmap->cf_tuples == NULL) { struct file_map_tuple *a = omtv;
fmap->max_cf_tuples = 1; FILENUM *b = v;
MALLOC_N(fmap->max_cf_tuples, fmap->cf_tuples); if (a->filenum.fileid < b->fileid) return -1;
if (fmap->cf_tuples == NULL) { if (a->filenum.fileid > b->fileid) return +1;
fmap->max_cf_tuples = 0;
return errno;
}
fmap->n_cf_tuples=1;
} else {
if (fmap->n_cf_tuples >= fmap->max_cf_tuples) {
struct cf_tuple *new_cf_tuples = toku_realloc(fmap->cf_tuples, 2*fmap->max_cf_tuples*sizeof(struct cf_tuple));
if (new_cf_tuples == NULL)
return errno;
fmap->cf_tuples = new_cf_tuples;
fmap->max_cf_tuples *= 2;
}
fmap->n_cf_tuples++;
}
struct cf_tuple *tuple = &fmap->cf_tuples[fmap->n_cf_tuples-1];
tuple->filenum = fnum;
tuple->cf = cf;
tuple->brt = brt;
tuple->fname = fname;
return 0; return 0;
} }
static int find_cachefile (struct file_map *fmap, FILENUM fnum, struct cf_tuple **cf_tuple) { static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char *fname) {
for (int i=0; i<fmap->n_cf_tuples; i++) { struct file_map_tuple *tuple = toku_malloc(sizeof (struct file_map_tuple));
if (fnum.fileid==fmap->cf_tuples[i].filenum.fileid) { assert(tuple);
*cf_tuple = &fmap->cf_tuples[i]; file_map_tuple_init(tuple, fnum, brt, fname);
return 0; int r = toku_omt_insert(fmap->filenums, tuple, file_map_h, &fnum, NULL);
return r;
}
static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL);
if (r == 0) {
struct file_map_tuple *tuple = v;
r = toku_omt_delete_at(fmap->filenums, idx);
file_map_tuple_destroy(tuple);
toku_free(tuple);
} }
}
static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL);
if (r == 0) {
struct file_map_tuple *tuple = v;
assert(tuple->filenum.fileid == fnum.fileid);
*file_map_tuple = tuple;
} }
return 1; return r;
} }
// The recovery environment
struct recover_env { struct recover_env {
CACHETABLE ct; CACHETABLE ct;
TOKULOGGER logger; TOKULOGGER logger;
...@@ -137,6 +154,7 @@ static void recover_env_cleanup (RECOVER_ENV renv) { ...@@ -137,6 +154,7 @@ static void recover_env_cleanup (RECOVER_ENV renv) {
int r; int r;
file_map_close_dictionaries(&renv->fmap); file_map_close_dictionaries(&renv->fmap);
file_map_destroy(&renv->fmap);
r = toku_logger_close(&renv->logger); r = toku_logger_close(&renv->logger);
assert(r == 0); assert(r == 0);
...@@ -159,7 +177,9 @@ static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV renv) { ...@@ -159,7 +177,9 @@ static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV renv) {
// find the transaction by transaction id // find the transaction by transaction id
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn == NULL)
return;
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, lsn); r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, lsn);
...@@ -180,7 +200,9 @@ static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV renv) { ...@@ -180,7 +200,9 @@ static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV renv) {
// find the transaction by transaction id // find the transaction by transaction id
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn == NULL)
return;
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, lsn); r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, lsn);
...@@ -230,8 +252,8 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, ...@@ -230,8 +252,8 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
int r; int r;
// already open // already open
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
r = find_cachefile(&renv->fmap, filenum, &tuple); r = file_map_find(&renv->fmap, filenum, &tuple);
if (r == 0) { if (r == 0) {
assert(strcmp(tuple->fname, fixedfname) == 0); assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname); toku_free(fixedfname);
...@@ -268,7 +290,7 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, ...@@ -268,7 +290,7 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
r = toku_brt_open(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, NULL); r = toku_brt_open(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, NULL);
assert(r == 0); assert(r == 0);
file_map_add(&renv->fmap, filenum, NULL, brt, fixedfname); file_map_insert(&renv->fmap, filenum, brt, fixedfname);
} }
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV renv) { static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV renv) {
...@@ -279,12 +301,12 @@ static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FI ...@@ -279,12 +301,12 @@ static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FI
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) { static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
if (renv->bs.bs == BS_SAW_CKPT_END) { if (renv->bs.bs == BS_SAW_CKPT_END) {
// close the tree // close the tree
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, l->filenum, &tuple); int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r == 0) { if (r == 0) {
r = toku_close_brt(tuple->brt, 0, 0); r = toku_close_brt(tuple->brt, 0, 0);
assert(r == 0); assert(r == 0);
tuple->brt=0; file_map_remove(&renv->fmap, l->filenum);
} }
} }
return 0; return 0;
...@@ -303,36 +325,38 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER ...@@ -303,36 +325,38 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER
} }
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) { static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything. // if we didn't find a cachefile, then we don't have to do anything.
return; return;
} }
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn == NULL)
return;
DBT keydbt, valdbt; DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len); toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len); toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, lsn); r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, lsn);
assert(r == 0); assert(r == 0);
} }
static int toku_recover_tablelock_on_empty_table(LSN UU(lsn), FILENUM filenum, TXNID xid, RECOVER_ENV renv) { static int toku_recover_tablelock_on_empty_table(LSN UU(lsn), FILENUM filenum, TXNID xid, RECOVER_ENV renv) {
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything. // if we didn't find a cachefile, then we don't have to do anything.
return 0; return 0;
} }
TOKUTXN txn = NULL;
TOKUTXN txn;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn != NULL) {
r = toku_brt_note_table_lock(tuple->brt, txn); r = toku_brt_note_table_lock(tuple->brt, txn);
assert(r == 0); assert(r == 0);
}
return 0; return 0;
} }
...@@ -342,20 +366,21 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R ...@@ -342,20 +366,21 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
} }
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) { static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything. // if we didn't find a cachefile, then we don't have to do anything.
return; return;
} }
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn == NULL)
return;
DBT keydbt, valdbt; DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len); toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len); toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_maybe_delete_both(tuple->brt, &keydbt, &valdbt, txn, lsn); r = toku_brt_maybe_delete_both(tuple->brt, &keydbt, &valdbt, txn, TRUE, lsn);
assert(r == 0); assert(r == 0);
} }
...@@ -365,19 +390,20 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both ...@@ -365,19 +390,20 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both
} }
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV renv) { static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV renv) {
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything. // if we didn't find a cachefile, then we don't have to do anything.
return; return;
} }
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn); r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0 && txn); assert(r == 0);
if (txn == NULL)
return;
DBT keydbt; DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len); toku_fill_dbt(&keydbt, key.data, key.len);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, lsn); r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE,lsn);
assert(r == 0); assert(r == 0);
} }
...@@ -387,15 +413,15 @@ static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any * ...@@ -387,15 +413,15 @@ static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *
} }
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING fname, FILENUM filenum, RECOVER_ENV UU(renv)) { static void toku_recover_fclose (LSN UU(lsn), BYTESTRING fname, FILENUM filenum, RECOVER_ENV UU(renv)) {
struct cf_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple); int r = file_map_find(&renv->fmap, filenum, &tuple);
if (r == 0) { if (r == 0) {
char *fixedfname = fixup_fname(&fname); char *fixedfname = fixup_fname(&fname);
assert(strcmp(tuple->fname, fixedfname) == 0); assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname); toku_free(fixedfname);
r = toku_close_brt(tuple->brt, 0, 0); r = toku_close_brt(tuple->brt, 0, 0);
assert(r == 0); assert(r == 0);
tuple->brt=0; file_map_remove(&renv->fmap, filenum);
} }
} }
...@@ -627,37 +653,26 @@ int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) { ...@@ -627,37 +653,26 @@ int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) {
return needs_recovery; return needs_recovery;
} }
// Sort the transactions in reverse order of transaction id // abort all of the remaining live transactions in descending transaction id order
static int compare_txn(const void *a, const void *b) { static void recover_abort_live_txns(RECOVER_ENV renv) {
TOKUTXN atxn = (TOKUTXN) * (void **) a; int r;
TOKUTXN btxn = (TOKUTXN) * (void **) b; while (1) {
// TODO this is wrong. we want if (older(atxn, btxn)) return -1 u_int32_t n_live_txns = toku_omt_size(renv->logger->live_txns);
if (toku_txnid_eq(atxn->txnid64, btxn->txnid64)) if (n_live_txns == 0)
return 0; break;
if (toku_txnid_older(atxn->txnid64, btxn->txnid64)) OMTVALUE v;
return +1; r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v, NULL);
else if (r != 0)
return -1; break;
}
// Append a transaction to the set of live transactions
static int append_live_txn(OMTVALUE v, u_int32_t UU(i), void *extra) {
TOKUTXN txn = (TOKUTXN) v;
struct varray *live_txns = (struct varray *) extra;
varray_append(live_txns, txn);
return 0;
}
// Abort a live transaction
static void abort_live_txn(void *v, void *UU(extra)) {
TOKUTXN txn = (TOKUTXN) v; TOKUTXN txn = (TOKUTXN) v;
// abort the transaction // abort the transaction
int r = toku_txn_abort_txn(txn, recover_yield, NULL); r = toku_txn_abort_txn(txn, recover_yield, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
}
} }
static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_dir) { static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_dir) {
...@@ -676,6 +691,7 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d ...@@ -676,6 +691,7 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d
r = toku_logger_open(log_dir, renv->logger); r = toku_logger_open(log_dir, renv->logger);
assert(r == 0); assert(r == 0);
// grab the last LSN so that it can be restored when the log is restarted
LSN lastlsn = toku_logger_last_lsn(renv->logger); LSN lastlsn = toku_logger_last_lsn(renv->logger);
// there must be at least one log entry // there must be at least one log entry
...@@ -743,14 +759,8 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d ...@@ -743,14 +759,8 @@ static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_d
// restart logging // restart logging
toku_logger_restart(renv->logger, lastlsn); toku_logger_restart(renv->logger, lastlsn);
// abort all of the remaining live transactions in reverse transaction id order // abort the live transactions
struct varray *live_txns = NULL; recover_abort_live_txns(renv);
r = varray_create(&live_txns, 1);
assert(r == 0);
toku_omt_iterate(renv->logger->live_txns, append_live_txn, live_txns);
varray_sort(live_txns, compare_txn);
varray_iterate(live_txns, abort_live_txn, NULL);
varray_destroy(&live_txns);
// close the open dictionaries // close the open dictionaries
file_map_close_dictionaries(&renv->fmap); file_map_close_dictionaries(&renv->fmap);
......
...@@ -169,7 +169,7 @@ else ...@@ -169,7 +169,7 @@ else
lines=`cat $<.check.valgrind | wc -l`; \ lines=`cat $<.check.valgrind | wc -l`; \
if [ $$lines -ne 0 ] ; then cat $<.check.valgrind; test 0 = 1; fi \ if [ $$lines -ne 0 ] ; then cat $<.check.valgrind; test 0 = 1; fi \
else \ else \
test 0 = 1; \ cat $<.check.valgrind; test 0 = 1; \
fi \ fi \
$(SUMMARIZE_CMD) $(SUMMARIZE_CMD)
endif endif
......
...@@ -316,14 +316,14 @@ ifeq ($(VGRIND),) ...@@ -316,14 +316,14 @@ ifeq ($(VGRIND),)
else else
./$< --test >$<.check.output 2>&1; \ ./$< --test >$<.check.output 2>&1; \
if [ $$? -ne 134 ] ; then \ if [ $$? -ne 134 ] ; then \
test 0 = 1; \ cat $<.check.output; test 0 = 1; \
else \ else \
valgrind --quiet --error-exitcode=1 --leak-check=full --log-file=$<.check.valgrind ./$< --recover >>$<.check.output 2>&1; \ valgrind --quiet --error-exitcode=1 --leak-check=full --log-file=$<.check.valgrind ./$< --recover >>$<.check.output 2>&1; \
if [ $$? -ne 0 ] ; then \ if [ $$? -ne 0 ] ; then \
test 0 = 1; \ cat $<.check.valgrind; test 0 = 1; \
else \ else \
lines=`cat $<.check.valgrind | wc -l`; \ lines=`cat $<.check.valgrind | wc -l`; \
if [ $$lines -ne 0 ] ; then test 0 = 1; fi; \ if [ $$lines -ne 0 ] ; then cat $<.check.valgrind; test 0 = 1; fi; \
fi \ fi \
fi \ fi \
$(MAYBEINVERTER) $(SUMMARIZE_CMD) $(MAYBEINVERTER) $(SUMMARIZE_CMD)
......
...@@ -70,7 +70,7 @@ test_db_thread (void) { ...@@ -70,7 +70,7 @@ test_db_thread (void) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->open(env, ".", DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL, 0); assert(r == 0); r = env->open(env, ".", DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL+DB_THREAD, 0); assert(r == 0);
DB *db; DB *db;
r = db_create(&db, env, 0); assert(r == 0); r = db_create(&db, env, 0); assert(r == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment