Commit ddec990a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Make each rollback code take a list of arguments rather than a single...

Make each rollback code take a list of arguments rather than a single logentry.  Makes it easier to notice that I've got unused variables.  Addresses #27.

git-svn-id: file:///svn/tokudb@2206 c7de825b-a66e-492c-adef-691d508d4ae1
parent a30a89e7
...@@ -205,7 +205,9 @@ void generate_log_struct (void) { ...@@ -205,7 +205,9 @@ void generate_log_struct (void) {
fprintf(hf, " %-16s crc;\n", "u_int32_t"); fprintf(hf, " %-16s crc;\n", "u_int32_t");
fprintf(hf, " %-16s len;\n", "u_int32_t"); fprintf(hf, " %-16s len;\n", "u_int32_t");
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "void toku_recover_%s (struct logtype_%s *);\n", lt->name, lt->name); fprintf(hf, "void toku_recover_%s (LSN lsn", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(hf, "int toku_rollback_%s (struct logtype_%s *, TOKUTXN);\n", lt->name, lt->name); fprintf(hf, "int toku_rollback_%s (struct logtype_%s *, TOKUTXN);\n", lt->name, lt->name);
})); }));
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
...@@ -227,6 +229,14 @@ void generate_dispatch (void) { ...@@ -227,6 +229,14 @@ void generate_dispatch (void) {
DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name)); DO_LOGTYPES(lt, fprintf(hf, " case LT_%s: var = funprefix ## %s (&(s)->u.%s, ## args); break;\\\n", lt->name, lt->name, lt->name));
fprintf(hf, " }})\n"); fprintf(hf, " }})\n");
fprintf(hf, "#define logtype_dispatch_args(s, funprefix) ({ switch((s)->cmd) {\\\n");
DO_LOGTYPES(lt,
({
fprintf(hf, " case LT_%s: funprefix ## %s ((s)->u.%s.lsn", lt->name, lt->name, lt->name);
DO_FIELDS(ft, lt, fprintf(hf, ",(s)->u.%s.%s", lt->name, ft->name));
fprintf(hf, "); break;\\\n");
}));
fprintf(hf, " }})\n");
} }
void generate_log_free(void) { void generate_log_free(void) {
......
...@@ -37,7 +37,7 @@ int main (int argc, char *argv[]) { ...@@ -37,7 +37,7 @@ int main (int argc, char *argv[]) {
assert(r==0 && version==0); assert(r==0 && version==0);
while ((r = toku_log_fread(f, &le))==0) { while ((r = toku_log_fread(f, &le))==0) {
//printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd); //printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd);
logtype_dispatch(&le, toku_recover_); logtype_dispatch_args(&le, toku_recover_);
entrycount++; entrycount++;
} }
if (r!=EOF) { if (r!=EOF) {
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#define VERIFY_COUNTS(n) ((void)0) #define VERIFY_COUNTS(n) ((void)0)
#endif #endif
#define UU(x) x __attribute__((__unused__))
static DB * const null_db=0; static DB * const null_db=0;
// These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one // These data structures really should be part of a recovery data structure. Recovery could be multithreaded (on different environments...) But this is OK since recovery can only happen in one
...@@ -89,8 +91,7 @@ static char *fixup_fname(BYTESTRING *f) { ...@@ -89,8 +91,7 @@ static char *fixup_fname(BYTESTRING *f) {
} }
void toku_recover_commit (struct logtype_commit *c) { void toku_recover_commit (LSN UU(lsn), TXNID UU(txnid)) {
c=c; // !!! We need to do something, but for now we assume everything commits and so we don't have do anything to remember what commited and how to unroll.
} }
// Rolling back a commit doesn't make much sense. It won't happen during an abort. But it doesn't hurt to ignore it. // Rolling back a commit doesn't make much sense. It won't happen during an abort. But it doesn't hurt to ignore it.
...@@ -100,14 +101,14 @@ int toku_rollback_commit (struct logtype_commit *le __attribute__((__unused__)), ...@@ -100,14 +101,14 @@ int toku_rollback_commit (struct logtype_commit *le __attribute__((__unused__)),
#define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); } #define ABORTIT { le=le; txn=txn; fprintf(stderr, "%s:%d (%s) not ready to go\n", __FILE__, __LINE__, __func__); abort(); }
int toku_rollback_delete (struct logtype_delete *le, TOKUTXN txn) ABORTIT int toku_rollback_delete (struct logtype_delete *le, TOKUTXN txn) ABORTIT
void toku_recover_delete (struct logtype_delete *c) {c=c;fprintf(stderr, "%s:%d\n", __FILE__, __LINE__); abort(); } void toku_recover_delete (LSN UU(lsn), FILENUM UU(filenum),DISKOFF UU(diskoff),BYTESTRING UU(key),BYTESTRING UU(data)) { assert(0); }
void toku_recover_fcreate (struct logtype_fcreate *c) { void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) {
char *fname = fixup_fname(&c->fname); char *fixed_fname = fixup_fname(&fname);
int fd = creat(fname, c->mode); int fd = creat(fixed_fname, mode);
assert(fd>=0); assert(fd>=0);
toku_free(fname); toku_free(fixed_fname);
toku_free(c->fname.data); toku_free_BYTESTRING(fname);
} }
int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__((__unused__))) { int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__((__unused__))) {
...@@ -124,20 +125,20 @@ int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__ ...@@ -124,20 +125,20 @@ int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__
} }
void toku_recover_fheader (struct logtype_fheader *c) { void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,LOGGEDBRTHEADER header) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
struct brt_header *MALLOC(h); struct brt_header *MALLOC(h);
assert(h); assert(h);
h->dirty=0; h->dirty=0;
h->flags = c->header.flags; h->flags = header.flags;
h->nodesize = c->header.nodesize; h->nodesize = header.nodesize;
h->freelist = c->header.freelist; h->freelist = header.freelist;
h->unused_memory = c->header.unused_memory; h->unused_memory = header.unused_memory;
h->n_named_roots = c->header.n_named_roots; h->n_named_roots = header.n_named_roots;
if ((signed)c->header.n_named_roots==-1) { if ((signed)header.n_named_roots==-1) {
h->unnamed_root = c->header.u.one.root; h->unnamed_root = header.u.one.root;
} else { } else {
assert(0); assert(0);
} }
...@@ -161,24 +162,24 @@ void toku_recover_fheader (struct logtype_fheader *c) { ...@@ -161,24 +162,24 @@ void toku_recover_fheader (struct logtype_fheader *c) {
assert(r==0); assert(r==0);
} }
void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t height,u_int32_t nodesize,u_int8_t is_dup_sort,u_int32_t rand4fingerprint) {
int r; int r;
struct cf_pair *pair; struct cf_pair *pair;
r = find_cachefile(c->filenum, &pair); r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
TAGMALLOC(BRTNODE, n); TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize; n->nodesize = nodesize;
n->thisnodename = c->diskoff; n->thisnodename = diskoff;
n->log_lsn = n->disk_lsn = c->lsn; n->log_lsn = n->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn); //printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 2; n->layout_version = 2;
n->height = c->height; n->height = height;
n->rand4fingerprint = c->rand4fingerprint; n->rand4fingerprint = rand4fingerprint;
n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ??? n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
n->local_fingerprint = 0; // nothing there yet n->local_fingerprint = 0; // nothing there yet
n->dirty = 1; n->dirty = 1;
if (c->height==0) { if (height==0) {
r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, c->filenum, c->nodesize); r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, filenum, nodesize);
assert(r==0); assert(r==0);
n->u.l.n_bytes_in_buffer=0; n->u.l.n_bytes_in_buffer=0;
} else { } else {
...@@ -190,12 +191,12 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { ...@@ -190,12 +191,12 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
BNC_NBYTESINBUF(n, i) = 0; BNC_NBYTESINBUF(n, i) = 0;
} }
// Now put it in the cachetable // Now put it in the cachetable
toku_cachetable_put(pair->cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0); toku_cachetable_put(pair->cf, diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
VERIFY_COUNTS(n); VERIFY_COUNTS(n);
n->log_lsn = c->lsn; n->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0); assert(r==0);
} }
...@@ -223,55 +224,53 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf, ...@@ -223,55 +224,53 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
} }
int toku_rollback_brtdeq (struct logtype_brtdeq * le, TOKUTXN txn) ABORTIT int toku_rollback_brtdeq (struct logtype_brtdeq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtdeq (struct logtype_brtdeq *le) { le=le; assert(0); } void toku_recover_brtdeq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
int toku_rollback_brtenq (struct logtype_brtenq * le, TOKUTXN txn) ABORTIT int toku_rollback_brtenq (struct logtype_brtenq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtenq (struct logtype_brtenq *le) { le=le; assert(0); } void toku_recover_brtenq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
int toku_rollback_addchild (struct logtype_addchild *le, TOKUTXN txn) ABORTIT int toku_rollback_addchild (struct logtype_addchild *le, TOKUTXN txn) ABORTIT
void toku_recover_addchild (struct logtype_addchild *le) { void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint) {
CACHEFILE cf; CACHEFILE cf;
BRTNODE node; BRTNODE node;
recover_setup_node(le->filenum, le->diskoff, &cf, &node); recover_setup_node(filenum, diskoff, &cf, &node);
assert(node->height>0); assert(node->height>0);
assert(le->childnum <= (unsigned)node->u.n.n_children); assert(childnum <= (unsigned)node->u.n.n_children);
unsigned int i; unsigned int i;
for (i=node->u.n.n_children; i>le->childnum; i--) { for (i=node->u.n.n_children; i>childnum; i--) {
node->u.n.childinfos[i]=node->u.n.childinfos[i-1]; node->u.n.childinfos[i]=node->u.n.childinfos[i-1];
BNC_NBYTESINBUF(node,i) = BNC_NBYTESINBUF(node,i-1); BNC_NBYTESINBUF(node,i) = BNC_NBYTESINBUF(node,i-1);
assert(i>=2); assert(i>=2);
node->u.n.childkeys [i-1] = node->u.n.childkeys [i-2]; node->u.n.childkeys [i-1] = node->u.n.childkeys [i-2];
} }
if (le->childnum>0) { if (childnum>0) {
node->u.n.childkeys [le->childnum-1] = 0; node->u.n.childkeys [childnum-1] = 0;
} }
BNC_DISKOFF(node, le->childnum) = le->child; BNC_DISKOFF(node, childnum) = child;
BNC_SUBTREE_FINGERPRINT(node, le->childnum) = le->childfingerprint; BNC_SUBTREE_FINGERPRINT(node, childnum) = childfingerprint;
int r= toku_fifo_create(&BNC_BUFFER(node, le->childnum)); assert(r==0); int r= toku_fifo_create(&BNC_BUFFER(node, childnum)); assert(r==0);
BNC_NBYTESINBUF(node, le->childnum) = 0; BNC_NBYTESINBUF(node, childnum) = 0;
node->u.n.n_children++; node->u.n.n_children++;
node->log_lsn = le->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT
void toku_recover_delchild (struct logtype_delchild *le) { void toku_recover_delchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint, BYTESTRING pivotkey) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height>0); assert(node->height>0);
u_int32_t childnum = le->childnum;
assert(childnum < (unsigned)node->u.n.n_children); assert(childnum < (unsigned)node->u.n.n_children);
assert(node->u.n.childinfos[childnum].subtree_fingerprint == le->childfingerprint); assert(node->u.n.childinfos[childnum].subtree_fingerprint == childfingerprint);
assert(BNC_DISKOFF(node, childnum)==le->child); assert(BNC_DISKOFF(node, childnum)==child);
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0); assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0);
assert(BNC_NBYTESINBUF(node,childnum)==0); assert(BNC_NBYTESINBUF(node,childnum)==0);
assert(node->u.n.n_children>2); // Must be at least two children. assert(node->u.n.n_children>2); // Must be at least two children.
...@@ -287,82 +286,82 @@ void toku_recover_delchild (struct logtype_delchild *le) { ...@@ -287,82 +286,82 @@ void toku_recover_delchild (struct logtype_delchild *le) {
} }
node->u.n.n_children--; node->u.n.n_children--;
node->log_lsn = le->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(le->pivotkey.data); toku_free(pivotkey.data);
} }
int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT
void toku_recover_setchild (struct logtype_setchild *le) { void toku_recover_setchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF UU(oldchild), DISKOFF newchild) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height>0); assert(node->height>0);
assert(le->childnum < (unsigned)node->u.n.n_children); assert(childnum < (unsigned)node->u.n.n_children);
BNC_DISKOFF(node, le->childnum) = le->newchild; BNC_DISKOFF(node, childnum) = newchild;
node->log_lsn = le->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
int toku_rollback_setpivot (struct logtype_setpivot *le, TOKUTXN txn) ABORTIT int toku_rollback_setpivot (struct logtype_setpivot *le, TOKUTXN txn) ABORTIT
void toku_recover_setpivot (struct logtype_setpivot *le) { void toku_recover_setpivot (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, BYTESTRING pivotkey) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height>0); assert(node->height>0);
struct kv_pair *new_pivot = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0); struct kv_pair *new_pivot = kv_pair_malloc(pivotkey.data, pivotkey.len, 0, 0);
node->u.n.childkeys[le->childnum] = new_pivot; node->u.n.childkeys[childnum] = new_pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[le->childnum]); node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum]);
node->log_lsn = le->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(le->pivotkey.data); toku_free(pivotkey.data);
} }
void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint *le) { void toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, u_int32_t UU(oldfingerprint), u_int32_t newfingerprint) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height>0); assert(node->height>0);
assert((signed)le->childnum < node->u.n.n_children); assert((signed)childnum < node->u.n.n_children);
BNC_SUBTREE_FINGERPRINT(node, le->childnum) = le->newfingerprint; BNC_SUBTREE_FINGERPRINT(node, childnum) = newfingerprint;
node->log_lsn = le->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
int toku_rollback_changechildfingerprint (struct logtype_changechildfingerprint *le, TOKUTXN txn) ABORTIT int toku_rollback_changechildfingerprint (struct logtype_changechildfingerprint *le, TOKUTXN txn) ABORTIT
void toku_recover_fopen (struct logtype_fopen *c) { void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fname = fixup_fname(&c->fname); char *fixedfname = fixup_fname(&fname);
CACHEFILE cf; CACHEFILE cf;
int fd = open(fname, O_RDWR, 0); int fd = open(fixedfname, O_RDWR, 0);
assert(fd>=0); assert(fd>=0);
int r = toku_cachetable_openfd(&cf, ct, fd); int r = toku_cachetable_openfd(&cf, ct, fd);
assert(r==0); assert(r==0);
toku_recover_note_cachefile(c->filenum, cf); toku_recover_note_cachefile(filenum, cf);
toku_free(fname); toku_free(fixedfname);
toku_free(c->fname.data); toku_free_BYTESTRING(fname);
} }
int toku_rollback_fopen (struct logtype_fopen *le __attribute__((__unused__)), TOKUTXN txn __attribute__((__unused__))) { int toku_rollback_fopen (struct logtype_fopen *le __attribute__((__unused__)), TOKUTXN txn __attribute__((__unused__))) {
...@@ -370,30 +369,30 @@ int toku_rollback_fopen (struct logtype_fopen *le __attribute__((__unused__)), ...@@ -370,30 +369,30 @@ int toku_rollback_fopen (struct logtype_fopen *le __attribute__((__unused__)),
return 0; return 0;
} }
void toku_recover_insertinleaf (struct logtype_insertinleaf *c) { void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height==0); assert(node->height==0);
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
DBT key,data; DBT key,data;
r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len)); r = toku_pma_set_at_index(node->u.l.buffer, pmaidx, toku_fill_dbt(&key, keybs.data, keybs.len), toku_fill_dbt(&data, databs.data, databs.len));
assert(r==0); assert(r==0);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(keybs.data, keybs.len, databs.data, databs.len);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + keybs.len + databs.len;
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(c->key.data); toku_free_BYTESTRING(keybs);
toku_free(c->data.data); toku_free_BYTESTRING(databs);
} }
int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) { int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
...@@ -416,27 +415,27 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) { ...@@ -416,27 +415,27 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
} }
void toku_recover_deleteinleaf (struct logtype_deleteinleaf *c) { void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING keybs, BYTESTRING databs) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height==0); assert(node->height==0);
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx); r = toku_pma_clear_at_index(node->u.l.buffer, pmaidx);
assert (r==0); assert (r==0);
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(keybs.data, keybs.len, databs.data, databs.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + keybs.len + databs.len;
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(c->key.data); toku_free_BYTESTRING(keybs);
toku_free(c->data.data); toku_free_BYTESTRING(databs);
} }
int toku_rollback_deleteinleaf (struct logtype_deleteinleaf *c, TOKUTXN txn) { int toku_rollback_deleteinleaf (struct logtype_deleteinleaf *c, TOKUTXN txn) {
...@@ -460,46 +459,46 @@ int toku_rollback_deleteinleaf (struct logtype_deleteinleaf *c, TOKUTXN txn) { ...@@ -460,46 +459,46 @@ int toku_rollback_deleteinleaf (struct logtype_deleteinleaf *c, TOKUTXN txn) {
} }
// a newbrtnode should have been done before this // a newbrtnode should have been done before this
void toku_recover_resizepma (struct logtype_resizepma *c) { void toku_recover_resizepma (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t oldsize, u_int32_t newsize) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_v; void *node_v;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin (pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin (pair->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
assert(node->height==0); assert(node->height==0);
r = toku_resize_pma_exactly (node->u.l.buffer, c->oldsize, c->newsize); r = toku_resize_pma_exactly (node->u.l.buffer, oldsize, newsize);
assert(r==0); assert(r==0);
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
void toku_recover_pmadistribute (struct logtype_pmadistribute *c) { void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, INTPAIRARRAY fromto) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
void *node_va, *node_vb; void *node_va, *node_vb;
assert(pair->brt); assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->old_diskoff, &node_va, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, old_diskoff, &node_va, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
r = toku_cachetable_get_and_pin(pair->cf, c->new_diskoff, &node_vb, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt); r = toku_cachetable_get_and_pin(pair->cf, new_diskoff, &node_vb, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0); assert(r==0);
BRTNODE nodea = node_va; assert(nodea->height==0); BRTNODE nodea = node_va; assert(nodea->height==0);
BRTNODE nodeb = node_vb; assert(nodeb->height==0); BRTNODE nodeb = node_vb; assert(nodeb->height==0);
{ {
unsigned int i; unsigned int i;
for (i=0; i<c->fromto.size; i++) { for (i=0; i<fromto.size; i++) {
assert(c->fromto.array[i].a < toku_pma_index_limit(nodea->u.l.buffer)); assert(fromto.array[i].a < toku_pma_index_limit(nodea->u.l.buffer));
assert(c->fromto.array[i].b < toku_pma_index_limit(nodeb->u.l.buffer)); assert(fromto.array[i].b < toku_pma_index_limit(nodeb->u.l.buffer));
} }
} }
r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, c->fromto, r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, fromto,
nodea->rand4fingerprint, &nodea->local_fingerprint, nodea->rand4fingerprint, &nodea->local_fingerprint,
nodeb->rand4fingerprint, &nodeb->local_fingerprint, nodeb->rand4fingerprint, &nodeb->local_fingerprint,
&nodea->u.l.n_bytes_in_buffer, &nodeb->u.l.n_bytes_in_buffer &nodea->u.l.n_bytes_in_buffer, &nodeb->u.l.n_bytes_in_buffer
...@@ -509,15 +508,15 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) { ...@@ -509,15 +508,15 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
VERIFY_COUNTS(nodea); VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb); VERIFY_COUNTS(nodeb);
nodea->log_lsn = c->lsn; nodea->log_lsn = lsn;
nodeb->log_lsn = c->lsn; nodeb->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, c->old_diskoff, 1, toku_serialize_brtnode_size(nodea)); r = toku_cachetable_unpin(pair->cf, old_diskoff, 1, toku_serialize_brtnode_size(nodea));
assert(r==0); assert(r==0);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb)); r = toku_cachetable_unpin(pair->cf, new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
assert(r==0); assert(r==0);
toku_free(c->fromto.array); toku_free_INTPAIRARRAY(fromto);
} }
int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) { int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) {
...@@ -558,28 +557,28 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn) ...@@ -558,28 +557,28 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn)
int toku_rollback_fheader (struct logtype_fheader *le, TOKUTXN txn) ABORTIT int toku_rollback_fheader (struct logtype_fheader *le, TOKUTXN txn) ABORTIT
int toku_rollback_resizepma (struct logtype_resizepma *le, TOKUTXN txn) ABORTIT int toku_rollback_resizepma (struct logtype_resizepma *le, TOKUTXN txn) ABORTIT
void toku_recover_changeunnamedroot (struct logtype_changeunnamedroot *le) { void toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldroot), DISKOFF newroot) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
assert(pair->brt); assert(pair->brt);
r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h); r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h);
assert(r==0); assert(r==0);
pair->brt->h->unnamed_root = le->newroot; pair->brt->h->unnamed_root = newroot;
r = toku_unpin_brt_header(pair->brt); r = toku_unpin_brt_header(pair->brt);
} }
void toku_recover_changenamedroot (struct logtype_changenamedroot *le) { le=le; assert(0); } void toku_recover_changenamedroot (LSN UU(lsn), FILENUM UU(filenum), BYTESTRING UU(name), DISKOFF UU(oldroot), DISKOFF UU(newroot)) { assert(0); }
int toku_rollback_changeunnamedroot (struct logtype_changeunnamedroot *le, TOKUTXN txn) ABORTIT int toku_rollback_changeunnamedroot (struct logtype_changeunnamedroot *le, TOKUTXN txn) ABORTIT
int toku_rollback_changenamedroot (struct logtype_changenamedroot *le, TOKUTXN txn) ABORTIT int toku_rollback_changenamedroot (struct logtype_changenamedroot *le, TOKUTXN txn) ABORTIT
void toku_recover_changeunusedmemory (struct logtype_changeunusedmemory *le) { void toku_recover_changeunusedmemory (LSN UU(lsn), FILENUM filenum, DISKOFF UU(oldunused), DISKOFF newunused) {
struct cf_pair *pair; struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair); int r = find_cachefile(filenum, &pair);
assert(r==0); assert(r==0);
assert(pair->brt); assert(pair->brt);
r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h); r = toku_read_and_pin_brt_header(pair->cf, &pair->brt->h);
assert(r==0); assert(r==0);
pair->brt->h->unused_memory = le->newunused; pair->brt->h->unused_memory = newunused;
r = toku_unpin_brt_header(pair->brt); r = toku_unpin_brt_header(pair->brt);
} }
int toku_rollback_changeunusedmemory (struct logtype_changeunusedmemory *le, TOKUTXN txn) ABORTIT int toku_rollback_changeunusedmemory (struct logtype_changeunusedmemory *le, TOKUTXN txn) ABORTIT
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment