Commit fa5638a0 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improving logging for db-benchmark-test. Addresses #27.

git-svn-id: file:///svn/tokudb@1951 c7de825b-a66e-492c-adef-691d508d4ae1
parent d087e755
......@@ -119,6 +119,7 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
//printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, node->local_fingerprint);
//printf("%s:%d w.ndone=%d n_children=%d\n", __FILE__, __LINE__, w.ndone, node->n_children);
if (node->height>0) {
assert(node->u.n.n_children>0);
// Local fingerprint is not actually stored while in main memory. Must calculate it.
// Subtract the child fingerprints from the subtree fingerprint to get the local fingerprint.
{
......
......@@ -39,7 +39,7 @@
extern long long n_items_malloced;
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKUTXN);
//static void verify_local_fingerprint_nonleaf (BRTNODE node);
static void verify_local_fingerprint_nonleaf (BRTNODE node);
/* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) {
......@@ -92,6 +92,9 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change.
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum;
node->dirty=1;
if (toku_txn_get_last_lsn(txn).lsn >= 917435 && toku_txn_get_last_lsn(txn).lsn < 917439) {
printf("%s:%d changing fingerprint\n", __FILE__, __LINE__);
}
toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_lsn(node, txn);
}
......@@ -305,30 +308,6 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
toku_update_brtnode_lsn(n, txn);
}
static void delete_node (BRT t, BRTNODE node) {
int i;
assert(node->height>=0);
if (node->height==0) {
if (node->u.l.buffer) {
toku_pma_free(&node->u.l.buffer);
}
node->u.l.n_bytes_in_buffer=0;
} else {
for (i=0; i<node->u.n.n_children; i++) {
if (node->u.n.buffers[i]) {
toku_fifo_free(&node->u.n.buffers[i]);
}
node->u.n.n_bytes_in_buffer[0]=0;
}
node->u.n.n_bytes_in_buffers = 0;
node->u.n.totalchildkeylens=0;
node->u.n.n_children=0;
node->height=0;
node->u.l.buffer=0; /* It's a leaf now (height==0) so set the buffer to NULL. */
}
toku_cachetable_remove(t->cf, node->thisnodename, 0); /* Don't write it back to disk. */
}
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type) {
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(node->u.n.buffers[childnum], k->data, k->size, v->data, v->size, type);
......@@ -371,25 +350,19 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
return 0;
}
static void brt_update_fingerprint_when_moving_hashtable (BRTNODE oldnode, BRTNODE newnode, FIFO table_being_moved) {
u_int32_t sum = 0;
FIFO_ITERATE(table_being_moved, key, keylen, data, datalen, type,
sum += toku_calccrc32_cmd(type, key, keylen, data, datalen));
oldnode->local_fingerprint -= oldnode->rand4fingerprint * sum;
newnode->local_fingerprint += newnode->rand4fingerprint * sum;
}
/* Side effect: sets splitk->data pointer to a malloc'd value */
static void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKUTXN txn) {
int n_children_in_a = node->u.n.n_children/2;
BRTNODE A,B;
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKUTXN txn) {
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
TXNID txnid = toku_txn_get_txnid(txn);
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &A, node->height, txn);
create_new_brtnode(t, &B, node->height, txn);
A->u.n.n_children=n_children_in_a;
B->u.n.n_children=node->u.n.n_children-n_children_in_a;
B->u.n.n_children =n_children_in_b;
//printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{
......@@ -397,73 +370,102 @@ static void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nod
* That means that the first n_children_in_a-1 keys go into node a.
* The splitter key is key number n_children_in_a */
int i;
for (i=0; i<n_children_in_a; i++) {
FIFO htab = node->u.n.buffers[i];
BRTNODE_CHILD_DISKOFF(A, i) = BRTNODE_CHILD_DISKOFF(node, i);
A->u.n.buffers[i] = htab;
A->u.n.n_bytes_in_buffers += (A->u.n.n_bytes_in_buffer[i] = node->u.n.n_bytes_in_buffer[i]);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(A, i) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i);
node->u.n.buffers[i] = 0;
node->u.n.n_bytes_in_buffers -= node->u.n.n_bytes_in_buffer[i];
node->u.n.n_bytes_in_buffer[i] = 0;
brt_update_fingerprint_when_moving_hashtable(node, A, htab);
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
int r = toku_fifo_create(&B->u.n.buffers[targchild]);
if (r!=0) return r;
}
for (i=n_children_in_a; i<node->u.n.n_children; i++) {
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
FIFO htab = node->u.n.buffers[i];
BRTNODE_CHILD_DISKOFF(B, targchild) = BRTNODE_CHILD_DISKOFF(node, i);
B->u.n.buffers[targchild] = htab;
B->u.n.n_bytes_in_buffers += (B->u.n.n_bytes_in_buffer[targchild] = node->u.n.n_bytes_in_buffer[i]);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(B, targchild) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i);
FIFO from_htab = node->u.n.buffers[i];
FIFO to_htab = B ->u.n.buffers[targchild];
DISKOFF thischilddiskoff = BRTNODE_CHILD_DISKOFF(node, i);
node->u.n.buffers[i] = 0;
node->u.n.n_bytes_in_buffers -= node->u.n.n_bytes_in_buffer[i];
node->u.n.n_bytes_in_buffer[i] = 0;
BRTNODE_CHILD_DISKOFF(B, targchild) = thischilddiskoff;
brt_update_fingerprint_when_moving_hashtable(node, B, htab);
int r = toku_log_addchild(txn, txnid, fnum, B->thisnodename, targchild, thischilddiskoff, BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
int type;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
BYTESTRING keybs = { .len = keylen, .data = (char*)key };
BYTESTRING databs = { .len = datalen, .data = (char*)data };
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t old_to_fingerprint = B->local_fingerprint;
u_int32_t delta = toku_calccrc32_cmd(type, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(txn, txnid, fnum, node->thisnodename, n_children_in_a, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(txn, txnid, fnum, B->thisnodename, targchild, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->local_fingerprint = new_to_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
B->u.n.n_bytes_in_buffer[targchild] += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
node->u.n.n_bytes_in_buffer[i] -= n_bytes_moved;
verify_local_fingerprint_nonleaf(B);
verify_local_fingerprint_nonleaf(node);
}
for (i=0; i<n_children_in_a-1; i++) {
A->u.n.childkeys[i] = node->u.n.childkeys[i];
A->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i]);
node->u.n.childkeys[i] = 0;
// Delete a child, removing it's fingerprint, and also the preceeding pivot key
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
r = toku_log_delchild(txn, txnid, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(txn, txnid, fnum, B->thisnodename, targchild, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.childkeys[i-1] = 0;
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.childkeys[n_children_in_a-1]=0;
for (i=n_children_in_a; i<node->u.n.n_children-1; i++) {
B->u.n.childkeys[i-n_children_in_a] = node->u.n.childkeys[i];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i]);
node->u.n.childkeys[i] = 0;
}
assert(node->u.n.totalchildkeylens==0);
BRTNODE_CHILD_DISKOFF(node, i) = 0;
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(B, targchild) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, i) = 0;
//verify_local_fingerprint_nonleaf(A);
//verify_local_fingerprint_nonleaf(B);
assert(node->u.n.n_bytes_in_buffer[i] == 0);
}
{
int i;
for (i=0; i<TREE_FANOUT+1; i++) {
assert(node->u.n.buffers[i]==0);
assert(node->u.n.n_bytes_in_buffer[i]==0);
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&node->u.n.buffers[i]);
}
assert(node->u.n.n_bytes_in_buffers==0);
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.childkeys[n_children_in_a-1]=0;
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
/* The buffer is all divied up between them, since just moved the buffers over. */
*nodea = A;
*nodea = node;
*nodeb = B;
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d removing %lld\n", __FILE__, __LINE__, node->thisnodename);
delete_node(t, node);
assert(toku_serialize_brtnode_size(A)<A->nodesize);
assert(toku_serialize_brtnode_size(node)<node->nodesize);
assert(toku_serialize_brtnode_size(B)<B->nodesize);
return 0;
}
static void find_heaviest_child (BRTNODE node, int *childnum) {
......@@ -613,6 +615,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, cnum) = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, cnum-1);
node->u.n.n_bytes_in_buffer[cnum] = node->u.n.n_bytes_in_buffer[cnum-1];
}
r = toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, childb->thisnodename, 0);
r = toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, BRTNODE_CHILD_DISKOFF(node, childnum+1), childb->thisnodename);
BRTNODE_CHILD_DISKOFF(node, childnum) = childa->thisnodename;
BRTNODE_CHILD_DISKOFF(node, childnum+1) = childb->thisnodename;
fixup_child_fingerprint(node, childnum, childa, t, txn);
......@@ -663,9 +667,19 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (type == BRT_DELETE) {
int r2 = push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, txn);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r2!=0) return r2;
if (r2!=0) {
// In this case we must put things from old_h into the new buffers.
// This code is wrong, so I'll abort.
abort();
return r2;
}
}
if (r!=0) {
// In this case we must put things from old_h into the new buffers.
// This code is wrong, so I'll abort.
abort();
return r;
}
if (r!=0) return r;
}));
toku_fifo_free(&old_h);
......@@ -685,7 +699,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (node->u.n.n_children>TREE_FANOUT) {
//printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs);
brt_nonleaf_split(t, node, nodea, nodeb, splitk, txn);
r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, txn);
if (r!=0) return r;
//printf("%s:%d did split\n", __FILE__, __LINE__);
split_count++;
*did_split=1;
......@@ -1148,17 +1163,17 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
}
//static void verify_local_fingerprint_nonleaf (BRTNODE node) {
// u_int32_t fp=0;
// int i;
// if (node->height==0) return;
// for (i=0; i<node->u.n.n_children; i++)
// FIFO_ITERATE(node->u.n.htables[i], key, keylen, data, datalen, type,
// ({
// fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen);
// }));
// assert(fp==node->local_fingerprint);
//}
static void verify_local_fingerprint_nonleaf (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(node->u.n.buffers[i], key, keylen, data, datalen, type,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
}
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
......@@ -1231,7 +1246,9 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
toku_free(node);
return r;
}
//printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
if (node->thisnodename==20971520) {
printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
}
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
......@@ -1583,13 +1600,9 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0);
if (r!=0) return r;
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1);
if (r!=0) return r;
r=toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename);
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_setchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename);
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, txn);
fixup_child_fingerprint(newroot, 1, nodeb, brt, txn);
......
......@@ -77,7 +77,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
return 0;
}
/* peek at the head of the fifo */
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
......@@ -91,7 +91,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
int toku_fifo_deq(FIFO fifo) {
struct fifo_entry *entry = fifo_deq(fifo);
if (entry == 0) return ENOMEM;
if (entry == 0) return -1; // if entry is 0 then it was an empty fifo
toku_free_n(entry, fifo_entry_size(entry));
return 0;
}
......
......@@ -564,6 +564,7 @@ TXNID toku_txn_get_txnid (TOKUTXN txn) {
}
LSN toku_txn_get_last_lsn (TOKUTXN txn) {
if (txn==0) return (LSN){0};
return txn->last_lsn;
}
......
......@@ -88,12 +88,23 @@ const struct logtype logtypes[] = {
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
NULLFIELD}},
{"delchild", 'r', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"setchild", 'i', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"DISKOFF", "child", 0},
{"DISKOFF", "oldchild", 0},
{"DISKOFF", "newchild", 0},
NULLFIELD}},
{"setpivot", 'k', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
......@@ -105,6 +116,26 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"brtdeq", 'U', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"brtenq", 'Q', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"insertinleaf", 'I', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
......
......@@ -714,6 +714,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
if (toku_txn_get_last_lsn(txn).lsn>=3836455 && toku_txn_get_last_lsn(txn).lsn<=3836460) printf("%s:%d inserting\n", __FILE__, __LINE__);
int r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
......@@ -844,8 +845,10 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r;
int found;
int traceit = toku_txn_get_last_lsn(txn).lsn>=3836455 && toku_txn_get_last_lsn(txn).lsn<=3836460;
unsigned int idx = pma_search(pma, k, pma->dup_mode & TOKU_DB_DUPSORT ? v : 0, 0, pma->N, &found);
if (found) {
if (traceit) printf("%s:%d Already present at lsn %ld\n", __FILE__, __LINE__, toku_txn_get_last_lsn(txn).lsn);
struct kv_pair *kv = pma->pairs[idx];
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
......@@ -868,6 +871,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
if (traceit) printf("%s:%d inuse\n", __FILE__, __LINE__);
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
if (r!=0) return r;
idx=newidx;
......@@ -884,6 +888,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
if (traceit) printf("%s:%d inserting\n", __FILE__, __LINE__);
r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
}
......
......@@ -222,6 +222,13 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
*cf = pair->cf;
}
int toku_rollback_brtdeq (struct logtype_brtdeq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtdeq (struct logtype_brtdeq *le) { le=le; assert(0); }
int toku_rollback_brtenq (struct logtype_brtenq * le, TOKUTXN txn) ABORTIT
void toku_recover_brtenq (struct logtype_brtenq *le) { le=le; assert(0); }
int toku_rollback_addchild (struct logtype_addchild *le, TOKUTXN txn) ABORTIT
void toku_recover_addchild (struct logtype_addchild *le) {
CACHEFILE cf;
......@@ -241,7 +248,7 @@ void toku_recover_addchild (struct logtype_addchild *le) {
}
node->u.n.childinfos[le->childnum].subtree_fingerprint = 0;
node->u.n.childkeys [le->childnum] = 0;
node->u.n.children [le->childnum] = -1;
node->u.n.children [le->childnum] = le->child;
int r= toku_fifo_create(&node->u.n.buffers[le->childnum]); assert(r==0);
node->u.n.n_bytes_in_buffer[le->childnum] = 0;
node->u.n.n_children++;
......@@ -250,6 +257,9 @@ void toku_recover_addchild (struct logtype_addchild *le) {
assert(r==0);
}
int toku_rollback_delchild (struct logtype_delchild * le, TOKUTXN txn) ABORTIT
void toku_recover_delchild (struct logtype_delchild *le) { le=le; assert(0); }
int toku_rollback_setchild (struct logtype_setchild *le, TOKUTXN txn) ABORTIT
void toku_recover_setchild (struct logtype_setchild *le) {
struct cf_pair *pair;
......@@ -262,7 +272,7 @@ void toku_recover_setchild (struct logtype_setchild *le) {
BRTNODE node = node_v;
assert(node->height>0);
assert(le->childnum < (unsigned)node->u.n.n_children);
node->u.n.children[le->childnum] = le->child;
node->u.n.children[le->childnum] = le->newchild;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......@@ -298,6 +308,7 @@ void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint
assert(r==0);
BRTNODE node = node_v;
assert(node->height>0);
assert((signed)le->childnum < node->u.n.n_children);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
......@@ -380,13 +391,11 @@ void toku_recover_deleteinleaf (struct logtype_deleteinleaf *c) {
BRTNODE node = node_v;
assert(node->height==0);
VERIFY_COUNTS(node);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx);
assert (r==0);
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment