Commit 2b020676 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Make {{{src/tests/test_dup_search}}} work. Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7620 c7de825b-a66e-492c-adef-691d508d4ae1
parent d0b92303
......@@ -280,17 +280,21 @@ fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT
node->dirty=1;
}
static void
static inline void
verify_local_fingerprint_nonleaf (BRTNODE node)
{
if (0) {
static int count=0; count++;
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
fp += toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
fp *= node->rand4fingerprint;
assert(fp==node->local_fingerprint);
}
}
static inline void
......@@ -353,6 +357,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// }
verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node));
}
......@@ -638,8 +643,8 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
BNC_SUBTREE_FINGERPRINT(newroot, 1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 0)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 1)=0;
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
verify_local_fingerprint_nonleaf(nodea);
verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (unsigned char)((brt->flags&TOKU_DB_DUPSORT)!=0), newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
......@@ -860,7 +865,7 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B,i)=0;
}
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
for (i=n_children_in_a; i<old_n_children; i++) {
......@@ -949,8 +954,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
REALLOC_N(n_children_in_a+1, node->u.n.childinfos);
REALLOC_N(n_children_in_a, node->u.n.childkeys);
//verify_local_fingerprint_nonleaf(node);
//verify_local_fingerprint_nonleaf(B);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
node->dirty = 1;
......@@ -998,7 +1003,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->dirty = 1;
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
XREALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
XREALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
......@@ -1021,14 +1026,14 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
// Slide the keys over
{
......@@ -1055,16 +1060,16 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
}
)
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa);
//verify_local_fingerprint_nonleaf(childb);
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
verify_local_fingerprint_nonleaf(childb);
verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node);
VERIFY_NODE(t, childa);
......@@ -1604,6 +1609,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
enum reactivity re_array[], BOOL *did_io)
{
verify_local_fingerprint_nonleaf(node);
// if the fifo is empty and the child is in main memory and the child isn't gorged, then put it in the child
if (BNC_NBYTESINBUF(node, childnum) == 0) {
BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum);
......@@ -1617,11 +1624,19 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
// The child is in main memory.
BRTNODE child = child_v;
verify_local_fingerprint_nonleaf(child);
r = brtnode_put_cmd (t, child, cmd, logger, &re_array[childnum], did_io);
fixup_child_fingerprint(node, childnum, child, t, logger);
VERIFY_NODE(t, node);
verify_local_fingerprint_nonleaf(child);
int rr = toku_unpin_brtnode(t, child);
assert(rr==0);
verify_local_fingerprint_nonleaf(node);
return r;
}
......@@ -1642,6 +1657,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
node->dirty = 1;
}
verify_local_fingerprint_nonleaf(node);
return 0;
}
......@@ -1675,11 +1692,18 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to reactivity of any modified child.
{
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
/* find the right subtree */
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t);
return brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io);
int r = brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io);
verify_local_fingerprint_nonleaf(node);
return r;
}
static int
......@@ -1737,6 +1761,9 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
//
{
verify_local_fingerprint_nonleaf(node);
switch (cmd->type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
......@@ -1871,6 +1898,7 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
BOOL *did_merge,
struct kv_pair **splitk)
{
verify_local_fingerprint_nonleaf(a);
assert(parent_splitk);
int old_n_children = a->u.n.n_children;
int new_n_children = old_n_children + b->u.n.n_children;
......@@ -1891,10 +1919,25 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
b->u.n.n_children = 0;
b->u.n.n_bytes_in_buffers = 0;
{
static int count=0; count++;
u_int32_t fp = 0;
int i;
for (i=0; i<a->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(a,i), key, keylen, data, datalen, type, xid,
fp += toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
a->local_fingerprint = a->rand4fingerprint * fp;
//printf("%s:%d fp=%u\n", __FILE__, __LINE__, a->local_fingerprint);
verify_local_fingerprint_nonleaf(a);
}
b->local_fingerprint = 0;
fixup_child_fingerprint(parent, childnum_of_parent, a, t, logger);
// abort(); // don't forget to reuse blocknums
*did_merge = TRUE;
*splitk = NULL;
verify_local_fingerprint_nonleaf(a);
return 0;
}
......@@ -1920,17 +1963,21 @@ maybe_merge_pinned_nodes (BRT t,
// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
{
assert(a->height == b->height);
verify_local_fingerprint_nonleaf(a);
if (a->height == 0) {
toku_free(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk, we'll malloc a new one.
return maybe_merge_pinned_leaf_nodes(t, a, b, logger, did_merge, splitk);
} else {
return maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk);
int r = maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk);
verify_local_fingerprint_nonleaf(a);
return r;
}
}
static int
brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKULOGGER logger, BOOL *did_react)
{
verify_local_fingerprint_nonleaf(node);
if (node->u.n.n_children < 2) return 0; // if no siblings, we are merged as best we can.
int childnuma,childnumb;
......@@ -1968,6 +2015,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
childa = childnode_v;
verify_local_fingerprint_nonleaf(childa);
}
{
void *childnode_v;
......@@ -1989,7 +2037,9 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
struct kv_pair *splitk_kvpair = 0;
struct kv_pair *old_split_key = node->u.n.childkeys[childnuma];
unsigned int deleted_size = toku_brt_pivot_key_len(t, old_split_key);
verify_local_fingerprint_nonleaf(childa);
r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, logger, &did_merge, &splitk_kvpair);
verify_local_fingerprint_nonleaf(childa);
if (childa->height>0) { int i; for (i=0; i+1<childa->u.n.n_children; i++) assert(childa->u.n.childkeys[i]); }
//(toku_verify_counts(childa), toku_verify_estimates(t,childa));
*did_react = did_merge;
......@@ -1998,6 +2048,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
node->u.n.totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
if (did_merge) {
toku_fifo_free(&BNC_BUFFER(node, childnumb));
node->u.n.n_children--;
......@@ -2011,11 +2063,14 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys);
fixup_child_fingerprint(node, childnuma, childa, t, logger);
assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
} else {
assert(splitk_kvpair);
// If we didn't merge the nodes, then we need the correct pivot.
node->u.n.childkeys[childnuma] = splitk_kvpair;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]);
verify_local_fingerprint_nonleaf(node);
}
}
return_r:
......@@ -2033,6 +2088,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
if (rra) return rra;
if (rrb) return rrb;
}
verify_local_fingerprint_nonleaf(node);
return r;
}
......@@ -2151,7 +2207,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
}
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
}
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
return_r:
fixup_child_fingerprint(node, childnum, child, t, logger);
{
......@@ -2184,14 +2240,18 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
// If we perform I/O then set *did_io to true.
// If a nonleaf node becomes overfull then we will flush some child.
{
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
if (node->height==0) {
return brt_leaf_put_cmd(t, node, cmd, logger, re);
} else {
verify_local_fingerprint_nonleaf(node);
enum reactivity child_re[node->u.n.n_children];
{ int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; }
int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io);
if (r!=0) goto return_r;
verify_local_fingerprint_nonleaf(node);
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
while (!*did_io // Don't flush if we've done I/O.
&& nonleaf_node_is_gorged(node) // Don't flush if the node is small enough.
......@@ -2239,13 +2299,18 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
BOOL did_io = FALSE;
{
int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io);
verify_local_fingerprint_nonleaf(node);
if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
}
//printf("%s:%d should_split=%d node_size=%" PRIu64 "\n", __FILE__, __LINE__, should_split, brtnode_memory_size(node));
return brt_handle_maybe_reactive_child_at_root(brt, rootp, nodep, re, logger);
{
int r = brt_handle_maybe_reactive_child_at_root(brt, rootp, nodep, re, logger);
verify_local_fingerprint_nonleaf(*nodep);
return r;
}
}
static void compute_and_fill_remembered_hash (BRT brt, int rootnum) {
......@@ -2327,9 +2392,9 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
}
VERIFY_NODE(brt, node);
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) return r;
//verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(brt, node);
assert(r == 0);
return 0;
......@@ -3165,6 +3230,8 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{
verify_local_fingerprint_nonleaf(node);
/* if the child's buffer is not empty then empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
BOOL did_io = FALSE;
......@@ -3183,9 +3250,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
}
BRTNODE childnode = node_v;
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, newkey, newval, &child_re, logger, omtcursor);
// Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
{
BOOL did_io = FALSE;
int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, logger, did_react);
......@@ -3193,16 +3264,22 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
}
{
int rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->fullhash, childnode->dirty, brtnode_memory_size(childnode));
int rr = toku_unpin_brtnode(brt, childnode);
if (rr!=0) r = rr;
}
*parent_re = get_nonleaf_reactivity(node);
verify_local_fingerprint_nonleaf(node);
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor) {
int count=0;
again:
count++;
verify_local_fingerprint_nonleaf(node);
{
int c;
......@@ -3222,6 +3299,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
BOOL did_change_shape;
verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape);
assert(r != EAGAIN);
if (r == 0) return r;
......@@ -3231,6 +3309,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */
BOOL did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, newkey, newval, re, logger, omtcursor, &did_change_shape);
}
}
......@@ -3238,6 +3317,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, enum reactivity *re, TOKULOGGER logger, OMTCURSOR omtcursor)
{
verify_local_fingerprint_nonleaf(node);
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, re, logger, omtcursor);
else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment