Commit eeaef728 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add XID (Addresses #242), and also fix the problem where deletes were sneaking...

Add XID (Addresses #242), and also fix the problem where deletes were sneaking around inserts (Fixes #332.)

git-svn-id: file:///svn/tokudb@2098 c7de825b-a66e-492c-adef-691d508d4ae1
parent 54fe4c77
......@@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg
OPTFLAGS = -O2
# OPTFLAGS = -O2
ifeq ($(VERBOSE),2)
VERBVERBOSE=-v
......
......@@ -16,7 +16,9 @@
enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { PMA_ITEM_OVERHEAD = 4 };
enum { BRT_CMD_OVERHEAD = 1 };
enum { BRT_CMD_OVERHEAD = (1 // the type
+ 8) // the xid
};
enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 };
struct nodeheader_in_file {
......@@ -56,7 +58,7 @@ struct brtnode {
// When we checkpoint: Create a checkpoint record, and cause every dirty node to be written to disk. The new checkpoint record is *not* incorporated into the disk_lsn of the written nodes.
// While we are checkpointing, someone may modify a dirty node that has not yet been written. In that case, when we unpin the node, we make the new copy (because the disk_lsn<checkpoint_lsn), just as we would usually.
//
int layout_version; // What version of the data structure?
int layout_version; // What version of the data structure? (version 2 adds the xid to the brt cmds)
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
u_int32_t rand4fingerprint;
u_int32_t local_fingerprint; /* For leaves this is everything in the buffer. For nonleaves, this is everything in the buffers, but does not include child subtree fingerprints. */
......@@ -159,6 +161,7 @@ enum brt_cmd_type {
/* tree commands */
struct brt_cmd {
enum brt_cmd_type type;
TXNID xid;
union {
/* insert or delete */
struct brt_cmd_insert_delete {
......@@ -185,8 +188,8 @@ extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt);
static const BRTNODE null_brtnode=0;
extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD_S *cmd);
extern u_int32_t toku_calccrc32_cmd (int type, TXNID xid, const void *key, int keylen, const void *val, int vallen);
extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD cmd);
// How long is the pivot key?
unsigned int toku_brt_pivot_key_len (BRT, struct kv_pair *); // Given the tree
......
......@@ -26,7 +26,7 @@ static void test_serialize(void) {
sn.thisnodename = sn.nodesize*20;
sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456;
sn.layout_version = 1;
sn.layout_version = 2;
sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
......@@ -40,9 +40,9 @@ static void test_serialize(void) {
BNC_SUBTREE_FINGERPRINT(&sn, 1) = random();
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, "x", 2, "xval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, (TXNID)0); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, (TXNID)123); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, (TXNID)234); assert(r==0); sn.local_fingerprint += randval*toku_calccrc32_cmd(BRT_NONE, (TXNID)234, "x", 2, "xval", 5);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
{
......@@ -59,7 +59,7 @@ static void test_serialize(void) {
assert(dn->thisnodename==nodesize*20);
assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==1);
assert(dn->layout_version ==2);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
......
......@@ -49,7 +49,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) {
FIFO_ITERATE(BNC_BUFFER(node,i),
key __attribute__((__unused__)), keylen,
data __attribute__((__unused__)), datalen,
type __attribute__((__unused__)),
type __attribute__((__unused__)), xid __attribute__((__unused__)),
(hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen));
}
assert(hsize==node->u.n.n_bytes_in_buffers);
......@@ -154,12 +154,13 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
for (i=0; i< n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type,
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
wbuf_char(&w, type);
wbuf_TXNID(&w, xid);
wbuf_bytes(&w, key, keylen);
wbuf_bytes(&w, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calccrc32_cmd(type, key, keylen, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
}));
}
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
......@@ -257,7 +258,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
}
}
result->layout_version = rbuf_int(&rc);
if (result->layout_version!=1) {
if (result->layout_version!=2) {
r=DB_BADFORMAT;
goto died1;
}
......@@ -337,17 +338,17 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
//printf("%d in hash\n", n_in_hash);
for (i=0; i<n_in_this_hash; i++) {
int diff;
int type;
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
toku_verify_counts(result);
type = rbuf_char(&rc);
int type = rbuf_char(&rc);
TXNID xid = rbuf_ulonglong(&rc);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, val, vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
{
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type); /* Copies the data into the hash table. */
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xid); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; }
}
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
......
......@@ -2287,6 +2287,7 @@ static void test_brt_delete() {
test_brt_delete_cursor_first(100); toku_memory_check_all_free();
test_brt_delete_cursor_first(500); toku_memory_check_all_free();
test_brt_delete_cursor_first(10000); toku_memory_check_all_free();
test_insert_delete_lookup(2); toku_memory_check_all_free();
test_insert_delete_lookup(512); toku_memory_check_all_free();
}
......@@ -2831,6 +2832,7 @@ static void brt_blackbox_test (void) {
int main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
brt_blackbox_test();
toku_malloc_cleanup();
if (verbose) printf("test ok\n");
......
......@@ -24,9 +24,9 @@ static void verify_local_fingerprint (BRTNODE node) {
int i;
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type,
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, key, keylen, data, datalen);
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
} else {
......@@ -69,6 +69,7 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)),
TXNID xid __attribute__((__unused__)),
void *ignore __attribute__((__unused__))) {
if (thislorange) assert(toku_keycompare(thislorange,thislolen,key,keylen)<0);
if (thishirange && toku_keycompare(key,keylen,thishirange,thishilen)>0) {
......
This diff is collapsed.
......@@ -46,10 +46,10 @@ void test_fifo_enq(int n) {
for (i=0; i<n; i++) {
buildkey(i);
buildval(i);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i); assert(r == 0);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, (TXNID)i); assert(r == 0);
}
void checkit(bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, int type, void *arg) {
void checkit(bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, int type, TXNID xid, void *arg) {
if (verbose) printf("checkit %d %d\n", i, type);
assert(arg == 0);
buildkey(i);
......@@ -57,6 +57,7 @@ void test_fifo_enq(int n) {
assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0);
assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0);
assert(i % 256 == type);
assert((TXNID)i==xid);
i += 1;
}
......
......@@ -65,10 +65,11 @@ int toku_fifo_n_entries(FIFO fifo) {
return fifo->n;
}
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type) {
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, TXNID xid) {
struct fifo_entry *entry = toku_malloc(sizeof (struct fifo_entry) + keylen + datalen);
if (entry == 0) return ENOMEM;
entry->type = type;
entry->xid = xid;
entry->keylen = keylen;
memcpy(entry->key, key, keylen);
entry->vallen = datalen;
......@@ -78,7 +79,7 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
}
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type) {
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, int *type, TXNID *xid) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
*key = entry->key;
......@@ -86,6 +87,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
*data = entry->key + entry->keylen;
*datalen = entry->vallen;
*type = entry->type;
*xid = entry->xid;
return 0;
}
......@@ -96,10 +98,10 @@ int toku_fifo_deq(FIFO fifo) {
return 0;
}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, void*), void *arg) {
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) {
struct fifo_entry *entry;
for (entry = fifo_peek(fifo); entry; entry = entry->next)
f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, arg);
f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, entry->xid, arg);
}
......
......@@ -5,6 +5,7 @@ struct fifo_entry {
unsigned int keylen;
unsigned int vallen;
unsigned char type;
TXNID xid;
unsigned char key[];
};
......@@ -18,20 +19,21 @@ typedef struct fifo *FIFO;
int toku_fifo_create(FIFO *);
void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type, TXNID *xid);
int toku_fifo_deq(FIFO);
int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, int *type);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, void*), void*);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,body) ({ \
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) ({ \
struct fifo_entry *entry; \
for (entry = fifo->head; entry; entry = entry->next) { \
unsigned int keylenvar = entry->keylen; \
void *keyvar = entry->key; \
unsigned int datalenvar = entry->vallen; \
void *datavar = entry->key + entry->keylen; \
unsigned int typevar = entry->type; \
enum brt_cmd_type typevar = entry->type; \
TXNID xidvar = entry->xid; \
body; \
} \
})
......
......@@ -21,16 +21,20 @@ u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, i
return toku_calc_more_crc32_kvpair(toku_null_crc, key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmd (int type, const void *key, int keylen, const void *val, int vallen) {
u_int32_t toku_calccrc32_cmd (int type, TXNID xid, const void *key, int keylen, const void *val, int vallen) {
unsigned char type_c = type;
return toku_calc_more_crc32_kvpair(toku_crc32(toku_null_crc,
unsigned int a = htonl(xid>>32);
unsigned int b = htonl(xid&0xffffffff);
return toku_calc_more_crc32_kvpair(toku_crc32(toku_crc32(toku_crc32(toku_null_crc,
&type_c, 1),
&a, 4),
&b, 4),
key, keylen, val, vallen);
}
u_int32_t toku_calccrc32_cmdstruct (BRT_CMD cmd) {
if (cmd->type <= BRT_DELETE_BOTH)
return toku_calccrc32_cmd (cmd->type, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
return toku_calccrc32_cmd (cmd->type, cmd->xid, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size);
else
assert(0); /* Should not have come here. */
}
......@@ -171,7 +171,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
n->thisnodename = c->diskoff;
n->log_lsn = n->disk_lsn = c->lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 1;
n->layout_version = 2;
n->height = c->height;
n->rand4fingerprint = c->rand4fingerprint;
n->flags = c->is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment