Commit 67b645b1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Reorganize how rollback and recovery are organized into files. Fixes #253.

git-svn-id: file:///svn/tokudb@1589 c7de825b-a66e-492c-adef-691d508d4ae1
parent 987de2be
...@@ -66,11 +66,13 @@ BINS = $(REGRESSION_TESTS) \ ...@@ -66,11 +66,13 @@ BINS = $(REGRESSION_TESTS) \
tdb_logprint: LDFLAGS+=-lz tdb_logprint: LDFLAGS+=-lz
tdb_logprint.o: log-internal.h brttypes.h yerror.h log.h kv-pair.h tdb_logprint.o: log-internal.h brttypes.h yerror.h log.h kv-pair.h
tdb_logprint: log_code.o memory.o log.o brt-serialize.o hashtable.o pma.o ybt.o fingerprint.o mempool.o primes.o tdb_logprint: log_code.o memory.o log.o brt-serialize.o hashtable.o pma.o ybt.o fingerprint.o mempool.o primes.o roll.o brt.o cachetable.o brt-verify.o key.o
recover: LDFLAGS+=-lz recover: LDFLAGS+=-lz
recover.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h recover.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h
recover: recover.o log_code.o memory.o log.o brt-serialize.o hashtable.o pma.o ybt.o fingerprint.o mempool.o primes.o cachetable.o brt.o brt-verify.o key.o recover: recover.o log_code.o memory.o log.o brt-serialize.o hashtable.o pma.o ybt.o fingerprint.o mempool.o primes.o cachetable.o brt.o brt-verify.o key.o roll.o
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h
log_code.o: log_header.h wbuf.h log-internal.h log_code.o: log_header.h wbuf.h log-internal.h
log_code.c log_header.h: logformat log_code.c log_header.h: logformat
...@@ -107,13 +109,13 @@ pma-test benchmark-test brt-test brt-serialize-test: LDFLAGS+=-lz ...@@ -107,13 +109,13 @@ pma-test benchmark-test brt-test brt-serialize-test: LDFLAGS+=-lz
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h hashtable.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h hashtable.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h
key.o: brttypes.h key.h key.o: brttypes.h key.h
pma-test.o: $(BRT_INTERNAL_H_INCLUDES) pma-internal.h pma.h list.h mempool.h pma-test.o: $(BRT_INTERNAL_H_INCLUDES) pma-internal.h pma.h list.h mempool.h
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o fingerprint.o brt-serialize.o hashtable.o primes.o log_code.o pma-test: pma.o memory.o key.o ybt.o log.o mempool.o fingerprint.o brt-serialize.o hashtable.o primes.o log_code.o roll.o brt.o cachetable.o brt-verify.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h log_header.h pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h log_header.h
ybt.o: ybt.h brttypes.h ../include/db.h ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h memory.h cachetable.o: cachetable.h hashfun.h memory.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o ybt.o key.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o ybt.o key.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o roll.o
log.o: log_header.h log-internal.h log.h wbuf.h crc.h brttypes.h $(BRT_INTERNAL_H_INCLUDES) log.o: log_header.h log-internal.h log.h wbuf.h crc.h brttypes.h $(BRT_INTERNAL_H_INCLUDES)
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h memory.h brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h memory.h
brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES) brt-serialize-test.o: $(BRT_INTERNAL_H_INCLUDES)
...@@ -129,7 +131,7 @@ log-test: log.o memory.o ...@@ -129,7 +131,7 @@ log-test: log.o memory.o
brt-verify.o: $(BRT_INTERNAL_H_INCLUDES) brt-verify.o: $(BRT_INTERNAL_H_INCLUDES)
fingerprint.o: $(BRT_INTERNAL_H_INCLUDES) fingerprint.o: $(BRT_INTERNAL_H_INCLUDES)
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o roll.o
test_toku_malloc_plain_free: memory.o test_toku_malloc_plain_free: memory.o
...@@ -139,7 +141,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o ...@@ -139,7 +141,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
cachetable-test2.o: cachetable.h memory.h cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o brt-verify.o fingerprint.o log_code.o roll.o
benchmark-test.o: brt.h ../include/db.h benchmark-test.o: brt.h ../include/db.h
checko2: checko2:
......
...@@ -666,16 +666,6 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse ...@@ -666,16 +666,6 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse
return EINVAL; return EINVAL;
} }
#define ABORTIT { le=le; txn=txn; abort(); }
int toku_abort_logentry_delete (struct logtype_delete *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_fcreate (struct logtype_fcreate *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_fheader (struct logtype_fheader *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_newbrtnode (struct logtype_newbrtnode *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_fopen (struct logtype_fopen *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_insertinleaf (struct logtype_insertinleaf *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_resizepma (struct logtype_resizepma *le, TOKUTXN txn) ABORTIT
int toku_abort_logentry_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) ABORTIT
int toku_logger_abort(TOKUTXN txn) { int toku_logger_abort(TOKUTXN txn) {
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
...@@ -688,7 +678,7 @@ int toku_logger_abort(TOKUTXN txn) { ...@@ -688,7 +678,7 @@ int toku_logger_abort(TOKUTXN txn) {
} }
for (item=txn->newest_logentry; item; item=item->tmp) { for (item=txn->newest_logentry; item; item=item->tmp) {
int r; int r;
logtype_dispatch_assign(item, toku_abort_logentry_, r, txn); logtype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r; if (r!=0) return r;
} }
return 0; return 0;
......
...@@ -131,4 +131,7 @@ static inline void toku_free_LOGGEDBRTHEADER(LOGGEDBRTHEADER val) { ...@@ -131,4 +131,7 @@ static inline void toku_free_LOGGEDBRTHEADER(LOGGEDBRTHEADER val) {
toku_free(val.u.many.roots); toku_free(val.u.many.roots);
} }
int toku_recover_init(void);
void toku_recover_cleanup(void);
#endif #endif
...@@ -132,6 +132,8 @@ void generate_log_struct (void) { ...@@ -132,6 +132,8 @@ void generate_log_struct (void) {
fprintf(hf, " %-16s crc;\n", "u_int32_t"); fprintf(hf, " %-16s crc;\n", "u_int32_t");
fprintf(hf, " %-16s len;\n", "u_int32_t"); fprintf(hf, " %-16s len;\n", "u_int32_t");
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "void toku_recover_%s (struct logtype_%s *);\n", lt->name, lt->name);
fprintf(hf, "int toku_rollback_%s (struct logtype_%s *, TOKUTXN);\n", lt->name, lt->name);
})); }));
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
......
...@@ -11,227 +11,12 @@ ...@@ -11,227 +11,12 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <inttypes.h>
#include "log_header.h" #include "log_header.h"
#include "log-internal.h" #include "log-internal.h"
#include "cachetable.h" #include "cachetable.h"
#include "key.h" #include "key.h"
//#define DO_VERIFY_COUNTS
#ifdef DO_VERIFY_COUNTS
#define VERIFY_COUNTS(n) toku_verify_counts(n)
#else
#define VERIFY_COUNTS(n) ((void)0)
#endif
static CACHETABLE ct;
static struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
static int n_cf_pairs=0, max_cf_pairs=0;
static DB * const null_db=0;
static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (fnum.fileid==cf_pairs[i].filenum.fileid) {
*cf_pair = cf_pairs+i;
return 0;
}
}
return 1;
}
static char *fixup_fname(BYTESTRING *f) {
assert(f->len>0);
char *fname = toku_malloc(f->len+1);
memcpy(fname, f->data, f->len);
fname[f->len]=0;
return fname;
}
static void toku_recover_commit (struct logtype_commit *c) {
c=c; // !!! We need to do something, but for now we assume everything commits and so we don't have do anything to remember what commited and how to unroll.
}
static void toku_recover_delete (struct logtype_delete *c) {c=c;fprintf(stderr, "%s:%d\n", __FILE__, __LINE__); abort(); }
static void toku_recover_fcreate (struct logtype_fcreate *c) {
char *fname = fixup_fname(&c->fname);
int fd = creat(fname, c->mode);
assert(fd>=0);
toku_free(fname);
toku_free(c->fname.data);
}
static void toku_recover_fheader (struct logtype_fheader *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
struct brt_header *MALLOC(h);
assert(h);
h->dirty=0;
h->flags = c->header.flags;
h->nodesize = c->header.nodesize;
h->freelist = c->header.freelist;
h->unused_memory = c->header.unused_memory;
h->n_named_roots = c->header.n_named_roots;
if ((signed)c->header.n_named_roots==-1) {
h->unnamed_root = c->header.u.one.root;
} else {
assert(0);
}
toku_cachetable_put(pair->cf, 0, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (pair->brt) {
free(pair->brt->h);
} else {
MALLOC(pair->brt);
pair->brt->cf = pair->cf;
pair->brt->database_name = 0; // Special case, we don't know or care what the database name is for recovery.
pair->brt->cursors_head = pair->brt->cursors_tail = 0;
pair->brt->compare_fun = 0;
pair->brt->dup_compare = 0;
pair->brt->db = 0;
pair->brt->skey = pair->brt->sval = 0;
}
pair->brt->h = h;
pair->brt->nodesize = h->nodesize;
pair->brt->flags = h->nodesize;
r = toku_unpin_brt_header(pair->brt);
assert(r==0);
}
static void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
int r;
struct cf_pair *pair;
r = find_cachefile(c->filenum, &pair);
assert(r==0);
TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize;
n->thisnodename = c->diskoff;
n->log_lsn = n->disk_lsn = c->lsn; printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 1;
n->height = c->height;
n->rand4fingerprint = c->rand4fingerprint;
n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
n->local_fingerprint = 0; // nothing there yet
n->dirty = 1;
if (c->height==0) {
r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, c->filenum, c->nodesize);
assert(r==0);
n->u.l.n_bytes_in_buffer=0;
} else {
fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
abort();
}
// Now put it in the cachetable
toku_cachetable_put(pair->cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
VERIFY_COUNTS(n);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0);
}
static void toku_recover_fopen (struct logtype_fopen *c) {
char *fname = fixup_fname(&c->fname);
CACHEFILE cf;
int fd = open(fname, O_RDWR, 0);
assert(fd>=0);
int r = toku_cachetable_openfd(&cf, ct, fd);
assert(r==0);
if (max_cf_pairs==0) {
n_cf_pairs=1;
max_cf_pairs=2;
MALLOC_N(max_cf_pairs, cf_pairs);
} else {
if (n_cf_pairs>=max_cf_pairs) {
max_cf_pairs*=2;
cf_pairs = toku_realloc(cf_pairs, max_cf_pairs*sizeof(*cf_pairs));
}
n_cf_pairs++;
}
cf_pairs[n_cf_pairs-1].filenum = c->filenum;
cf_pairs[n_cf_pairs-1].cf = cf;
cf_pairs[n_cf_pairs-1].brt = 0;
toku_free(fname);
toku_free(c->fname.data);
}
static void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
VERIFY_COUNTS(node);
DBT key,data;
r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len));
assert(r==0);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->key.data);
toku_free(c->data.data);
}
// a newbrtnode should have been done before this
static void toku_recover_resizepma (struct logtype_resizepma *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin (pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
r = toku_resize_pma_exactly (node->u.l.buffer, c->oldsize, c->newsize);
assert(r==0);
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
{
unsigned int i;
for (i=0; i<c->fromto.size; i++) {
assert(c->fromto.array[i].a < toku_pma_index_limit(node->u.l.buffer));
assert(c->fromto.array[i].b < toku_pma_index_limit(node->u.l.buffer));
}
}
r = toku_pma_move_indices (node->u.l.buffer, c->fromto);
// The bytes in bufer and fingerprint shouldn't change
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->fromto.array);
}
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
const char *dir; const char *dir;
int r; int r;
...@@ -243,7 +28,7 @@ int main (int argc, char *argv[]) { ...@@ -243,7 +28,7 @@ int main (int argc, char *argv[]) {
r = toku_logger_find_logfiles(dir, &n_logfiles, &logfiles); r = toku_logger_find_logfiles(dir, &n_logfiles, &logfiles);
if (r!=0) exit(1); if (r!=0) exit(1);
int i; int i;
r = toku_create_cachetable(&ct, 1<<25, (LSN){0}, 0); toku_recover_init();
for (i=0; i<n_logfiles; i++) { for (i=0; i<n_logfiles; i++) {
//fprintf(stderr, "Opening %s\n", logfiles[i]); //fprintf(stderr, "Opening %s\n", logfiles[i]);
FILE *f = fopen(logfiles[i], "r"); FILE *f = fopen(logfiles[i], "r");
...@@ -253,19 +38,7 @@ int main (int argc, char *argv[]) { ...@@ -253,19 +38,7 @@ int main (int argc, char *argv[]) {
assert(r==0 && version==0); assert(r==0 && version==0);
while ((r = toku_log_fread(f, &le))==0) { while ((r = toku_log_fread(f, &le))==0) {
//printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd); //printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd);
static int prevcount=-1;
logtype_dispatch(&le, toku_recover_); logtype_dispatch(&le, toku_recover_);
if (0) {
int thiscount = 0;
int cfi;
for (cfi=0; cfi<n_cf_pairs; cfi++) {
thiscount+=toku_cachefile_count_pinned(cf_pairs[cfi].cf, 0);
}
if (thiscount!=prevcount) {
printf("%d: Count becomes %d\n", entrycount, thiscount);
prevcount=thiscount;
}
}
entrycount++; entrycount++;
} }
if (r!=EOF) { if (r!=EOF) {
...@@ -279,16 +52,7 @@ int main (int argc, char *argv[]) { ...@@ -279,16 +52,7 @@ int main (int argc, char *argv[]) {
} }
fclose(f); fclose(f);
} }
for (i=0; i<n_cf_pairs; i++) { toku_recover_cleanup();
if (cf_pairs[i].brt) {
r = toku_close_brt(cf_pairs[i].brt);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
}
}
toku_free(cf_pairs);
r = toku_cachetable_close(&ct);
assert(r==0);
for (i=0; i<n_logfiles; i++) { for (i=0; i<n_logfiles; i++) {
toku_free(logfiles[i]); toku_free(logfiles[i]);
} }
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
/* rollback and rollforward routines. */
#include <stdlib.h>
#include <inttypes.h>
#include "log_header.h"
#include "log-internal.h"
#include "cachetable.h"
#include "key.h"
//#define DO_VERIFY_COUNTS
#ifdef DO_VERIFY_COUNTS
#define VERIFY_COUNTS(n) toku_verify_counts(n)
#else
#define VERIFY_COUNTS(n) ((void)0)
#endif
static DB * const null_db=0;
// These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one
static CACHETABLE ct;
static struct cf_pair {
FILENUM filenum;
CACHEFILE cf;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
static int n_cf_pairs=0, max_cf_pairs=0;;
int toku_recover_init (void) {
int r = toku_create_cachetable(&ct, 1<<25, (LSN){0}, 0);
return r;
}
void toku_recover_cleanup (void) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) {
int r = toku_close_brt(cf_pairs[i].brt);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
}
}
toku_free(cf_pairs);
{
int r = toku_cachetable_close(&ct);
assert(r==0);
}
}
int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) {
if (max_cf_pairs==0) {
n_cf_pairs=1;
max_cf_pairs=2;
MALLOC_N(max_cf_pairs, cf_pairs);
if (cf_pairs==0) return errno;
} else {
if (n_cf_pairs>=max_cf_pairs) {
max_cf_pairs*=2;
cf_pairs = toku_realloc(cf_pairs, max_cf_pairs*sizeof(*cf_pairs));
}
n_cf_pairs++;
}
cf_pairs[n_cf_pairs-1].filenum = fnum;
cf_pairs[n_cf_pairs-1].cf = cf;
cf_pairs[n_cf_pairs-1].brt = 0;
return 0;
}
static int find_cachefile (FILENUM fnum, struct cf_pair **cf_pair) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (fnum.fileid==cf_pairs[i].filenum.fileid) {
*cf_pair = cf_pairs+i;
return 0;
}
}
return 1;
}
static char *fixup_fname(BYTESTRING *f) {
assert(f->len>0);
char *fname = toku_malloc(f->len+1);
memcpy(fname, f->data, f->len);
fname[f->len]=0;
return fname;
}
void toku_recover_commit (struct logtype_commit *c) {
c=c; // !!! We need to do something, but for now we assume everything commits and so we don't have do anything to remember what commited and how to unroll.
}
// Rolling back a commit doesn't make much sense. It won't happen during an abort. But it doesn't hurt to ignore it.
int toku_rollback_commit (struct logtype_commit *le __attribute__((__unused__)), TOKUTXN txn __attribute__((__unused__))) {
return 0;
}
#define ABORTIT { le=le; txn=txn; abort(); }
int toku_rollback_delete (struct logtype_delete *le, TOKUTXN txn) ABORTIT
void toku_recover_delete (struct logtype_delete *c) {c=c;fprintf(stderr, "%s:%d\n", __FILE__, __LINE__); abort(); }
void toku_recover_fcreate (struct logtype_fcreate *c) {
char *fname = fixup_fname(&c->fname);
int fd = creat(fname, c->mode);
assert(fd>=0);
toku_free(fname);
toku_free(c->fname.data);
}
void toku_recover_fheader (struct logtype_fheader *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
struct brt_header *MALLOC(h);
assert(h);
h->dirty=0;
h->flags = c->header.flags;
h->nodesize = c->header.nodesize;
h->freelist = c->header.freelist;
h->unused_memory = c->header.unused_memory;
h->n_named_roots = c->header.n_named_roots;
if ((signed)c->header.n_named_roots==-1) {
h->unnamed_root = c->header.u.one.root;
} else {
assert(0);
}
toku_cachetable_put(pair->cf, 0, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (pair->brt) {
free(pair->brt->h);
} else {
MALLOC(pair->brt);
pair->brt->cf = pair->cf;
pair->brt->database_name = 0; // Special case, we don't know or care what the database name is for recovery.
pair->brt->cursors_head = pair->brt->cursors_tail = 0;
pair->brt->compare_fun = 0;
pair->brt->dup_compare = 0;
pair->brt->db = 0;
pair->brt->skey = pair->brt->sval = 0;
}
pair->brt->h = h;
pair->brt->nodesize = h->nodesize;
pair->brt->flags = h->nodesize;
r = toku_unpin_brt_header(pair->brt);
assert(r==0);
}
void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
int r;
struct cf_pair *pair;
r = find_cachefile(c->filenum, &pair);
assert(r==0);
TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize;
n->thisnodename = c->diskoff;
n->log_lsn = n->disk_lsn = c->lsn; printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 1;
n->height = c->height;
n->rand4fingerprint = c->rand4fingerprint;
n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
n->local_fingerprint = 0; // nothing there yet
n->dirty = 1;
if (c->height==0) {
r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, c->filenum, c->nodesize);
assert(r==0);
n->u.l.n_bytes_in_buffer=0;
} else {
fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
abort();
}
// Now put it in the cachetable
toku_cachetable_put(pair->cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
VERIFY_COUNTS(n);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0);
}
void toku_recover_fopen (struct logtype_fopen *c) {
char *fname = fixup_fname(&c->fname);
CACHEFILE cf;
int fd = open(fname, O_RDWR, 0);
assert(fd>=0);
int r = toku_cachetable_openfd(&cf, ct, fd);
assert(r==0);
toku_recover_note_cachefile(c->filenum, cf);
toku_free(fname);
toku_free(c->fname.data);
}
void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
VERIFY_COUNTS(node);
DBT key,data;
r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len));
assert(r==0);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->key.data);
toku_free(c->data.data);
}
// a newbrtnode should have been done before this
void toku_recover_resizepma (struct logtype_resizepma *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin (pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
r = toku_resize_pma_exactly (node->u.l.buffer, c->oldsize, c->newsize);
assert(r==0);
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
{
unsigned int i;
for (i=0; i<c->fromto.size; i++) {
assert(c->fromto.array[i].a < toku_pma_index_limit(node->u.l.buffer));
assert(c->fromto.array[i].b < toku_pma_index_limit(node->u.l.buffer));
}
}
r = toku_pma_move_indices (node->u.l.buffer, c->fromto);
// The bytes in bufer and fingerprint shouldn't change
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->fromto.array);
}
int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn) ABORTIT
int toku_rollback_fheader (struct logtype_fheader *le, TOKUTXN txn) ABORTIT
int toku_rollback_newbrtnode (struct logtype_newbrtnode *le, TOKUTXN txn) ABORTIT
int toku_rollback_fopen (struct logtype_fopen *le, TOKUTXN txn) ABORTIT
int toku_rollback_insertinleaf (struct logtype_insertinleaf *le, TOKUTXN txn) ABORTIT
int toku_rollback_resizepma (struct logtype_resizepma *le, TOKUTXN txn) ABORTIT
int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) ABORTIT
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment