Commit 6d19bad1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Fixes many problems in {{{test_log10}}}. There's a memory leak and the...

Fixes many problems in {{{test_log10}}}.  There's a memory leak and the recovered files still differ, however.  Addresses #548.

git-svn-id: file:///svn/tokudb@2913 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7a4157a4
...@@ -193,7 +193,8 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -193,7 +193,8 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
assert((size_t)r==w.ndone); assert((size_t)r==w.ndone);
} }
//printf("%s:%d w.done=%d r=%d\n", __FILE__, __LINE__, w.ndone, r); if (calculated_size!=w.ndone)
printf("%s:%d w.done=%d calculated_size=%d\n", __FILE__, __LINE__, w.ndone, calculated_size);
assert(calculated_size==w.ndone); assert(calculated_size==w.ndone);
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size); //printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
......
...@@ -305,7 +305,9 @@ static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT ...@@ -305,7 +305,9 @@ static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size; unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid); int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid);
if (r!=0) return r; if (r!=0) return r;
// printf("%s:%d fingerprint %08x -> ", __FILE__, __LINE__, node->local_fingerprint);
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmd(type, xid, k->data, k->size, v->data, v->size); node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmd(type, xid, k->data, k->size, v->data, v->size);
// printf(" %08x\n", node->local_fingerprint);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added; BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added; node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1; node->dirty = 1;
...@@ -582,8 +584,9 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum ...@@ -582,8 +584,9 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
node->dirty = 1; node->dirty = 1;
} }
if (*child_did_split) { if (*child_did_split) {
fixup_child_fingerprint(node, childnum, *childa, t, logger); // Don't try to fix these up.
fixup_child_fingerprint(node, childnum+1, *childb, t, logger); //fixup_child_fingerprint(node, childnum, *childa, t, logger);
//fixup_child_fingerprint(node, childnum+1, *childb, t, logger);
} else { } else {
fixup_child_fingerprint(node, childnum, child, t, logger); fixup_child_fingerprint(node, childnum, child, t, logger);
} }
...@@ -635,20 +638,33 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -635,20 +638,33 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1]; node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
} }
r = toku_log_addchild(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0); r = toku_log_addchild(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename); node->u.n.n_children++;
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename); // use the same child
BNC_DISKOFF(node, childnum+1) = childb->thisnodename; BNC_DISKOFF(node, childnum+1) = childb->thisnodename;
BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT(node, childnum+1)=0; BNC_SUBTREE_FINGERPRINT(node, childnum+1)=0;
fixup_child_fingerprint(node, childnum, childa, t, logger); fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger); fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0); r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0; BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0; BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child. // Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_cmd(type, xid, skey, skeylen, sval, svallen)); ({
BYTESTRING keybs = { .len = skeylen, .data = (char*)skey };
BYTESTRING databs = { .len = svallen, .data = (char*)sval };
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calccrc32_cmd(type, xid, skey, skeylen, sval, svallen);
r = toku_log_brtdeq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
xid, type, keybs, databs, old_fingerprint, new_fingerprint);
node->local_fingerprint = new_fingerprint;
}));
//verify_local_fingerprint_nonleaf(node);
// Slide the keys over // Slide the keys over
{ {
...@@ -658,7 +674,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -658,7 +674,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
r = toku_log_setpivot(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs); r = toku_log_setpivot(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r; if (r!=0) return r;
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) { for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1]; node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
} }
//if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken. //if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
...@@ -666,15 +682,15 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -666,15 +682,15 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, pivot); node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, pivot);
} }
node->u.n.n_children++;
if (toku_brt_debug_mode) { if (toku_brt_debug_mode) {
int i; int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__); printf("%s:%d splitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]); for(i=0; i<node->u.n.n_children-2; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n"); printf("\n");
} }
//verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */ node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */ /* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, ({ FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, ({
......
...@@ -119,11 +119,19 @@ int toku_fifo_deq(FIFO fifo) { ...@@ -119,11 +119,19 @@ int toku_fifo_deq(FIFO fifo) {
return 0; return 0;
} }
int toku_fifo_peek_deq (FIFO fifo, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid) { // fill in the BRT_CMD, using the two DBTs for the DBT part.
int r= toku_fifo_peek(fifo, key, keylen, data, datalen, type, xid); //int toku_fifo_peek_deq_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) {
if (r==0) return toku_fifo_deq(fifo); // int r = toku_fifo_peek_cmdstruct(fifo, cmd, key, data);
else return r; // if (r!=0) return r;
} // return toku_fifo_deq(fifo);
//}
//int toku_fifo_peek_deq (FIFO fifo, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid) {
// int r= toku_fifo_peek(fifo, key, keylen, data, datalen, type, xid);
// if (r==0) return toku_fifo_deq(fifo);
// else return r;
//}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) { void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) {
......
...@@ -26,8 +26,9 @@ int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEM ...@@ -26,8 +26,9 @@ int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEM
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid); int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part. int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO); int toku_fifo_deq(FIFO);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid); //These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory.
int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part. //int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
//int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*); void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) ({ \ #define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) ({ \
......
...@@ -192,13 +192,13 @@ void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c ...@@ -192,13 +192,13 @@ void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c
int r; int r;
recover_setup_node(filenum, diskoff, &cf, &node); recover_setup_node(filenum, diskoff, &cf, &node);
assert(node->height>0); assert(node->height>0);
//printf("deq: expected_old_fingerprint=%08x actual=%08x\n", oldfingerprint, node->local_fingerprint); //printf("deq: %lld expected_old_fingerprint=%08x actual=%08x new=%08x\n", diskoff, oldfingerprint, node->local_fingerprint, newfingerprint);
assert(node->local_fingerprint==oldfingerprint); assert(node->local_fingerprint==oldfingerprint);
bytevec actual_key, actual_data; bytevec actual_key, actual_data;
ITEMLEN actual_keylen, actual_datalen; ITEMLEN actual_keylen, actual_datalen;
u_int32_t actual_type; u_int32_t actual_type;
TXNID actual_xid; TXNID actual_xid;
r = toku_fifo_peek_deq(BNC_BUFFER(node, childnum), &actual_key, &actual_keylen, &actual_data, &actual_datalen, &actual_type, &actual_xid); r = toku_fifo_peek(BNC_BUFFER(node, childnum), &actual_key, &actual_keylen, &actual_data, &actual_datalen, &actual_type, &actual_xid);
assert(r==0); assert(r==0);
assert(actual_keylen==(ITEMLEN)key.len); assert(actual_keylen==(ITEMLEN)key.len);
assert(memcmp(actual_key, key.data, actual_keylen)==0); assert(memcmp(actual_key, key.data, actual_keylen)==0);
...@@ -206,8 +206,12 @@ void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c ...@@ -206,8 +206,12 @@ void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c
assert(memcmp(actual_data, data.data, actual_datalen)==0); assert(memcmp(actual_data, data.data, actual_datalen)==0);
assert(actual_type==typ); assert(actual_type==typ);
assert(actual_xid==xid); assert(actual_xid==xid);
u_int32_t sizediff = key.len + data.len + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
node->local_fingerprint = newfingerprint; node->local_fingerprint = newfingerprint;
node->log_lsn = lsn; node->log_lsn = lsn;
node->u.n.n_bytes_in_buffers -= sizediff;
BNC_NBYTESINBUF(node, childnum) -= sizediff;
r = toku_fifo_deq(BNC_BUFFER(node, childnum)); // don't deq till were' done looking at the data.
r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(key.data); toku_free(key.data);
...@@ -220,14 +224,17 @@ void toku_recover_brtenq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c ...@@ -220,14 +224,17 @@ void toku_recover_brtenq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t c
int r; int r;
recover_setup_node(filenum, diskoff, &cf, &node); recover_setup_node(filenum, diskoff, &cf, &node);
assert(node->height>0); assert(node->height>0);
//printf("enq: expected_old_fingerprint=%08x actual=%08x\n", oldfingerprint, node->local_fingerprint); //printf("enq: %lld expected_old_fingerprint=%08x actual=%08x new=%08x\n", diskoff, oldfingerprint, node->local_fingerprint, newfingerprint);
assert(node->local_fingerprint==oldfingerprint); assert(node->local_fingerprint==oldfingerprint);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key.data, key.len, data.data, data.len, typ, xid); r = toku_fifo_enq(BNC_BUFFER(node, childnum), key.data, key.len, data.data, data.len, typ, xid);
assert(r==0); assert(r==0);
node->local_fingerprint = newfingerprint; node->local_fingerprint = newfingerprint;
node->log_lsn = lsn; node->log_lsn = lsn;
u_int32_t sizediff = key.len + data.len + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff;
toku_free(key.data); toku_free(key.data);
toku_free(data.data); toku_free(data.data);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment