Commit 8ff534e1 authored by Rich Prohaska's avatar Rich Prohaska

use kv-pair's to contain the pivot key and data. closes #37

git-svn-id: file:///svn/tokudb@770 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2a8344c0
...@@ -41,7 +41,7 @@ struct brtnode { ...@@ -41,7 +41,7 @@ struct brtnode {
// Don't actually store the subree fingerprint in the in-memory data structure. // Don't actually store the subree fingerprint in the in-memory data structure.
int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */ int n_children; /* if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced. */
u_int32_t child_subtree_fingerprints[TREE_FANOUT+1]; u_int32_t child_subtree_fingerprints[TREE_FANOUT+1];
bytevec childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1]. struct kv_pair *childkeys[TREE_FANOUT]; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Note: It is possible that Child 1's keys are == to child 0's key's, so it is Note: It is possible that Child 1's keys are == to child 0's key's, so it is
not necessarily true that child 1's keys are > childkeys[0]. not necessarily true that child 1's keys are > childkeys[0].
However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */ However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */
......
...@@ -28,7 +28,8 @@ void test_serialize(void) { ...@@ -28,7 +28,8 @@ void test_serialize(void) {
sn.rand4fingerprint = randval; sn.rand4fingerprint = randval;
sn.local_fingerprint = 0; sn.local_fingerprint = 0;
sn.u.n.n_children = 2; sn.u.n.n_children = 2;
sn.u.n.childkeys[0] = hello_string = toku_strdup("hello"); hello_string = toku_strdup("hello");
sn.u.n.childkeys[0] = kv_pair_malloc(hello_string, 6, 0, 0);
sn.u.n.childkeylens[0] = 6; sn.u.n.childkeylens[0] = 6;
sn.u.n.totalchildkeylens = 6; sn.u.n.totalchildkeylens = 6;
sn.u.n.pivotflags[0] = 42; sn.u.n.pivotflags[0] = 42;
...@@ -54,7 +55,7 @@ void test_serialize(void) { ...@@ -54,7 +55,7 @@ void test_serialize(void) {
assert(dn->height == 1); assert(dn->height == 1);
assert(dn->rand4fingerprint==randval); assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2); assert(dn->u.n.n_children==2);
assert(strcmp(dn->u.n.childkeys[0], "hello")==0); assert(strcmp(kv_pair_key(dn->u.n.childkeys[0]), "hello")==0);
assert(dn->u.n.childkeylens[0]==6); assert(dn->u.n.childkeylens[0]==6);
assert(dn->u.n.totalchildkeylens==6); assert(dn->u.n.totalchildkeylens==6);
assert(dn->u.n.pivotflags[0]==42); assert(dn->u.n.pivotflags[0]==42);
...@@ -89,6 +90,7 @@ void test_serialize(void) { ...@@ -89,6 +90,7 @@ void test_serialize(void) {
} }
brtnode_free(&dn); brtnode_free(&dn);
kv_pair_free(sn.u.n.childkeys[0]);
toku_free(hello_string); toku_free(hello_string);
toku_hashtable_free(&sn.u.n.htables[0]); toku_hashtable_free(&sn.u.n.htables[0]);
toku_hashtable_free(&sn.u.n.htables[1]); toku_hashtable_free(&sn.u.n.htables[1]);
......
...@@ -32,6 +32,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) { ...@@ -32,6 +32,7 @@ static unsigned int toku_serialize_brtnode_size_slow(BRTNODE node) {
size+=4; /* subtree fingerprint. */ size+=4; /* subtree fingerprint. */
for (i=0; i<node->u.n.n_children-1; i++) { for (i=0; i<node->u.n.n_children-1; i++) {
size+=4; size+=4;
if (node->flags & TOKU_DB_DUPSORT) size += 4;
size+=1; /* pivotflags */ size+=1; /* pivotflags */
csize+=node->u.n.childkeylens[i]; csize+=node->u.n.childkeylens[i];
} }
...@@ -71,6 +72,7 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) { ...@@ -71,6 +72,7 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
result+=4; /* n_children */ result+=4; /* n_children */
result+=4; /* subtree fingerpirnt */ result+=4; /* subtree fingerpirnt */
result+=(4+1)*(node->u.n.n_children-1); /* key lengths + pivotflags*/ result+=(4+1)*(node->u.n.n_children-1); /* key lengths + pivotflags*/
if (node->flags & TOKU_DB_DUPSORT) result += 4*(node->u.n.n_children-1); /* data lengths */
result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */ result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result+=(8+4+4)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and the subtree fingerprint. */ result+=(8+4+4)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and the subtree fingerprint. */
result+=node->u.n.n_bytes_in_hashtables; result+=node->u.n.n_bytes_in_hashtables;
...@@ -129,14 +131,11 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) ...@@ -129,14 +131,11 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node)
wbuf_char(&w, node->u.n.pivotflags[i]); wbuf_char(&w, node->u.n.pivotflags[i]);
for (i=0; i<node->u.n.n_children-1; i++) { for (i=0; i<node->u.n.n_children-1; i++) {
if (node->flags & TOKU_DB_DUPSORT) { if (node->flags & TOKU_DB_DUPSORT) {
int keylen = node->u.n.childkeylens[i]; wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
int datalen; wbuf_bytes(&w, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
memcpy(&datalen, node->u.n.childkeys[i], sizeof datalen); } else {
keylen = keylen - 4 - datalen; wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), node->u.n.childkeylens[i]);
wbuf_bytes(&w, node->u.n.childkeys[i] + 4, keylen); }
wbuf_bytes(&w, node->u.n.childkeys[i] + 4 + keylen, datalen);
} else
wbuf_bytes(&w, node->u.n.childkeys[i], node->u.n.childkeylens[i]);
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]); //printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
} }
for (i=0; i<node->u.n.n_children; i++) { for (i=0; i<node->u.n.n_children; i++) {
...@@ -297,17 +296,12 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl ...@@ -297,17 +296,12 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
unsigned int keylen, datalen; unsigned int keylen, datalen;
rbuf_bytes(&rc, &keyptr, &keylen); rbuf_bytes(&rc, &keyptr, &keylen);
rbuf_bytes(&rc, &dataptr, &datalen); rbuf_bytes(&rc, &dataptr, &datalen);
result->u.n.childkeylens[i] = keylen + datalen + 4; result->u.n.childkeylens[i] = keylen + datalen;
void *vp = toku_malloc(result->u.n.childkeylens[i]); result->u.n.childkeys[i] = kv_pair_malloc(keyptr, keylen, dataptr, datalen);
result->u.n.childkeys[i] = vp;
memcpy(vp, &datalen, sizeof datalen);
memcpy(vp + 4, keyptr, keylen);
memcpy(vp + 4 + keylen, dataptr, datalen);
} else { } else {
bytevec childkeyptr; bytevec childkeyptr;
rbuf_bytes(&rc, &childkeyptr, &result->u.n.childkeylens[i]); /* Returns a pointer into the rbuf. */ rbuf_bytes(&rc, &childkeyptr, &result->u.n.childkeylens[i]); /* Returns a pointer into the rbuf. */
result->u.n.childkeys[i] = memdup(childkeyptr, result->u.n.childkeylens[i]); result->u.n.childkeys[i] = kv_pair_malloc((void*)childkeyptr, result->u.n.childkeylens[i], 0, 0);
} }
//printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]); //printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]);
result->u.n.totalchildkeylens+=result->u.n.childkeylens[i]; result->u.n.totalchildkeylens+=result->u.n.childkeylens[i];
......
...@@ -75,14 +75,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b ...@@ -75,14 +75,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
thislorange=lorange; thislorange=lorange;
thislolen =lolen; thislolen =lolen;
} else { } else {
thislorange=node->u.n.childkeys[i-1]; thislorange=kv_pair_key(node->u.n.childkeys[i-1]);
thislolen =node->u.n.childkeylens[i-1]; thislolen =node->u.n.childkeylens[i-1];
} }
if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) { if (node->u.n.n_children==0 || i+1>=node->u.n.n_children) {
thishirange=hirange; thishirange=hirange;
thishilen =hilen; thishilen =hilen;
} else { } else {
thishirange=node->u.n.childkeys[i]; thishirange=kv_pair_key(node->u.n.childkeys[i]);
thishilen =node->u.n.childkeylens[i]; thishilen =node->u.n.childkeylens[i];
} }
{ {
...@@ -102,14 +102,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b ...@@ -102,14 +102,14 @@ int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, b
} }
for (i=0; i<node->u.n.n_children; i++) { for (i=0; i<node->u.n.n_children; i++) {
if (i>0) { if (i>0) {
if (lorange) assert(toku_keycompare(lorange,lolen, node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1])<0); if (lorange) assert(toku_keycompare(lorange,lolen, kv_pair_key(node->u.n.childkeys[i-1]), node->u.n.childkeylens[i-1])<0);
if (hirange) assert(toku_keycompare(node->u.n.childkeys[i-1], node->u.n.childkeylens[i-1], hirange, hilen)<=0); if (hirange) assert(toku_keycompare(kv_pair_key(node->u.n.childkeys[i-1]), node->u.n.childkeylens[i-1], hirange, hilen)<=0);
} }
if (recurse) { if (recurse) {
result|=toku_verify_brtnode(brt, node->u.n.children[i], result|=toku_verify_brtnode(brt, node->u.n.children[i],
(i==0) ? lorange : node->u.n.childkeys[i-1], (i==0) ? lorange : kv_pair_key(node->u.n.childkeys[i-1]),
(i==0) ? lolen : node->u.n.childkeylens[i-1], (i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i], (i==node->u.n.n_children-1) ? hirange : kv_pair_key(node->u.n.childkeys[i]),
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i], (i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
recurse, recurse,
node); node);
......
...@@ -119,20 +119,16 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE ...@@ -119,20 +119,16 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl) { static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck, unsigned int cl) {
int cmp; int cmp;
DBT mydbt; DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) { if (brt->flags & TOKU_DB_DUPSORT) {
bytevec k; unsigned int kl; assert(kv_pair_keylen(kv) + kv_pair_vallen(kv) == cl);
bytevec d; unsigned int dl; cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
assert(cl >= sizeof dl);
memcpy(&dl, ck, sizeof dl);
assert(cl >= dl - (sizeof dl));
kl = cl - dl - (sizeof dl);
k = ck + (sizeof dl);
d = ck + (sizeof dl) + kl;
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, k, kl));
if (cmp == 0 && data != 0) if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, fill_dbt(&mydbt, d, dl)); cmp = brt->dup_compare(brt->db, data, fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else } else {
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, ck, cl)); assert(kv_pair_keylen(kv) == cl);
cmp = brt->compare_fun(brt->db, key, fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp; return cmp;
} }
...@@ -694,7 +690,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -694,7 +690,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1]; node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1];
node->u.n.pivotflags[cnum] = node->u.n.pivotflags[cnum-1]; node->u.n.pivotflags[cnum] = node->u.n.pivotflags[cnum-1];
} }
node->u.n.childkeys[childnum]= (char*)childsplitk->data; node->u.n.childkeys[childnum]= (void*)childsplitk->data;
node->u.n.childkeylens[childnum]= childsplitk->size; node->u.n.childkeylens[childnum]= childsplitk->size;
node->u.n.pivotflags[childnum] = childsplitk->flags; node->u.n.pivotflags[childnum] = childsplitk->flags;
node->u.n.totalchildkeylens += childsplitk->size; node->u.n.totalchildkeylens += childsplitk->size;
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
* key is right after the length fields and the value is right after the key. * key is right after the length fields and the value is right after the key.
*/ */
struct kv_pair { struct kv_pair {
int keylen; unsigned int keylen;
int vallen; unsigned int vallen;
char key[]; char key[];
}; };
...@@ -19,15 +19,14 @@ static inline int kv_pair_size(struct kv_pair *pair) { ...@@ -19,15 +19,14 @@ static inline int kv_pair_size(struct kv_pair *pair) {
return sizeof (struct kv_pair) + pair->keylen + pair->vallen; return sizeof (struct kv_pair) + pair->keylen + pair->vallen;
} }
static inline void kv_pair_init(struct kv_pair *pair, void *key, int keylen, void *val, int vallen) { static inline void kv_pair_init(struct kv_pair *pair, const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
pair->keylen = keylen; pair->keylen = keylen;
memcpy(pair->key, key, keylen); memcpy(pair->key, key, keylen);
pair->vallen = vallen; pair->vallen = vallen;
memcpy(pair->key + keylen, val, vallen); memcpy(pair->key + keylen, val, vallen);
} }
static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, int vallen) { static inline struct kv_pair *kv_pair_malloc(const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen); struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair) if (pair)
kv_pair_init(pair, key, keylen, val, vallen); kv_pair_init(pair, key, keylen, val, vallen);
...@@ -35,7 +34,7 @@ static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, i ...@@ -35,7 +34,7 @@ static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, i
} }
/* replace the val, keep the same key */ /* replace the val, keep the same key */
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, int newvallen) { static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, unsigned int newvallen) {
struct kv_pair *pair = toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen); struct kv_pair *pair = toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
if (pair) { if (pair) {
pair->vallen = newvallen; pair->vallen = newvallen;
...@@ -51,50 +50,25 @@ static inline void kv_pair_free(struct kv_pair *pair) { ...@@ -51,50 +50,25 @@ static inline void kv_pair_free(struct kv_pair *pair) {
static inline void *kv_pair_key(struct kv_pair *pair) { static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key; return pair->key;
} }
static inline const void *kv_pair_key_const(const struct kv_pair *pair) { static inline const void *kv_pair_key_const(const struct kv_pair *pair) {
return pair->key; return pair->key;
} }
static inline unsigned int kv_pair_keylen(struct kv_pair *pair) {
static inline int kv_pair_keylen(struct kv_pair *pair) {
return pair->keylen; return pair->keylen;
} }
static inline void *kv_pair_val(struct kv_pair *pair) { static inline void *kv_pair_val(struct kv_pair *pair) {
return pair->key + pair->keylen; return pair->key + pair->keylen;
} }
static inline const void *kv_pair_val_const(const struct kv_pair *pair) { static inline const void *kv_pair_val_const(const struct kv_pair *pair) {
return pair->key + pair->keylen; return pair->key + pair->keylen;
} }
static inline int kv_pair_vallen(struct kv_pair *pair) { static inline unsigned int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen; return pair->vallen;
} }
/* use the low bit to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0;
}
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
#endif #endif
...@@ -69,6 +69,32 @@ static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b); ...@@ -69,6 +69,32 @@ static int __pma_compare_kv(PMA pma, struct kv_pair *a, struct kv_pair *b);
/**************************** end of static functions forward declarations. *********************/ /**************************** end of static functions forward declarations. *********************/
/* use the low bit in the kv_pair pointer to indicate an inuse pair that is deleted */
static inline int kv_pair_inuse(struct kv_pair *pair) {
return pair != 0;
}
static inline int kv_pair_deleted(struct kv_pair *pair) {
return ((long) pair & 1) != 0;
}
static inline int kv_pair_valid(struct kv_pair *pair) {
return kv_pair_inuse(pair) && !kv_pair_deleted(pair);
}
static inline struct kv_pair *kv_pair_set_deleted(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair | 1);
}
static inline struct kv_pair *kv_pair_ptr(struct kv_pair *pair) {
return (struct kv_pair *) ((long) pair & ~1);
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
#ifndef PMA_USE_MEMPOOL #ifndef PMA_USE_MEMPOOL
#define PMA_USE_MEMPOOL 1 #define PMA_USE_MEMPOOL 1
...@@ -1394,16 +1420,11 @@ int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk, ...@@ -1394,16 +1420,11 @@ int toku_pma_split(PMA origpma, unsigned int *origpma_size, DBT *splitk,
if (splitk) { if (splitk) {
struct kv_pair *a = pairs[spliti-1].pair; struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & TOKU_DB_DUPSORT) { if (origpma->dup_mode & TOKU_DB_DUPSORT) {
int kl = kv_pair_keylen(a); splitk->data = kv_pair_malloc(kv_pair_key(a), kv_pair_keylen(a), kv_pair_val(a), kv_pair_vallen(a));
int vl = kv_pair_vallen(a); splitk->size = kv_pair_keylen(a) + kv_pair_vallen(a);
splitk->size = (sizeof vl) + kl + vl;
splitk->data = toku_malloc(splitk->size);
memcpy(splitk->data, &vl, sizeof vl);
memcpy(splitk->data + (sizeof vl), kv_pair_key(a), kl);
memcpy(splitk->data + (sizeof vl) + kl, kv_pair_val(a), vl);
} else { } else {
splitk->data = kv_pair_malloc(kv_pair_key(a), kv_pair_keylen(a), 0, 0);
splitk->size = kv_pair_keylen(a); splitk->size = kv_pair_keylen(a);
splitk->data = memdup(kv_pair_key(a), splitk->size);
} }
splitk->flags = BRT_PIVOT_PRESENT_L; splitk->flags = BRT_PIVOT_PRESENT_L;
if (spliti < npairs && __pma_compare_kv(origpma, a, pairs[spliti].pair) == 0) { if (spliti < npairs && __pma_compare_kv(origpma, a, pairs[spliti].pair) == 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment