Commit 71a5cdc4 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

recovery bind's to filenum's [t:1998]

git-svn-id: file:///svn/toku/tokudb@14425 c7de825b-a66e-492c-adef-691d508d4ae1
parent 39b46845
......@@ -188,8 +188,10 @@ struct brt {
unsigned int nodesize;
unsigned int flags;
unsigned int did_set_flags;
unsigned int did_set_descriptor;
BOOL did_set_flags;
BOOL did_set_descriptor;
BOOL did_set_filenum;
FILENUM filenum;
struct descriptor temp_descriptor;
toku_dbt_upgradef dbt_userformat_upgrade;
int (*compare_fun)(DB*,const DBT*,const DBT*);
......
......@@ -2943,7 +2943,6 @@ brtheader_note_brt_open(BRT live) {
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
int r;
BOOL txn_created = FALSE;
......@@ -2965,7 +2964,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
BOOL did_create = FALSE;
r = brt_open_file(t, fname, is_create, &fd, &did_create);
if (r != 0) goto died00;
r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
r=toku_cachetable_openfd_with_filenum(&t->cf, cachetable, fd, fname_in_env, t->did_set_filenum, t->filenum);
if (r != 0) goto died00;
if (did_create) {
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
......@@ -3098,6 +3097,12 @@ int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const
return 0;
}
int toku_brt_set_filenum(BRT brt, FILENUM filenum) {
brt->did_set_filenum = TRUE;
brt->filenum = filenum;
return 0;
}
int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn, TOKULOGGER logger) {
if (cachesize == 0)
cachesize = 128*1024*1024;
......@@ -3302,8 +3307,9 @@ int toku_brt_create(BRT *brt_ptr) {
list_init(&brt->zombie_brt_link);
list_init(&brt->cursors);
brt->flags = 0;
brt->did_set_flags = 0;
brt->did_set_descriptor = 0;
brt->did_set_flags = FALSE;
brt->did_set_descriptor = FALSE;
brt->did_set_filenum = FALSE;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
......
......@@ -45,6 +45,8 @@ int toku_brt_get_nodesize(BRT, unsigned int *nodesize);
int toku_brt_set_bt_compare(BRT, brt_compare_func);
int toku_brt_set_dup_compare(BRT, brt_compare_func);
int toku_brt_set_filenum(BRT brt, FILENUM filenum);
int brt_set_cachetable(BRT, CACHETABLE);
int toku_brt_open(BRT, const char *fname, const char *fname_in_env, int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db);
......@@ -54,28 +56,28 @@ int toku_brt_broadcast_commit_all (BRT brt);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v);
// Effect: Insert a key and data pair into a brt
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
// Effect: Delete a key from a brt
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, LSN oplsn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function.
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function and the
// oplsn is newer than the brt lsn. This function is called by recovery.
// Returns 0 if successfull
// Returns 0 if successful
int toku_brt_maybe_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags);
......
......@@ -258,8 +258,7 @@ int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
static FILENUM next_filenum_to_use={0};
static void cachefile_init_filenum(CACHEFILE cf, int fd, const char *fname_relative_to_env, struct fileid fileid) \
{
static void cachefile_init_filenum(CACHEFILE cf, int fd, const char *fname_relative_to_env, struct fileid fileid) {
cf->fd = fd;
cf->fileid = fileid;
cf->fname_relative_to_env = fname_relative_to_env ? toku_strdup(fname_relative_to_env) : 0;
......@@ -275,6 +274,10 @@ cachefile_refup (CACHEFILE cf) {
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_relative_to_env) {
return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_relative_to_env, FALSE, next_filenum_to_use);
}
int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum) {
int r;
CACHEFILE extant;
struct fileid fileid;
......@@ -305,19 +308,32 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char
goto exit;
}
}
//File is not open. Make a new cachefile.
try_again:
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++;
goto try_again;
}
if (with_filenum) {
// verify that filenum is not in use
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (filenum.fileid == extant->filenum.fileid) {
r = EEXIST;
goto exit;
}
}
} else {
// find an unused fileid and use it
try_again:
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++;
goto try_again;
}
}
}
{
// create a new cachefile entry in the cachetable
CACHEFILE XCALLOC(newcf);
newcf->cachetable = ct;
newcf->filenum.fileid = next_filenum_to_use.fileid++;
newcf->filenum.fileid = with_filenum ? filenum.fileid : next_filenum_to_use.fileid++;
cachefile_init_filenum(newcf, fd, fname_relative_to_env, fileid);
newcf->refcount = 1;
newcf->next = ct->cachefiles;
......
......@@ -15,8 +15,6 @@
extern int STRADDLE_HACK_INSIDE_CALLBACK;
#endif
// Maintain a cache mapping from cachekeys to values (void*)
// Some of the keys can be pinned. Don't pin too many or for too long.
// If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs
......@@ -32,6 +30,8 @@ extern int STRADDLE_HACK_INSIDE_CALLBACK;
typedef BLOCKNUM CACHEKEY;
// cachetable operations
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
// Create a new cachetable.
// Effects: a new cachetable is created and initialized.
......@@ -49,10 +49,6 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER);
int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*testcallback_f)(void*), void * testextra);
// Does an fsync of a cachefile.
// Handles the case where cf points to /dev/null
int toku_cachefile_fsync(CACHEFILE cf);
// Shuts down checkpoint thread
// Requires no locks be held that are taken by the checkpoint function
void toku_cachetable_minicron_shutdown(CACHETABLE ct);
......@@ -67,13 +63,20 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, const
// Bind a file to a new cachefile object.
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env /*(used for logging)*/);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum);
// Get access to the asynchronous work queue
// Returns: a pointer to the work queue
WORKQUEUE toku_cachetable_get_workqueue (CACHETABLE);
// cachefile operations
void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threads);
// Does an fsync of a cachefile.
// Handles the case where cf points to /dev/null
int toku_cachefile_fsync(CACHEFILE cf);
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
// When write_me is true, the value should be written to storage.
// When keep_me is false, the value should be freed.
......
......@@ -105,33 +105,33 @@ struct recover_env {
};
typedef struct recover_env *RECOVER_ENV;
static int recover_env_init (RECOVER_ENV env, brt_compare_func bt_compare, brt_compare_func dup_compare) {
static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare) {
int r;
r = toku_create_cachetable(&env->ct, 1<<25, (LSN){0}, 0);
r = toku_create_cachetable(&renv->ct, 1<<25, (LSN){0}, 0);
assert(r == 0);
r = toku_logger_create(&env->logger);
r = toku_logger_create(&renv->logger);
assert(r == 0);
toku_logger_write_log_files(env->logger, FALSE);
toku_logger_set_cachetable(env->logger, env->ct);
env->bt_compare = bt_compare;
env->dup_compare = dup_compare;
file_map_init(&env->fmap);
toku_logger_write_log_files(renv->logger, FALSE);
toku_logger_set_cachetable(renv->logger, renv->ct);
renv->bt_compare = bt_compare;
renv->dup_compare = dup_compare;
file_map_init(&renv->fmap);
if (toku_recover_trace)
printf("%s:%d\n", __FUNCTION__, __LINE__);
return r;
}
static void recover_env_cleanup (RECOVER_ENV env) {
static void recover_env_cleanup (RECOVER_ENV renv) {
int r;
file_map_close_dictionaries(&env->fmap);
file_map_close_dictionaries(&renv->fmap);
r = toku_logger_close(&env->logger);
r = toku_logger_close(&renv->logger);
assert(r == 0);
r = toku_cachetable_close(&env->ct);
r = toku_cachetable_close(&renv->ct);
assert(r == 0);
if (toku_recover_trace)
......@@ -143,12 +143,12 @@ static void recover_yield(voidfp UU(f), void *UU(extra)) {
// nothing
}
static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV env) {
static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
// commit the transaction
......@@ -159,17 +159,17 @@ static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV env) {
toku_txn_close_txn(txn);
}
static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV env) {
static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
// abort the transaction
......@@ -180,7 +180,7 @@ static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV env) {
toku_txn_close_txn(txn);
}
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
......@@ -211,12 +211,12 @@ static void create_dir_from_file (const char *fname) {
}
// Open the file if it is not already open. If it is already open, then do nothing.
static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV env, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags) {
static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags) {
int r;
// already open
struct cf_pair *pair = NULL;
r = find_cachefile(&env->fmap, filenum, &pair);
r = find_cachefile(&renv->fmap, filenum, &pair);
if (r == 0) {
toku_free(fixedfname);
return;
......@@ -238,32 +238,35 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV env, int flags,
toku_brt_set_flags(brt, treeflags);
// set the key compare functions
if (env->bt_compare)
toku_brt_set_bt_compare(brt, env->bt_compare);
if (env->dup_compare)
toku_brt_set_dup_compare(brt, env->dup_compare);
if (renv->bt_compare)
toku_brt_set_bt_compare(brt, renv->bt_compare);
if (renv->dup_compare)
toku_brt_set_dup_compare(brt, renv->dup_compare);
// bind to filenum when opened
toku_brt_set_filenum(brt, filenum);
// TODO mode
mode = mode;
r = toku_brt_open(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, env->ct, NULL, NULL);
r = toku_brt_open(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, NULL);
assert(r == 0);
toku_free(fixedfname);
file_map_add(&env->fmap, filenum, NULL, brt);
file_map_add(&renv->fmap, filenum, NULL, brt);
}
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV env) {
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&fname);
internal_toku_recover_fopen_or_fcreate(env, 0, 0, fixedfname, filenum, 0);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, filenum, 0);
}
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV env) {
if (env->bs.bs == BS_SAW_CKPT_END) {
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
if (renv->bs.bs == BS_SAW_CKPT_END) {
// close the tree
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, l->filenum, &pair);
int r = find_cachefile(&renv->fmap, l->filenum, &pair);
if (r == 0) {
r = toku_close_brt(pair->brt, 0, 0);
assert(r == 0);
......@@ -274,27 +277,27 @@ static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV env
}
// fcreate is like fopen except that the file must be created. Also creates the dir if needed.
static void toku_recover_fcreate (LSN UU(lsn), TXNID UU(xid), FILENUM filenum, BYTESTRING fname, u_int32_t mode, u_int32_t treeflags, RECOVER_ENV env) {
static void toku_recover_fcreate (LSN UU(lsn), TXNID UU(xid), FILENUM filenum, BYTESTRING fname, u_int32_t mode, u_int32_t treeflags, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&fname);
create_dir_from_file(fixedfname);
internal_toku_recover_fopen_or_fcreate(env, O_CREAT|O_TRUNC, mode, fixedfname, filenum, treeflags);
internal_toku_recover_fopen_or_fcreate(renv, O_CREAT|O_TRUNC, mode, fixedfname, filenum, treeflags);
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
int r = find_cachefile(&renv->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -303,21 +306,21 @@ static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTEST
assert(r == 0);
}
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
int r = find_cachefile(&renv->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -326,21 +329,21 @@ static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, B
assert(r == 0);
}
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV env) {
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
int r = find_cachefile(&renv->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len);
......@@ -348,14 +351,14 @@ static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BY
assert(r == 0);
}
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum, RECOVER_ENV UU(env)) {
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum, RECOVER_ENV UU(renv)) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
int r = find_cachefile(&renv->fmap, filenum, &pair);
if (r == 0) {
r = toku_close_brt(pair->brt, 0, 0);
assert(r == 0);
......@@ -363,22 +366,22 @@ static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM file
}
}
static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV env) {
if (env->bs.bs == BS_SAW_CKPT) {
static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
if (renv->bs.bs == BS_SAW_CKPT) {
// tree open
char *fixedfname = fixup_fname(&l->fname);
internal_toku_recover_fopen_or_fcreate(env, 0, 0, fixedfname, l->filenum, 0);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, 0);
}
return 0;
}
static int toku_recover_begin_checkpoint (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
static int toku_recover_begin_checkpoint (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // incomplete checkpoint
......@@ -397,13 +400,13 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
abort();
}
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(xid), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
static int toku_recover_end_checkpoint (LSN UU(lsn), TXNID UU(xid), u_int64_t UU(timestamp), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
bs->bs = BS_SAW_CKPT_END;
......@@ -419,24 +422,24 @@ static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *
abort();
}
static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(fname), RECOVER_ENV UU(env)) {
static int toku_recover_fassociate (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(fname), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV env) {
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->fname);
internal_toku_recover_fopen_or_fcreate(env, 0, 0, fixedfname, l->filenum, 0);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, 0);
return 0;
}
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(xid), TXNID UU(parent), RECOVER_ENV UU(env)) {
static int toku_recover_xstillopen (LSN UU(lsn), TXNID UU(xid), TXNID UU(parent), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // ignore live txns from incomplete checkpoint
......@@ -454,23 +457,23 @@ static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOV
abort();
}
static int toku_recover_xbegin (LSN lsn, TXNID parent_xid, RECOVER_ENV env) {
static int toku_recover_xbegin (LSN lsn, TXNID parent_xid, RECOVER_ENV renv) {
int r;
// lookup the parent
TOKUTXN parent = NULL;
r = toku_txnid2txn(env->logger, parent_xid, &parent);
r = toku_txnid2txn(renv->logger, parent_xid, &parent);
assert(r == 0);
// create a transaction and bind it to the transaction id
TOKUTXN txn = NULL;
r = toku_txn_begin_with_xid(parent, &txn, env->logger, lsn.lsn);
r = toku_txn_begin_with_xid(parent, &txn, renv->logger, lsn.lsn);
assert(r == 0);
return 0;
}
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV env) {
struct backward_scan_state *bs = &env->bs;
static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_INIT:
return 0; // ignore txns that began after checkpoint
......@@ -491,22 +494,22 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV e
abort();
}
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment), RECOVER_ENV UU(env)) {
static int toku_recover_timestamp (LSN UU(lsn), u_int64_t UU(timestamp), BYTESTRING UU(comment), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_timestamp (struct logtype_timestamp *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_shutdown (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(env)) {
static int toku_recover_shutdown (LSN UU(lsn), u_int64_t UU(timestamp), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(env)) {
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
......@@ -622,7 +625,7 @@ static void abort_live_txn(void *v, void *UU(extra)) {
toku_txn_close_txn(txn);
}
static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_dir) {
static int do_recovery(RECOVER_ENV renv, const char *data_dir, const char *log_dir) {
int r;
int rr = 0;
TOKULOGCURSOR logcursor = NULL;
......@@ -635,10 +638,10 @@ static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_di
//printf("%s:%d org_wd=\"%s\"\n", __FILE__, __LINE__, org_wd);
}
r = toku_logger_open(log_dir, env->logger);
r = toku_logger_open(log_dir, renv->logger);
assert(r == 0);
LSN lastlsn = toku_logger_last_lsn(env->logger);
LSN lastlsn = toku_logger_last_lsn(renv->logger);
// there must be at least one log entry
r = toku_logcursor_create(&logcursor, log_dir);
......@@ -663,7 +666,7 @@ static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_di
}
// scan backwards
backward_scan_state_init(&env->bs);
backward_scan_state_init(&renv->bs);
while (1) {
le = NULL;
r = toku_logcursor_prev(logcursor, &le);
......@@ -675,11 +678,11 @@ static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_di
}
break;
}
logtype_dispatch_assign(le, toku_recover_backward_, r, env);
logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
if (r != 0) {
if (toku_recover_trace)
printf("%s:%d r=%d cmd=%c\n", __FUNCTION__, __LINE__, r, le ? le->cmd : '?');
logtype_dispatch_args(le, toku_recover_, env);
logtype_dispatch_args(le, toku_recover_, renv);
break;
}
}
......@@ -696,38 +699,38 @@ static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_di
}
break;
}
logtype_dispatch_args(le, toku_recover_, env);
logtype_dispatch_args(le, toku_recover_, renv);
}
r = toku_logcursor_destroy(&logcursor);
assert(r == 0);
// restart logging
toku_logger_restart(env->logger, lastlsn);
toku_logger_restart(renv->logger, lastlsn);
// abort all of the remaining live transactions in reverse transaction id order
struct varray *live_txns = NULL;
r = varray_create(&live_txns, 1);
assert(r == 0);
toku_omt_iterate(env->logger->live_txns, append_live_txn, live_txns);
toku_omt_iterate(renv->logger->live_txns, append_live_txn, live_txns);
varray_sort(live_txns, compare_txn);
varray_iterate(live_txns, abort_live_txn, NULL);
varray_destroy(&live_txns);
// close the open dictionaries
file_map_close_dictionaries(&env->fmap);
file_map_close_dictionaries(&renv->fmap);
// write a recovery log entry
BYTESTRING recover_comment = { strlen("recover"), "recover" };
r = toku_log_timestamp(env->logger, NULL, TRUE, 0, recover_comment);
r = toku_log_timestamp(renv->logger, NULL, TRUE, 0, recover_comment);
assert(r == 0);
// checkpoint
// TODO: checkpoint locks needed here?
r = toku_cachetable_begin_checkpoint(env->ct, env->logger);
r = toku_cachetable_begin_checkpoint(renv->ct, renv->logger);
assert(r == 0);
// TODO: what about the error_string?
r = toku_cachetable_end_checkpoint(env->ct, env->logger, NULL, NULL, NULL);
r = toku_cachetable_end_checkpoint(renv->ct, renv->logger, NULL, NULL, NULL);
assert(r == 0);
r = chdir(org_wd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment