Commit 522627d9 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log deq, but still not quite passing the test. Addresses #548.

git-svn-id: file:///svn/tokudb@2898 c7de825b-a66e-492c-adef-691d508d4ae1
parent 76597302
......@@ -387,7 +387,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
int type;
u_int32_t type;
TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break;
......@@ -560,7 +560,16 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
//if (debug) printf("%s:%d %*sinserted down child_did_split=%d\n", __FILE__, __LINE__, debug, "", child_did_split);
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd);
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd);
node->local_fingerprint = new_fingerprint;
{
BYTESTRING keybs = { .len=k->size, .data=(char*)k->data };
BYTESTRING databs = { .len=v->size, .data=(char*)v->data };
int r = toku_log_brtdeq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
cmd->xid, cmd->type, keybs, databs, old_fingerprint, new_fingerprint);
assert(r==0);
}
{
int r = toku_fifo_deq(BNC_BUFFER(node,childnum));
//printf("%s:%d deleted status=%d\n", __FILE__, __LINE__, r);
......@@ -802,7 +811,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
ITEMLEN keylen, vallen;
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(BNC_BUFFER(node,childnum))>0);
int type;
u_int32_t type;
TXNID xid;
while(0==toku_fifo_peek(BNC_BUFFER(node,childnum), &key, &keylen, &val, &vallen, &type, &xid)) {
int child_did_split=0; BRTNODE childa, childb;
......@@ -1064,9 +1073,14 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
DBT *v = cmd->u.id.val;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
BYTESTRING keybs = { .len=k->size, .data=(char*)k->data };
BYTESTRING databs = { .len=v->size, .data=(char*)v->data };
u_int32_t newfingerprint = node->local_fingerprint + node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size);
int r=toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, cmd->xid, type, keybs, databs, node->local_fingerprint, newfingerprint);
assert(r==0);
r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid);
assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size);
node->local_fingerprint = newfingerprint;
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1;
......
......@@ -84,7 +84,7 @@ int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd) {
}
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type, TXNID *xid) {
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, TXNID *xid) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
*key = entry->key;
......@@ -98,7 +98,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
// fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_peek_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) {
int type;
u_int32_t type;
bytevec keyb,datab;
unsigned int keylen,datalen;
int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xid);
......@@ -119,6 +119,13 @@ int toku_fifo_deq(FIFO fifo) {
return 0;
}
int toku_fifo_peek_deq (FIFO fifo, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid) {
int r= toku_fifo_peek(fifo, key, keylen, data, datalen, type, xid);
if (r==0) return toku_fifo_deq(fifo);
else return r;
}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) {
struct fifo_entry *entry;
for (entry = fifo_peek(fifo); entry; entry = entry->next)
......
......@@ -23,10 +23,10 @@ void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type, TXNID *xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
......
......@@ -186,9 +186,51 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
*cf = pair->cf;
}
void toku_recover_brtdeq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING data, u_int32_t oldfingerprint, u_int32_t newfingerprint) {
CACHEFILE cf;
BRTNODE node;
int r;
recover_setup_node(filenum, diskoff, &cf, &node);
assert(node->height>0);
printf("deq: expected_old_fingerprint=%08x actual=%08x\n", oldfingerprint, node->local_fingerprint);
assert(node->local_fingerprint==oldfingerprint);
bytevec actual_key, actual_data;
ITEMLEN actual_keylen, actual_datalen;
u_int32_t actual_type;
TXNID actual_xid;
r = toku_fifo_peek_deq(BNC_BUFFER(node, childnum), &actual_key, &actual_keylen, &actual_data, &actual_datalen, &actual_type, &actual_xid);
assert(r==0);
assert(actual_keylen==(ITEMLEN)key.len);
assert(memcmp(actual_key, key.data, actual_keylen)==0);
assert(actual_datalen=data.len);
assert(memcmp(actual_data, data.data, actual_datalen)==0);
assert(actual_type==typ);
assert(actual_xid==xid);
node->local_fingerprint = newfingerprint;
node->log_lsn = lsn;
r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(key.data);
toku_free(data.data);
}
void toku_recover_brtenq (LSN UU(lsn), FILENUM UU(filenum), DISKOFF UU(diskoff), u_int32_t UU(childnum), TXNID UU(xid), u_int32_t UU(typ), BYTESTRING UU(key), BYTESTRING UU(data), u_int32_t UU(oldfingerprint), u_int32_t UU(newfingerprint)) { assert(0); }
void toku_recover_brtenq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING data, u_int32_t oldfingerprint, u_int32_t newfingerprint) {
CACHEFILE cf;
BRTNODE node;
int r;
recover_setup_node(filenum, diskoff, &cf, &node);
assert(node->height>0);
printf("enq: expected_old_fingerprint=%08x actual=%08x\n", oldfingerprint, node->local_fingerprint);
assert(node->local_fingerprint==oldfingerprint);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key.data, key.len, data.data, data.len, typ, xid);
assert(r==0);
node->local_fingerprint = newfingerprint;
node->log_lsn = lsn;
r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(key.data);
toku_free(data.data);
}
void toku_recover_addchild (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, DISKOFF child, u_int32_t childfingerprint) {
CACHEFILE cf;
......
......@@ -135,7 +135,7 @@ libs:
$(SETTOKUENV) cc -DENVDIR=\"dir.$<.tdb\" -DUSE_TDB -DIS_TDB=1 $(CFLAGS) $(TDB_CPPFLAGS) $(TDB_LOADLIBES) $< -o $@
.PHONY: %.recover
TLRECOVER = 2 3 4 5 6 7 8 9
TLRECOVER = 2 3 4 5 6 7 8 9 10
all.recover: $(patsubst %,test_log%.recover,$(TLRECOVER))
$(patsubst %,test_log%.tdbrun,$(TLRECOVER)):
@echo -n
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment