Commit 1dc9085f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improve logging on node splits. Addresses #27.

git-svn-id: file:///svn/tokudb@1970 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5ee687ab
......@@ -371,7 +371,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
* The splitter key is key number n_children_in_a */
int i;
for (i=n_children_in_a; i<old_n_children; i++) {
for (i=0; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
int r = toku_fifo_create(&B->u.n.buffers[targchild]);
if (r!=0) return r;
......@@ -422,14 +422,15 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(txn, txnid, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(txn, txnid, fnum, B->thisnodename, targchild, bs);
r = toku_log_setpivot(txn, txnid, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
......@@ -615,9 +616,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, cnum) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, cnum-1);
node->u.n.n_bytes_in_buffer[cnum] = node->u.n.n_bytes_in_buffer[cnum-1];
}
r = toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, childb->thisnodename, 0);
r = toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, BRTNODE_CHILD_DISKOFF(node, childnum+1), childb->thisnodename);
BRTNODE_CHILD_DISKOFF(node, childnum) = childa->thisnodename;
r = toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
assert(BRTNODE_CHILD_DISKOFF(node, childnum)==childa->thisnodename);
BRTNODE_CHILD_DISKOFF(node, childnum+1) = childb->thisnodename;
fixup_child_fingerprint(node, childnum, childa, t, txn);
fixup_child_fingerprint(node, childnum+1, childb, t, txn);
......@@ -631,6 +631,12 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_cmd(type, skey, skeylen, sval, svallen));
// Slide the keys over
{
BYTESTRING bs = { .len = childsplitk->size,
.data = childsplitk->data };
r = toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
}
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
}
......
......@@ -145,7 +145,7 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
logger->n_in_file += logger->n_in_buf+nbytes;
logger->n_in_buf=0;
if (logger->n_in_file > 100<<20) {
fprintf(stderr, "%s:%d closing logfile\n", __FILE__, __LINE__);
//fprintf(stderr, "%s:%d closing logfile\n", __FILE__, __LINE__);
r = close(logger->fd);
if (r!=0) return errno;
logger->fd=-1;
......
......@@ -240,15 +240,14 @@ void toku_recover_addchild (struct logtype_addchild *le) {
for (i=node->u.n.n_children; i>le->childnum; i--) {
node->u.n.childinfos[i]=node->u.n.childinfos[i-1];
BRTNODE_CHILD_DISKOFF(node,i) = BRTNODE_CHILD_DISKOFF(node, i-1);
node->u.n.buffers[i] = node->u.n.buffers[i-1];
node->u.n.n_bytes_in_buffer[i] = node->u.n.n_bytes_in_buffer[i-1];
if (i!=(unsigned int)node->u.n.n_children) {
node->u.n.childkeys [i]= node->u.n.childkeys [i-1];
}
node->u.n.buffers[i] = node->u.n.buffers[i-1];
node->u.n.n_bytes_in_buffer[i] = node->u.n.n_bytes_in_buffer[i-1];
assert(i>=2);
node->u.n.childkeys [i-1] = node->u.n.childkeys [i-2];
}
node->u.n.childinfos[le->childnum].subtree_fingerprint = 0;
node->u.n.childkeys [le->childnum] = 0;
node->u.n.childinfos[le->childnum].subtree_fingerprint = le->childfingerprint;
node->u.n.children [le->childnum] = le->child;
node->u.n.childkeys [le->childnum-1] = 0;
int r= toku_fifo_create(&node->u.n.buffers[le->childnum]); assert(r==0);
node->u.n.n_bytes_in_buffer[le->childnum] = 0;
node->u.n.n_children++;
......@@ -258,7 +257,44 @@ void toku_recover_addchild (struct logtype_addchild *le) {
}
int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT
void toku_recover_delchild (struct logtype_delchild *le) { le=le; assert(0); }
void toku_recover_delchild (struct logtype_delchild *le) {
struct cf_pair *pair;
int r = find_cachefile(le->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, le->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
u_int32_t childnum = le->childnum;
assert(childnum < (unsigned)node->u.n.n_children);
assert(node->u.n.childinfos[childnum].subtree_fingerprint == le->childfingerprint);
assert(node->u.n.children[childnum]==le->child);
assert(toku_fifo_n_entries(node->u.n.buffers[childnum])==0);
assert(node->u.n.n_bytes_in_buffer[childnum]==0);
assert(node->u.n.n_children>2); // Must be at least two children.
u_int32_t i;
assert(childnum>0);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum-1]);
toku_free((void*)node->u.n.childkeys[childnum-1]);
toku_fifo_free(&node->u.n.buffers[childnum]);
for (i=childnum+1; i<(unsigned)node->u.n.n_children; i++) {
node->u.n.childinfos[i-1] = node->u.n.childinfos[i];
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i-1) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i);
BRTNODE_CHILD_DISKOFF(node, i-1) = BRTNODE_CHILD_DISKOFF(node, i);
node->u.n.buffers[i-1] = node->u.n.buffers[i];
node->u.n.n_bytes_in_buffer[i-1] = node->u.n.n_bytes_in_buffer[i];
node->u.n.childkeys[i-2] = node->u.n.childkeys[i-1];
}
node->u.n.n_children--;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(le->pivotkey.data);
}
int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT
void toku_recover_setchild (struct logtype_setchild *le) {
......@@ -288,7 +324,10 @@ void toku_recover_setpivot (struct logtype_setpivot *le) {
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
node->u.n.childkeys[le->childnum] = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0);
struct kv_pair *new_pivot = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0);
node->u.n.childkeys[le->childnum] = new_pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[le->childnum]);
node->log_lsn = le->lsn;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment