Commit 62da09d7 authored by Rich Prohaska's avatar Rich Prohaska

simplify. addresses #247 #250

git-svn-id: file:///svn/tokudb@1559 c7de825b-a66e-492c-adef-691d508d4ae1
parent f8f16912
...@@ -683,14 +683,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -683,14 +683,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE tochild = childa; BRTNODE tochild = childa;
if (type == BRT_INSERT || type == BRT_DELETE_BOTH) { if (type == BRT_INSERT || type == BRT_DELETE_BOTH) {
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data); int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
if (cmp < 0) { if (cmp > 0) {
;
} else if (cmp > 0) {
tochildnum = childnum+1; tochild = childb; tochildnum = childnum+1; tochild = childb;
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) {
tochildnum = childnum+1; tochild = childb;
}
} }
} }
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn); r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn);
...@@ -955,38 +949,6 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -955,38 +949,6 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} }
} }
/* find the rightmost child that the key/data will be inserted */
static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t) {
assert(node->height>0);
int maybe = -1; /* last pivot that matched the key */
int i;
for (i=node->u.n.n_children-2; i >= 0; i--) {
int cmp = brt_compare_pivot(t, k, data, node->u.n.childkeys[i]);
if (cmp < 0) {
continue;
} else if (cmp > 0) {
if (maybe != -1) goto foundkeymatch;
return i+1;
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
maybe = i;
} else
maybe = i;
}
maybe = 0;
foundkeymatch:
if (!(node->u.n.pivotflags[maybe] & BRT_PIVOT_PRESENT_L)) {
node->u.n.pivotflags[maybe] |= BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
return maybe;
}
/* find the leftmost child that may contain the key */ /* find the leftmost child that may contain the key */
static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) { static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i; int i;
...@@ -995,24 +957,19 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) { ...@@ -995,24 +957,19 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]); int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue; if (cmp > 0) continue;
if (cmp < 0) return i; if (cmp < 0) return i;
if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
}
return i; return i;
} }
return node->u.n.n_children-1; return node->u.n.n_children-1;
} }
static inline unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t) { static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t) {
return brtnode_left_child(node, k, 0, t); return brtnode_left_child(node, k, data, t);
} }
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd, /* put a cmd into a nodes child */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *splitk, int debug, TOKUTXN txn, int childnum, int maybe) { int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, int childnum, int maybe) {
int r; int r;
void *child_v; void *child_v;
BRTNODE child; BRTNODE child;
...@@ -1060,26 +1017,16 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1060,26 +1017,16 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int toku_brt_do_push_cmd = 1; int toku_brt_do_push_cmd = 1;
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, /* put a cmd into a node at childnum */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, int debug, TOKUTXN txn, unsigned int childnum, int can_push, int *do_push_down) {
TOKUTXN txn) {
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
unsigned int childnum;
int found;
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
childnum = brtnode_right_child(node, k, v, t);
//rfp printf("nonleaf_insert %d,%d -> %lld %d\n", htonl(*(int*)k->data), *(int*)v->data, node->thisnodename, childnum);
/* non-buffering mode when cursors are open on this child */ /* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) { if (node->u.n.n_cursors[childnum] > 0) {
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0); assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0); int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
//if (*did_split) { //if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea); // verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb); // verify_local_fingerprint_nonleaf(*nodeb);
...@@ -1089,45 +1036,20 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1089,45 +1036,20 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
return r; return r;
} }
//verify_local_fingerprint_nonleaf(node); /* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */
{ if (node->u.n.n_bytes_in_hashtable[childnum] == 0 && can_push && toku_brt_do_push_cmd) {
int anytype; int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
bytevec olddata; if (r == 0)
ITEMLEN olddatalen;
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
//verify_local_fingerprint_nonleaf(node);
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
toku_verify_counts(node);
if (found) {
if (!(t->flags & TOKU_DB_DUP)) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
/* Be careful, olddata is now invalid because of the delete. */
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->dirty = 1;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
found = 0;
}
}
}
//verify_local_fingerprint_nonleaf(node);
/* if the child is in the cache table then push the cmd to it
otherwise just put it into this node's buffer */
if (!found && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
if (r == 0) {
//printf("%s:%d\n", __FILE__, __LINE__);
return r; return r;
}
} }
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
/* append the cmd to the child buffer */
{ {
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type); int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type);
assert(r==0); assert(r==0);
...@@ -1136,124 +1058,52 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1136,124 +1058,52 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
node->u.n.n_bytes_in_hashtable[childnum] += diff; node->u.n.n_bytes_in_hashtable[childnum] += diff;
node->dirty = 1; node->dirty = 1;
} }
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, ""); *do_push_down = 1;
//verify_local_fingerprint_nonleaf(node);
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea, (*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb, (*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
toku_verify_counts(node);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return 0; return 0;
} }
static int brt_nonleaf_delete_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd, static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, unsigned int childnum) { int debug, TOKUTXN txn) {
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
int found; unsigned int childnum;
int r;
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return r;
}
//verify_local_fingerprint_nonleaf(node); /* find the right subtree */
{ childnum = brtnode_right_child(node, cmd->u.id.key, cmd->u.id.val, t);
int anytype;
bytevec olddata; /* put the cmd in the subtree */
ITEMLEN olddatalen; int do_push_down = 0;
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype); r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1, &do_push_down);
if (r != 0) return r;
//verify_local_fingerprint_nonleaf(node);
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, ""); /* maybe push down */
toku_verify_counts(node); if (do_push_down) {
while (found) { if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d found and deleting\n", __FILE__, __LINE__); //verify_local_fingerprint_nonleaf(node);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen); r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size); if (r!=0) return r;
/* Be careful, olddata is now invalid because of the delete. */ if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; if (*did_split) {
assert(r==0); assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
node->u.n.n_bytes_in_hashtables -= diff; assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
node->u.n.n_bytes_in_hashtable[childnum] -= diff; assert((*nodea)->u.n.n_children>0);
node->dirty = 1; assert((*nodeb)->u.n.n_children>0);
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff); assert(BRTNODE_CHILD_DISKOFF(*nodea, (*nodea)->u.n.n_children-1)!=0);
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype); assert(BRTNODE_CHILD_DISKOFF(*nodeb, (*nodeb)->u.n.n_children-1)!=0);
} toku_verify_counts(*nodea);
} toku_verify_counts(*nodeb);
//verify_local_fingerprint_nonleaf(node); } else {
/* if the child is in the cache table then push the cmd to it assert(toku_serialize_brtnode_size(node)<=node->nodesize);
otherwise just put it into this node's buffer */ toku_verify_counts(node);
if (toku_brt_do_push_cmd) { }
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1); //if (*did_split) {
if (r == 0) { // verify_local_fingerprint_nonleaf(*nodea);
//printf("%s:%d\n", __FILE__, __LINE__); // verify_local_fingerprint_nonleaf(*nodeb);
return r; //} else {
} // verify_local_fingerprint_nonleaf(node);
} //}
//verify_local_fingerprint_nonleaf(node);
{
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type);
assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, k->data, k->size, v->data, v->size);
node->u.n.n_bytes_in_hashtables += diff;
node->u.n.n_bytes_in_hashtable[childnum] += diff;
node->dirty = 1;
}
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea,(*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb,(*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
toku_verify_counts(node);
} }
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return 0; return 0;
} }
...@@ -1281,30 +1131,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1281,30 +1131,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} else if (t->flags & TOKU_DB_DUPSORT) { } else if (t->flags & TOKU_DB_DUPSORT) {
delchild_append(i); delchild_append(i);
delchild_append(i+1); delchild_append(i+1);
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
delchild_append(i);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
delchild_append(i+1);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else { } else {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
delchild_append(i); delchild_append(i);
break; break;
} }
...@@ -1314,19 +1141,39 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1314,19 +1141,39 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
delchild_append(node->u.n.n_children-1); delchild_append(node->u.n.n_children-1);
/* issue the delete cmd to all of the children found previously */ /* issue the delete cmd to all of the children found previously */
int do_push_down = 0;
for (i=0; i<delidx; i++) { for (i=0; i<delidx; i++) {
r = brt_nonleaf_delete_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i]); r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i], delidx == 1, &do_push_down);
assert(r == 0); assert(r == 0);
} }
/* post condition: for all pk(i) == k -> assert pf(i) == 0 */ if (do_push_down) {
for (i=0; i < node->u.n.n_children-1; i++) { /* maybe push down */
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]); if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
if (cmp == 0) r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
assert(node->u.n.pivotflags[i] == 0); if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea,(*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb,(*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
toku_verify_counts(node);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
} }
return 0; return 0;
} }
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
...@@ -1334,11 +1181,11 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -1334,11 +1181,11 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *splitk, DBT *splitk,
int debug, int debug,
TOKUTXN txn) { TOKUTXN txn) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn); return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else if (cmd->type == BRT_DELETE) } else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn); return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else } else
return EINVAL; return EINVAL;
} }
...@@ -1769,7 +1616,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1769,7 +1616,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root); //printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot), toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
brt_update_cursors_new_root(brt, newroot, nodea, nodeb); brt_update_cursors_new_root(brt, newroot, nodea, nodeb);
return 0; return 0;
} }
...@@ -1840,99 +1687,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { ...@@ -1840,99 +1687,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return r; return r;
} }
#if 0
static int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v) {
int result;
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0)
return r;
BRTNODE node = node_v;
assert(node->tag == TYP_BRTNODE);
int childnum;
//printf("%s:%d pin %p height=%d children=%d\n", __FILE__, __LINE__, node_v, node->height, node->u.n.n_children);
if (node->height==0) {
result = toku_pma_lookup(node->u.l.buffer, k, v);
//printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen);
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
childnum = brtnode_which_child(node, k, brt);
{
bytevec hanswer;
ITEMLEN hanswerlen;
int type;
if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) {
if (type == BRT_INSERT) {
if ((brt->flags & TOKU_DB_DUP)) {
result = brt_lookup_node(brt, BRTNODE_CHILD_DISKOFF(node, childnum), k, v);
if (result != 0) {
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
}
} else {
//printf("Found %d bytes\n", *vallen);
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
//printf("%s:%d Returning %p\n", __FILE__, __LINE__, v->data);
result = 0;
}
} else if (type == BRT_DELETE) {
if ((brt->flags & TOKU_DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
assert(type == BRT_INSERT);
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
} else
result = DB_NOTFOUND;
} else {
assert(0);
result = EINVAL;
}
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
}
result = brt_lookup_node(brt, BRTNODE_CHILD_DISKOFF(node, childnum), k, v);
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
#endif
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
#if 0
int r;
CACHEKEY *rootp;
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
printf("%s:%d\n", __FILE__, __LINE__);
if (0) { died0: toku_unpin_brt_header(brt); }
// printf("%s:%d returning %d\n", __FILE__, __LINE__, r);
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r = brt_lookup_node(brt, *rootp, k, v))) {
// printf("%s:%d\n", __FILE__, __LINE__);
goto died0;
}
//printf("%s:%d r=%d", __FILE__, __LINE__, r); if (r==0) printf(" vallen=%d", *vallen); printf("\n");
if ((r = toku_unpin_brt_header(brt))!=0) return r;
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
return 0;
#else
int r, rr; int r, rr;
BRT_CURSOR cursor; BRT_CURSOR cursor;
...@@ -1945,9 +1700,7 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { ...@@ -1945,9 +1700,7 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
rr = toku_brt_cursor_close(cursor); assert(rr == 0); rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r; return r;
#endif
} }
int toku_brt_delete(BRT brt, DBT *key) { int toku_brt_delete(BRT brt, DBT *key) {
int r; int r;
...@@ -2424,12 +2177,13 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key, ...@@ -2424,12 +2177,13 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key,
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) { if (r!=0) return r;
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 1, 0); }
return r;
}
BRTNODE node = node_v; BRTNODE node = node_v;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
}
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT); assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node; cursor->path[cursor->path_len++] = node;
if (node->height>0) { if (node->height>0) {
...@@ -2456,7 +2210,8 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key, ...@@ -2456,7 +2210,8 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key,
r=brtcurs_set_position_last (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn); r=brtcurs_set_position_last (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
if (childnum>0) { if (childnum>0) {
...@@ -2485,14 +2240,15 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key, ...@@ -2485,14 +2240,15 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key,
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) { if (r!=0) return r;
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 1, 0); }
return r;
}
BRTNODE node = node_v; BRTNODE node = node_v;
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT); assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node; cursor->path[cursor->path_len++] = node;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
}
if (node->height>0) { if (node->height>0) {
int childnum int childnum
; ;
...@@ -2517,7 +2273,8 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key, ...@@ -2517,7 +2273,8 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key,
r=brtcurs_set_position_first (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn); r=brtcurs_set_position_first (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0) if (r == 0)
return r; return r;
assert(node == cursor->path[cursor->path_len-1]); node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
if (childnum+1<node->u.n.n_children) { if (childnum+1<node->u.n.n_children) {
...@@ -2581,7 +2338,8 @@ static int brtcurs_set_position_next2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn) ...@@ -2581,7 +2338,8 @@ static int brtcurs_set_position_next2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
r = brtcurs_set_position_first(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn); r = brtcurs_set_position_first(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
childnum += 1; childnum += 1;
} }
...@@ -2642,7 +2400,8 @@ static int brtcurs_set_position_prev2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn) ...@@ -2642,7 +2400,8 @@ static int brtcurs_set_position_prev2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
r = brtcurs_set_position_last(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn); r = brtcurs_set_position_last(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
childnum -= 1; childnum -= 1;
} }
...@@ -2929,76 +2688,6 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT ...@@ -2929,76 +2688,6 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
return 0; return 0;
} }
/* clear the right present flag if the key matches the pivot key */
static int brt_node_maybe_clear_right_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) {
int match = 0;
if (0 <= childnum) {
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) {
assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R);
node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
match = 1;
}
}
return match;
}
/* clear the left present flag if the key matches the pivot key */
static int brt_node_maybe_clear_left_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) {
int match = 0;
if (childnum < node->u.n.n_children - 1) {
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) {
assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_L);
node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
match = 1;
}
}
return match;
}
/* check if any subtree of this node contains the key */
static int brt_node_any_key_present(BRTNODE node, DBT *key, DBT *val, BRT t) {
int i;
for (i=0; i<node->u.n.n_children-1; i++)
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[i]) && (node->u.n.pivotflags[i] & (BRT_PIVOT_PRESENT_L + BRT_PIVOT_PRESENT_R)))
return 1;
return 0;
}
/* clear the key present flags in the nodes along the cursor path */
static void brt_cursor_maybe_clear_pivot_flags(BRT_CURSOR cursor) {
int r;
DBT key, *k;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
k = &key;
DBT val, *v;
if (cursor->brt->flags & TOKU_DB_DUPSORT) {
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
v = &val;
} else
v = 0;
r = toku_pma_cursor_get_current(cursor->pmacurs, k, v, 1); assert(r == 0);
int i;
for (i = cursor->path_len - 2; i >= 0; i -= 1) {
BRTNODE node = cursor->path[i];
int childnum = cursor->pathcnum[i];
int match;
match = brt_node_maybe_clear_left_pivot_flag(node, k, v, childnum, cursor->brt);
match += brt_node_maybe_clear_right_pivot_flag(node, k, v, childnum-1, cursor->brt);
if (!match) break;
/* if matching keys in any subtrees of this node then we are done */
if (brt_node_any_key_present(node, k, v, cursor->brt)) break;
}
toku_free(k->data);
if (v) toku_free(v->data);
}
/* delete the key and value under the cursor */ /* delete the key and value under the cursor */
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__))) { int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__))) {
int r; int r;
...@@ -3006,12 +2695,11 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused_ ...@@ -3006,12 +2695,11 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused_
if (cursor->path_len > 0) { if (cursor->path_len > 0) {
BRTNODE node = cursor->path[cursor->path_len-1]; BRTNODE node = cursor->path[cursor->path_len-1];
assert(node->height == 0); assert(node->height == 0);
int kvsize, lastmatch; int kvsize;
r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint, &lastmatch); r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint, 0);
if (r == 0) { if (r == 0) {
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kvsize; node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kvsize;
node->dirty = 1; node->dirty = 1;
if (lastmatch) brt_cursor_maybe_clear_pivot_flags(cursor);
} }
} else } else
r = DB_NOTFOUND; r = DB_NOTFOUND;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment