Commit ea9cd982 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Put the parent pointers into every brtnode

git-svn-id: file:///svn/tokudb@388 c7de825b-a66e-492c-adef-691d508d4ae1
parent bbb3a7c5
...@@ -24,6 +24,7 @@ struct brtnode { ...@@ -24,6 +24,7 @@ struct brtnode {
enum typ_tag tag; enum typ_tag tag;
unsigned int nodesize; unsigned int nodesize;
diskoff thisnodename; diskoff thisnodename;
BRTNODE parent_brtnode; /* Invariant: The parent of an in-memory node must be in main memory. This is so we can find and update the down pointer when we change the diskoff of a node. */
int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */ int height; /* height is always >= 0. 0 for leaf, >0 for nonleaf. */
int dirty; int dirty;
union node { union node {
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
const BRTNODE null_brtnode=0;
extern long long n_items_malloced; extern long long n_items_malloced;
/* Frees a node, including all the stuff in the hash table. */ /* Frees a node, including all the stuff in the hash table. */
...@@ -69,6 +71,37 @@ long brtnode_size(BRTNODE node) { ...@@ -69,6 +71,37 @@ long brtnode_size(BRTNODE node) {
return size; return size;
} }
void fix_up_parent_pointers_of_children (BRT t, BRTNODE node) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children; i++) {
void *v;
int r = cachetable_maybe_get_and_pin(t->cf, node->u.n.children[i], &v);
if (r==0) {
BRTNODE child = v;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, v);
child->parent_brtnode = node;
r=cachetable_unpin(t->cf, node->u.n.children[i], 0);
}
}
}
void fix_up_parent_pointers_of_children_now_that_parent_is_gone (CACHEFILE cf, BRTNODE node) {
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++) {
void *v;
int r = cachetable_maybe_get_and_pin(cf, node->u.n.children[i], &v);
if (r==0) {
BRTNODE child = v;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, v);
child->parent_brtnode = 0;
r=cachetable_unpin(cf, node->u.n.children[i], 0);
}
}
}
void brtnode_flush_callback (CACHEFILE cachefile, diskoff nodename, void *brtnode_v, long size __attribute((unused)), int write_me, int keep_me) { void brtnode_flush_callback (CACHEFILE cachefile, diskoff nodename, void *brtnode_v, long size __attribute((unused)), int write_me, int keep_me) {
BRTNODE brtnode = brtnode_v; BRTNODE brtnode = brtnode_v;
if (0) { if (0) {
...@@ -76,7 +109,28 @@ void brtnode_flush_callback (CACHEFILE cachefile, diskoff nodename, void *brtnod ...@@ -76,7 +109,28 @@ void brtnode_flush_callback (CACHEFILE cachefile, diskoff nodename, void *brtnod
if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer); if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer);
printf("\n"); printf("\n");
} }
fix_up_parent_pointers_of_children_now_that_parent_is_gone(cachefile, brtnode);
assert(brtnode->thisnodename==nodename); assert(brtnode->thisnodename==nodename);
{
BRTNODE parent = brtnode->parent_brtnode;
//printf("%s:%d Looking at %p (offset=%lld) tag=%d parent=%p height=%d\n", __FILE__, __LINE__, brtnode, nodename, brtnode->tag, parent, brtnode->height);
if (parent!=0) {
/* make sure we are one of the children of the parent. */
int i;
//int pheight=0;//parent->height;
//int nc = 0;//parent->u.n.n_children;
//printf("%s:%d parent height=%d has %d children: The first few are", __FILE__, __LINE__, pheight, nc);
assert(parent->u.n.n_children<=TREE_FANOUT+1);
for (i=0; i<parent->u.n.n_children; i++) {
//printf(" %lld\n", parent->u.n.children[i]);
if (parent->u.n.children[i]==nodename) goto ok;
}
printf("%s:%d Whoops, the parent of %p (%p) isn't right\n", __FILE__, __LINE__, brtnode, parent);
assert(0);
ok: ;
//printf("\n");
}
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) { if (write_me) {
serialize_brtnode_to(cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode); serialize_brtnode_to(cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode);
...@@ -94,6 +148,8 @@ int brtnode_fetch_callback (CACHEFILE cachefile, diskoff nodename, void **brtnod ...@@ -94,6 +148,8 @@ int brtnode_fetch_callback (CACHEFILE cachefile, diskoff nodename, void **brtnod
int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, nodesize); int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, nodesize);
if (r == 0) if (r == 0)
*sizep = brtnode_size(*result); *sizep = brtnode_size(*result);
//(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r; return r;
} }
...@@ -211,7 +267,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height) ...@@ -211,7 +267,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height)
} }
} }
static void create_new_brtnode (BRT t, BRTNODE *result, int height) { static void create_new_brtnode (BRT t, BRTNODE *result, int height, BRTNODE parent_brtnode) {
TAGMALLOC(BRTNODE, n); TAGMALLOC(BRTNODE, n);
int r; int r;
diskoff name = malloc_diskblock(t, t->h->nodesize); diskoff name = malloc_diskblock(t, t->h->nodesize);
...@@ -221,8 +277,10 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height) { ...@@ -221,8 +277,10 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height) {
initialize_brtnode(t, n, name, height); initialize_brtnode(t, n, name, height);
*result = n; *result = n;
assert(n->nodesize>0); assert(n->nodesize>0);
n->parent_brtnode = parent_brtnode;
//printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode);
r=cachetable_put_size(t->cf, n->thisnodename, n, brtnode_size(n), r=cachetable_put_size(t->cf, n->thisnodename, n, brtnode_size(n),
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
assert(r==0); assert(r==0);
} }
...@@ -276,8 +334,8 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -276,8 +334,8 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
BRTNODE A,B; BRTNODE A,B;
assert(node->height==0); assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &A, 0); create_new_brtnode(t, &A, 0, node->parent_brtnode);
create_new_brtnode(t, &B, 0); create_new_brtnode(t, &B, 0, node->parent_brtnode);
//printf("%s:%d A PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer); //printf("%s:%d A PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
//printf("%s:%d B PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer); //printf("%s:%d B PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
assert(A->nodesize>0); assert(A->nodesize>0);
...@@ -341,10 +399,11 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT ...@@ -341,10 +399,11 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT
assert(node->height>0); assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */ assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &A, node->height); create_new_brtnode(t, &A, node->height, node->parent_brtnode);
create_new_brtnode(t, &B, node->height); create_new_brtnode(t, &B, node->height, node->parent_brtnode);
A->u.n.n_children=n_children_in_a; A->u.n.n_children=n_children_in_a;
B->u.n.n_children=node->u.n.n_children-n_children_in_a; B->u.n.n_children=node->u.n.n_children-n_children_in_a;
//printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename); //printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{ {
/* The first n_children_in_a go into node a. /* The first n_children_in_a go into node a.
...@@ -392,6 +451,10 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT ...@@ -392,6 +451,10 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT
node->u.n.childkeylens[i] = 0; node->u.n.childkeylens[i] = 0;
} }
assert(node->u.n.totalchildkeylens==0); assert(node->u.n.totalchildkeylens==0);
fix_up_parent_pointers_of_children(t, A);
fix_up_parent_pointers_of_children(t, B);
} }
{ {
...@@ -644,7 +707,9 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, ...@@ -644,7 +707,9 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
r = cachetable_get_and_pin(t->cf, targetchild, &childnode_v, r = cachetable_get_and_pin(t->cf, targetchild, &childnode_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
if (r!=0) return r; if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
child=childnode_v; child=childnode_v;
child->parent_brtnode = node;
verify_counts(child); verify_counts(child);
//printf("%s:%d height=%d n_bytes_in_hashtable = {%d, %d, %d, ...}\n", __FILE__, __LINE__, child->height, child->n_bytes_in_hashtable[0], child->n_bytes_in_hashtable[1], child->n_bytes_in_hashtable[2]); //printf("%s:%d height=%d n_bytes_in_hashtable = {%d, %d, %d, ...}\n", __FILE__, __LINE__, child->height, child->n_bytes_in_hashtable[0], child->n_bytes_in_hashtable[1], child->n_bytes_in_hashtable[2]);
if (child->height>0 && child->u.n.n_children>0) assert(child->u.n.children[child->u.n.n_children-1]!=0); if (child->height>0 && child->u.n.n_children>0) assert(child->u.n.children[child->u.n.n_children-1]!=0);
...@@ -865,6 +930,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -865,6 +930,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
return r; return r;
child = child_v; child = child_v;
child->parent_brtnode = node;
child_did_split = 0; child_did_split = 0;
r = brtnode_put_cmd(t, child, cmd, r = brtnode_put_cmd(t, child, cmd,
...@@ -921,42 +987,6 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd, ...@@ -921,42 +987,6 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &type); found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &type);
if (0) { // It is faster to do this, except on yobiduck where things grind to a halt.
void *child_v;
if (node->height>0 &&
0 == cachetable_maybe_get_and_pin(t->cf, node->u.n.children[childnum], &child_v)) {
/* If the child is in memory, then go ahead and put it in the child. */
BRTNODE child = child_v;
if (found) {
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
brtnode_set_dirty(node);
}
{
int child_did_split;
BRTNODE childa, childb;
DBT childsplitk;
int r = brtnode_put_cmd(t, child, cmd,
&child_did_split, &childa, &childb, &childsplitk, 0, txn);
if (r!=0) return r;
if (child_did_split) {
r=handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
k->app_private, db, txn);
if (r!=0) return r;
} else {
cachetable_unpin_size(t->cf, child->thisnodename, child->dirty, brtnode_size(child));
*did_split = 0;
}
}
return 0;
}
}
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, ""); if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
verify_counts(node); verify_counts(node);
if (found) { if (found) {
...@@ -1041,12 +1071,14 @@ static int setup_brt_root_node (BRT t, diskoff offset) { ...@@ -1041,12 +1071,14 @@ static int setup_brt_root_node (BRT t, diskoff offset) {
initialize_brtnode(t, node, initialize_brtnode(t, node,
offset, /* the location is one nodesize offset from 0. */ offset, /* the location is one nodesize offset from 0. */
0); 0);
node->parent_brtnode=0;
if (0) { if (0) {
printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer); printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer);
printf("%s:%d put root at %lld\n", __FILE__, __LINE__, offset); printf("%s:%d put root at %lld\n", __FILE__, __LINE__, offset);
} }
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, node, node->thisnodename);
r=cachetable_put_size(t->cf, offset, node, brtnode_size(node), r=cachetable_put_size(t->cf, offset, node, brtnode_size(node),
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
if (r!=0) { if (r!=0) {
toku_free(node); toku_free(node);
return r; return r;
...@@ -1217,6 +1249,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE ...@@ -1217,6 +1249,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
brt->h->dirty=1; brt->h->dirty=1;
// printf("new_root %lld\n", newroot_diskoff); // printf("new_root %lld\n", newroot_diskoff);
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1); initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
newroot->parent_brtnode=0;
newroot->u.n.n_children=2; newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey); //printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->u.n.childkeys[0] = splitk.data; newroot->u.n.childkeys[0] = splitk.data;
...@@ -1258,7 +1291,9 @@ int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) { ...@@ -1258,7 +1291,9 @@ int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) { brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) {
goto died0; goto died0;
} }
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
node->parent_brtnode = 0;
if (debug) printf("%s:%d node inserting\n", __FILE__, __LINE__); if (debug) printf("%s:%d node inserting\n", __FILE__, __LINE__);
did_split = 0; did_split = 0;
result = brtnode_put_cmd(brt, node, cmd, result = brtnode_put_cmd(brt, node, cmd,
...@@ -1301,7 +1336,7 @@ int brt_insert (BRT brt, DBT *key, DBT *val, DB* db, TOKUTXN txn) { ...@@ -1301,7 +1336,7 @@ int brt_insert (BRT brt, DBT *key, DBT *val, DB* db, TOKUTXN txn) {
return r; return r;
} }
int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) { int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db, BRTNODE parent_brtnode) {
int result; int result;
void *node_v; void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, int r = cachetable_get_and_pin(brt->cf, off, &node_v,
...@@ -1312,6 +1347,10 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) { ...@@ -1312,6 +1347,10 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) {
BRTNODE node = node_v; BRTNODE node = node_v;
int childnum; int childnum;
//printf("%s:%d pin %p height=%d children=%d\n", __FILE__, __LINE__, node_v, node->height, node->u.n.n_children);
node->parent_brtnode = parent_brtnode;
if (node->height==0) { if (node->height==0) {
result = pma_lookup(node->u.l.buffer, k, v, db); result = pma_lookup(node->u.l.buffer, k, v, db);
//printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen); //printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen);
...@@ -1343,7 +1382,7 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) { ...@@ -1343,7 +1382,7 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) {
} }
} }
result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db); result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db, node);
r = cachetable_unpin_size(brt->cf, off, 0, 0); r = cachetable_unpin_size(brt->cf, off, 0, 0);
assert(r == 0); assert(r == 0);
return result; return result;
...@@ -1362,7 +1401,7 @@ int brt_lookup (BRT brt, DBT *k, DBT *v, DB *db) { ...@@ -1362,7 +1401,7 @@ int brt_lookup (BRT brt, DBT *k, DBT *v, DB *db) {
return r; return r;
} }
rootp = calculate_root_offset_pointer(brt); rootp = calculate_root_offset_pointer(brt);
if ((r = brt_lookup_node(brt, *rootp, k, v, db))) { if ((r = brt_lookup_node(brt, *rootp, k, v, db, 0))) {
// printf("%s:%d\n", __FILE__, __LINE__); // printf("%s:%d\n", __FILE__, __LINE__);
goto died0; goto died0;
} }
...@@ -1387,17 +1426,19 @@ int brt_delete(BRT brt, DBT *key, DB *db) { ...@@ -1387,17 +1426,19 @@ int brt_delete(BRT brt, DBT *key, DB *db) {
return r; return r;
} }
int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse); int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode);
int dump_brtnode (BRT brt, diskoff off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen) { int dump_brtnode (BRT brt, diskoff off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, BRTNODE parent_brtnode) {
int result=0; int result=0;
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, int r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
assert(r==0); assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
result=verify_brtnode(brt, off, lorange, lolen, hirange, hilen, 0); node->parent_brtnode = parent_brtnode;
result=verify_brtnode(brt, off, lorange, lolen, hirange, hilen, 0, parent_brtnode);
printf("%*sNode=%p\n", depth, "", node); printf("%*sNode=%p\n", depth, "", node);
if (node->height>0) { if (node->height>0) {
printf("%*sNode %lld nodesize=%d height=%d n_children=%d n_bytes_in_hashtables=%d keyrange=%s %s\n", printf("%*sNode %lld nodesize=%d height=%d n_children=%d n_bytes_in_hashtables=%d keyrange=%s %s\n",
...@@ -1423,7 +1464,8 @@ int dump_brtnode (BRT brt, diskoff off, int depth, bytevec lorange, ITEMLEN lole ...@@ -1423,7 +1464,8 @@ int dump_brtnode (BRT brt, diskoff off, int depth, bytevec lorange, ITEMLEN lole
(i==0) ? lorange : node->u.n.childkeys[i-1], (i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : node->u.n.childkeylens[i-1], (i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i], (i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i] (i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
node
); );
} }
} }
...@@ -1448,12 +1490,12 @@ int dump_brt (BRT brt) { ...@@ -1448,12 +1490,12 @@ int dump_brt (BRT brt) {
} }
rootp = calculate_root_offset_pointer(brt); rootp = calculate_root_offset_pointer(brt);
printf("split_count=%d\n", split_count); printf("split_count=%d\n", split_count);
if ((r = dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0))) goto died0; if ((r = dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0;
if ((r = unpin_brt_header(brt))!=0) return r; if ((r = unpin_brt_header(brt))!=0) return r;
return 0; return 0;
} }
int show_brtnode_blocknumbers (BRT brt, diskoff off) { int show_brtnode_blocknumbers (BRT brt, diskoff off, BRTNODE parent_brtnode) {
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
int i,r; int i,r;
...@@ -1463,11 +1505,13 @@ int show_brtnode_blocknumbers (BRT brt, diskoff off) { ...@@ -1463,11 +1505,13 @@ int show_brtnode_blocknumbers (BRT brt, diskoff off) {
if (0) { died0: cachetable_unpin_size(brt->cf, off, 0, 0); } if (0) { died0: cachetable_unpin_size(brt->cf, off, 0, 0); }
return r; return r;
} }
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
node->parent_brtnode = parent_brtnode;
printf(" %lld", off/brt->h->nodesize); printf(" %lld", off/brt->h->nodesize);
if (node->height>0) { if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++) { for (i=0; i<node->u.n.n_children; i++) {
if ((r=show_brtnode_blocknumbers(brt, node->u.n.children[i]))) goto died0; if ((r=show_brtnode_blocknumbers(brt, node->u.n.children[i], node))) goto died0;
} }
} }
r = cachetable_unpin_size(brt->cf, off, 0, 0); r = cachetable_unpin_size(brt->cf, off, 0, 0);
...@@ -1483,13 +1527,13 @@ int show_brt_blocknumbers (BRT brt) { ...@@ -1483,13 +1527,13 @@ int show_brt_blocknumbers (BRT brt) {
} }
rootp = calculate_root_offset_pointer(brt); rootp = calculate_root_offset_pointer(brt);
printf("BRT %p has blocks:", brt); printf("BRT %p has blocks:", brt);
if ((r=show_brtnode_blocknumbers (brt, *rootp))) goto died0; if ((r=show_brtnode_blocknumbers (brt, *rootp, 0))) goto died0;
printf("\n"); printf("\n");
if ((r = unpin_brt_header(brt))!=0) return r; if ((r = unpin_brt_header(brt))!=0) return r;
return 0; return 0;
} }
int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse) { int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode) {
int result=0; int result=0;
BRTNODE node; BRTNODE node;
void *node_v; void *node_v;
...@@ -1497,7 +1541,9 @@ int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, byteve ...@@ -1497,7 +1541,9 @@ int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, byteve
if ((r = cachetable_get_and_pin(brt->cf, off, &node_v, if ((r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize)))
return r; return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
node->parent_brtnode = parent_brtnode;
if (node->height>0) { if (node->height>0) {
int i; int i;
for (i=0; i< node->u.n.n_children-1; i++) { for (i=0; i< node->u.n.n_children-1; i++) {
...@@ -1543,7 +1589,8 @@ int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, byteve ...@@ -1543,7 +1589,8 @@ int verify_brtnode (BRT brt, diskoff off, bytevec lorange, ITEMLEN lolen, byteve
(i==0) ? lolen : node->u.n.childkeylens[i-1], (i==0) ? lolen : node->u.n.childkeylens[i-1],
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i], (i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i], (i==node->u.n.n_children-1) ? hilen : node->u.n.childkeylens[i],
recurse); recurse,
node);
} }
} }
} }
...@@ -1559,7 +1606,7 @@ int verify_brt (BRT brt) { ...@@ -1559,7 +1606,7 @@ int verify_brt (BRT brt) {
return r; return r;
} }
rootp = calculate_root_offset_pointer(brt); rootp = calculate_root_offset_pointer(brt);
if ((r=verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1))) goto died0; if ((r=verify_brtnode(brt, *rootp, 0, 0, 0, 0, 1, null_brtnode))) goto died0;
if ((r = unpin_brt_header(brt))!=0) return r; if ((r = unpin_brt_header(brt))!=0) return r;
return 0; return 0;
} }
...@@ -1895,7 +1942,7 @@ void brt_cursor_print(BRT_CURSOR cursor) { ...@@ -1895,7 +1942,7 @@ void brt_cursor_print(BRT_CURSOR cursor) {
printf("\n"); printf("\n");
} }
int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN txn) { int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt=cursor->brt; BRT brt=cursor->brt;
void *node_v; void *node_v;
...@@ -1906,6 +1953,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t ...@@ -1906,6 +1953,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t
return r; return r;
} }
BRTNODE node = node_v; BRTNODE node = node_v;
node->parent_brtnode = parent_brtnode;
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT); assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node; cursor->path[cursor->path_len++] = node;
if (node->height>0) { if (node->height>0) {
...@@ -1929,7 +1977,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t ...@@ -1929,7 +1977,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
goto try_last_child; goto try_last_child;
} }
r=brtcurs_set_position_last (cursor, node->u.n.children[childnum], db, txn); r=brtcurs_set_position_last (cursor, node->u.n.children[childnum], db, txn, node);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); assert(node == cursor->path[cursor->path_len-1]);
...@@ -1956,7 +2004,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t ...@@ -1956,7 +2004,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN t
} }
} }
int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN txn) { int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt=cursor->brt; BRT brt=cursor->brt;
void *node_v; void *node_v;
...@@ -1967,6 +2015,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN ...@@ -1967,6 +2015,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN
return r; return r;
} }
BRTNODE node = node_v; BRTNODE node = node_v;
node->parent_brtnode = parent_brtnode;
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT); assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node; cursor->path[cursor->path_len++] = node;
if (node->height>0) { if (node->height>0) {
...@@ -1990,7 +2039,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN ...@@ -1990,7 +2039,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off, DB *db, TOKUTXN
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
goto try_first_child; goto try_first_child;
} }
r=brtcurs_set_position_first (cursor, node->u.n.children[childnum], db, txn); r=brtcurs_set_position_first (cursor, node->u.n.children[childnum], db, txn, node);
if (r == 0) if (r == 0)
return r; return r;
assert(node == cursor->path[cursor->path_len-1]); assert(node == cursor->path[cursor->path_len-1]);
...@@ -2053,7 +2102,7 @@ int brtcurs_set_position_next2(BRT_CURSOR cursor, DB *db, TOKUTXN txn) { ...@@ -2053,7 +2102,7 @@ int brtcurs_set_position_next2(BRT_CURSOR cursor, DB *db, TOKUTXN txn) {
node = cursor->path[cursor->path_len-1]; node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1]; childnum = cursor->pathcnum[cursor->path_len-1];
} }
r = brtcurs_set_position_first(cursor, node->u.n.children[childnum], db, txn); r = brtcurs_set_position_first(cursor, node->u.n.children[childnum], db, txn, node);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); assert(node == cursor->path[cursor->path_len-1]);
...@@ -2113,7 +2162,7 @@ int brtcurs_set_position_prev2(BRT_CURSOR cursor, DB *db, TOKUTXN txn) { ...@@ -2113,7 +2162,7 @@ int brtcurs_set_position_prev2(BRT_CURSOR cursor, DB *db, TOKUTXN txn) {
node = cursor->path[cursor->path_len-1]; node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1]; childnum = cursor->pathcnum[cursor->path_len-1];
} }
r = brtcurs_set_position_last(cursor, node->u.n.children[childnum], 0, txn); r = brtcurs_set_position_last(cursor, node->u.n.children[childnum], db, txn, node);
if (r == 0) if (r == 0)
return 0; return 0;
assert(node == cursor->path[cursor->path_len-1]); assert(node == cursor->path[cursor->path_len-1]);
...@@ -2136,7 +2185,7 @@ int brtcurs_set_position_prev (BRT_CURSOR cursor, DB *db, TOKUTXN txn) { ...@@ -2136,7 +2185,7 @@ int brtcurs_set_position_prev (BRT_CURSOR cursor, DB *db, TOKUTXN txn) {
return 0; return 0;
} }
int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag, DB *db, TOKUTXN txn) { int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag, DB *db, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt = cursor->brt; BRT brt = cursor->brt;
void *node_v; void *node_v;
int r; int r;
...@@ -2147,6 +2196,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag ...@@ -2147,6 +2196,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag
BRTNODE node = node_v; BRTNODE node = node_v;
int childnum; int childnum;
node->parent_brtnode = parent_brtnode;
if (node->height > 0) { if (node->height > 0) {
cursor->path_len += 1; cursor->path_len += 1;
...@@ -2166,7 +2216,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag ...@@ -2166,7 +2216,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag
} }
break; break;
} }
r = brtcurs_set_key(cursor, node->u.n.children[childnum], key, val, flag, db, txn); r = brtcurs_set_key(cursor, node->u.n.children[childnum], key, val, flag, db, txn, node);
if (r != 0) if (r != 0)
brt_node_remove_cursor(node, childnum, cursor); brt_node_remove_cursor(node, childnum, cursor);
} else { } else {
...@@ -2196,7 +2246,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag ...@@ -2196,7 +2246,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DBT *val, int flag
return r; return r;
} }
int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN txn) { int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt = cursor->brt; BRT brt = cursor->brt;
void *node_v; void *node_v;
int r; int r;
...@@ -2207,6 +2257,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN ...@@ -2207,6 +2257,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN
BRTNODE node = node_v; BRTNODE node = node_v;
int childnum; int childnum;
node->parent_brtnode = parent_brtnode;
if (node->height > 0) { if (node->height > 0) {
cursor->path_len += 1; cursor->path_len += 1;
...@@ -2227,7 +2278,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN ...@@ -2227,7 +2278,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db, TOKUTXN
} }
break; break;
} }
r = brtcurs_set_range(cursor, node->u.n.children[childnum], key, db, txn); r = brtcurs_set_range(cursor, node->u.n.children[childnum], key, db, txn, node);
if (r != 0) { if (r != 0) {
node = cursor->path[cursor->path_len-1]; node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1]; childnum = cursor->pathcnum[cursor->path_len-1];
...@@ -2316,7 +2367,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO ...@@ -2316,7 +2367,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO
do_db_last: do_db_last:
r=unpin_cursor(cursor); if (r!=0) goto died0; r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0); assert(cursor->pmacurs == 0);
r=brtcurs_set_position_last(cursor, *rootp, db, txn); if (r!=0) goto died0; r=brtcurs_set_position_last(cursor, *rootp, db, txn, null_brtnode); if (r!=0) goto died0;
r=pma_cursor_get_current(cursor->pmacurs, kbt, vbt); r=pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
...@@ -2324,7 +2375,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO ...@@ -2324,7 +2375,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO
do_db_first: do_db_first:
r=unpin_cursor(cursor); if (r!=0) goto died0; r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0); assert(cursor->pmacurs == 0);
r=brtcurs_set_position_first(cursor, *rootp, db, txn); if (r!=0) goto died0; r=brtcurs_set_position_first(cursor, *rootp, db, txn, null_brtnode); if (r!=0) goto died0;
r=pma_cursor_get_current(cursor->pmacurs, kbt, vbt); r=pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
...@@ -2345,7 +2396,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO ...@@ -2345,7 +2396,7 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO
case DB_SET: case DB_SET:
r = unpin_cursor(cursor); r = unpin_cursor(cursor);
assert(r == 0); assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_SET, db, txn); r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_SET, db, txn, null_brtnode);
if (r != 0) goto died0; if (r != 0) goto died0;
r = pma_cursor_get_current(cursor->pmacurs, kbt, vbt); r = pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
if (r != 0) goto died0; if (r != 0) goto died0;
...@@ -2353,13 +2404,13 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO ...@@ -2353,13 +2404,13 @@ int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, DB *db, TO
case DB_GET_BOTH: case DB_GET_BOTH:
r = unpin_cursor(cursor); r = unpin_cursor(cursor);
assert(r == 0); assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_GET_BOTH, db, txn); r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_GET_BOTH, db, txn, null_brtnode);
if (r != 0) goto died0; if (r != 0) goto died0;
break; break;
case DB_SET_RANGE: case DB_SET_RANGE:
r = unpin_cursor(cursor); r = unpin_cursor(cursor);
assert(r == 0); assert(r == 0);
r = brtcurs_set_range(cursor, *rootp, kbt, db, txn); r = brtcurs_set_range(cursor, *rootp, kbt, db, txn, null_brtnode);
if (r != 0) goto died0; if (r != 0) goto died0;
r = pma_cursor_get_current(cursor->pmacurs, kbt, vbt); r = pma_cursor_get_current(cursor->pmacurs, kbt, vbt);
if (r != 0) goto died0; if (r != 0) goto died0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment