Commit 407c0ebe authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log the fingerprints correctly. Addresses #27.

git-svn-id: file:///svn/tokudb@1820 c7de825b-a66e-492c-adef-691d508d4ae1
parent 09bbe6dd
...@@ -49,8 +49,15 @@ struct brtnode { ...@@ -49,8 +49,15 @@ struct brtnode {
unsigned int nodesize; unsigned int nodesize;
unsigned int flags; unsigned int flags;
DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use. DISKOFF thisnodename; // The size of the node allocated on disk. Not all is necessarily in use.
LSN disk_lsn; // The LSN as of the most recent version on disk. // These two LSNs are used to decide when to make a copy of a node instead of overwriting it.
LSN log_lsn; // The LSN as of the most recent log write. // In the TOKULOGGER is a field called checkpoint_lsn which is the lsn of the most recent checkpoint
LSN disk_lsn; // The LSN as of the most recent version on disk. (Updated by brt-serialize) This lsn is saved in the node.
LSN log_lsn; // The LSN of the youngest log entry that affects the current in-memory state. The log write may not have actually made it to disk. This lsn is not saved in disk (since the two lsns are the same for any node not in main memory.)
// The checkpointing works as follows:
// When we unpin a node: if it is dirty and disk_lsn<checkpoint_lsn then we need to make a new copy.
// When we checkpoint: Create a checkpoint record, and cause every dirty node to be written to disk. The new checkpoint record is *not* incorporated into the disk_lsn of the written nodes.
// While we are checkpointing, someone may modify a dirty node that has not yet been written. In that case, when we unpin the node, we make the new copy (because the disk_lsn<checkpoint_lsn), just as we would usually.
//
int layout_version; // What version of the data structure? int layout_version; // What version of the data structure?
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */ int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
u_int32_t rand4fingerprint; u_int32_t rand4fingerprint;
......
...@@ -79,35 +79,8 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnod ...@@ -79,35 +79,8 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnod
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk); static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void fix_up_parent_pointers_of_children (BRT t, BRTNODE node) { static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKUTXN txn) {
int i; u_int32_t old_fingerprint = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node);
assert(node->height>0);
for (i=0; i<node->u.n.n_children; i++) {
void *v;
int r = toku_cachetable_maybe_get_and_pin(t->cf, BRTNODE_CHILD_DISKOFF(node, i), &v);
if (r==0) {
BRTNODE child = v;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, v);
r=toku_cachetable_unpin(t->cf, BRTNODE_CHILD_DISKOFF(node, i), child->dirty, brtnode_size(child));
}
}
}
static void fix_up_parent_pointers_of_children_now_that_parent_is_gone (CACHEFILE cf, BRTNODE node) {
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++) {
void *v;
int r = toku_cachetable_maybe_get_and_pin(cf, BRTNODE_CHILD_DISKOFF(node, i), &v);
if (r==0) {
BRTNODE child = v;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, v);
r=toku_cachetable_unpin(cf, BRTNODE_CHILD_DISKOFF(node, i), child->dirty, brtnode_size(child));
}
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child) {
u_int32_t sum = child->local_fingerprint; u_int32_t sum = child->local_fingerprint;
if (child->height>0) { if (child->height>0) {
int i; int i;
...@@ -119,6 +92,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE ...@@ -119,6 +92,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change. // We only call this function if we have reason to believe that the child's fingerprint did change.
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum; BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum;
node->dirty=1; node->dirty=1;
toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
} }
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) { static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
...@@ -147,7 +121,6 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b ...@@ -147,7 +121,6 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *b
printf("\n"); printf("\n");
} }
//if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn; //if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
fix_up_parent_pointers_of_children_now_that_parent_is_gone(cachefile, brtnode);
assert(brtnode->thisnodename==nodename); assert(brtnode->thisnodename==nodename);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) { if (write_me) {
...@@ -165,9 +138,10 @@ int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **b ...@@ -165,9 +138,10 @@ int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **b
BRTNODE *result=(BRTNODE*)brtnode_pv; BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize, int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize,
t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf)); t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf));
if (r == 0) if (r == 0) {
*sizep = brtnode_size(*result); *sizep = brtnode_size(*result);
*written_lsn = (*result)->disk_lsn; *written_lsn = (*result)->disk_lsn;
}
//(*result)->parent_brtnode = 0; /* Don't know it right now. */ //(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename); //printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r; return r;
...@@ -217,7 +191,14 @@ int toku_unpin_brt_header (BRT brt) { ...@@ -217,7 +191,14 @@ int toku_unpin_brt_header (BRT brt) {
brt->h=0; brt->h=0;
return r; return r;
} }
static int unpin_brtnode (BRT brt, BRTNODE node, TOKUTXN txn) {
if (node->dirty && txn) {
// For now just update the log_lsn. Later we'll have to deal with the checksums.
node->log_lsn = toku_txn_get_last_lsn(txn);
//if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
}
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
}
typedef struct kvpair { typedef struct kvpair {
bytevec key; bytevec key;
...@@ -462,9 +443,6 @@ static void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nod ...@@ -462,9 +443,6 @@ static void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nod
} }
assert(node->u.n.totalchildkeylens==0); assert(node->u.n.totalchildkeylens==0);
fix_up_parent_pointers_of_children(t, A);
fix_up_parent_pointers_of_children(t, B);
//verify_local_fingerprint_nonleaf(A); //verify_local_fingerprint_nonleaf(A);
//verify_local_fingerprint_nonleaf(B); //verify_local_fingerprint_nonleaf(B);
} }
...@@ -549,7 +527,7 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT ...@@ -549,7 +527,7 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT
} else { } else {
r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type); r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type);
} }
fixup_child_fingerprint(node, childnum_of_node, child); fixup_child_fingerprint(node, childnum_of_node, child, t, txn);
return r; return r;
} }
...@@ -585,10 +563,10 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum ...@@ -585,10 +563,10 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
node->dirty = 1; node->dirty = 1;
} }
if (*child_did_split) { if (*child_did_split) {
fixup_child_fingerprint(node, childnum, *childa); fixup_child_fingerprint(node, childnum, *childa, t, txn);
fixup_child_fingerprint(node, childnum+1, *childb); fixup_child_fingerprint(node, childnum+1, *childb, t, txn);
} else { } else {
fixup_child_fingerprint(node, childnum, child); fixup_child_fingerprint(node, childnum, child, t, txn);
} }
return 0; return 0;
} }
...@@ -641,8 +619,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -641,8 +619,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE_CHILD_DISKOFF(node, childnum) = childa->thisnodename; BRTNODE_CHILD_DISKOFF(node, childnum) = childa->thisnodename;
BRTNODE_CHILD_DISKOFF(node, childnum+1) = childb->thisnodename; BRTNODE_CHILD_DISKOFF(node, childnum+1) = childb->thisnodename;
node->u.n.n_cursors[childnum+1] = 0; node->u.n.n_cursors[childnum+1] = 0;
fixup_child_fingerprint(node, childnum, childa); fixup_child_fingerprint(node, childnum, childa, t, txn);
fixup_child_fingerprint(node, childnum+1, childb); fixup_child_fingerprint(node, childnum+1, childb, t, txn);
r=toku_fifo_create(&node->u.n.buffers[childnum]); assert(r==0); // ??? SHould handle this error case r=toku_fifo_create(&node->u.n.buffers[childnum]); assert(r==0); // ??? SHould handle this error case
r=toku_fifo_create(&node->u.n.buffers[childnum+1]); assert(r==0); r=toku_fifo_create(&node->u.n.buffers[childnum+1]); assert(r==0);
node->u.n.n_bytes_in_buffer[childnum] = 0; node->u.n.n_bytes_in_buffer[childnum] = 0;
...@@ -706,9 +684,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -706,9 +684,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
toku_verify_counts(childa); toku_verify_counts(childa);
toku_verify_counts(childb); toku_verify_counts(childb);
r=toku_cachetable_unpin(t->cf, childa->thisnodename, childa->dirty, brtnode_size(childa)); r=unpin_brtnode(t, childa, txn);
assert(r==0); assert(r==0);
r=toku_cachetable_unpin(t->cf, childb->thisnodename, childb->dirty, brtnode_size(childb)); r=unpin_brtnode(t, childb, txn);
assert(r==0); assert(r==0);
if (node->u.n.n_children>TREE_FANOUT) { if (node->u.n.n_children>TREE_FANOUT) {
...@@ -826,7 +804,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -826,7 +804,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild); if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild);
assert(toku_serialize_brtnode_size(node)<=node->nodesize); assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
r=toku_cachetable_unpin(t->cf, targetchild, child->dirty, brtnode_size(child)); r=unpin_brtnode(t, child, txn);
if (r!=0) return r; if (r!=0) return r;
*did_split=0; *did_split=0;
return 0; return 0;
...@@ -992,7 +970,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -992,7 +970,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
&child_did_split, &childa, &childb, &childsplitk, debug, txn); &child_did_split, &childa, &childb, &childsplitk, debug, txn);
if (r != 0) { if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */ /* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = toku_cachetable_unpin(t->cf, child->thisnodename, child->dirty, brtnode_size(child)); int rr = unpin_brtnode(t, child, txn);
assert(rr == 0); assert(rr == 0);
return r; return r;
} }
...@@ -1006,8 +984,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1006,8 +984,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
assert(r == 0); assert(r == 0);
} else { } else {
//verify_local_fingerprint_nonleaf(child); //verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child); fixup_child_fingerprint(node, childnum, child, t, txn);
int rr = toku_cachetable_unpin(t->cf, child->thisnodename, child->dirty, brtnode_size(child)); int rr = unpin_brtnode(t, child, txn);
assert(rr == 0); assert(rr == 0);
} }
return r; return r;
...@@ -1279,7 +1257,7 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) { ...@@ -1279,7 +1257,7 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
node->log_lsn = toku_txn_get_last_lsn(txn); node->log_lsn = toku_txn_get_last_lsn(txn);
//fprintf(stderr, "%s:%d last lsn=%" PRId64 "\n", __FILE__, __LINE__, node->log_lsn.lsn); //fprintf(stderr, "%s:%d last lsn=%" PRId64 "\n", __FILE__, __LINE__, node->log_lsn.lsn);
} }
r=toku_cachetable_unpin(t->cf, node->thisnodename, node->dirty, brtnode_size(node)); r=unpin_brtnode(t, node, txn);
if (r!=0) { if (r!=0) {
toku_free(node); toku_free(node);
return r; return r;
...@@ -1591,7 +1569,7 @@ CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) { ...@@ -1591,7 +1569,7 @@ CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
abort(); abort();
} }
static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp, TOKUTXN txn) { static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp, TOKUTXN txn, BRTNODE *newrootp) {
TAGMALLOC(BRTNODE, newroot); TAGMALLOC(BRTNODE, newroot);
int r; int r;
int new_height = nodea->height+1; int new_height = nodea->height+1;
...@@ -1618,8 +1596,6 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1618,8 +1596,6 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
newroot->u.n.totalchildkeylens=splitk.size; newroot->u.n.totalchildkeylens=splitk.size;
newroot->u.n.children[0]=nodea->thisnodename; newroot->u.n.children[0]=nodea->thisnodename;
newroot->u.n.children[1]=nodeb->thisnodename; newroot->u.n.children[1]=nodeb->thisnodename;
fixup_child_fingerprint(newroot, 0, nodea);
fixup_child_fingerprint(newroot, 1, nodeb);
r=toku_fifo_create(&newroot->u.n.buffers[0]); if (r!=0) return r; r=toku_fifo_create(&newroot->u.n.buffers[0]); if (r!=0) return r;
r=toku_fifo_create(&newroot->u.n.buffers[1]); if (r!=0) return r; r=toku_fifo_create(&newroot->u.n.buffers[1]); if (r!=0) return r;
toku_verify_counts(newroot); toku_verify_counts(newroot);
...@@ -1635,6 +1611,8 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1635,6 +1611,8 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
if (r!=0) return r; if (r!=0) return r;
r=toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename); r=toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename);
if (r!=0) return r; if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, txn);
fixup_child_fingerprint(newroot, 1, nodeb, brt, txn);
{ {
BYTESTRING bs; BYTESTRING bs;
bs.len = splitk.size; bs.len = splitk.size;
...@@ -1642,14 +1620,15 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1642,14 +1620,15 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs); r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r; if (r!=0) return r;
} }
r=toku_cachetable_unpin(brt->cf, nodea->thisnodename, nodea->dirty, brtnode_size(nodea)); r=unpin_brtnode(brt, nodea, txn);
if (r!=0) return r; if (r!=0) return r;
r=toku_cachetable_unpin(brt->cf, nodeb->thisnodename, nodeb->dirty, brtnode_size(nodeb)); r=unpin_brtnode(brt, nodeb, txn);
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root); //printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot), toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
brt_update_cursors_new_root(brt, newroot, nodea, nodeb); brt_update_cursors_new_root(brt, newroot, nodea, nodeb);
*newrootp = newroot;
return 0; return 0;
} }
...@@ -1683,25 +1662,20 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) { ...@@ -1683,25 +1662,20 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
txn); txn);
if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__); if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__);
if (did_split) { if (did_split) {
// node is unpinned, so now we have to proceed to update the root with a new node.
//printf("%s:%d did_split=%d nodeb=%p nodeb->thisnodename=%lld nodeb->nodesize=%d\n", __FILE__, __LINE__, did_split, nodeb, nodeb->thisnodename, nodeb->nodesize); //printf("%s:%d did_split=%d nodeb=%p nodeb->thisnodename=%lld nodeb->nodesize=%d\n", __FILE__, __LINE__, did_split, nodeb, nodeb->thisnodename, nodeb->nodesize);
//printf("Did split, splitkey=%s\n", splitkey); //printf("Did split, splitkey=%s\n", splitkey);
if (nodeb->height>0) assert(nodeb->u.n.children[nodeb->u.n.n_children-1]!=0); if (nodeb->height>0) assert(nodeb->u.n.children[nodeb->u.n.n_children-1]!=0);
assert(nodeb->nodesize>0); assert(nodeb->nodesize>0);
} r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, txn, &node);
int dirty;
long size;
if (did_split) {
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, txn);
assert(r == 0); assert(r == 0);
dirty = 1;
size = 0;
} else { } else {
if (node->height>0) if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT); assert(node->u.n.n_children<=TREE_FANOUT);
dirty = node->dirty;
size = brtnode_size(node);
} }
toku_cachetable_unpin(brt->cf, *rootp, dirty, size); r = unpin_brtnode(brt, node, txn);
assert(r==0);
r = toku_unpin_brt_header(brt); r = toku_unpin_brt_header(brt);
assert(r == 0); assert(r == 0);
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
...@@ -1907,9 +1881,10 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor ...@@ -1907,9 +1881,10 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor
if (0) printf("child_did_split %lld %lld\n", childa->thisnodename, childb->thisnodename); if (0) printf("child_did_split %lld %lld\n", childa->thisnodename, childb->thisnodename);
if (i == 0) { if (i == 0) {
CACHEKEY *rootp = toku_calculate_root_offset_pointer(t); CACHEKEY *rootp = toku_calculate_root_offset_pointer(t);
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn); BRTNODE newnode;
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn, &newnode);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(t->cf, *rootp, CACHETABLE_DIRTY, 0); r = unpin_brtnode(t, newnode, txn);
assert(r == 0); assert(r == 0);
} else { } else {
BRTNODE upnode; BRTNODE upnode;
......
...@@ -52,7 +52,7 @@ void dump_node (int f, DISKOFF off, struct brt_header *h) { ...@@ -52,7 +52,7 @@ void dump_node (int f, DISKOFF off, struct brt_header *h) {
printf(" flags =%u\n", n->flags); printf(" flags =%u\n", n->flags);
printf(" thisnodename=%lld\n", n->thisnodename); printf(" thisnodename=%lld\n", n->thisnodename);
printf(" disk_lsn =%lld\n", n->disk_lsn.lsn); printf(" disk_lsn =%lld\n", n->disk_lsn.lsn);
printf(" log_lsn =%lld\n", n->log_lsn.lsn); //printf(" log_lsn =%lld\n", n->log_lsn.lsn); // The log_lsn is a memory-only value.
printf(" height =%d\n", n->height); printf(" height =%d\n", n->height);
printf(" rand4fp =%08x\n", n->rand4fingerprint); printf(" rand4fp =%08x\n", n->rand4fingerprint);
printf(" localfp =%08x\n", n->local_fingerprint); printf(" localfp =%08x\n", n->local_fingerprint);
......
...@@ -415,8 +415,9 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, void**value, ...@@ -415,8 +415,9 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, void**value,
int r; int r;
LSN written_lsn; LSN written_lsn;
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs, &written_lsn))) if ((r=fetch_callback(cachefile, key, &toku_value, &size, extraargs, &written_lsn))) {
return r; return r;
}
cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0, written_lsn); cachetable_insert_at(cachefile, hashit(t,key), key, toku_value, size, flush_callback, fetch_callback, extraargs, 0, written_lsn);
*value = toku_value; *value = toku_value;
if (sizep) if (sizep)
......
...@@ -59,7 +59,14 @@ const struct logtype logtypes[] = { ...@@ -59,7 +59,14 @@ const struct logtype logtypes[] = {
{"u_int32_t", "height", 0}, {"u_int32_t", "height", 0},
{"u_int32_t", "nodesize", 0}, {"u_int32_t", "nodesize", 0},
{"u_int8_t", "is_dup_sort", 0}, {"u_int8_t", "is_dup_sort", 0},
{"u_int32_t", "rand4fingerprint", 0}, {"u_int32_t", "rand4fingerprint", "%08x"},
NULLFIELD}},
{"changechildfingerprint", 'f', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}}, NULLFIELD}},
{"changeunnamedroot", 'u', FA{{"TXNID", "txnid", 0}, {"changeunnamedroot", 'u', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
......
...@@ -169,7 +169,8 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) { ...@@ -169,7 +169,8 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
TAGMALLOC(BRTNODE, n); TAGMALLOC(BRTNODE, n);
n->nodesize = c->nodesize; n->nodesize = c->nodesize;
n->thisnodename = c->diskoff; n->thisnodename = c->diskoff;
n->log_lsn = n->disk_lsn = c->lsn; printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn); n->log_lsn = n->disk_lsn = c->lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 1; n->layout_version = 1;
n->height = c->height; n->height = c->height;
n->rand4fingerprint = c->rand4fingerprint; n->rand4fingerprint = c->rand4fingerprint;
...@@ -285,6 +286,23 @@ void toku_recover_setpivot (struct logtype_setpivot *le) { ...@@ -285,6 +286,23 @@ void toku_recover_setpivot (struct logtype_setpivot *le) {
toku_free(le->pivotkey.data); toku_free(le->pivotkey.data);
} }
void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint *le) {
struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
int toku_rollback_changechildfingerprint (struct logtype_changechildfingerprint *le, TOKUTXN txn) ABORTIT
void toku_recover_fopen (struct logtype_fopen *c) { void toku_recover_fopen (struct logtype_fopen *c) {
char *fname = fixup_fname(&c->fname); char *fname = fixup_fname(&c->fname);
CACHEFILE cf; CACHEFILE cf;
...@@ -465,3 +483,4 @@ void toku_recover_changeunusedmemory (struct logtype_changeunusedmemory *le) { ...@@ -465,3 +483,4 @@ void toku_recover_changeunusedmemory (struct logtype_changeunusedmemory *le) {
r = toku_unpin_brt_header(pair->brt); r = toku_unpin_brt_header(pair->brt);
} }
int toku_rollback_changeunusedmemory (struct logtype_changeunusedmemory *le, TOKUTXN txn) ABORTIT int toku_rollback_changeunusedmemory (struct logtype_changeunusedmemory *le, TOKUTXN txn) ABORTIT
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment