Commit 62482b48 authored by Rich Prohaska's avatar Rich Prohaska

use kv-pair's to contain the pivot key and data. closes #37

git-svn-id: file:///svn/tokudb@770 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4750f488
......@@ -41,7 +41,7 @@ struct brtnode {
// Don't actually store the subree fingerprint in the in-memory data structure.
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
u_int32_t child_subtree_fingerprints[TREE_FANOUT+1];
bytevec childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
struct kv_pair *childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Note: It is possible that Child 1's keys are == to child 0's key's, so it is
not necessarily true that child 1's keys are > childkeys[0].
However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */
......
......@@ -28,7 +28,8 @@ void test_serialize(void) {
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
sn.u.n.n_children = 2;
sn.u.n.childkeys[0] = hello_string = toku_strdup("hello");
hello_string = toku_strdup("hello");
sn.u.n.childkeys[0] = kv_pair_malloc(hello_string, 6, 0, 0);
sn.u.n.childkeylens[0] = 6;
sn.u.n.totalchildkeylens = 6;
sn.u.n.pivotflags[0] = 42;
......@@ -54,7 +55,7 @@ void test_serialize(void) {
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
assert(strcmp(dn->u.n.childkeys[0], "hello")==0);
assert(strcmp(kv_pair_key(dn->u.n.childkeys[0]), "hello")==0);
assert(dn->u.n.childkeylens[0]==6);
assert(dn->u.n.totalchildkeylens==6);
assert(dn->u.n.pivotflags[0]==42);
......@@ -89,6 +90,7 @@ void test_serialize(void) {
}
brtnode_free(&dn);
kv_pair_free(sn.u.n.childkeys[0]);
toku_free(hello_string);
toku_hashtable_free(&sn.u.n.htables[0]);
toku_hashtable_free(&sn.u.n.htables[1]);
......
......@@ -32,6 +32,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) {
size+=4; /* subtree fingerprint. */
for (i=0; i<node->u.n.n_children-1; i++) {
size+=4;
if (node->flags & TOKU_DB_DUPSORT) size += 4;
size+=1; /* pivotflags */
csize+=node->u.n.childkeylens[i];
}
......@@ -71,6 +72,7 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
result+=4; /* n_children */
result+=4; /* subtree fingerpirnt */
result+=(4+1)*(node->u.n.n_children-1); /* key lengths + pivotflags*/
if (node->flags & TOKU_DB_DUPSORT) result += 4*(node->u.n.n_children-1); /* data lengths */
result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result+=(8+4+4)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and the subtree fingerprint. */
result+=node->u.n.n_bytes_in_hashtables;
......@@ -129,14 +131,11 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
wbuf_char(&w, node->u.n.pivotflags[i]);
for (i=0; i<node->u.n.n_children-1; i++) {
if (node->flags & TOKU_DB_DUPSORT) {
int keylen = node->u.n.childkeylens[i];
int datalen;
memcpy(&datalen, node->u.n.childkeys[i], sizeof datalen);
keylen = keylen - 4 - datalen;
wbuf_bytes(&w, node->u.n.childkeys[i] + 4, keylen);
wbuf_bytes(&w, node->u.n.childkeys[i] + 4 + keylen, datalen);
} else
wbuf_bytes(&w, node->u.n.childkeys[i], node->u.n.childkeylens[i]);
wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
wbuf_bytes(&w, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
} else {
wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), node->u.n.childkeylens[i]);
}
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
......@@ -297,17 +296,12 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
unsigned int keylen, datalen;
rbuf_bytes(&rc, &keyptr, &keylen);
rbuf_bytes(&rc, &dataptr, &datalen);
result->u.n.childkeylens[i] = keylen + datalen + 4;
void *vp = toku_malloc(result->u.n.childkeylens[i]);
result->u.n.childkeys[i] = vp;
memcpy(vp, &datalen, sizeof datalen);
memcpy(vp + 4, keyptr, keylen);
memcpy(vp + 4 + keylen, dataptr, datalen);
result->u.n.childkeylens[i] = keylen + datalen;
result->u.n.childkeys[i] = kv_pair_malloc(keyptr, keylen, dataptr, datalen);
} else {
bytevec childkeyptr;
rbuf_bytes(&rc, &childkeyptr, &result->u.n.childkeylens[i]); /* Returns a pointer into the rbuf. */
result->u.n.childkeys[i] = memdup(childkeyptr, result->u.n.childkeylens[i]);
result->u.n.childkeys[i] = kv_pair_malloc((void*)childkeyptr, result->u.n.childkeylens[i], 0, 0);
}
//printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]);
result->u.n.totalchildkeylens+=result->u.n.childkeylens[i];
......
......@@ -75,14 +75,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
thislorange=lorange;
thislolen =lolen;
} else {
thislorange=node->u.n.childkeys[i-1];
thislorange=kv_pair_key(node->u.n.childkeys[i-1]);
thislolen =node->u.n.childkeylens[i-1];
}
if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) {
thishirange=hirange;
thishilen =hilen;
} else {
thishirange=node->u.n.childkeys[i];
thishirange=kv_pair_key(node->u.n.childkeys[i]);
thishilen =node->u.n.childkeylens[i];
}
{
......@@ -102,14 +102,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
}
for (i=0; i<node->u.n.n_children; i++) {
if (i>0) {
if (lorange) assert(toku_keycompare(lorange,lolen, node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1])<0);
if (hirange) assert(toku_keycompare(node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1], hirange, hilen)<=0);
if (lorange) assert(toku_keycompare(lorange,lolen, kv_pair_key(node->u.n.childkeys[i-1]), node->u.n.childkeylens[i-1])<0);
if (hirange) assert(toku_keycompare(kv_pair_key(node->u.n.childkeys[i-1]), node->u.n.childkeylens[i-1], hirange, hilen)<=0);
}
if (recurse) {
result|=toku_verify_brtnode(brt, node->u.n.children[i],
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lorange : kv_pair_key(node->u.n.childkeys[i-1]),
(i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hirange : kv_pair_key(node->u.n.childkeys[i]),
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
recurse,
node);
......
......@@ -119,20 +119,16 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl) {
int cmp;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) {
bytevec k; unsigned int kl;
bytevec d; unsigned int dl;
assert(cl >= sizeof dl);
memcpy(&dl, ck, sizeof dl);
assert(cl >= dl - (sizeof dl));
kl = cl - dl - (sizeof dl);
k = ck + (sizeof dl);
d = ck + (sizeof dl) + kl;
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, k, kl));
assert(kv_pair_keylen(kv) + kv_pair_vallen(kv) == cl);
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, fill_dbt(&mydbt, d, dl));
} else
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, ck, cl));
cmp = brt->dup_compare(brt->db, data, fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else {
assert(kv_pair_keylen(kv) == cl);
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp;
}
......@@ -694,7 +690,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1];
node->u.n.pivotflags[cnum] = node->u.n.pivotflags[cnum-1];
}
node->u.n.childkeys[childnum]= (char*)childsplitk->data;
node->u.n.childkeys[childnum]= (void*)childsplitk->data;
node->u.n.childkeylens[childnum]= childsplitk->size;
node->u.n.pivotflags[childnum] = childsplitk->flags;
node->u.n.totalchildkeylens += childsplitk->size;
......
......@@ -9,8 +9,8 @@
* key is right after the length fields and the value is right after the key.
*/
struct kv_pair {
int keylen;
int vallen;
unsigned int keylen;
unsigned int vallen;
char key[];
};
......@@ -19,15 +19,14 @@ static inline int kv_pair_size(struct kv_pair *pair) {
return sizeof (struct kv_pair) + pair->keylen + pair->vallen;
}
static inline void kv_pair_init(struct kv_pair *pair, void *key, int keylen, void *val, int vallen) {
static inline void kv_pair_init(struct kv_pair *pair, const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
pair->keylen = keylen;
memcpy(pair->key, key, keylen);
pair->vallen = vallen;
memcpy(pair->key + keylen, val, vallen);
}
static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, int vallen) {
static inline struct kv_pair *kv_pair_malloc(const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair)
kv_pair_init(pair, key, keylen, val, vallen);
......@@ -35,7 +34,7 @@ static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, i
}
/* replace the val, keep the same key */
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, int newvallen) {
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, unsigned int newvallen) {
struct kv_pair *pair = toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
if (pair) {
pair->vallen = newvallen;
......@@ -51,50 +50,25 @@ static inline void kv_pair_free(struct kv_pair *pair) {
static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key;
}
static inline const void *kv_pair_key_const(const struct kv_pair *pair) {
return pair->key;
}
static inline int kv_pair_keylen(struct kv_pair *pair) {
static inline unsigned int kv_pair_keylen(struct kv_pair *pair) {
return pair->keylen;
}
static inline void *kv_pair_val(struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline const void *kv_pair_val_const(const struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline int kv_pair_vallen(struct kv_pair *pair) {
static inline unsigned int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
}
/* use the low bit to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0;
}
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
#endif
......@@ -69,6 +69,32 @@ static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b);
/**************************** end of static functions forward declarations. *********************/
/* use the low bit in the kv_pair pointer to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0;
}
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
#ifndef PMA_USE_MEMPOOL
#define PMA_USE_MEMPOOL 1
......@@ -1394,16 +1420,11 @@ int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk,
if (splitk) {
struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & TOKU_DB_DUPSORT) {
int kl = kv_pair_keylen(a);
int vl = kv_pair_vallen(a);
splitk->size = (sizeof vl) + kl + vl;
splitk->data = toku_malloc(splitk->size);
memcpy(splitk->data, &vl, sizeof vl);
memcpy(splitk->data + (sizeof vl), kv_pair_key(a), kl);
memcpy(splitk->data + (sizeof vl) + kl, kv_pair_val(a), vl);
splitk->data = kv_pair_malloc(kv_pair_key(a), kv_pair_keylen(a), kv_pair_val(a), kv_pair_vallen(a));
splitk->size = kv_pair_keylen(a) + kv_pair_vallen(a);
} else {
splitk->data = kv_pair_malloc(kv_pair_key(a), kv_pair_keylen(a), 0, 0);
splitk->size = kv_pair_keylen(a);
splitk->data = memdup(kv_pair_key(a), splitk->size);
}
splitk->flags = BRT_PIVOT_PRESENT_L;
if (spliti < npairs && __pma_compare_kv(origpma, a, pairs[spliti].pair) == 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment