Commit d6d4c401 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improve recovery. Addresses #27.

git-svn-id: file:///svn/tokudb@1417 c7de825b-a66e-492c-adef-691d508d4ae1
parent 98ff03a3
......@@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg
OPTFLAGS = -O2
# OPTFLAGS = -O2
ifeq ($(VERBOSE),2)
VERBVERBOSE=-v
......
......@@ -28,18 +28,17 @@ static CACHETABLE ct;
static struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
BRT brt;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
static int n_cf_pairs=0, max_cf_pairs=0;
static DB * const null_db=0;
static int find_cachefile (FILENUM fnum, CACHEFILE *cf, BRT *brt) {
static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (fnum.fileid==cf_pairs[i].filenum.fileid) {
*cf = cf_pairs[i].cf;
*brt = cf_pairs[i].brt;
*cf_pair = cf_pairs+i;
return 0;
}
}
......@@ -67,9 +66,8 @@ static void toku_recover_fcreate (struct logtype_fcreate *c) {
toku_free(c->fname.data);
}
static void toku_recover_fheader (struct logtype_fheader *c) {
CACHEFILE cf;
BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
struct brt_header *MALLOC(h);
assert(h);
......@@ -84,14 +82,30 @@ static void toku_recover_fheader (struct logtype_fheader *c) {
} else {
assert(0);
}
toku_cachetable_put(cf, 0, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
toku_cachetable_put(pair->cf, 0, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (pair->brt) {
free(pair->brt->h);
} else {
MALLOC(pair->brt);
pair->brt->cf = pair->cf;
pair->brt->database_name = 0; // Special case, we don't know or care what the database name is for recovery.
pair->brt->cursors_head = pair->brt->cursors_tail = 0;
pair->brt->compare_fun = 0;
pair->brt->dup_compare = 0;
pair->brt->db = 0;
pair->brt->skey = pair->brt->sval = 0;
}
pair->brt->h = h;
pair->brt->nodesize = h->nodesize;
pair->brt->flags = h->nodesize;
r = toku_unpin_brt_header(pair->brt);
assert(r==0);
}
static void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
int r;
CACHEFILE cf;
BRT brt;
r = find_cachefile(c->filenum, &cf, &brt);
struct cf_pair *pair;
r = find_cachefile(c->filenum, &pair);
assert(r==0);
TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize;
......@@ -113,11 +127,11 @@ static void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
abort();
}
// Now put it in the cachetable
toku_cachetable_put(cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
toku_cachetable_put(pair->cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
VERIFY_COUNTS(n);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0);
}
static void toku_recover_fopen (struct logtype_fopen *c) {
......@@ -140,17 +154,18 @@ static void toku_recover_fopen (struct logtype_fopen *c) {
}
cf_pairs[n_cf_pairs-1].filenum = c->filenum;
cf_pairs[n_cf_pairs-1].cf = cf;
cf_pairs[n_cf_pairs-1].brt = 0;
toku_free(fname);
toku_free(c->fname.data);
}
static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
CACHEFILE cf;
BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
......@@ -163,19 +178,19 @@ static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->key.data);
toku_free(c->data.data);
}
static void toku_recover_resizepma (struct logtype_resizepma *c) {
CACHEFILE cf;
BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
r = toku_cachetable_get_and_pin (cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(pair->brt);
r = toku_cachetable_get_and_pin (pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
......@@ -184,17 +199,17 @@ static void toku_recover_resizepma (struct logtype_resizepma *c) {
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
CACHEFILE cf;
BRT brt;
int r = find_cachefile(c->filenum, &cf, &brt);
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
......@@ -210,7 +225,7 @@ static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->fromto.array);
......@@ -237,8 +252,20 @@ int main (int argc, char *argv[]) {
assert(r==0 && version==0);
while ((r = toku_log_fread(f, &le))==0) {
//printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd);
entrycount++;
static int prevcount=-1;
logtype_dispatch(le, toku_recover_);
if (0) {
int thiscount = 0;
int cfi;
for (cfi=0; cfi<n_cf_pairs; cfi++) {
thiscount+=toku_cachefile_count_pinned(cf_pairs[cfi].cf, 0);
}
if (thiscount!=prevcount) {
printf("%d: Count becomes %d\n", entrycount, thiscount);
prevcount=thiscount;
}
}
entrycount++;
}
if (r!=EOF) {
if (r==DB_BADFORMAT) {
......@@ -252,8 +279,11 @@ int main (int argc, char *argv[]) {
fclose(f);
}
for (i=0; i<n_cf_pairs; i++) {
r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
if (cf_pairs[i].brt) {
r = toku_close_brt(cf_pairs[i].brt);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
}
}
toku_free(cf_pairs);
r = toku_cachetable_close(&ct);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment