Commit a49aabbd authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log node splits better. Addresses #27.

git-svn-id: file:///svn/tokudb@1714 c7de825b-a66e-492c-adef-691d508d4ae1
parent aa66baec
......@@ -598,7 +598,7 @@ static int split_count=0;
/* NODE is a node with a child.
* childnum was split into two nodes childa, and childb.
* We must slide things around, & move things from the old table to the new tables.
* We also move things to the new children as much as we an without doing any pushdowns or splitting of the child.
* We also move things to the new children as much as we can without doing any pushdowns or splitting of the child.
* We must delete the old buffer (but the old child is already deleted.)
* We also unpin the new children.
*/
......@@ -1588,14 +1588,16 @@ CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
abort();
}
static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp) {
static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp, TOKUTXN txn) {
TAGMALLOC(BRTNODE, newroot);
int r;
DISKOFF newroot_diskoff=malloc_diskblock(brt, brt->h->nodesize);
int new_height = nodea->height+1;
int new_nodesize = brt->h->nodesize;
DISKOFF newroot_diskoff=malloc_diskblock(brt, new_nodesize);
assert(newroot);
*rootp=newroot_diskoff;
brt->h->dirty=1;
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
initialize_brtnode (brt, newroot, newroot_diskoff, new_height);
//printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename);
newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
......@@ -1610,6 +1612,19 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
toku_verify_counts(newroot);
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_insertchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename);
if (r!=0) return r;
r=toku_log_insertchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename);
if (r!=0) return r;
{
BYTESTRING bs;
bs.len = splitk.size;
bs.data = splitk.data;
r=toku_log_insertpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
}
r=toku_cachetable_unpin(brt->cf, nodea->thisnodename, nodea->dirty, brtnode_size(nodea));
if (r!=0) return r;
r=toku_cachetable_unpin(brt->cf, nodeb->thisnodename, nodeb->dirty, brtnode_size(nodeb));
......@@ -1659,7 +1674,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
int dirty;
long size;
if (did_split) {
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, txn);
assert(r == 0);
dirty = 1;
size = 0;
......@@ -1875,7 +1890,7 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor
if (0) printf("child_did_split %lld %lld\n", childa->thisnodename, childb->thisnodename);
if (i == 0) {
CACHEKEY *rootp = toku_calculate_root_offset_pointer(t);
r = brt_init_new_root(t, childa, childb, child_splitk, rootp);
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn);
assert(r == 0);
r = toku_cachetable_unpin(t->cf, *rootp, CACHETABLE_DIRTY, 0);
assert(r == 0);
......
......@@ -60,6 +60,18 @@ const struct logtype logtypes[] = {
{"u_int8_t", "is_dup_sort", 0},
{"u_int32_t", "rand4fingerprint", 0},
NULLFIELD}},
{"insertchild", 'i', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"DISKOFF", "child", 0},
NULLFIELD}},
{"insertpivot", 'k', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"fopen", 'O', FA{{"TXNID", "txnid", 0},
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
......
......@@ -180,8 +180,9 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
assert(r==0);
n->u.l.n_bytes_in_buffer=0;
} else {
fprintf(stderr, "%s:%d\n", __FILE__, __LINE__);
abort();
n->u.n.n_children = 0;
n->u.n.totalchildkeylens = 0;
n->u.n.n_bytes_in_buffers = 0;
}
// Now put it in the cachetable
toku_cachetable_put(pair->cf, c->diskoff, n, toku_serialize_brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
......@@ -202,6 +203,11 @@ int toku_rollback_newbrtnode (struct logtype_newbrtnode *le, TOKUTXN txn) {
}
int toku_rollback_insertchild (struct logtype_insertchild *le, TOKUTXN txn) ABORTIT
void toku_recover_insertchild (struct logtype_insertchild *le) { le=le; abort(); }
int toku_rollback_insertpivot (struct logtype_insertpivot *le, TOKUTXN txn) ABORTIT
void toku_recover_insertpivot (struct logtype_insertpivot *le) { le=le; abort(); }
void toku_recover_fopen (struct logtype_fopen *c) {
char *fname = fixup_fname(&c->fname);
CACHEFILE cf;
......
......@@ -150,7 +150,7 @@ libs:
.PHONY: %.recover
all.recover: test_log2.recover test_log3.recover test_log4.recover test_log5.recover
%.recover: %.tdb
$(MAYBEATSIGN) cd ../../newbrt;make $(VERBQUIET) recover
$(MAYBEATSIGN) cd ../../newbrt;make $(VERBQUIET) recover;make $(VERBQUIET)
$(MAYBEATSIGN) $(VGRIND) ./$<
$(MAYBEATSIGN) rm -rf dir.$(patsubst %.tdb,%.c.tdb,$<).recover
$(MAYBEATSIGN) mkdir dir.$(patsubst %.tdb,%.c.tdb,$<).recover
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment