Commit dfd26d86 authored by Rich Prohaska's avatar Rich Prohaska

serialize pivotflags

git-svn-id: file:///svn/tokudb@608 c7de825b-a66e-492c-adef-691d508d4ae1
parent db85f45b
......@@ -44,6 +44,7 @@ struct brtnode {
However, in the absense of duplicate keys, child 1's keys *are* > childkeys[0]. */
unsigned int childkeylens[TREE_FANOUT];
unsigned int totalchildkeylens;
unsigned char pivotflags[TREE_FANOUT];
DISKOFF children[TREE_FANOUT+1]; /* unused if height==0 */ /* Note: The last element of these arrays is used only temporarily while splitting a node. */
HASHTABLE htables[TREE_FANOUT+1];
unsigned int n_bytes_in_hashtable[TREE_FANOUT+1]; /* how many bytes are in each hashtable (including overheads) */
......@@ -95,7 +96,7 @@ struct brt {
/* serialization code */
void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node);
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize, int (*)(DB *, const DBT*, const DBT*));
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int flags, int nodesize, int (*bt_compare)(DB *, const DBT*, const DBT*), int (*dup_compare)(DB *, const DBT *, const DBT *));
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -29,6 +29,7 @@ void test_serialize(void) {
sn.u.n.childkeys[0] = hello_string = toku_strdup("hello");
sn.u.n.childkeylens[0] = 6;
sn.u.n.totalchildkeylens = 6;
sn.u.n.pivotflags[0] = 42;
sn.u.n.children[0] = sn.nodesize*30;
sn.u.n.children[1] = sn.nodesize*35;
sn.u.n.child_subtree_fingerprints[0] = random();
......@@ -42,7 +43,7 @@ void test_serialize(void) {
serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize, 0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, 0, nodesize, 0, 0);
assert(r==0);
assert(dn->thisnodename==nodesize*20);
......@@ -54,6 +55,7 @@ void test_serialize(void) {
assert(strcmp(dn->u.n.childkeys[0], "hello")==0);
assert(dn->u.n.childkeylens[0]==6);
assert(dn->u.n.totalchildkeylens==6);
assert(dn->u.n.pivotflags[0]==42);
assert(dn->u.n.children[0]==nodesize*30);
assert(dn->u.n.children[1]==nodesize*35);
{
......
......@@ -31,6 +31,7 @@ static unsigned int serialize_brtnode_size_slow(BRTNODE node) {
size+=4; /* subtree fingerprint. */
for (i=0; i<node->u.n.n_children-1; i++) {
size+=4;
size+=1; /* pivotflags */
csize+=node->u.n.childkeylens[i];
}
for (i=0; i<node->u.n.n_children; i++) {
......@@ -68,7 +69,7 @@ unsigned int serialize_brtnode_size (BRTNODE node) {
if (node->height>0) {
result+=4; /* n_children */
result+=4; /* subtree fingerpirnt */
result+=4*(node->u.n.n_children-1); /* key lengths */
result+=(4+1)*(node->u.n.n_children-1); /* key lengths + pivotflags*/
result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result+=(8+4+4)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, and the subtree fingerprint. */
result+=node->u.n.n_bytes_in_hashtables;
......@@ -122,6 +123,8 @@ void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) {
wbuf_int(&w, node->u.n.child_subtree_fingerprints[i]);
}
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
for (i=0; i<node->u.n.n_children-1; i++)
wbuf_char(&w, node->u.n.pivotflags[i]);
for (i=0; i<node->u.n.n_children-1; i++) {
wbuf_bytes(&w, node->u.n.childkeys[i], node->u.n.childkeylens[i]);
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
......@@ -178,8 +181,9 @@ void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) {
toku_free(buf);
}
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize,
int (*compare_fun)(DB *, const DBT *, const DBT *)) {
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int flags, int nodesize,
int (*bt_compare)(DB *, const DBT *, const DBT *),
int (*dup_compare)(DB *, const DBT *, const DBT *)) {
TAGMALLOC(BRTNODE, result);
struct rbuf rc;
int i;
......@@ -272,6 +276,8 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesiz
result->u.n.child_subtree_fingerprints[i]= childfp;
check_subtree_fingerprint += childfp;
}
for (i=0; i<result->u.n.n_children-1; i++)
result->u.n.pivotflags[i] = rbuf_char(&rc);
for (i=0; i<result->u.n.n_children-1; i++) {
bytevec childkeyptr;
rbuf_bytes(&rc, &childkeyptr, &result->u.n.childkeylens[i]); /* Returns a pointer into the rbuf. */
......@@ -335,11 +341,13 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesiz
} else {
int n_in_buf = rbuf_int(&rc);
result->u.l.n_bytes_in_buffer = 0;
r=pma_create(&result->u.l.buffer, compare_fun, nodesize);
r=pma_create(&result->u.l.buffer, bt_compare, nodesize);
if (r!=0) {
if (0) { died_21: pma_free(&result->u.l.buffer); }
goto died1;
}
pma_set_dup_mode(result->u.l.buffer, flags);
if (flags & DB_DUPSORT) pma_set_dup_compare(result->u.l.buffer, dup_compare);
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
#define BRT_USE_PMA_BULK_INSERT 1
#if BRT_USE_PMA_BULK_INSERT
......
......@@ -176,7 +176,8 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod
int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, t->nodesize, t->compare_fun);
int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize,
t->compare_fun, t->dup_compare);
if (r == 0)
*sizep = brtnode_size(*result);
*written_lsn = (*result)->lsn;
......@@ -449,6 +450,7 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT
for (i=0; i<n_children_in_a-1; i++) {
A->u.n.childkeys[i] = node->u.n.childkeys[i];
A->u.n.childkeylens[i] = node->u.n.childkeylens[i];
A->u.n.pivotflags[i] = node->u.n.pivotflags[i];
A->u.n.totalchildkeylens += node->u.n.childkeylens[i];
node->u.n.totalchildkeylens -= node->u.n.childkeylens[i];
node->u.n.childkeys[i] = 0;
......@@ -456,12 +458,14 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = node->u.n.childkeylens[n_children_in_a-1];
splitk->flags = node->u.n.pivotflags[n_children_in_a-1];
node->u.n.totalchildkeylens -= node->u.n.childkeylens[n_children_in_a-1];
node->u.n.childkeys[n_children_in_a-1]=0;
node->u.n.childkeylens[n_children_in_a-1]=0;
for (i=n_children_in_a; i<node->u.n.n_children-1; i++) {
B->u.n.childkeys[i-n_children_in_a] = node->u.n.childkeys[i];
B->u.n.childkeylens[i-n_children_in_a] = node->u.n.childkeylens[i];
B->u.n.pivotflags[i-n_children_in_a] = node->u.n.pivotflags[i];
B->u.n.totalchildkeylens += node->u.n.childkeylens[i];
node->u.n.totalchildkeylens -= node->u.n.childkeylens[i];
node->u.n.childkeys[i] = 0;
......@@ -661,9 +665,11 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1];
node->u.n.pivotflags[cnum] = node->u.n.pivotflags[cnum-1];
}
node->u.n.childkeys[childnum]= (char*)childsplitk->data;
node->u.n.childkeylens[childnum]= childsplitk->size;
node->u.n.pivotflags[childnum] = childsplitk->flags;
node->u.n.totalchildkeylens += childsplitk->size;
node->u.n.n_children++;
......@@ -1452,6 +1458,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->u.n.childkeys[0] = splitk.data;
newroot->u.n.childkeylens[0] = splitk.size;
newroot->u.n.pivotflags[0] = splitk.flags;
newroot->u.n.totalchildkeylens=splitk.size;
newroot->u.n.children[0]=nodea->thisnodename;
newroot->u.n.children[1]=nodeb->thisnodename;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment