Commit 538a507a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Make rollback do the right thing in some cases fo internal nodes. Addresses #556.

git-svn-id: file:///svn/tokudb@2955 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6153661e
...@@ -52,6 +52,7 @@ REGRESSION_TESTS = \ ...@@ -52,6 +52,7 @@ REGRESSION_TESTS = \
cachetable-test \ cachetable-test \
cachetable-test2 \ cachetable-test2 \
fifo-test \ fifo-test \
fifo-test-exp \
test-brt-delete-both \ test-brt-delete-both \
brt-test \ brt-test \
brt-test3 \ brt-test3 \
...@@ -168,7 +169,7 @@ brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h ...@@ -168,7 +169,7 @@ brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h
fifo.o: fifo.h brttypes.h fifo.o: fifo.h brttypes.h
memory.o: memory.h memory.o: memory.h
primes.o: primes.h toku_assert.h primes.o: primes.h toku_assert.h
fifo-test: fifo.o memory.o toku_assert.o ybt.o fifo-test-exp fifo-test: fifo.o memory.o toku_assert.o ybt.o
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h brt-bigtest.o: brt.h ../include/db.h
......
...@@ -191,4 +191,6 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_typ ...@@ -191,4 +191,6 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_typ
int toku_set_func_fsync (int (*fsync_function)(int)); int toku_set_func_fsync (int (*fsync_function)(int));
#endif #endif
...@@ -492,13 +492,14 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -492,13 +492,14 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *split, DBT *split,
int debug, int debug,
TOKULOGGER); TOKULOGGER, DISKOFFARRAY path_to_parent);
/* key is not in the buffer. Either put the key-value pair in the child, or put it in the node. */ /* key is not in the buffer. Either put the key-value pair in the child, or put it in the node. */
static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child, static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child,
BRT_CMD cmd, BRT_CMD cmd,
int childnum_of_node, int childnum_of_node,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
assert(node->height>0); /* Not a leaf. */ assert(node->height>0); /* Not a leaf. */
DBT *k = cmd->u.id.key; DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val; DBT *v = cmd->u.id.val;
...@@ -533,7 +534,8 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT ...@@ -533,7 +534,8 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT
r = brtnode_put_cmd(t, child, cmd, r = brtnode_put_cmd(t, child, cmd,
&again_split, &againa, &againb, &againk, &again_split, &againa, &againb, &againk,
0, 0,
logger); logger,
path_to_parent);
if (r!=0) return r; if (r!=0) return r;
assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */ assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */
} else { } else {
...@@ -547,7 +549,8 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum ...@@ -547,7 +549,8 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
BRT_CMD cmd, BRT_CMD cmd,
int *child_did_split, BRTNODE *childa, BRTNODE *childb, int *child_did_split, BRTNODE *childa, BRTNODE *childb,
DBT *childsplitk, DBT *childsplitk,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
//if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, ""); //if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d hello!\n", __FILE__, __LINE__); //printf("%s:%d hello!\n", __FILE__, __LINE__);
assert(node->height>0); assert(node->height>0);
...@@ -555,7 +558,8 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum ...@@ -555,7 +558,8 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
int r = brtnode_put_cmd(t, child, cmd, int r = brtnode_put_cmd(t, child, cmd,
child_did_split, childa, childb, childsplitk, child_did_split, childa, childb, childsplitk,
0, 0,
logger); logger,
path_to_parent);
if (r!=0) return r; if (r!=0) return r;
} }
...@@ -593,7 +597,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum ...@@ -593,7 +597,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
return 0; return 0;
} }
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger); static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger, DISKOFFARRAY path_to_parent);
static int split_count=0; static int split_count=0;
...@@ -609,7 +613,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -609,7 +613,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */ DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, DBT *splitk,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
assert(node->height>0); assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children); assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum); FIFO old_h = BNC_BUFFER(node,childnum);
...@@ -718,7 +723,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -718,7 +723,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (pusha) { if (pusha) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order. // If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) { if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, logger); r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, logger, path_to_parent);
} else { } else {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid); r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
} }
...@@ -726,7 +731,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -726,7 +731,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (pushb) { if (pushb) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order. // If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) { if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, logger); r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, logger, path_to_parent);
} else { } else {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid); r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
} }
...@@ -784,7 +789,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -784,7 +789,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (toku_serialize_brtnode_size(node) > node->nodesize) { if (toku_serialize_brtnode_size(node) > node->nodesize) {
/* lighten the node by pushing down its buffers. this may cause /* lighten the node by pushing down its buffers. this may cause
the current node to split and go away */ the current node to split and go away */
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, 0, logger); r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, 0, logger, path_to_parent);
assert(r == 0); assert(r == 0);
} }
if (*did_split == 0) assert(toku_serialize_brtnode_size(node)<=node->nodesize); if (*did_split == 0) assert(toku_serialize_brtnode_size(node)<=node->nodesize);
...@@ -796,7 +801,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -796,7 +801,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, DBT *splitk,
int debug, int debug,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
void *childnode_v; void *childnode_v;
BRTNODE child; BRTNODE child;
int r; int r;
...@@ -844,7 +850,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -844,7 +850,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
&brtcmd, &brtcmd,
&child_did_split, &childa, &childb, &child_did_split, &childa, &childb,
&childsplitk, &childsplitk,
logger); logger,
path_to_parent);
if (0){ if (0){
unsigned int sum=0; unsigned int sum=0;
...@@ -862,7 +869,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -862,7 +869,8 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
r=handle_split_of_child (t, node, childnum, r=handle_split_of_child (t, node, childnum,
childa, childb, &childsplitk, childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, did_split, nodea, nodeb, splitk,
logger); logger,
path_to_parent);
//if (*did_split) { //if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea); // verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb); // verify_local_fingerprint_nonleaf(*nodeb);
...@@ -885,7 +893,7 @@ static int debugp1 (int debug) { ...@@ -885,7 +893,7 @@ static int debugp1 (int debug) {
return debug ? debug+1 : 0; return debug ? debug+1 : 0;
} }
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger) static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger, DISKOFFARRAY path_to_parent)
/* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */ /* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */
{ {
assert(node->height>0); assert(node->height>0);
...@@ -901,7 +909,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE ...@@ -901,7 +909,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
find_heaviest_child(node, &childnum); find_heaviest_child(node, &childnum);
if (0) printf("%s:%d %*spush some down from %lld into %lld (child %d)\n", __FILE__, __LINE__, debug, "", node->thisnodename, BNC_DISKOFF(node, childnum), childnum); if (0) printf("%s:%d %*spush some down from %lld into %lld (child %d)\n", __FILE__, __LINE__, debug, "", node->thisnodename, BNC_DISKOFF(node, childnum), childnum);
assert(BNC_DISKOFF(node, childnum)!=0); assert(BNC_DISKOFF(node, childnum)!=0);
int r = push_some_brt_cmds_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), logger); int r = push_some_brt_cmds_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), logger, path_to_parent);
if (r!=0) return r; if (r!=0) return r;
assert(*did_split==0 || *did_split==1); assert(*did_split==0 || *did_split==1);
if (debug) printf("%s:%d %*sdid push_some_brt_cmds_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split); if (debug) printf("%s:%d %*sdid push_some_brt_cmds_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split);
...@@ -1020,7 +1028,8 @@ unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) { ...@@ -1020,7 +1028,8 @@ unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
/* put a cmd into a nodes child */ /* put a cmd into a nodes child */
static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd, static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKULOGGER logger, int childnum, int maybe) { int debug, TOKULOGGER logger, int childnum, int maybe,
DISKOFFARRAY path_to_parent) {
int r; int r;
void *child_v; void *child_v;
BRTNODE child; BRTNODE child;
...@@ -1042,7 +1051,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1042,7 +1051,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
child_did_split = 0; child_did_split = 0;
r = brtnode_put_cmd(t, child, cmd, r = brtnode_put_cmd(t, child, cmd,
&child_did_split, &childa, &childb, &childsplitk, debug, logger); &child_did_split, &childa, &childb, &childsplitk, debug, logger,
path_to_parent);
if (r != 0) { if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */ /* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = toku_unpin_brtnode(t, child); int rr = toku_unpin_brtnode(t, child);
...@@ -1055,7 +1065,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1055,7 +1065,8 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
r = handle_split_of_child(t, node, childnum, r = handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk, childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk, did_split, nodea, nodeb, splitk,
logger); logger,
path_to_parent);
assert(r == 0); assert(r == 0);
} else { } else {
//verify_local_fingerprint_nonleaf(child); //verify_local_fingerprint_nonleaf(child);
...@@ -1071,12 +1082,13 @@ int toku_brt_do_push_cmd = 1; ...@@ -1071,12 +1082,13 @@ int toku_brt_do_push_cmd = 1;
/* put a cmd into a node at childnum */ /* put a cmd into a node at childnum */
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd, static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKULOGGER logger, unsigned int childnum, int can_push, int *do_push_down) { int debug, TOKULOGGER logger, unsigned int childnum, int can_push, int *do_push_down,
DISKOFFARRAY path_to_parent) {
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
/* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */ /* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */
if (BNC_NBYTESINBUF(node, childnum) == 0 && can_push && toku_brt_do_push_cmd) { if (BNC_NBYTESINBUF(node, childnum) == 0 && can_push && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1); int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1, path_to_parent);
if (r == 0) if (r == 0)
return r; return r;
} }
...@@ -1094,6 +1106,15 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1094,6 +1106,15 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
u_int32_t newfingerprint = node->local_fingerprint + node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size); u_int32_t newfingerprint = node->local_fingerprint + node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size);
int r=toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, cmd->xid, type, keybs, databs, node->local_fingerprint, newfingerprint); int r=toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, cmd->xid, type, keybs, databs, node->local_fingerprint, newfingerprint);
assert(r==0); assert(r==0);
{
TOKUTXN txn;
if (0==toku_txnid2txn(logger,cmd->xid,&txn) && txn) {
DISKOFFARRAY path = path_to_parent;
path.array = toku_memdup(path.array, sizeof(path.array[0])*(1+path.len));
r=toku_logger_save_rollback_xactiontouchednonleaf(txn, toku_cachefile_filenum(t->cf), path, node->thisnodename);
if (r!=0) return r;
}
}
r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid); r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0); assert(r==0);
node->local_fingerprint = newfingerprint; node->local_fingerprint = newfingerprint;
...@@ -1107,7 +1128,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1107,7 +1128,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd, static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKULOGGER logger) { int debug, TOKULOGGER logger, DISKOFFARRAY path_to_parent) {
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
unsigned int childnum; unsigned int childnum;
int r; int r;
...@@ -1117,14 +1138,14 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1117,14 +1138,14 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* put the cmd in the subtree */ /* put the cmd in the subtree */
int do_push_down = 0; int do_push_down = 0;
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1, &do_push_down); r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1, &do_push_down, path_to_parent);
if (r != 0) return r; if (r != 0) return r;
/* maybe push down */ /* maybe push down */
if (do_push_down) { if (do_push_down) {
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger); r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger, path_to_parent);
if (r!=0) return r; if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) { if (*did_split) {
...@@ -1154,7 +1175,8 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1154,7 +1175,8 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd, static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, int debug,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
int r; int r;
/* find all children that need a delete cmd */ /* find all children that need a delete cmd */
...@@ -1186,7 +1208,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1186,7 +1208,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* issue the delete cmd to all of the children found previously */ /* issue the delete cmd to all of the children found previously */
int do_push_down = 0; int do_push_down = 0;
for (i=0; i<delidx; i++) { for (i=0; i<delidx; i++) {
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, delchild[i], delidx == 1, &do_push_down); r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, delchild[i], delidx == 1, &do_push_down, path_to_parent);
assert(r == 0); assert(r == 0);
} }
...@@ -1194,7 +1216,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1194,7 +1216,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* maybe push down */ /* maybe push down */
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node); //verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger); r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger, path_to_parent);
if (r!=0) return r; if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) { if (*did_split) {
...@@ -1223,11 +1245,12 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1223,11 +1245,12 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, DBT *splitk,
int debug, int debug,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) { if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger); return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, path_to_parent);
} else if (cmd->type == BRT_DELETE) { } else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger); return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, path_to_parent);
} else } else
return EINVAL; return EINVAL;
} }
...@@ -1248,7 +1271,8 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) { ...@@ -1248,7 +1271,8 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) {
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, int debug,
TOKULOGGER logger) { TOKULOGGER logger,
DISKOFFARRAY path_to_parent) {
//static int counter=0; // FOO //static int counter=0; // FOO
//static int oldcounter=0; //static int oldcounter=0;
//int tmpcounter; //int tmpcounter;
...@@ -1263,7 +1287,7 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1263,7 +1287,7 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
} else { } else {
r = brt_nonleaf_put_cmd(t, node, cmd, r = brt_nonleaf_put_cmd(t, node, cmd,
did_split, nodea, nodeb, splitk, did_split, nodea, nodeb, splitk,
debug, logger); debug, logger, path_to_parent);
} }
//oldcounter=tmpcounter; //oldcounter=tmpcounter;
// Watch out. If did_split then the original node is no longer allocated. // Watch out. If did_split then the original node is no longer allocated.
...@@ -1702,7 +1726,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -1702,7 +1726,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
return 0; return 0;
} }
static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger, DISKOFFARRAY path_to_parent) {
void *node_v; void *node_v;
BRTNODE node; BRTNODE node;
CACHEKEY *rootp; CACHEKEY *rootp;
...@@ -1729,7 +1753,8 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { ...@@ -1729,7 +1753,8 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
result = brtnode_put_cmd(brt, node, cmd, result = brtnode_put_cmd(brt, node, cmd,
&did_split, &nodea, &nodeb, &splitk, &did_split, &nodea, &nodeb, &splitk,
debug, debug,
logger); logger,
path_to_parent);
if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__); if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__);
if (did_split) { if (did_split) {
// node is unpinned, so now we have to proceed to update the root with a new node. // node is unpinned, so now we have to proceed to update the root with a new node.
...@@ -1752,11 +1777,14 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { ...@@ -1752,11 +1777,14 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
return result; return result;
} }
#define MAX_PATHLEN_TO_ROOT 40
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r; int r;
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}}; BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
DISKOFF path[MAX_PATHLEN_TO_ROOT];
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); DISKOFFARRAY path_to_parent = {0, path};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn), path_to_parent);
return r; return r;
} }
...@@ -1779,14 +1807,18 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { ...@@ -1779,14 +1807,18 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r; int r;
DBT val; DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}}; BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); DISKOFF path[MAX_PATHLEN_TO_ROOT];
DISKOFFARRAY path_to_parent = {0, path};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn), path_to_parent);
return r; return r;
} }
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r; int r;
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}}; BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); DISKOFF path[MAX_PATHLEN_TO_ROOT];
DISKOFFARRAY path_to_parent = {0, path};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn), path_to_parent);
return r; return r;
} }
...@@ -1924,15 +1956,15 @@ static inline void brt_split_init(BRT_SPLIT *split) { ...@@ -1924,15 +1956,15 @@ static inline void brt_split_init(BRT_SPLIT *split) {
toku_init_dbt(&split->splitk); toku_init_dbt(&split->splitk);
} }
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger); static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, DISKOFFARRAY path_to_parent);
/* search in a node's child */ /* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, DISKOFFARRAY path_to_parent) {
int r, rr; int r, rr;
/* if the child's buffer is not empty then try to empty it */ /* if the child's buffer is not empty then try to empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) { if (BNC_NBYTESINBUF(node, childnum) > 0) {
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, logger); rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, logger, path_to_parent);
assert(rr == 0); assert(rr == 0);
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */ /* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN; return EAGAIN;
...@@ -1945,11 +1977,11 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -1945,11 +1977,11 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
for (;;) { for (;;) {
BRTNODE childnode = node_v; BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit); BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger); r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger, path_to_parent);
if (childsplit.did_split) { if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk, rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger); &split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger, path_to_parent);
assert(rr == 0); assert(rr == 0);
break; break;
} else { } else {
...@@ -1964,7 +1996,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -1964,7 +1996,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r; return r;
} }
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, DISKOFFARRAY path_to_parent) {
int r = DB_NOTFOUND; int r = DB_NOTFOUND;
int c; int c;
...@@ -1982,7 +2014,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -1982,7 +2014,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
if (search->compare(search, if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)), toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) { brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger); r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, path_to_parent);
if (r == 0 || r == EAGAIN) if (r == 0 || r == EAGAIN)
break; break;
} }
...@@ -1990,7 +2022,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -1990,7 +2022,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */ /* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1) if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger); r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, path_to_parent);
return r; return r;
} }
...@@ -2001,9 +2033,9 @@ static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey, ...@@ -2001,9 +2033,9 @@ static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey,
return r; return r;
} }
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, DISKOFFARRAY path_to_parent) {
if (node->height > 0) if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger); return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, path_to_parent);
else else
return brt_search_leaf_node(node, search, newkey, newval); return brt_search_leaf_node(node, search, newkey, newval);
} }
...@@ -2025,7 +2057,9 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK ...@@ -2025,7 +2057,9 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
for (;;) { for (;;) {
BRT_SPLIT split; brt_split_init(&split); BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger); DISKOFF path[MAX_PATHLEN_TO_ROOT];
DISKOFFARRAY path_to_parent = {0, path};
r = brt_search_node(brt, node, search, newkey, newval, &split, logger, path_to_parent);
if (split.did_split) { if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node); rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
...@@ -2412,3 +2446,35 @@ int toku_brt_height_of_root(BRT brt, int *height) { ...@@ -2412,3 +2446,35 @@ int toku_brt_height_of_root(BRT brt, int *height) {
r = toku_unpin_brt_header(brt); assert(r==0); r = toku_unpin_brt_header(brt); assert(r==0);
return 0; return 0;
} }
struct callpair {
BRTNODE node;
int childnum;
};
static int note_removal (bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*cpairv) {
struct callpair *cpair = cpairv;
BRTNODE node = cpair->node;
int childnum = cpair->childnum;
u_int32_t old_fingerprint = node->local_fingerprint;
node->local_fingerprint = old_fingerprint = node->rand4fingerprint*toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t countdiff = keylen+datalen+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD;
BNC_NBYTESINBUF(node,childnum) -= countdiff;
node->u.n.n_bytes_in_buffers -= countdiff;
return 0;
}
int toku_brt_nonleaf_expunge_xaction(BRT brt, DISKOFF diskoff, TXNID xid) {
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
int i;
r=0;
for (i=0; i<node->u.n.n_children; i++) {
struct callpair pair = { node, i };
int r3 = toku_fifo_expunge_xaction(BNC_BUFFER(node, i), xid, note_removal, &pair);
if (r==0) r=r3;
}
int r2 = toku_cachetable_unpin(brt->cf, diskoff, 1, toku_serialize_brtnode_size(node));
return r ? r : r2;
}
...@@ -66,4 +66,7 @@ int toku_brt_get_fd(BRT, int *); ...@@ -66,4 +66,7 @@ int toku_brt_get_fd(BRT, int *);
int toku_brt_height_of_root(BRT, int *height); // for an open brt, return the current height. int toku_brt_height_of_root(BRT, int *height); // for an open brt, return the current height.
// Special hack for recovery
int toku_brt_nonleaf_expunge_xaction(BRT brt, DISKOFF diskoff, TXNID xid);
#endif #endif
...@@ -27,6 +27,11 @@ typedef struct { ...@@ -27,6 +27,11 @@ typedef struct {
char *data; char *data;
} BYTESTRING; } BYTESTRING;
typedef struct {
int len;
DISKOFF *array;
} DISKOFFARRAY;
/* Make the LSN be a struct instead of an integer so that we get better type checking. */ /* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN; typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0}) #define ZERO_LSN ((LSN){0})
......
/* Test the expunge method. */
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include "fifo.h"
#include "memory.h"
int count;
int callback (bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void *v) {
TXNID which=(long)v;
assert(xid==which);
int actual_row = count;
assert(strlen(key)+1==keylen);
assert(strlen(data)+1==datalen);
//printf("count=%d which=%ld deleting %s %s\n", count, (long)which, (char*)key, (char*)data);
switch (which) {
case 23: break;
case 24: actual_row++; break;
case 26: actual_row+=3;
}
switch (actual_row) {
case 0: assert(strcmp(key, "hello")==0); assert(strcmp(data, "thera")==0); assert(xid==23); assert(type==0); break;
case 1: assert(strcmp(key, "hello")==0); assert(strcmp(data, "therb")==0); assert(xid==24); assert(type==0); break;
case 2: assert(strcmp(key, "hell1")==0); assert(strcmp(data, "therc")==0); assert(xid==24); assert(type==1); break;
case 3: assert(strcmp(key, "hell1")==0); assert(strcmp(data, "therd")==0); assert(xid==26); assert(type==1); break;
default: assert(0);
}
count++;
return 0;
}
void doit (int which) {
int r;
FIFO f;
r = toku_fifo_create(&f); assert(r==0);
r = toku_fifo_enq(f, "hello", 6, "thera", 6, 0, 23); assert(r==0);
r = toku_fifo_enq(f, "hello", 6, "therb", 6, 0, 24); assert(r==0);
r = toku_fifo_enq(f, "hell1", 6, "therc", 6, 1, 24); assert(r==0);
r = toku_fifo_enq(f, "hell1", 6, "therd", 6, 1, 26); assert(r==0);
int i=0;
FIFO_ITERATE(f, k, kl, d, dl, t, x,
({
assert(strlen(k)+1==kl);
assert(strlen(d)+1==dl);
switch(i) {
case 0: assert(strcmp(k, "hello")==0); assert(strcmp(d, "thera")==0); assert(x==23); assert(t==0); i++; break;
case 1: assert(strcmp(k, "hello")==0); assert(strcmp(d, "therb")==0); assert(x==24); assert(t==0); i++; break;
case 2: assert(strcmp(k, "hell1")==0); assert(strcmp(d, "therc")==0); assert(x==24); assert(t==1); i++; break;
case 3: assert(strcmp(k, "hell1")==0); assert(strcmp(d, "therd")==0); assert(x==26); assert(t==1); i++; break;
default: assert(0);
}
}));
count=0;
r = toku_fifo_expunge_xaction(f, which, callback, (void*)(long)which);
switch (which) {
case 23: assert(count==1); break;
case 24: assert(count==2); break;
case 26: assert(count==1); break;
}
toku_fifo_free(&f);
toku_malloc_cleanup();
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
doit(23);
doit(24);
doit(26);
doit(27);
return 0;
}
...@@ -140,7 +140,20 @@ void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec d ...@@ -140,7 +140,20 @@ void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec d
f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, entry->xid, arg); f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, entry->xid, arg);
} }
int toku_fifo_expunge_xaction(FIFO fifo, TXNID xid, int (*callback_on_delete)(bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*), void*arg) {
struct fifo_entry **prev=&fifo->head;
struct fifo_entry *entry;
while ((entry=*prev)) {
if (entry->xid==xid) {
// Must remove it.
int r = callback_on_delete(entry->key, entry->keylen, entry->key+entry->keylen, entry->vallen, entry->type, entry->xid, arg);
fifo->n--;
*prev=entry->next;
toku_free_n(entry, fifo_entry_size(entry));
if (r!=0) return r;
} else {
prev = &entry->next;
}
}
return 0;
}
...@@ -44,4 +44,6 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I ...@@ -44,4 +44,6 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I
} \ } \
}) })
int toku_fifo_expunge_xaction(FIFO fifo, TXNID xid, int (*callback_on_delete)(bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*), void*arg);
#endif #endif
...@@ -112,6 +112,9 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) { ...@@ -112,6 +112,9 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
static inline void toku_free_BYTESTRING(BYTESTRING val) { static inline void toku_free_BYTESTRING(BYTESTRING val) {
toku_free(val.data); toku_free(val.data);
} }
static inline void toku_free_DISKOFFARRAY(DISKOFFARRAY val) {
toku_free(val.array);
}
static inline int toku_copy_LOGGEDBRTHEADER(LOGGEDBRTHEADER *target, LOGGEDBRTHEADER val) { static inline int toku_copy_LOGGEDBRTHEADER(LOGGEDBRTHEADER *target, LOGGEDBRTHEADER val) {
*target = val; *target = val;
......
...@@ -49,6 +49,10 @@ const struct logtype rollbacks[] = { ...@@ -49,6 +49,10 @@ const struct logtype rollbacks[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"xactiontouchednonleaf", 'n', FA{{"FILENUM", "filenum", 0},
{"DISKOFFARRAY", "parents", 0},
{"DISKOFF", "diskoff", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}} {0,0,FA{NULLFIELD}}
}; };
......
...@@ -62,3 +62,13 @@ int toku_rollback_deleteatleaf (FILENUM filenum, BYTESTRING key, BYTESTRING data ...@@ -62,3 +62,13 @@ int toku_rollback_deleteatleaf (FILENUM filenum, BYTESTRING key, BYTESTRING data
0); // Do the insertion unconditionally 0); // Do the insertion unconditionally
return r; return r;
} }
int toku_rollback_xactiontouchednonleaf(FILENUM filenum, DISKOFFARRAY array __attribute__((__unused__)), DISKOFF diskoff, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
r = toku_brt_nonleaf_expunge_xaction(brt, diskoff, txn->txnid64);
assert(r==0);
return 0;
}
...@@ -77,7 +77,7 @@ void do_test_abort2 (void) { ...@@ -77,7 +77,7 @@ void do_test_abort2 (void) {
insert(7, 1); insert(7, 1);
r=txn->abort(txn); CKERR(r); r=txn->abort(txn); CKERR(r);
// Don't do a query on "hello7", because that will force things out of the buffer. // Don't do a lookup on "hello7", because that will force things out of the buffer.
r=env->txn_begin(env, 0, &txn, 0); assert(r==0); r=env->txn_begin(env, 0, &txn, 0); assert(r==0);
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment