Commit 7353497d authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge recovery key compare function to main. close[t:1966]

git-svn-id: file:///svn/toku/tokudb@14218 c7de825b-a66e-492c-adef-691d508d4ae1
parent d73e7c40
......@@ -41,8 +41,10 @@ int toku_brt_set_descriptor (BRT t, u_int32_t version, const DBT* descriptor, to
int toku_brt_get_flags(BRT, unsigned int *flags);
int toku_brt_set_nodesize(BRT, unsigned int nodesize);
int toku_brt_get_nodesize(BRT, unsigned int *nodesize);
int toku_brt_set_bt_compare(BRT, int (*bt_compare)(DB *, const DBT*, const DBT*));
int toku_brt_set_dup_compare(BRT, int (*dup_compare)(DB *, const DBT*, const DBT*));
int toku_brt_set_bt_compare(BRT, brt_compare_func);
int toku_brt_set_dup_compare(BRT, brt_compare_func);
int brt_set_cachetable(BRT, CACHETABLE);
int toku_brt_open(BRT, const char *fname, const char *fname_in_env, int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db);
......
......@@ -115,7 +115,7 @@ typedef enum brt_cmd_type brt_msg_type;
// This structure supports nested transactions, and obsoletes brt_cmd.
typedef struct brt_cmd BRT_MSG_S, *BRT_MSG;
typedef int (*brt_compare_func)(DB *, const DBT *, const DBT *);
#define UU(x) x __attribute__((__unused__))
......
......@@ -265,12 +265,12 @@ generate_dispatch (void) {
});
fprintf(hf, " default: assert(0);} } while (0)\n");
fprintf(hf, "#define logtype_dispatch_args(s, funprefix) do { switch((s)->cmd) {\\\n");
fprintf(hf, "#define logtype_dispatch_args(s, funprefix, ...) do { switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt,
{
fprintf(hf, " case LT_%s: funprefix ## %s ((s)->u.%s.lsn", lt->name, lt->name, lt->name);
DO_FIELDS(ft, lt, fprintf(hf, ",(s)->u.%s.%s", lt->name, ft->name));
fprintf(hf, "); break;\\\n");
fprintf(hf, ", __VA_ARGS__); break;\\\n");
});
fprintf(hf, " }} while (0)\n");
}
......
......@@ -16,58 +16,88 @@ static int toku_recover_trace = 0;
#define VERIFY_COUNTS(n) ((void)0)
#endif
// These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one
static CACHETABLE recover_ct;
static TOKULOGGER recover_logger;
struct backward_scan_state {
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT } bs;
LSN checkpoint_lsn;
int n_live_txns;
TXNID min_live_txn;
};
static void backward_scan_state_init(struct backward_scan_state *bs) {
bs->bs = BS_INIT; bs->checkpoint_lsn = ZERO_LSN; bs->n_live_txns = 0; bs->min_live_txn = 0;
}
// Map filenum to brt
// TODO why can't we use the cachetable to find by filenum?
static struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
static int n_cf_pairs=0, max_cf_pairs=0;
struct filenum_map {
struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
int n_cf_pairs, max_cf_pairs;
};
static void filenum_map_init(struct filenum_map *fmap) {
fmap->cf_pairs = NULL;
fmap->n_cf_pairs = fmap->max_cf_pairs = 0;
}
struct recover_env {
CACHETABLE ct;
TOKULOGGER logger;
brt_compare_func bt_compare;
brt_compare_func dup_compare;
struct backward_scan_state bs;
struct filenum_map fmap;
};
typedef struct recover_env *RECOVER_ENV;
int toku_recover_init (void) {
int recover_env_init (RECOVER_ENV env, brt_compare_func bt_compare, brt_compare_func dup_compare) {
int r;
r = toku_create_cachetable(&recover_ct, 1<<25, (LSN){0}, 0);
r = toku_create_cachetable(&env->ct, 1<<25, (LSN){0}, 0);
assert(r == 0);
r = toku_logger_create(&recover_logger);
r = toku_logger_create(&env->logger);
assert(r == 0);
toku_logger_write_log_files(recover_logger, FALSE);
toku_logger_set_cachetable(recover_logger, recover_ct);
toku_logger_write_log_files(env->logger, FALSE);
toku_logger_set_cachetable(env->logger, env->ct);
env->bt_compare = bt_compare;
env->dup_compare = dup_compare;
filenum_map_init(&env->fmap);
if (toku_recover_trace)
printf("%s:%d\n", __FUNCTION__, __LINE__);
return r;
}
static void toku_recover_close_dictionaries(void) {
static void recover_close_dictionaries(struct filenum_map *fmap) {
int r;
int i;
for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) {
r = toku_close_brt(cf_pairs[i].brt, 0, 0);
for (i=0; i<fmap->n_cf_pairs; i++) {
if (fmap->cf_pairs[i].brt) {
r = toku_close_brt(fmap->cf_pairs[i].brt, 0, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r == 0);
}
}
n_cf_pairs = 0;
toku_free(cf_pairs);
cf_pairs = NULL;
fmap->n_cf_pairs = 0;
if (fmap->cf_pairs) {
toku_free(fmap->cf_pairs);
fmap->cf_pairs = NULL;
}
}
void toku_recover_cleanup (void) {
void recover_env_cleanup (RECOVER_ENV env) {
int r;
if (cf_pairs)
toku_recover_close_dictionaries();
recover_close_dictionaries(&env->fmap);
r = toku_logger_close(&recover_logger);
r = toku_logger_close(&env->logger);
assert(r == 0);
r = toku_cachetable_close(&recover_ct);
r = toku_cachetable_close(&env->ct);
assert(r == 0);
if (toku_recover_trace)
......@@ -79,22 +109,12 @@ static void recover_yield(voidfp UU(f), void *UU(extra)) {
// nothing
}
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT };
struct backward_scan_state {
enum backward_state bs;
LSN checkpoint_lsn;
int n_live_txns;
TXNID min_live_txn;
};
static struct backward_scan_state initial_bss = {BS_INIT,{0},0,0};
static void toku_recover_commit (LSN UU(lsn), TXNID xid) {
static void toku_recover_commit (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
// commit the transaction
......@@ -105,17 +125,17 @@ static void toku_recover_commit (LSN UU(lsn), TXNID xid) {
toku_txn_close_txn(txn);
}
static int toku_recover_backward_commit (struct logtype_commit *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static void toku_recover_xabort (LSN UU(lsn), TXNID xid) {
static void toku_recover_xabort (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
// abort the transaction
......@@ -126,7 +146,7 @@ static void toku_recover_xabort (LSN UU(lsn), TXNID xid) {
toku_txn_close_txn(txn);
}
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
......@@ -157,31 +177,31 @@ static void create_dir_from_file (const char *fname) {
toku_free(tmp);
}
static int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
if (max_cf_pairs==0) {
n_cf_pairs=1;
max_cf_pairs=2;
MALLOC_N(max_cf_pairs, cf_pairs);
if (cf_pairs==0) return errno;
static int toku_recover_note_cachefile (struct filenum_map *fmap, FILENUM fnum, CACHEFILE cf, BRT brt) {
if (fmap->max_cf_pairs==0) {
fmap->n_cf_pairs=1;
fmap->max_cf_pairs=2;
MALLOC_N(fmap->max_cf_pairs, fmap->cf_pairs);
if (fmap->cf_pairs==0) return errno;
} else {
if (n_cf_pairs>=max_cf_pairs) {
cf_pairs = toku_realloc(cf_pairs, 2*max_cf_pairs*sizeof(*cf_pairs));
assert(cf_pairs);
max_cf_pairs*=2;
if (fmap->n_cf_pairs >= fmap->max_cf_pairs) {
fmap->cf_pairs = toku_realloc(fmap->cf_pairs, 2*fmap->max_cf_pairs*sizeof(struct cf_pair));
assert(fmap->cf_pairs);
fmap->max_cf_pairs*=2;
}
n_cf_pairs++;
fmap->n_cf_pairs++;
}
cf_pairs[n_cf_pairs-1].filenum = fnum;
cf_pairs[n_cf_pairs-1].cf = cf;
cf_pairs[n_cf_pairs-1].brt = brt;
fmap->cf_pairs[fmap->n_cf_pairs-1].filenum = fnum;
fmap->cf_pairs[fmap->n_cf_pairs-1].cf = cf;
fmap->cf_pairs[fmap->n_cf_pairs-1].brt = brt;
return 0;
}
static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
static int find_cachefile (struct filenum_map *fmap, FILENUM fnum, struct cf_pair **cf_pair) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (fnum.fileid==cf_pairs[i].filenum.fileid) {
*cf_pair = cf_pairs+i;
for (i=0; i<fmap->n_cf_pairs; i++) {
if (fnum.fileid==fmap->cf_pairs[i].filenum.fileid) {
*cf_pair = fmap->cf_pairs+i;
return 0;
}
}
......@@ -189,10 +209,10 @@ static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
}
// Open the file if it is not already open. If it is already open, then do nothing.
static void internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags) {
static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV env, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags) {
{
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
int r = find_cachefile(&env->fmap, filenum, &pair);
if (0==r) {
toku_free(fixedfname);
return;
......@@ -208,59 +228,65 @@ static void internal_toku_recover_fopen_or_fcreate (int flags, int mode, char *f
__FILE__, __LINE__, fixedfname, wd, errno, strerror(errno));
}
assert(fd>=0);
BRT brt=0;
int r = toku_brt_create(&brt);
assert(r == 0);
// create tree with treeflags, otherwise use the treeflags from the tree
if (flags & O_CREAT)
toku_brt_set_flags(brt, treeflags);
// set the key compare functions
if (env->bt_compare)
toku_brt_set_bt_compare(brt, env->bt_compare);
if (env->dup_compare)
toku_brt_set_dup_compare(brt, env->dup_compare);
brt->fname = fixedfname;
brt->h=0;
// we'll need to set these to the right comparison function, or do without them.
// TODO get default compare functions from the environment
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
brt->db = 0;
r = toku_cachetable_openfd(&cf, recover_ct, fd, fixedfname);
r = toku_cachetable_openfd(&cf, env->ct, fd, fixedfname);
assert(r == 0);
brt->cf=cf;
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
r = toku_brt_alloc_init_header(brt);
}
toku_recover_note_cachefile(filenum, cf, brt);
toku_recover_note_cachefile(&env->fmap, filenum, cf, brt);
}
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum) {
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV env) {
char *fixedfname = fixup_fname(&fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, filenum, 0);
internal_toku_recover_fopen_or_fcreate(env, 0, 0, fixedfname, filenum, 0);
}
static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(env)) {
return 0;
}
// fcreate is like fopen except that the file must be created. Also creates the dir if needed.
static void toku_recover_fcreate (LSN UU(lsn), TXNID UU(xid), FILENUM filenum, BYTESTRING fname, u_int32_t mode, u_int32_t treeflags) {
static void toku_recover_fcreate (LSN UU(lsn), TXNID UU(xid), FILENUM filenum, BYTESTRING fname, u_int32_t mode, u_int32_t treeflags, RECOVER_ENV env) {
char *fixedfname = fixup_fname(&fname);
create_dir_from_file(fixedfname);
internal_toku_recover_fopen_or_fcreate(O_CREAT|O_TRUNC, mode, fixedfname, filenum, treeflags);
internal_toku_recover_fopen_or_fcreate(env, O_CREAT|O_TRUNC, mode, fixedfname, filenum, treeflags);
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -269,21 +295,21 @@ static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENU
assert(r == 0);
}
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -292,21 +318,21 @@ static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), F
assert(r == 0);
}
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING UU(val)) {
static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING UU(val), RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(recover_logger, xid, &txn);
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -314,30 +340,31 @@ static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FI
assert(r == 0);
}
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum, RECOVER_ENV UU(env)) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
int r = find_cachefile(&env->fmap, filenum, &pair);
assert(r == 0);
r = toku_close_brt(pair->brt, 0, 0);
assert(r == 0);
pair->brt=0;
}
static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(env)) {
return 0;
}
static int toku_recover_begin_checkpoint (LSN UU(lsn), u_int64_t UU(timestamp)) {
static int toku_recover_begin_checkpoint (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, struct backward_scan_state *bs) {
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // incomplete checkpoint
......@@ -356,12 +383,13 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
abort();
}
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(xid), u_int64_t UU(timestamp)) {
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(xid), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, struct backward_scan_state *bs) {
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
switch (bs->bs) {
case BS_INIT:
bs->bs = BS_SAW_CKPT_END;
......@@ -377,23 +405,24 @@ static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *
abort();
}
static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(fname)) {
static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(fname), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV env) {
char *fixedfname = fixup_fname(&l->fname);
internal_toku_recover_fopen_or_fcreate(0, 0, fixedfname, l->filenum, 0);
internal_toku_recover_fopen_or_fcreate(env, 0, 0, fixedfname, l->filenum, 0);
return 0;
}
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(xid), TXNID UU(parent)) {
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(xid), TXNID UU(parent), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, struct backward_scan_state *bs) {
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // ignore live txns from incomplete checkpoint
......@@ -412,22 +441,23 @@ static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, struc
abort();
}
static int toku_recover_xbegin (LSN lsn, TXNID parent_xid) {
static int toku_recover_xbegin (LSN lsn, TXNID parent_xid, RECOVER_ENV env) {
int r;
// lookup the parent
TOKUTXN parent = NULL;
r = toku_txnid2txn(recover_logger, parent_xid, &parent);
r = toku_txnid2txn(env->logger, parent_xid, &parent);
assert(r == 0);
// create a transaction and bind it to the transaction id
TOKUTXN txn = NULL;
r = toku_txn_begin_with_xid(parent, &txn, recover_logger, lsn.lsn);
r = toku_txn_begin_with_xid(parent, &txn, env->logger, lsn.lsn);
assert(r == 0);
return 0;
}
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backward_scan_state *bs) {
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // ignore txns that began after checkpoint
......@@ -448,22 +478,22 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, struct backwa
abort();
}
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment)) {
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_shutdown (LSN UU(lsn), u_int64_t UU(timestamp)) {
static int toku_recover_shutdown (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), struct backward_scan_state *UU(bs)) {
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(env)) {
// nothing
return 0;
}
......@@ -518,7 +548,7 @@ int tokudb_needs_recovery(const char *log_dir) {
if (r != 0) {
needs_recovery = TRUE; goto exit;
}
if (le->cmd == LT_shutdown) {
if (le->cmd == LT_shutdown || le->cmd == LT_timestamp) {
r = toku_logcursor_prev(logcursor, &le);
if (r != 0) {
needs_recovery = TRUE; goto exit;
......@@ -577,7 +607,7 @@ static void abort_live_txn(void *v, void *UU(extra)) {
toku_txn_close_txn(txn);
}
static int really_do_recovery(const char *data_dir, const char *log_dir) {
static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_dir) {
int r;
char org_wd[1000];
......@@ -587,12 +617,10 @@ static int really_do_recovery(const char *data_dir, const char *log_dir) {
//printf("%s:%d org_wd=\"%s\"\n", __FILE__, __LINE__, org_wd);
}
toku_recover_init();
r = toku_logger_open(log_dir, recover_logger);
r = toku_logger_open(log_dir, env->logger);
assert(r == 0);
LSN lastlsn = toku_logger_last_lsn(recover_logger);
LSN lastlsn = toku_logger_last_lsn(env->logger);
TOKULOGCURSOR logcursor;
r = toku_logcursor_create(&logcursor, log_dir);
......@@ -604,7 +632,7 @@ static int really_do_recovery(const char *data_dir, const char *log_dir) {
struct log_entry *le;
// scan backwards
struct backward_scan_state bs = initial_bss;
backward_scan_state_init(&env->bs);
while (1) {
le = NULL;
r = toku_logcursor_prev(logcursor, &le);
......@@ -612,11 +640,11 @@ static int really_do_recovery(const char *data_dir, const char *log_dir) {
printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
if (r != 0)
break;
logtype_dispatch_assign(le, toku_recover_backward_, r, &bs);
logtype_dispatch_assign(le, toku_recover_backward_, r, env);
if (r != 0) {
if (toku_recover_trace)
printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
logtype_dispatch_args(le, toku_recover_);
logtype_dispatch_args(le, toku_recover_, env);
break;
}
}
......@@ -629,49 +657,47 @@ static int really_do_recovery(const char *data_dir, const char *log_dir) {
printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
if (r != 0)
break;
logtype_dispatch_args(le, toku_recover_);
logtype_dispatch_args(le, toku_recover_, env);
}
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
// restart logging
toku_logger_restart(recover_logger, lastlsn);
toku_logger_restart(env->logger, lastlsn);
// abort all of the remaining live transactions in reverse transaction id order
struct varray *live_txns = NULL;
r = varray_create(&live_txns, 1);
assert(r == 0);
toku_omt_iterate(recover_logger->live_txns, append_live_txn, live_txns);
toku_omt_iterate(env->logger->live_txns, append_live_txn, live_txns);
varray_sort(live_txns, compare_txn);
varray_iterate(live_txns, abort_live_txn, NULL);
varray_destroy(&live_txns);
// close the open dictionaries
toku_recover_close_dictionaries();
recover_close_dictionaries(&env->fmap);
// write a recovery log entry
BYTESTRING recover_comment = { strlen("recover"), "recover" };
r = toku_log_timestamp(recover_logger, NULL, TRUE, 0, recover_comment);
r = toku_log_timestamp(env->logger, NULL, TRUE, 0, recover_comment);
assert(r == 0);
// checkpoint
// TODO: checkpoint locks needed here?
r = toku_cachetable_begin_checkpoint(recover_ct, recover_logger);
r = toku_cachetable_begin_checkpoint(env->ct, env->logger);
assert(r == 0);
// TODO: what about the error_string?
r = toku_cachetable_end_checkpoint(recover_ct, recover_logger, NULL);
r = toku_cachetable_end_checkpoint(env->ct, env->logger, NULL);
assert(r == 0);
toku_recover_cleanup();
r = chdir(org_wd);
assert(r == 0);
return 0;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int tokudb_recover(const char *data_dir, const char *log_dir, brt_compare_func bt_compare, brt_compare_func dup_compare) {
int r;
int lockfd;
......@@ -695,8 +721,15 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
}
int rr = 0;
if (tokudb_needs_recovery(log_dir))
rr = really_do_recovery(data_dir, log_dir);
if (tokudb_needs_recovery(log_dir)) {
struct recover_env renv;
r = recover_env_init(&renv, bt_compare, dup_compare);
assert(r == 0);
rr = do_recovery(&renv, data_dir, log_dir);
recover_env_cleanup(&renv);
}
r = toku_os_unlock_file(lockfd);
if (r != 0) return errno;
......
......@@ -14,11 +14,9 @@
#include "bread.h"
#include "x1764.h"
int toku_recover_init(void);
void toku_recover_cleanup(void);
int tokudb_recover(const char *datadir, const char *logdir);
// Run tokudb recovery from the log
// Returns: 0 if successfull
int tokudb_recover(const char *datadir, const char *logdir, brt_compare_func bt_compare, brt_compare_func dup_compare);
// Effect: Check the tokudb logs to determine whether or not we need to run recovery.
// If the log is empty or if there is a clean shutdown at the end of the log, then we
......
......@@ -35,7 +35,7 @@ int recovery_main (int argc, const char *argv[]) {
return(1);
}
int r=tokudb_recover(data_dir, log_dir);
int r = tokudb_recover(data_dir, log_dir, 0, 0);
if (r!=0) {
fprintf(stderr, "Recovery failed\n");
return(1);
......
#include <sys/stat.h>
#include <sys/wait.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN |DB_RECOVER;
char *namea="a.db";
char *nameb="b.db";
static void run_test (BOOL do_commit, BOOL do_abort) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
DB *dba, *dbb;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
{
DBT a={.data="a", .size=2};
DBT b={.data="b", .size=2};
r = dba->put(dba, txn, &a, &b, 0); CKERR(r);
r = dbb->put(dbb, txn, &b, &a, 0); CKERR(r);
}
//printf("opened\n");
if (do_commit) {
r = txn->commit(txn, 0); CKERR(r);
} else if (do_abort) {
r = txn->abort(txn); CKERR(r);
// force an fsync of the log
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
//printf("shutdown\n");
abort();
}
#if 0
static void run_recover (BOOL did_commit) {
DB_ENV *env;
DB *dba, *dbb;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dba->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DBT aa={.size=0}, ab={.size=0};
DBT ba={.size=0}, bb={.size=0};
DB_TXN *txn;
DBC *ca,*cb;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->cursor(dba, txn, &ca, 0); CKERR(r);
r = dbb->cursor(dbb, txn, &cb, 0); CKERR(r);
int ra = ca->c_get(ca, &aa, &ab, DB_FIRST); CKERR(r);
int rb = cb->c_get(cb, &ba, &bb, DB_FIRST); CKERR(r);
if (did_commit) {
assert(ra==0);
assert(rb==0);
// verify key-value pairs
assert(aa.size==2);
assert(ab.size==2);
assert(ba.size==2);
assert(bb.size==2);
const char a[2] = "a";
const char b[2] = "b";
assert(memcmp(aa.data, &a, 2)==0);
assert(memcmp(ab.data, &b, 2)==0);
assert(memcmp(ab.data, &b, 2)==0);
assert(memcmp(bb.data, &a, 2)==0);
// make sure no other entries in DB
assert(ca->c_get(ca, &aa, &ab, DB_NEXT) == DB_NOTFOUND);
assert(cb->c_get(cb, &ba, &bb, DB_NEXT) == DB_NOTFOUND);
fprintf(stderr, "Both verified. Yay!\n");
} else {
// It wasn't committed (it also wasn't aborted), but a checkpoint happened.
assert(ra==DB_NOTFOUND);
assert(rb==DB_NOTFOUND);
fprintf(stderr, "Neither present. Yay!\n");
}
r = ca->c_close(ca); CKERR(r);
r = cb->c_close(cb); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = dbb->close(dbb, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
#endif
static void run_recover_only (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
const char *cmd;
BOOL do_commit=FALSE, do_abort=FALSE, do_explicit_abort=FALSE, do_recover_committed=FALSE, do_recover_aborted=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--commit")==0) {
do_commit=TRUE;
} else if (strcmp(argv[0], "--abort")==0) {
do_abort=TRUE;
} else if (strcmp(argv[0], "--explicit-abort")==0) {
do_explicit_abort=TRUE;
} else if (strcmp(argv[0], "--recover-committed")==0) {
do_recover_committed=TRUE;
} else if (strcmp(argv[0], "--recover-aborted")==0) {
do_recover_aborted=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--commit | --abort | --explicit-abort | --recover-committed | --recover-aborted } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
{
int n_specified=0;
if (do_commit) n_specified++;
if (do_abort) n_specified++;
if (do_explicit_abort) n_specified++;
if (do_recover_committed) n_specified++;
if (do_recover_aborted) n_specified++;
if (do_recover_only) n_specified++;
if (do_no_recover) n_specified++;
if (n_specified>1) {
printf("Specify only one of --commit or --abort or --recover-committed or --recover-aborted\n");
resultcode=1;
goto do_usage;
}
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_commit) {
run_test(TRUE, FALSE);
} else if (do_abort) {
run_test(FALSE, TRUE);
} else if (do_recover_only) {
run_recover_only();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}
#include <sys/stat.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN |DB_RECOVER;
char *namea="a.db";
char *nameb="b.db";
static void run_test (BOOL do_commit, BOOL do_abort) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
DB *dba, *dbb;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
// r = dba->set_flags(dba, DB_DUPSORT); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->set_flags(dbb, DB_DUPSORT); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
{
DBT a={.data="a", .size=2};
DBT b={.data="b", .size=2};
r = dba->put(dba, txn, &a, &b, DB_YESOVERWRITE); CKERR(r);
r = dbb->put(dbb, txn, &b, &a, DB_YESOVERWRITE); CKERR(r);
}
//printf("opened\n");
if (do_commit) {
r = txn->commit(txn, 0); CKERR(r);
} else if (do_abort) {
r = txn->abort(txn); CKERR(r);
// force an fsync of the log
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
//printf("shutdown\n");
abort();
}
#if 0
static void run_recover (BOOL did_commit) {
DB_ENV *env;
DB *dba, *dbb;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dba->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DBT aa={.size=0}, ab={.size=0};
DBT ba={.size=0}, bb={.size=0};
DB_TXN *txn;
DBC *ca,*cb;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->cursor(dba, txn, &ca, 0); CKERR(r);
r = dbb->cursor(dbb, txn, &cb, 0); CKERR(r);
int ra = ca->c_get(ca, &aa, &ab, DB_FIRST); CKERR(r);
int rb = cb->c_get(cb, &ba, &bb, DB_FIRST); CKERR(r);
if (did_commit) {
assert(ra==0);
assert(rb==0);
// verify key-value pairs
assert(aa.size==2);
assert(ab.size==2);
assert(ba.size==2);
assert(bb.size==2);
const char a[2] = "a";
const char b[2] = "b";
assert(memcmp(aa.data, &a, 2)==0);
assert(memcmp(ab.data, &b, 2)==0);
assert(memcmp(ab.data, &b, 2)==0);
assert(memcmp(bb.data, &a, 2)==0);
// make sure no other entries in DB
assert(ca->c_get(ca, &aa, &ab, DB_NEXT) == DB_NOTFOUND);
assert(cb->c_get(cb, &ba, &bb, DB_NEXT) == DB_NOTFOUND);
fprintf(stderr, "Both verified. Yay!\n");
} else {
// It wasn't committed (it also wasn't aborted), but a checkpoint happened.
assert(ra==DB_NOTFOUND);
assert(rb==DB_NOTFOUND);
fprintf(stderr, "Neither present. Yay!\n");
}
r = ca->c_close(ca); CKERR(r);
r = cb->c_close(cb); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = dbb->close(dbb, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
#endif
static void run_recover_only (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
const char *cmd;
BOOL do_commit=FALSE, do_abort=FALSE, do_explicit_abort=FALSE, do_recover_committed=FALSE, do_recover_aborted=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--commit")==0) {
do_commit=TRUE;
} else if (strcmp(argv[0], "--abort")==0) {
do_abort=TRUE;
} else if (strcmp(argv[0], "--explicit-abort")==0) {
do_explicit_abort=TRUE;
} else if (strcmp(argv[0], "--recover-committed")==0) {
do_recover_committed=TRUE;
} else if (strcmp(argv[0], "--recover-aborted")==0) {
do_recover_aborted=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--commit | --abort | --explicit-abort | --recover-committed | --recover-aborted } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
{
int n_specified=0;
if (do_commit) n_specified++;
if (do_abort) n_specified++;
if (do_explicit_abort) n_specified++;
if (do_recover_committed) n_specified++;
if (do_recover_aborted) n_specified++;
if (do_recover_only) n_specified++;
if (do_no_recover) n_specified++;
if (n_specified>1) {
printf("Specify only one of --commit or --abort or --recover-committed or --recover-aborted\n");
resultcode=1;
goto do_usage;
}
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_commit) {
run_test(TRUE, FALSE);
} else if (do_abort) {
run_test(FALSE, TRUE);
} else if (do_recover_only) {
run_recover_only();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}
......@@ -302,7 +302,7 @@ static int do_recovery (DB_ENV *env) {
} else {
logdir = toku_strdup(env->i->dir);
}
int r = tokudb_recover(datadir, logdir);
int r = tokudb_recover(datadir, logdir, env->i->bt_compare, env->i->dup_compare);
toku_free(logdir);
return r;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment