Commit 5bcd43db authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Delete command must be duplicated. Fixes #201.

git-svn-id: file:///svn/tokudb@2132 c7de825b-a66e-492c-adef-691d508d4ae1
parent 59a5db9b
...@@ -100,6 +100,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE ...@@ -100,6 +100,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
toku_update_brtnode_lsn(node, txn); toku_update_brtnode_lsn(node, txn);
} }
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) { static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
int cmp; int cmp;
DBT mydbt; DBT mydbt;
...@@ -660,38 +661,53 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -660,38 +661,53 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRT_CMD_S brtcmd = { type, xid, .u.id= {toku_fill_dbt(&skd, skey, skeylen), BRT_CMD_S brtcmd = { type, xid, .u.id= {toku_fill_dbt(&skd, skey, skeylen),
toku_fill_dbt(&svd, sval, svallen)} }; toku_fill_dbt(&svd, sval, svallen)} };
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb); //verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int tochildnum; int pusha = 0, pushb = 0;
BRTNODE tochild;
switch (type) { switch (type) {
case BRT_INSERT: case BRT_INSERT:
case BRT_DELETE_BOTH: case BRT_DELETE_BOTH:
case BRT_DELETE: case BRT_DELETE:
//case BRT_DELETE: if (type!=BRT_DELETE || 0==(t->flags&TOKU_DB_DUPSORT)) {
{ // If it's an INSERT or DELETE_BOTH or there are no duplicates then we just put the command into one subtree
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data); int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
if (cmp > 0) { if (cmp <= 0) pusha = 1;
tochildnum = childnum+1; tochild = childb; else pushb = 1;
} else {
assert(type==BRT_DELETE && t->flags&TOKU_DB_DUPSORT);
// It is a DELETE and it's a DUPSORT database, in which case if the comparison function comes up 0 we must write the command to both children. (See #201)
int cmp = brt_compare_pivot(t, &skd, 0, childsplitk->data);
if (cmp<=0) pusha=1;
if (cmp>=0) pushb=1; // Could be that both pusha and pushb are set
}
if (pusha) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, txn);
} else { } else {
tochildnum = childnum; tochild = childa; r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
} }
} }
if (pushb) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, txn);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
goto ok; goto ok;
case BRT_NONE: case BRT_NONE:
// Don't have to do anything in this case, can just drop the command // Don't have to do anything in this case, can just drop the command
break; goto ok;
} }
printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones. printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0); assert(0);
ok: ok: /*nothing*/;
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,tochildnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn);
} else {
r=insert_to_buffer_in_nonleaf(node, tochildnum, &skd, &svd, type, xid);
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
})); }));
toku_fifo_free(&old_h); toku_fifo_free(&old_h);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment