Commit b0c5a7cb authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Make {{{src/tests/test_dup_search}}} work. Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7620 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7466b31c
...@@ -280,17 +280,21 @@ fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT ...@@ -280,17 +280,21 @@ fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT
node->dirty=1; node->dirty=1;
} }
static void static inline void
verify_local_fingerprint_nonleaf (BRTNODE node) verify_local_fingerprint_nonleaf (BRTNODE node)
{ {
if (0) {
static int count=0; count++;
u_int32_t fp=0; u_int32_t fp=0;
int i; int i;
if (node->height==0) return; if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++) for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen); fp += toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
); );
fp *= node->rand4fingerprint;
assert(fp==node->local_fingerprint); assert(fp==node->local_fingerprint);
}
} }
static inline void static inline void
...@@ -353,6 +357,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) { ...@@ -353,6 +357,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// node->log_lsn = toku_txn_get_last_lsn(txn); // node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn); // //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// } // }
verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(brt,node); VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node)); return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node));
} }
...@@ -638,8 +643,8 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r ...@@ -638,8 +643,8 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
BNC_SUBTREE_FINGERPRINT(newroot, 1)=0; BNC_SUBTREE_FINGERPRINT(newroot, 1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 0)=0; BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 0)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 1)=0; BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 1)=0;
//verify_local_fingerprint_nonleaf(nodea); verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb); verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (unsigned char)((brt->flags&TOKU_DB_DUPSORT)!=0), newroot->rand4fingerprint); r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (unsigned char)((brt->flags&TOKU_DB_DUPSORT)!=0), newroot->rand4fingerprint);
if (r!=0) return r; if (r!=0) return r;
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0); r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
...@@ -860,7 +865,7 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -860,7 +865,7 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B,i)=0; BNC_SUBTREE_LEAFENTRY_ESTIMATE(B,i)=0;
} }
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
for (i=n_children_in_a; i<old_n_children; i++) { for (i=n_children_in_a; i<old_n_children; i++) {
...@@ -949,8 +954,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -949,8 +954,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
REALLOC_N(n_children_in_a+1, node->u.n.childinfos); REALLOC_N(n_children_in_a+1, node->u.n.childinfos);
REALLOC_N(n_children_in_a, node->u.n.childkeys); REALLOC_N(n_children_in_a, node->u.n.childkeys);
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
//verify_local_fingerprint_nonleaf(B); verify_local_fingerprint_nonleaf(B);
} }
node->dirty = 1; node->dirty = 1;
...@@ -998,7 +1003,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -998,7 +1003,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->dirty = 1; node->dirty = 1;
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
XREALLOC_N(node->u.n.n_children+2, node->u.n.childinfos); XREALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
XREALLOC_N(node->u.n.n_children+1, node->u.n.childkeys); XREALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
...@@ -1021,14 +1026,14 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1021,14 +1026,14 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
fixup_child_fingerprint(node, childnum, childa, t, logger); fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger); fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0); r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there. verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0; BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0; BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child. // Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
// Slide the keys over // Slide the keys over
{ {
...@@ -1055,16 +1060,16 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1055,16 +1060,16 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
} }
) )
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */ node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */ /* Keep pushing to the children, but not if the children would require a pushdown */
toku_fifo_free(&old_h); toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childa);
//verify_local_fingerprint_nonleaf(childb); verify_local_fingerprint_nonleaf(childb);
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
VERIFY_NODE(t, childa); VERIFY_NODE(t, childa);
...@@ -1604,6 +1609,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil ...@@ -1604,6 +1609,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
enum reactivity re_array[], BOOL *did_io) enum reactivity re_array[], BOOL *did_io)
{ {
verify_local_fingerprint_nonleaf(node);
// if the fifo is empty and the child is in main memory and the child isn't gorged, then put it in the child // if the fifo is empty and the child is in main memory and the child isn't gorged, then put it in the child
if (BNC_NBYTESINBUF(node, childnum) == 0) { if (BNC_NBYTESINBUF(node, childnum) == 0) {
BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum); BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum);
...@@ -1617,11 +1624,19 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil ...@@ -1617,11 +1624,19 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
// The child is in main memory. // The child is in main memory.
BRTNODE child = child_v; BRTNODE child = child_v;
verify_local_fingerprint_nonleaf(child);
r = brtnode_put_cmd (t, child, cmd, logger, &re_array[childnum], did_io); r = brtnode_put_cmd (t, child, cmd, logger, &re_array[childnum], did_io);
fixup_child_fingerprint(node, childnum, child, t, logger); fixup_child_fingerprint(node, childnum, child, t, logger);
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
verify_local_fingerprint_nonleaf(child);
int rr = toku_unpin_brtnode(t, child); int rr = toku_unpin_brtnode(t, child);
assert(rr==0); assert(rr==0);
verify_local_fingerprint_nonleaf(node);
return r; return r;
} }
...@@ -1642,6 +1657,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil ...@@ -1642,6 +1657,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
node->dirty = 1; node->dirty = 1;
} }
verify_local_fingerprint_nonleaf(node);
return 0; return 0;
} }
...@@ -1675,11 +1692,18 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo ...@@ -1675,11 +1692,18 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to reactivity of any modified child. // The re_array[i] gets set to reactivity of any modified child.
{ {
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
/* find the right subtree */ /* find the right subtree */
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t); unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t);
return brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io); int r = brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io);
verify_local_fingerprint_nonleaf(node);
return r;
} }
static int static int
...@@ -1737,6 +1761,9 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, ...@@ -1737,6 +1761,9 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
// //
{ {
verify_local_fingerprint_nonleaf(node);
switch (cmd->type) { switch (cmd->type) {
case BRT_INSERT: case BRT_INSERT:
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
...@@ -1871,6 +1898,7 @@ maybe_merge_pinned_nonleaf_nodes (BRT t, ...@@ -1871,6 +1898,7 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
BOOL *did_merge, BOOL *did_merge,
struct kv_pair **splitk) struct kv_pair **splitk)
{ {
verify_local_fingerprint_nonleaf(a);
assert(parent_splitk); assert(parent_splitk);
int old_n_children = a->u.n.n_children; int old_n_children = a->u.n.n_children;
int new_n_children = old_n_children + b->u.n.n_children; int new_n_children = old_n_children + b->u.n.n_children;
...@@ -1891,10 +1919,25 @@ maybe_merge_pinned_nonleaf_nodes (BRT t, ...@@ -1891,10 +1919,25 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
b->u.n.n_children = 0; b->u.n.n_children = 0;
b->u.n.n_bytes_in_buffers = 0; b->u.n.n_bytes_in_buffers = 0;
{
static int count=0; count++;
u_int32_t fp = 0;
int i;
for (i=0; i<a->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(a,i), key, keylen, data, datalen, type, xid,
fp += toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
a->local_fingerprint = a->rand4fingerprint * fp;
//printf("%s:%d fp=%u\n", __FILE__, __LINE__, a->local_fingerprint);
verify_local_fingerprint_nonleaf(a);
}
b->local_fingerprint = 0;
fixup_child_fingerprint(parent, childnum_of_parent, a, t, logger); fixup_child_fingerprint(parent, childnum_of_parent, a, t, logger);
// abort(); // don't forget to reuse blocknums // abort(); // don't forget to reuse blocknums
*did_merge = TRUE; *did_merge = TRUE;
*splitk = NULL; *splitk = NULL;
verify_local_fingerprint_nonleaf(a);
return 0; return 0;
} }
...@@ -1920,17 +1963,21 @@ maybe_merge_pinned_nodes (BRT t, ...@@ -1920,17 +1963,21 @@ maybe_merge_pinned_nodes (BRT t,
// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes. // splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
{ {
assert(a->height == b->height); assert(a->height == b->height);
verify_local_fingerprint_nonleaf(a);
if (a->height == 0) { if (a->height == 0) {
toku_free(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk, we'll malloc a new one. toku_free(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk, we'll malloc a new one.
return maybe_merge_pinned_leaf_nodes(t, a, b, logger, did_merge, splitk); return maybe_merge_pinned_leaf_nodes(t, a, b, logger, did_merge, splitk);
} else { } else {
return maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk); int r = maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk);
verify_local_fingerprint_nonleaf(a);
return r;
} }
} }
static int static int
brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKULOGGER logger, BOOL *did_react) brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKULOGGER logger, BOOL *did_react)
{ {
verify_local_fingerprint_nonleaf(node);
if (node->u.n.n_children < 2) return 0; // if no siblings, we are merged as best we can. if (node->u.n.n_children < 2) return 0; // if no siblings, we are merged as best we can.
int childnuma,childnumb; int childnuma,childnumb;
...@@ -1968,6 +2015,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -1968,6 +2015,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r; if (r!=0) return r;
childa = childnode_v; childa = childnode_v;
verify_local_fingerprint_nonleaf(childa);
} }
{ {
void *childnode_v; void *childnode_v;
...@@ -1989,7 +2037,9 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -1989,7 +2037,9 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
struct kv_pair *splitk_kvpair = 0; struct kv_pair *splitk_kvpair = 0;
struct kv_pair *old_split_key = node->u.n.childkeys[childnuma]; struct kv_pair *old_split_key = node->u.n.childkeys[childnuma];
unsigned int deleted_size = toku_brt_pivot_key_len(t, old_split_key); unsigned int deleted_size = toku_brt_pivot_key_len(t, old_split_key);
verify_local_fingerprint_nonleaf(childa);
r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, logger, &did_merge, &splitk_kvpair); r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, logger, &did_merge, &splitk_kvpair);
verify_local_fingerprint_nonleaf(childa);
if (childa->height>0) { int i; for (i=0; i+1<childa->u.n.n_children; i++) assert(childa->u.n.childkeys[i]); } if (childa->height>0) { int i; for (i=0; i+1<childa->u.n.n_children; i++) assert(childa->u.n.childkeys[i]); }
//(toku_verify_counts(childa), toku_verify_estimates(t,childa)); //(toku_verify_counts(childa), toku_verify_estimates(t,childa));
*did_react = did_merge; *did_react = did_merge;
...@@ -1998,6 +2048,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -1998,6 +2048,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
node->u.n.totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes. node->u.n.totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
if (did_merge) { if (did_merge) {
toku_fifo_free(&BNC_BUFFER(node, childnumb)); toku_fifo_free(&BNC_BUFFER(node, childnumb));
node->u.n.n_children--; node->u.n.n_children--;
...@@ -2011,11 +2063,14 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -2011,11 +2063,14 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys); REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys);
fixup_child_fingerprint(node, childnuma, childa, t, logger); fixup_child_fingerprint(node, childnuma, childa, t, logger);
assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b); assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
} else { } else {
assert(splitk_kvpair); assert(splitk_kvpair);
// If we didn't merge the nodes, then we need the correct pivot. // If we didn't merge the nodes, then we need the correct pivot.
node->u.n.childkeys[childnuma] = splitk_kvpair; node->u.n.childkeys[childnuma] = splitk_kvpair;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]); node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]);
verify_local_fingerprint_nonleaf(node);
} }
} }
return_r: return_r:
...@@ -2033,6 +2088,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL ...@@ -2033,6 +2088,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
if (rra) return rra; if (rra) return rra;
if (rrb) return rrb; if (rrb) return rrb;
} }
verify_local_fingerprint_nonleaf(node);
return r; return r;
} }
...@@ -2151,7 +2207,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea ...@@ -2151,7 +2207,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
} }
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__); if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
} }
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
return_r: return_r:
fixup_child_fingerprint(node, childnum, child, t, logger); fixup_child_fingerprint(node, childnum, child, t, logger);
{ {
...@@ -2184,14 +2240,18 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react ...@@ -2184,14 +2240,18 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
// If we perform I/O then set *did_io to true. // If we perform I/O then set *did_io to true.
// If a nonleaf node becomes overfull then we will flush some child. // If a nonleaf node becomes overfull then we will flush some child.
{ {
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
if (node->height==0) { if (node->height==0) {
return brt_leaf_put_cmd(t, node, cmd, logger, re); return brt_leaf_put_cmd(t, node, cmd, logger, re);
} else { } else {
verify_local_fingerprint_nonleaf(node);
enum reactivity child_re[node->u.n.n_children]; enum reactivity child_re[node->u.n.n_children];
{ int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; } { int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; }
int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io); int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io);
if (r!=0) goto return_r; if (r!=0) goto return_r;
verify_local_fingerprint_nonleaf(node);
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy. // Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
while (!*did_io // Don't flush if we've done I/O. while (!*did_io // Don't flush if we've done I/O.
&& nonleaf_node_is_gorged(node) // Don't flush if the node is small enough. && nonleaf_node_is_gorged(node) // Don't flush if the node is small enough.
...@@ -2239,13 +2299,18 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT ...@@ -2239,13 +2299,18 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
BOOL did_io = FALSE; BOOL did_io = FALSE;
{ {
int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io); int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io);
verify_local_fingerprint_nonleaf(node);
if (r!=0) return r; if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__); //if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
} }
//printf("%s:%d should_split=%d node_size=%" PRIu64 "\n", __FILE__, __LINE__, should_split, brtnode_memory_size(node)); //printf("%s:%d should_split=%d node_size=%" PRIu64 "\n", __FILE__, __LINE__, should_split, brtnode_memory_size(node));
return brt_handle_maybe_reactive_child_at_root(brt, rootp, nodep, re, logger); {
int r = brt_handle_maybe_reactive_child_at_root(brt, rootp, nodep, re, logger);
verify_local_fingerprint_nonleaf(*nodep);
return r;
}
} }
static void compute_and_fill_remembered_hash (BRT brt, int rootnum) { static void compute_and_fill_remembered_hash (BRT brt, int rootnum) {
...@@ -2327,9 +2392,9 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) ...@@ -2327,9 +2392,9 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
} }
VERIFY_NODE(brt, node); VERIFY_NODE(brt, node);
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) return r; if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) return r;
//verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(brt, node); r = toku_unpin_brtnode(brt, node);
assert(r == 0); assert(r == 0);
return 0; return 0;
...@@ -3165,6 +3230,8 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -3165,6 +3230,8 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE. // If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{ {
verify_local_fingerprint_nonleaf(node);
/* if the child's buffer is not empty then empty it */ /* if the child's buffer is not empty then empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) { if (BNC_NBYTESINBUF(node, childnum) > 0) {
BOOL did_io = FALSE; BOOL did_io = FALSE;
...@@ -3183,9 +3250,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -3183,9 +3250,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
} }
BRTNODE childnode = node_v; BRTNODE childnode = node_v;
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE; enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, omtcursor); int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, omtcursor);
// Even if r is reactive, we want to handle the maybe reactive child. // Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
{ {
BOOL did_io = FALSE; BOOL did_io = FALSE;
int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, logger, did_react); int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, logger, did_react);
...@@ -3193,16 +3264,22 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -3193,16 +3264,22 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
} }
{ {
int rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->fullhash, childnode->dirty, brtnode_memory_size(childnode)); int rr = toku_unpin_brtnode(brt, childnode);
if (rr!=0) r = rr; if (rr!=0) r = rr;
} }
*parent_re = get_nonleaf_reactivity(node); *parent_re = get_nonleaf_reactivity(node);
verify_local_fingerprint_nonleaf(node);
return r; return r;
} }
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) { static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) {
int count=0;
again: again:
count++;
verify_local_fingerprint_nonleaf(node);
{ {
int c; int c;
...@@ -3222,6 +3299,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -3222,6 +3299,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)), toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) { brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
BOOL did_change_shape; BOOL did_change_shape;
verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape); int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape);
assert(r != EAGAIN); assert(r != EAGAIN);
if (r == 0) return r; if (r == 0) return r;
...@@ -3231,6 +3309,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -3231,6 +3309,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */ /* check the first (left) or last (right) node if nothing has been found */
BOOL did_change_shape; // ignore this BOOL did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape); return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape);
} }
} }
...@@ -3238,6 +3317,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -3238,6 +3317,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
static int static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor)
{ {
verify_local_fingerprint_nonleaf(node);
if (node->height > 0) if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor); return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor);
else { else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment