Commit 0b0470b9 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the simplified logging back to the main line, and get rid of the...

Merge the simplified logging back to the main line, and get rid of the tokudb.1493a branch. Closes #1493.

git-svn-id: file:///svn/toku/tokudb@9719 c7de825b-a66e-492c-adef-691d508d4ae1
parent ddda4cae
......@@ -53,7 +53,7 @@ $(DBCXX):
cd ..;make
clean:
rm -rf $(TARGETS) *.gcno *.gcda *.gcov *.db dir.test.db.assoc3 test_reverse_compare_fun.cpp.dir
rm -rf $(TARGETS) *.gcno *.gcda *.gcov *.db dir.test.db.assoc3 test_reverse_compare_fun.cpp.dir *.tdb.clean
check_test1: test1
$(VGRIND) ./$< $(SUMMARIZE_CMD)
......
......@@ -241,7 +241,7 @@ toku_block_get_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF
}
int
toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty, TOKULOGGER UU(logger)) {
toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty) {
lock_for_blocktable();
BLOCKNUM result;
if (bt->free_blocks.b == diskoff_is_null) {
......@@ -265,7 +265,7 @@ toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty, TOKULOG
int
toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty, TOKULOGGER UU(logger))
toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty)
// Effect: Free a diskblock
// Watch out for the case where the disk block was never yet written to disk
{
......
......@@ -20,8 +20,8 @@ void toku_block_free(BLOCK_TABLE bt, u_int64_t offset);
DISKOFF toku_block_get_offset(BLOCK_TABLE bt, BLOCKNUM b);
DISKOFF toku_block_get_size(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_get_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size);
int toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty, TOKULOGGER logger);
int toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty, TOKULOGGER logger);
int toku_allocate_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *res, int *dirty);
int toku_free_diskblocknumber(BLOCK_TABLE bt, BLOCKNUM *b, int *dirty);
void toku_verify_diskblocknumber_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocks(BLOCK_TABLE bt);
u_int64_t toku_block_allocator_allocated_limit(BLOCK_TABLE bt);
......
......@@ -236,7 +236,7 @@ struct brt_cursor {
};
// logs the memory allocation, but not the creation of the new node
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger);
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height);
int toku_unpin_brtnode (BRT brt, BRTNODE node);
unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t);
......
......@@ -4,7 +4,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node;
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r!=0) return r;
toku_create_new_brtnode(brt, &node, 0, (TOKULOGGER)0);
toku_create_new_brtnode(brt, &node, 0);
*blocknum = node->thisnodename;
r = toku_unpin_brtnode(brt, node);
......@@ -18,7 +18,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child
assert(n_children<=BRT_FANOUT);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r!=0) return r;
toku_create_new_brtnode(brt, &node, height, (TOKULOGGER)0);
toku_create_new_brtnode(brt, &node, height);
node->u.n.n_children=n_children;
MALLOC_N(n_children+1, node->u.n.childinfos);
MALLOC_N(n_children, node->u.n.childkeys);
......
......@@ -213,10 +213,10 @@ nonleaf_node_is_gorged (BRTNODE node) {
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io);
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, enum reactivity *re, BOOL *did_io);
static int
flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io);
flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, BOOL *did_io);
int toku_brt_debug_mode = 0;
......@@ -255,7 +255,7 @@ static u_int32_t compute_child_fullhash (CACHEFILE cf, BRTNODE node, int childnu
}
static void
fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT UU(brt), TOKULOGGER UU(logger))
fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child)
// Effect: Sum the child fingerprint (and leafentry estimates) and store them in NODE.
// Parameters:
// node The node to modify
......@@ -494,21 +494,6 @@ brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck)
return cmp;
}
static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int childnum, TXNID xid, int type, const char *key, int keylen, const char *data, int datalen, u_int32_t *fingerprint) {
BYTESTRING keybs = {.len=keylen, .data=(char*)key};
BYTESTRING databs = {.len=datalen, .data=(char*)data};
u_int32_t old_fingerprint = *fingerprint;
u_int32_t fdiff=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_fingerprint = old_fingerprint + fdiff;
//printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid);
*fingerprint = new_fingerprint;
if (t->txnid_that_created_or_locked_when_empty != xid) {
int r = toku_log_brtenq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs);
if (r!=0) return r;
}
return 0;
}
static int
verify_in_mempool (OMTVALUE lev, u_int32_t UU(idx), void *vmp)
{
......@@ -666,11 +651,10 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
TAGMALLOC(BRTNODE, newroot);
int r;
int new_height = nodea->height+1;
int new_nodesize = brt->h->nodesize;
BLOCKNUM newroot_diskoff;
r = toku_allocate_diskblocknumber(brt->h->blocktable,
&newroot_diskoff,
&brt->h->dirty, logger);
&brt->h->dirty);
assert(r==0);
assert(newroot);
newroot->ever_been_written = 0;
......@@ -706,20 +690,8 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
BNC_SUBTREE_LEAFENTRY_ESTIMATE(newroot, 1)=0;
verify_local_fingerprint_nonleaf(nodea);
verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (unsigned char)((brt->flags&TOKU_DB_DUPSORT)!=0), newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, logger);
fixup_child_fingerprint(newroot, 1, nodeb, brt, logger);
{
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, &newroot->log_lsn, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
}
fixup_child_fingerprint(newroot, 0, nodea);
fixup_child_fingerprint(newroot, 1, nodeb);
r = toku_unpin_brtnode(brt, nodea);
if (r!=0) return r;
r = toku_unpin_brtnode(brt, nodeb);
......@@ -734,11 +706,11 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
}
// logs the memory allocation, but not the creation of the new node
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height) {
TAGMALLOC(BRTNODE, n);
int r;
BLOCKNUM name;
r = toku_allocate_diskblocknumber(t->h->blocktable, &name, &t->h->dirty, logger);
r = toku_allocate_diskblocknumber(t->h->blocktable, &name, &t->h->dirty);
assert(r==0);
assert(n);
assert(t->h->nodesize>0);
......@@ -767,7 +739,7 @@ fill_buf (OMTVALUE lev, u_int32_t idx, void *varray)
}
static int
brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
// Effect: Split a leaf node.
{
BRTNODE B;
......@@ -777,7 +749,7 @@ brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE
assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, 0, logger);
toku_create_new_brtnode(t, &B, 0);
assert(B->nodesize>0);
assert(node->nodesize>0);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
......@@ -845,13 +817,6 @@ brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE
toku_omt_destroy(&old_omt);
}
LSN lsn={0};
r = toku_log_leafsplit(logger, &lsn, 0, filenum, node->thisnodename, B->thisnodename, n_leafentries, break_at, node->nodesize, B->rand4fingerprint, (u_int8_t)((t->flags&TOKU_DB_DUPSORT)!=0));
if (logger) {
node->log_lsn = lsn;
B->log_lsn = lsn;
}
//toku_verify_gpma(node->u.l.buffer);
//toku_verify_gpma(B->u.l.buffer);
if (splitk) {
......@@ -889,7 +854,7 @@ brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE
}
static int
brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger)
brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
// Effect: node must be a node-leaf node. It is split into two nodes, and the fanout is split between them.
// Sets splitk->data pointer to a malloc'd value
// Sets nodea, and nodeb to the two new nodes.
......@@ -899,11 +864,10 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, node->height, logger);
toku_create_new_brtnode(t, &B, node->height);
MALLOC_N(n_children_in_b+1, B->u.n.childinfos);
MALLOC_N(n_children_in_b, B->u.n.childkeys);
B->u.n.n_children =n_children_in_b;
......@@ -942,10 +906,6 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
BNC_HAVE_FULLHASH(B,targchild) = BNC_HAVE_FULLHASH(node,i);
BNC_FULLHASH(B,targchild) = BNC_FULLHASH(node, i);
int r = toku_log_addchild(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
......@@ -957,13 +917,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t delta = toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
if (r!=0) return r;
if (t->txnid_that_created_or_locked_when_empty != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, fnum, node->thisnodename, n_children_in_a);
if (r!=0) return r;
}
r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
B->local_fingerprint += B->rand4fingerprint*delta;
int r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
......@@ -979,14 +934,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, (LSN*)0, 0, fnum, node->thisnodename, n_children_in_a, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
......@@ -1044,8 +993,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
static int
handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
DBT *splitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
TOKULOGGER logger)
DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
)
{
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
......@@ -1077,7 +1026,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
node->u.n.n_children++;
assert(BNC_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
......@@ -1087,8 +1035,8 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
// BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT (node, childnum+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, childnum+1)=0;
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
fixup_child_fingerprint(node, childnum, childa);
fixup_child_fingerprint(node, childnum+1, childb);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
......@@ -1102,10 +1050,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
// Slide the keys over
{
struct kv_pair *pivot = splitk->data;
BYTESTRING bs = { .len = splitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
......@@ -1148,7 +1092,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
}
static int
brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did_react)
brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
{
if (0) {
printf("%s:%d Node %" PRId64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
......@@ -1162,7 +1106,7 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did
// I don't think this can happen, but it's easy to handle. Flush the child, and if no longer fissible, then return.
enum reactivity re = RE_STABLE;
BOOL did_io = FALSE;
int r = flush_this_child(t, node, childnum, logger, &re, &did_io);
int r = flush_this_child(t, node, childnum, &re, &did_io);
if (r != 0) return r;
if (re != RE_FISSIBLE) return 0;
}
......@@ -1188,18 +1132,18 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did
DBT splitk;
// printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height);
if (child->height==0) {
int r = brtleaf_split(logger, toku_cachefile_filenum(t->cf), t, child, &nodea, &nodeb, &splitk);
int r = brtleaf_split(t, child, &nodea, &nodeb, &splitk);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
} else {
int r = brt_nonleaf_split(t, child, &nodea, &nodeb, &splitk, logger);
int r = brt_nonleaf_split(t, child, &nodea, &nodeb, &splitk);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
}
// printf("%s:%d child did split\n", __FILE__, __LINE__);
*did_react = TRUE;
{
int r = handle_split_of_child (t, node, childnum, nodea, nodeb, &splitk, logger);
int r = handle_split_of_child (t, node, childnum, nodea, nodeb, &splitk);
if (0) {
printf("%s:%d Node %" PRId64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
......@@ -1473,13 +1417,12 @@ apply_cmd_to_leaf (BRT_CMD cmd,
}
static int
brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
brt_leaf_apply_cmd_once (BRTNODE node, BRT_CMD cmd,
u_int32_t idx, LEAFENTRY le)
// Effect: Apply cmd to leafentry
// idx is the location where it goes
// le is old leafentry
{
FILENUM filenum = toku_cachefile_filenum(t->cf);
u_int32_t newlen=0, newdisksize=0;
LEAFENTRY new_le=0;
void *maybe_free = 0;
......@@ -1499,10 +1442,6 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
//printf(" got "); print_leafentry(stdout, new_le); printf("\n");
if (le && new_le) {
if (t->txnid_that_created_or_locked_when_empty != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) goto return_r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, new_le))) goto return_r;
}
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
......@@ -1522,10 +1461,6 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
if (le) {
// It's there, note that it's gone and remove it from the mempool
if (t->txnid_that_created_or_locked_when_empty != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) goto return_r;
}
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto return_r;
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
......@@ -1537,10 +1472,6 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
if (new_le) {
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) goto return_r;
if (t->txnid_that_created_or_locked_when_empty != cmd->xid) {
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, new_le))) goto return_r;
}
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
}
......@@ -1555,7 +1486,7 @@ brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
}
static int
brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
enum reactivity *re /*OUT*/
)
// Effect: Put a cmd into a leaf.
......@@ -1601,7 +1532,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
storeddata=storeddatav;
}
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
r = brt_leaf_apply_cmd_once(node, cmd, idx, storeddata);
if (r!=0) return r;
// if the insertion point is within a window of the right edge of
......@@ -1635,7 +1566,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
VERIFY_NODE(t, node);
//static int count=0; count++;
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
r = brt_leaf_apply_cmd_once(node, cmd, idx, storeddata);
if (r!=0) return r;
VERIFY_NODE(t, node);
......@@ -1656,7 +1587,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
int vallen = le_any_vallen(storeddata);
void *save_val = toku_memdup(le_any_val(storeddata), vallen);
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
r = brt_leaf_apply_cmd_once(node, cmd, idx, storeddata);
if (r!=0) return r;
// Now we must find the next one.
......@@ -1692,7 +1623,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
return 0;
}
static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int childnum, BRT_CMD cmd, TOKULOGGER logger,
static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int childnum, BRT_CMD cmd,
enum reactivity re_array[], BOOL *did_io)
{
......@@ -1713,8 +1644,8 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
verify_local_fingerprint_nonleaf(child);
r = brtnode_put_cmd (t, child, cmd, logger, &re_array[childnum], did_io);
fixup_child_fingerprint(node, childnum, child, t, logger);
r = brtnode_put_cmd (t, child, cmd, &re_array[childnum], did_io);
fixup_child_fingerprint(node, childnum, child);
VERIFY_NODE(t, node);
verify_local_fingerprint_nonleaf(child);
......@@ -1734,10 +1665,9 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int r = log_and_save_brtenq(logger, t, node, childnum, cmd->xid, type, k->data, k->size, v->data, v->size, &node->local_fingerprint);
if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, cmd->xid, k->data, k->size, v->data, v->size);
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
int r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0);
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
......@@ -1773,7 +1703,7 @@ unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
#endif
}
static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd,
enum reactivity re_array[], BOOL *did_io)
// Effect: Insert a message into a nonleaf. We may put it into a child, possibly causing the child to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
......@@ -1786,7 +1716,7 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo
/* find the right subtree */
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t);
int r = brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io);
int r = brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, re_array, did_io);
verify_local_fingerprint_nonleaf(node);
......@@ -1794,7 +1724,7 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo
}
static int
brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd,
enum reactivity re_array[], BOOL *did_io)
// Effect: Put the cmd into a nonleaf node. We may put it into several children, possibly causing the children to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
......@@ -1832,7 +1762,7 @@ brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
/* Append the cmd to the appropriate child buffer. */
int childnum = sendchild[i];
r = brt_nonleaf_cmd_once_to_child(t, node, childnum, cmd, logger, re_array, did_io);
r = brt_nonleaf_cmd_once_to_child(t, node, childnum, cmd, re_array, did_io);
if (r!=0) goto return_r;
}
r=0;
......@@ -1841,7 +1771,7 @@ brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
}
static int
brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
enum reactivity re_array[], BOOL *did_io)
// Effect: Put the cmd into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
......@@ -1857,12 +1787,12 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
do_once:
return brt_nonleaf_cmd_once(t, node, cmd, logger, re_array, did_io);
return brt_nonleaf_cmd_once(t, node, cmd, re_array, did_io);
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
if (0 == (node->flags & TOKU_DB_DUPSORT)) goto do_once; // nondupsort delete_any is just do once.
return brt_nonleaf_cmd_many(t, node, cmd, logger, re_array, did_io);
return brt_nonleaf_cmd_many(t, node, cmd, re_array, did_io);
case BRT_NONE:
break;
}
......@@ -1959,13 +1889,12 @@ balance_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair **splitk)
static int
maybe_merge_pinned_leaf_nodes (BRT t, BRTNODE a, BRTNODE b, struct kv_pair *parent_splitk, TOKULOGGER logger, BOOL *did_merge, struct kv_pair **splitk)
maybe_merge_pinned_leaf_nodes (BRTNODE a, BRTNODE b, struct kv_pair *parent_splitk, BOOL *did_merge, struct kv_pair **splitk)
// Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = TRUE. (We do this if the resulting node is not fissible)
// or distribute the leafentries evenly between a and b. (If a and be are already evenly distributed, we may do nothing.)
{
unsigned int sizea = toku_serialize_brtnode_size(a);
unsigned int sizeb = toku_serialize_brtnode_size(b);
t=t; logger=logger;
if ((sizea + sizeb)*4 > (a->nodesize*3)) {
// the combined size is more than 3/4 of a node, so don't merge them.
*did_merge = FALSE;
......@@ -1990,7 +1919,6 @@ static int
maybe_merge_pinned_nonleaf_nodes (BRT t,
BRTNODE parent, int childnum_of_parent, struct kv_pair *parent_splitk,
BRTNODE a, BRTNODE b,
TOKULOGGER logger,
BOOL *did_merge,
struct kv_pair **splitk)
{
......@@ -2032,7 +1960,7 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
a->dirty = 1;
b->dirty = 1;
fixup_child_fingerprint(parent, childnum_of_parent, a, t, logger);
fixup_child_fingerprint(parent, childnum_of_parent, a);
// abort(); // don't forget to reuse blocknums
*did_merge = TRUE;
*splitk = NULL;
......@@ -2043,7 +1971,7 @@ maybe_merge_pinned_nonleaf_nodes (BRT t,
static int
maybe_merge_pinned_nodes (BRT t,
BRTNODE parent, int childnum_of_parent, struct kv_pair *parent_splitk,
BRTNODE a, BRTNODE b, TOKULOGGER logger, BOOL *did_merge, struct kv_pair **splitk)
BRTNODE a, BRTNODE b, BOOL *did_merge, struct kv_pair **splitk)
// Effect: either merge a and b into one node (merge them into a) and set *did_merge = TRUE. (We do this if the resulting node is not fissible)
// or distribute a and b evenly and set *did_merge = FALSE (If a and be are already evenly distributed, we may do nothing.)
// If we distribute:
......@@ -2065,16 +1993,16 @@ maybe_merge_pinned_nodes (BRT t,
verify_local_fingerprint_nonleaf(a);
parent->dirty = 1; // just to make sure
if (a->height == 0) {
return maybe_merge_pinned_leaf_nodes(t, a, b, parent_splitk, logger, did_merge, splitk);
return maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, splitk);
} else {
int r = maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk);
int r = maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, did_merge, splitk);
verify_local_fingerprint_nonleaf(a);
return r;
}
}
static int
brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKULOGGER logger, BOOL *did_react)
brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, BOOL *did_react)
{
verify_local_fingerprint_nonleaf(node);
if (node->u.n.n_children < 2) return 0; // if no siblings, we are merged as best we can.
......@@ -2095,12 +2023,12 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
if (toku_fifo_n_entries(BNC_BUFFER(node,childnuma))>0) {
enum reactivity re = RE_STABLE;
int r = flush_this_child(t, node, childnuma, logger, &re, did_io);
int r = flush_this_child(t, node, childnuma, &re, did_io);
if (r!=0) return r;
}
if (toku_fifo_n_entries(BNC_BUFFER(node,childnumb))>0) {
enum reactivity re = RE_STABLE;
int r = flush_this_child(t, node, childnumb, logger, &re, did_io);
int r = flush_this_child(t, node, childnumb, &re, did_io);
if (r!=0) return r;
}
......@@ -2137,7 +2065,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
struct kv_pair *old_split_key = node->u.n.childkeys[childnuma];
unsigned int deleted_size = toku_brt_pivot_key_len(t, old_split_key);
verify_local_fingerprint_nonleaf(childa);
r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, logger, &did_merge, &splitk_kvpair);
r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, &did_merge, &splitk_kvpair);
verify_local_fingerprint_nonleaf(childa);
if (childa->height>0) { int i; for (i=0; i+1<childa->u.n.n_children; i++) assert(childa->u.n.childkeys[i]); }
//(toku_verify_counts(childa), toku_verify_estimates(t,childa));
......@@ -2160,7 +2088,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
&node->u.n.childkeys[childnuma+1],
(node->u.n.n_children-childnumb)*sizeof(node->u.n.childkeys[0]));
REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys);
fixup_child_fingerprint(node, childnuma, childa, t, logger);
fixup_child_fingerprint(node, childnuma, childa);
assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childa);
......@@ -2186,7 +2114,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
BLOCKNUM bn = childb->thisnodename;
rrb = toku_cachetable_unpin_and_remove(t->cf, bn);
rrb1 = toku_free_diskblocknumber(t->h->blocktable, &bn,
&t->h->dirty, logger);
&t->h->dirty);
} else {
rrb = toku_unpin_brtnode(t, childb);
}
......@@ -2200,15 +2128,15 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL
}
static int
brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivity re, BOOL *did_io, TOKULOGGER logger, BOOL *did_react) {
brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivity re, BOOL *did_io, BOOL *did_react) {
switch (re) {
case RE_STABLE:
*did_react = FALSE;
return 0;
case RE_FISSIBLE:
return brt_split_child(t, node, childnum, logger, did_react);
return brt_split_child(t, node, childnum, did_react);
case RE_FUSIBLE:
return brt_merge_child(t, node, childnum, did_io, logger, did_react);
return brt_merge_child(t, node, childnum, did_io, did_react);
}
abort(); return 0; // this cannot happen
}
......@@ -2225,10 +2153,10 @@ brt_handle_maybe_reactive_child_at_root (BRT brt, CACHEKEY *rootp, BRTNODE *node
BRTNODE nodea,nodeb;
DBT splitk;
if (node->height==0) {
int r = brtleaf_split(logger, toku_cachefile_filenum(brt->cf), brt, node, &nodea, &nodeb, &splitk);
int r = brtleaf_split(brt, node, &nodea, &nodeb, &splitk);
if (r!=0) return r;
} else {
int r = brt_nonleaf_split(brt, node, &nodea, &nodeb, &splitk, logger);
int r = brt_nonleaf_split(brt, node, &nodea, &nodeb, &splitk);
if (r!=0) return r;
}
return brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
......@@ -2260,7 +2188,7 @@ static void find_heaviest_child (BRTNODE node, int *childnum) {
}
static int
flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum reactivity *child_re, BOOL *did_io)
flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, BOOL *did_io)
// Effect: Push everything in the CHILDNUMth buffer of node down into the child.
// The child could end up reactive, and this function doesn't fix that.
{
......@@ -2299,7 +2227,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
r = brtnode_put_cmd (t, child, &brtcmd, logger, child_re, did_io);
r = brtnode_put_cmd (t, child, &brtcmd, child_re, did_io);
//printf("%s:%d %d=push_a_brt_cmd_down=(); child_did_split=%d (weight=%d)\n", __FILE__, __LINE__, r, child_did_split, BNC_NBYTESINBUF(node, childnum));
if (r!=0) goto return_r;
......@@ -2318,7 +2246,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
}
verify_local_fingerprint_nonleaf(node);
return_r:
fixup_child_fingerprint(node, childnum, child, t, logger);
fixup_child_fingerprint(node, childnum, child);
{
int rr=toku_unpin_brtnode(t, child);
if (rr!=0) return rr;
......@@ -2328,18 +2256,18 @@ flush_this_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, enum rea
}
static int
flush_some_child (BRT t, BRTNODE node, TOKULOGGER logger, enum reactivity re_array[], BOOL *did_io)
flush_some_child (BRT t, BRTNODE node, enum reactivity re_array[], BOOL *did_io)
{
assert(node->height>0);
int childnum;
find_heaviest_child(node, &childnum);
assert(toku_fifo_n_entries(BNC_BUFFER(node, childnum))>0);
return flush_this_child (t, node, childnum, logger, &re_array[childnum], did_io);
return flush_this_child (t, node, childnum, &re_array[childnum], did_io);
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io)
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, enum reactivity *re, BOOL *did_io)
// Effect: Push CMD into the subtree rooted at NODE, and indicate whether as a result NODE should split or should merge.
// If NODE is a leaf, then
// put CMD into leaf, applying it to the leafentries
......@@ -2353,12 +2281,12 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
verify_local_fingerprint_nonleaf(node);
if (node->height==0) {
return brt_leaf_put_cmd(t, node, cmd, logger, re);
return brt_leaf_put_cmd(t, node, cmd, re);
} else {
verify_local_fingerprint_nonleaf(node);
enum reactivity child_re[node->u.n.n_children];
{ int i; for (i=0; i<node->u.n.n_children; i++) child_re[i]=RE_STABLE; }
int r = brt_nonleaf_put_cmd(t, node, cmd, logger, child_re, did_io);
int r = brt_nonleaf_put_cmd(t, node, cmd, child_re, did_io);
if (r!=0) goto return_r;
verify_local_fingerprint_nonleaf(node);
// Now we may have overfilled node. So we'll flush the heaviest child until we are happy.
......@@ -2366,7 +2294,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
&& nonleaf_node_is_gorged(node) // Don't flush if the node is small enough.
&& (node->u.n.n_bytes_in_buffers > 0) // Don't try to flush if everything is flushed.
) {
r = flush_some_child(t, node, logger, child_re, did_io);
r = flush_some_child(t, node, child_re, did_io);
if (r!=0) goto return_r;
}
// Now all those children may need fixing.
......@@ -2376,7 +2304,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
for (i=0; i<original_n_children; i++) {
int childnum = original_n_children - 1 -i;
BOOL did_react; // ignore the result.
r = brt_handle_maybe_reactive_child(t, node, childnum, child_re[childnum], did_io, logger, &did_react);
r = brt_handle_maybe_reactive_child(t, node, childnum, child_re[childnum], did_io, &did_react);
if (r!=0) break;
if (*did_io) break;
}
......@@ -2409,7 +2337,7 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
enum reactivity re = RE_STABLE;
BOOL did_io = FALSE;
{
int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io);
int r = brtnode_put_cmd(brt, node, cmd, &re, &did_io);
verify_local_fingerprint_nonleaf(node);
if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
......@@ -2643,7 +2571,7 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
return r;
}
static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER logger) {
static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
......@@ -2666,7 +2594,6 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER log
return r;
}
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), blocknum, 0, t->h->nodesize, (unsigned char)((t->flags&TOKU_DB_DUPSORT)!=0), node->rand4fingerprint);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
......@@ -2732,7 +2659,7 @@ static int brt_init_header(BRT t, TOKUTXN txn) {
}
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { return r; }
}
if ((r=setup_initial_brt_root_node(t, root, toku_txn_logger(txn)))!=0) { return r; }
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
toku_block_verify_no_free_blocks(t->h->blocktable);
toku_cachefile_set_userdata(t->cf, t->h, toku_brtheader_close, toku_brtheader_checkpoint);
......@@ -2886,11 +2813,11 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
r = toku_allocate_diskblocknumber(t->h->blocktable, &t->h->roots[t->h->n_named_roots-1], &t->h->dirty, toku_txn_logger(txn));
r = toku_allocate_diskblocknumber(t->h->blocktable, &t->h->roots[t->h->n_named_roots-1], &t->h->dirty);
if (r!=0) goto died_after_read_and_pin;
t->h->dirty = 1;
compute_and_fill_remembered_hash(t, t->h->n_named_roots-1);
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died_after_read_and_pin;
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1]))!=0) goto died_after_read_and_pin;
}
} else {
if ((r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h))!=0) goto died_after_open;
......@@ -3297,7 +3224,7 @@ brt_cursor_update(BRT_CURSOR brtcursor) {
// This is a bottom layer of the search functions.
static int
brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, TOKULOGGER logger, BRT_CURSOR brtcursor)
brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
{
// Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain...
......@@ -3347,7 +3274,7 @@ brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADD
DBT key, val;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)),
toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} };
r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le);
r = brt_leaf_apply_cmd_once(node, &brtcmd, idx, le);
assert(r == 0);
}
if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND;
......@@ -3400,7 +3327,7 @@ got_a_good_value:
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, TOKULOGGER logger, BRT_CURSOR brtcursor);
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor);
// the number of nodes to prefetch
#define TOKU_DO_PREFETCH 1
......@@ -3430,7 +3357,7 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
/* search in a node's child */
static int
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *parent_re, BOOL *doprefetch, TOKULOGGER logger, BRT_CURSOR brtcursor, BOOL *did_react)
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *parent_re, BOOL *doprefetch, BRT_CURSOR brtcursor, BOOL *did_react)
// Effect: Search in a node's child.
// If we change the shape, set *did_react = TRUE. Else set *did_react = FALSE.
{
......@@ -3441,7 +3368,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
if (BNC_NBYTESINBUF(node, childnum) > 0) {
BOOL did_io = FALSE;
enum reactivity child_re = RE_STABLE;
int rr = flush_this_child(brt, node, childnum, logger, &child_re, &did_io);
int rr = flush_this_child(brt, node, childnum, &child_re, &did_io);
assert(rr == 0);
/* push down may cause the child to be overfull, but that's OK. We'll search the child anyway, and recompute the ractivity. */
}
......@@ -3458,7 +3385,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
enum reactivity child_re = RE_STABLE;
int r = brt_search_node(brt, childnode, search, getf, getf_v, &child_re, doprefetch, logger, brtcursor);
int r = brt_search_node(brt, childnode, search, getf, getf_v, &child_re, doprefetch, brtcursor);
// Even if r is reactive, we want to handle the maybe reactive child.
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(childnode);
......@@ -3476,7 +3403,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
{
BOOL did_io = FALSE;
int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, logger, did_react);
int rr = brt_handle_maybe_reactive_child(brt, node, childnum, child_re, &did_io, did_react);
if (rr!=0) r = rr; // if we got an error, then return rr. Else we will return the r from brt_search_node().
}
......@@ -3488,7 +3415,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
}
static int
brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, TOKULOGGER logger, BRT_CURSOR brtcursor)
brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
{
int count=0;
again:
......@@ -3514,7 +3441,7 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STR
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
BOOL did_change_shape = FALSE;
verify_local_fingerprint_nonleaf(node);
int r = brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, logger, brtcursor, &did_change_shape);
int r = brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &did_change_shape);
assert(r != EAGAIN);
if (r == 0) return r;
if (did_change_shape) goto again;
......@@ -3524,18 +3451,18 @@ brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STR
/* check the first (left) or last (right) node if nothing has been found */
BOOL did_change_shape; // ignore this
verify_local_fingerprint_nonleaf(node);
return brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, logger, brtcursor, &did_change_shape);
return brt_search_child(brt, node, child[c], search, getf, getf_v, re, doprefetch, brtcursor, &did_change_shape);
}
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, TOKULOGGER logger, BRT_CURSOR brtcursor)
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
{
verify_local_fingerprint_nonleaf(node);
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, getf, getf_v, re, doprefetch, logger, brtcursor);
return brt_search_nonleaf_node(brt, node, search, getf, getf_v, re, doprefetch, brtcursor);
else {
return brt_search_leaf_node(brt, node, search, getf, getf_v, re, doprefetch, logger, brtcursor);
return brt_search_leaf_node(brt, node, search, getf, getf_v, re, doprefetch, brtcursor);
}
}
......@@ -3577,7 +3504,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTI
enum reactivity re = RE_STABLE;
BOOL doprefetch = FALSE;
//static int counter = 0; counter++;
r = brt_search_node(brt, node, search, getf, getf_v, &re, &doprefetch, logger, brtcursor);
r = brt_search_node(brt, node, search, getf, getf_v, &re, &doprefetch, brtcursor);
if (r!=0) goto return_r;
r = brt_handle_maybe_reactive_child_at_root(brt, rootp, &node, re, logger);
......
......@@ -106,13 +106,6 @@ const struct logtype logtypes[] = {
{"FILENUM", "filenum", 0},
{"LOGGEDBRTHEADER", "header", 0},
NULLFIELD}},
{"newbrtnode", 'N', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "height", 0},
{"u_int32_t", "nodesize", 0},
{"u_int8_t", "is_dup_sort", 0},
{"u_int32_t", "rand4fingerprint", "%08x"},
NULLFIELD}},
{"changeunnamedroot", 'u', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "oldroot", 0},
{"BLOCKNUM", "newroot", 0},
......@@ -126,30 +119,6 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "oldunused", 0},
{"BLOCKNUM", "newunused", 0},
NULLFIELD}},
{"addchild", 'c', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"BLOCKNUM", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
NULLFIELD}},
{"delchild", 'r', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"BLOCKNUM", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"setchild", 'i', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0},
{"BLOCKNUM", "oldchild", 0},
{"BLOCKNUM", "newchild", 0},
NULLFIELD}},
{"setpivot", 'k', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"fopen", 'O', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
......@@ -160,32 +129,6 @@ const struct logtype logtypes[] = {
{"cfclose", 'o', FA{{"BYTESTRING", "fname", 0}, // cfclose is logged when a cachefile actually closes ("cfclose" means cache file close)
{"FILENUM", "filenum", 0},
NULLFIELD}},
// Note that brtdeq and brtenq don't name the new size or fingerprint. We can calculate them properly.
{"brtdeq", 'U', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0},
NULLFIELD}},
{"brtenq", 'Q', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "childnum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
// {"insertinleaf", 'I', FA{{"TXNID", "txnid", 0},
// {"FILENUM", "filenum", 0},
// {"BLOCKNUM", "blocknum", 0},
// {"u_int32_t", "pmaidx", 0},
// {"BYTESTRING", "key", 0},
// {"BYTESTRING", "data", 0},
// NULLFIELD}},
// {"replaceleafentry", 'L', FA{{"FILENUM", "filenum", 0},
// {"BLOCKNUM", "blocknum", 0},
// {"u_int32_t", "pmaidx", 0},
// {"LEAFENTRY", "oldleafentry", 0},
// {"LEAFENTRY", "newleafentry", 0},
// NULLFIELD}},
{"enqrootentry", 'a', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
......@@ -194,24 +137,6 @@ const struct logtype logtypes[] = {
NULLFIELD}},
{"deqrootentry", 'A', FA{{"FILENUM", "filenum", 0},
NULLFIELD}},
{"insertleafentry", 'I', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "idx", 0},
{"LEAFENTRY", "newleafentry", 0},
NULLFIELD}},
{"deleteleafentry", 'D', FA{{"FILENUM", "filenum", 0},
{"BLOCKNUM", "blocknum", 0},
{"u_int32_t", "idx", 0},
NULLFIELD}},
{"leafsplit", 's', FA{{"FILENUM", "filenum", 0}, // log the creation of a new node by splitting stuff out of an old node
{"BLOCKNUM", "old_blocknum", 0},
{"BLOCKNUM", "new_blocknum", 0},
{"u_int32_t", "old_n", 0},
{"u_int32_t", "split_at", 0},
{"u_int32_t", "new_nodesize", 0},
{"u_int32_t", "new_rand4", "%08x"},
{"u_int8_t", "is_dupsort", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
};
......
......@@ -170,69 +170,6 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
toku_cachefile_set_userdata(pair->cf, pair->brt->h, toku_brtheader_close, toku_brtheader_checkpoint);
}
static void
toku_recover_newbrtnode (LSN lsn, FILENUM filenum, BLOCKNUM blocknum,u_int32_t height,u_int32_t nodesize,u_int8_t is_dup_sort,u_int32_t rand4fingerprint) {
int r;
struct cf_pair *pair = NULL;
r = find_cachefile(filenum, &pair);
assert(r==0);
TAGMALLOC(BRTNODE, n);
n->nodesize = nodesize;
n->thisnodename = blocknum;
n->log_lsn = n->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = BRT_LAYOUT_VERSION;
n->height = height;
n->rand4fingerprint = rand4fingerprint;
n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
n->local_fingerprint = 0; // nothing there yet
n->dirty = 1;
if (height==0) {
r=toku_omt_create(&n->u.l.buffer);
assert(r==0);
n->u.l.n_bytes_in_buffer=0;
{
u_int32_t mpsize = n->nodesize + n->nodesize/4;
void *mp = toku_malloc(mpsize);
assert(mp);
toku_mempool_init(&n->u.l.buffer_mempool, mp, mpsize);
}
} else {
n->u.n.n_children = 0;
n->u.n.totalchildkeylens = 0;
n->u.n.n_bytes_in_buffers = 0;
MALLOC_N(3,n->u.n.childinfos);
MALLOC_N(2,n->u.n.childkeys);
}
// Now put it in the cachetable
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
n->fullhash = fullhash;
toku_cachetable_put(pair->cf, blocknum, fullhash, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt->h);
VERIFY_COUNTS(n);
n->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(n));
assert(r==0);
}
static void recover_setup_node (FILENUM filenum, BLOCKNUM blocknum, CACHEFILE *cf, BRTNODE *resultnode) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
assert(pair->brt);
void *node_v;
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(fullhash==node->fullhash);
*resultnode = node;
*cf = pair->cf;
}
static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
......@@ -276,187 +213,6 @@ toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum,
toku_free(val.data);
}
static void
toku_recover_brtdeq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum) {
CACHEFILE cf;
BRTNODE node;
int r;
recover_setup_node(filenum, blocknum, &cf, &node);
assert(node->height>0);
//printf("deq: %lld expected_old_fingerprint=%08x actual=%08x new=%08x\n", diskoff, oldfingerprint, node->local_fingerprint, newfingerprint);
bytevec actual_key=0, actual_data=0;
ITEMLEN actual_keylen=0, actual_datalen=0;
u_int32_t actual_type=0;
TXNID actual_xid=0;
assert(childnum<(u_int32_t)node->u.n.n_children);
r = toku_fifo_peek(BNC_BUFFER(node, childnum), &actual_key, &actual_keylen, &actual_data, &actual_datalen, &actual_type, &actual_xid);
assert(r==0);
u_int32_t sizediff = actual_keylen + actual_datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
node->local_fingerprint -= node->rand4fingerprint * toku_calc_fingerprint_cmd(actual_type, actual_xid, actual_key, actual_keylen, actual_data, actual_datalen);
node->log_lsn = lsn;
node->u.n.n_bytes_in_buffers -= sizediff;
BNC_NBYTESINBUF(node, childnum) -= sizediff;
r = toku_fifo_deq(BNC_BUFFER(node, childnum)); // don't deq till were' done looking at the data.
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void
toku_recover_brtenq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING data) {
CACHEFILE cf;
BRTNODE node;
int r;
recover_setup_node(filenum, blocknum, &cf, &node);
assert(node->height>0);
//printf("enq: %lld expected_old_fingerprint=%08x actual=%08x new=%08x\n", blocknum, oldfingerprint, node->local_fingerprint, newfingerprint);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key.data, key.len, data.data, data.len, typ, xid);
assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(typ, xid, key.data, key.len, data.data, data.len);
node->log_lsn = lsn;
u_int32_t sizediff = key.len + data.len + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff;
toku_free(key.data);
toku_free(data.data);
}
static void
toku_recover_addchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, BLOCKNUM child, u_int32_t childfingerprint) {
CACHEFILE cf;
BRTNODE node;
recover_setup_node(filenum, blocknum, &cf, &node);
assert(node->height>0);
assert(childnum <= (unsigned)node->u.n.n_children);
unsigned int i;
REALLOC_N(node->u.n.n_children+1, node->u.n.childinfos);
REALLOC_N(node->u.n.n_children, node->u.n.childkeys);
for (i=node->u.n.n_children; i>childnum; i--) {
node->u.n.childinfos[i]=node->u.n.childinfos[i-1];
BNC_NBYTESINBUF(node,i) = BNC_NBYTESINBUF(node,i-1);
assert(i>=2);
node->u.n.childkeys [i-1] = node->u.n.childkeys [i-2];
}
if (childnum>0) {
node->u.n.childkeys [childnum-1] = 0;
}
BNC_BLOCKNUM(node, childnum) = child;
BNC_SUBTREE_FINGERPRINT(node, childnum) = childfingerprint;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, childnum) = 0;
int r= toku_fifo_create(&BNC_BUFFER(node, childnum)); assert(r==0);
BNC_NBYTESINBUF(node, childnum) = 0;
node->u.n.n_children++;
node->log_lsn = lsn;
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void
toku_recover_delchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, BLOCKNUM child, u_int32_t childfingerprint, BYTESTRING pivotkey) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
assert(node->fullhash==fullhash);
assert(childnum < (unsigned)node->u.n.n_children);
assert(node->u.n.childinfos[childnum].subtree_fingerprint == childfingerprint);
assert(BNC_BLOCKNUM(node, childnum).b==child.b);
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0);
assert(BNC_NBYTESINBUF(node,childnum)==0);
assert(node->u.n.n_children>2); // Must be at least two children.
u_int32_t i;
assert(childnum>0);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum-1]);
toku_free((void*)node->u.n.childkeys[childnum-1]);
toku_fifo_free(&BNC_BUFFER(node,childnum));
for (i=childnum+1; i<(unsigned)node->u.n.n_children; i++) {
node->u.n.childinfos[i-1] = node->u.n.childinfos[i];
BNC_NBYTESINBUF(node,i-1) = BNC_NBYTESINBUF(node,i);
node->u.n.childkeys[i-2] = node->u.n.childkeys[i-1];
}
node->u.n.n_children--;
node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(pivotkey.data);
}
static void
toku_recover_setchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, BLOCKNUM UU(oldchild), BLOCKNUM newchild) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->fullhash == fullhash);
assert(node->height>0);
assert(childnum < (unsigned)node->u.n.n_children);
BNC_BLOCKNUM(node, childnum) = newchild;
node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void
toku_recover_setpivot (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, BYTESTRING pivotkey) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->fullhash==fullhash);
assert(node->height>0);
struct kv_pair *new_pivot = kv_pair_malloc(pivotkey.data, pivotkey.len, 0, 0);
node->u.n.childkeys[childnum] = new_pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum]);
node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(pivotkey.data);
}
#if 0
static void
toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t childnum, u_int32_t UU(oldfingerprint), u_int32_t newfingerprint) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->fullhash == fullhash);
assert(node->height>0);
assert((signed)childnum <= node->u.n.n_children); // we allow the childnum to be one too large.
BNC_SUBTREE_FINGERPRINT(node, childnum) = newfingerprint;
node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
}
#endif
static void
toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM filenum) {
char *fixedfname = fixup_fname(&fname);
......@@ -507,165 +263,6 @@ toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
toku_free_BYTESTRING(fname);
}
static int fill_buf (OMTVALUE lev, u_int32_t idx, void *varray) {
LEAFENTRY le=lev;
LEAFENTRY *array=varray;
array[idx]=le;
return 0;
}
// The memory for the new node should have already been allocated.
static void
toku_recover_leafsplit (LSN lsn, FILENUM filenum, BLOCKNUM old_blocknum, BLOCKNUM new_blocknum, u_int32_t old_n, u_int32_t new_n, u_int32_t new_node_size, u_int32_t new_rand4, u_int8_t is_dup_sort) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
void *nodeA_v;
assert(pair->brt);
u_int32_t oldn_fullhash = toku_cachetable_hash(pair->cf, old_blocknum);
r = toku_cachetable_get_and_pin(pair->cf, old_blocknum, oldn_fullhash, &nodeA_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE oldn = nodeA_v;
assert(oldn->fullhash==oldn_fullhash);
assert(oldn->height==0);
TAGMALLOC(BRTNODE, newn);
assert(newn);
//printf("%s:%d leafsplit %p (%lld) %p (%lld)\n", __FILE__, __LINE__, oldn, old_blocknum, newn, new_blocknum);
newn->fullhash = toku_cachetable_hash(pair->cf, new_blocknum);
newn->nodesize = new_node_size;
newn->thisnodename = new_blocknum;
newn->log_lsn = newn->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
newn->layout_version = BRT_LAYOUT_VERSION;
newn->height = 0;
newn->rand4fingerprint = new_rand4;
newn->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
newn->dirty = 1;
{
u_int32_t mpsize = newn->nodesize + newn->nodesize/4;
void *mp = toku_malloc(mpsize);
assert(mp);
toku_mempool_init(&newn->u.l.buffer_mempool, mp, mpsize);
}
assert(toku_omt_size(oldn->u.l.buffer)==old_n);
u_int32_t n_leafentries = old_n;
OMTVALUE *MALLOC_N(n_leafentries, leafentries);
assert(leafentries);
toku_omt_iterate(oldn->u.l.buffer, fill_buf, leafentries);
{
u_int32_t i;
u_int32_t new_fp = 0, new_size = 0;
for (i=new_n; i<n_leafentries; i++) {
LEAFENTRY oldle = leafentries[i];
LEAFENTRY newle = toku_mempool_malloc(&newn->u.l.buffer_mempool, leafentry_memsize(oldle), 1);
assert(newle);
new_fp += toku_le_crc(oldle);
new_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
memcpy(newle, oldle, leafentry_memsize(oldle));
toku_mempool_mfree(&oldn->u.l.buffer_mempool, oldle, leafentry_memsize(oldle));
leafentries[i] = newle;
}
toku_omt_destroy(&oldn->u.l.buffer);
r = toku_omt_create_from_sorted_array(&newn->u.l.buffer, leafentries+new_n, n_leafentries-new_n);
assert(r==0);
newn->u.l.n_bytes_in_buffer = new_size;
newn->local_fingerprint = newn->rand4fingerprint * new_fp;
}
{
u_int32_t i;
u_int32_t old_fp = 0, old_size = 0;
for (i=0; i<new_n; i++) {
LEAFENTRY oldle = leafentries[i];
old_fp += toku_le_crc(oldle);
old_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
}
r = toku_omt_create_from_sorted_array(&oldn->u.l.buffer, leafentries, new_n);
oldn->u.l.n_bytes_in_buffer = old_size;
oldn->local_fingerprint = oldn->rand4fingerprint * old_fp;
}
toku_free(leafentries);
//r = toku_omt_split_at(oldn->u.l.buffer, &newn->u.l.buffer, new_n);
toku_verify_all_in_mempool(oldn); toku_verify_counts(oldn);
toku_verify_all_in_mempool(newn); toku_verify_counts(newn);
toku_cachetable_put(pair->cf, new_blocknum, newn->fullhash,
newn, toku_serialize_brtnode_size(newn), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
newn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, new_blocknum, newn->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(newn));
assert(r==0);
oldn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, old_blocknum, oldn->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(oldn));
assert(r==0);
}
static void
toku_recover_insertleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t idx, LEAFENTRY newleafentry) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->fullhash==fullhash);
assert(node->height==0);
VERIFY_COUNTS(node);
node->log_lsn = lsn;
{
int memsize = leafentry_memsize(newleafentry);
void *mem = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, memsize, 0);
assert(mem);
memcpy(mem, newleafentry, memsize);
r = toku_omt_insert_at(node->u.l.buffer, mem, idx);
assert(r==0);
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + leafentry_disksize(newleafentry);
node->local_fingerprint += node->rand4fingerprint * toku_le_crc(newleafentry);
}
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free_LEAFENTRY(newleafentry);
}
static void
toku_recover_deleteleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t idx) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
u_int32_t fullhash = toku_cachetable_hash(pair->cf, blocknum);
r = toku_cachetable_get_and_pin(pair->cf, blocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->fullhash==fullhash);
assert(node->height==0);
VERIFY_COUNTS(node);
node->log_lsn = lsn;
{
OMTVALUE data = 0;
r=toku_omt_fetch(node->u.l.buffer, idx, &data, NULL);
assert(r==0);
LEAFENTRY oldleafentry=data;
u_int32_t len = leafentry_memsize(oldleafentry);
assert(memcmp(oldleafentry, data, len)==0);
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(oldleafentry);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(oldleafentry);
toku_mempool_mfree(&node->u.l.buffer_mempool, oldleafentry, len);
r = toku_omt_delete_at(node->u.l.buffer, idx);
assert(r==0);
}
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0);
}
static void
toku_recover_changeunnamedroot (LSN UU(lsn), FILENUM filenum, BLOCKNUM UU(oldroot), BLOCKNUM newroot) {
struct cf_pair *pair = NULL;
......
......@@ -49,6 +49,11 @@ BDB_DONTRUN_TESTS = \
helgrind2 \
helgrind3 \
test1426 \
test_logflush \
test_txn_abort8 \
test_txn_abort9 \
test_txn_close_open_commit \
test_txn_commit8 \
#\ ends prev line
BDB_TESTS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DONTRUN_TESTS)),$(SRCS)))
......
......@@ -27,7 +27,7 @@ test_main (int argc, const char *argv[]) {
r=txn->commit(txn, 0); CKERR(r);
int i;
for (i=0; i<200; i++) {
for (i=0; i<400; i++) {
DBT key,data;
char hello[30],there[30];
snprintf(hello, sizeof(hello), "hello%d", i);
......
......@@ -181,5 +181,5 @@ test_dupsort:
#if $(DIFF) -q <(echo "foo") <(echo "foo") > /dev/null; then echo yes; else echo no; fi
clean:
rm -rf *.so *.o $(UTILS) $(BDB_UTILS) $(STATIC_UTILS) *.temp *.gcno *.gcda *.gcov
rm -rf *.so *.o $(UTILS) $(BDB_UTILS) $(STATIC_UTILS) *.temp *.gcno *.gcda *.gcov *.temp.clean
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment