Commit 7e429177 authored by Rich Prohaska's avatar Rich Prohaska

tree cursors that can walk the tree

git-svn-id: file:///svn/tokudb@170 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0bf72887
......@@ -36,7 +36,8 @@ struct brtnode {
HASHTABLE htables[TREE_FANOUT+1];
unsigned int n_bytes_in_hashtable[TREE_FANOUT+1]; /* how many bytes are in each hashtable (including overheads) */
unsigned int n_bytes_in_hashtables;
} n;
unsigned int n_cursors[TREE_FANOUT+1];
} n;
struct leaf {
PMA buffer;
unsigned int n_bytes_in_buffer;
......@@ -92,3 +93,47 @@ void brtnode_free (BRTNODE *node);
#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF)
#endif
#define CURSOR_PATHLEN_LIMIT 256
struct brt_cursor {
BRT brt;
int path_len; /* -1 if the cursor points nowhere. */
BRTNODE path[CURSOR_PATHLEN_LIMIT]; /* Include the leaf (last). These are all pinned. */
int pathcnum[CURSOR_PATHLEN_LIMIT]; /* which child did we descend to from here? */
PMA_CURSOR pmacurs; /* The cursor into the leaf. NULL if the cursor doesn't exist. */
BRT_CURSOR prev,next;
};
/* print the cursor path */
void brt_cursor_print(BRT_CURSOR cursor);
/* is the cursor path empty? */
static inline int brt_cursor_path_empty(BRT_CURSOR cursor) {
return cursor->path_len == 0;
}
/*is the cursor path full? */
static inline int brt_cursor_path_full(BRT_CURSOR cursor) {
return cursor->path_len == CURSOR_PATHLEN_LIMIT;
}
static inline int brt_cursor_active(BRT_CURSOR cursor) {
return cursor->path_len > 0;
}
/* brt has a new root. add the root to this cursor. */
void brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
/* a brt leaf has split. modify this cursor if it includes the old node in its path. */
void brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
/* a brt internal node has expanded. modify this cursor if it includes the old node in its path. */
void brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right);
/* a brt internal node has split. modify this cursor if it includes the old node in its path. */
void brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right);
void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
......@@ -176,8 +176,16 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
//printf("height==%d\n", result->height);
if (result->height>0) {
result->u.n.totalchildkeylens=0;
for (i=0; i<TREE_FANOUT; i++) { result->u.n.childkeys[i]=0; result->u.n.childkeylens[i]=0; }
for (i=0; i<TREE_FANOUT+1; i++) { result->u.n.children[i]=0; result->u.n.htables[i]=0; result->u.n.n_bytes_in_hashtable[i]=0; }
for (i=0; i<TREE_FANOUT; i++) {
result->u.n.childkeys[i]=0;
result->u.n.childkeylens[i]=0;
}
for (i=0; i<TREE_FANOUT+1; i++) {
result->u.n.children[i]=0;
result->u.n.htables[i]=0;
result->u.n.n_bytes_in_hashtable[i]=0;
result->u.n.n_cursors[i]=0;
}
result->u.n.n_children = rbuf_int(&rc);
//printf("n_children=%d\n", result->n_children);
assert(result->u.n.n_children>=0 && result->u.n.n_children<=TREE_FANOUT);
......
......@@ -809,7 +809,386 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
memory_check_all_free();
}
void assert_tree_first(BRT brt, long long firstv __attribute__((unused)), long long lastv __attribute__((unused))) {
BRT_CURSOR cursor;
int r;
DBT kbt, vbt;
long long v;
r = brt_cursor(brt, &cursor);
assert(r==0);
#if 1
printf("first key: ");
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_FIRST);
assert(r == 0);
printf("%s ", (char*)kbt.data);
assert(vbt.size == sizeof v);
memcpy(&v, vbt.data, vbt.size);
assert(v == firstv);
toku_free(kbt.data);
toku_free(vbt.data);
printf("\n");
#endif
#if 0
printf("last key:");
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_LAST);
assert(r == 0);
printf("%s ", (char*)kbt.data);
assert(vbt.size == sizeof v);
memcpy(&v, vbt.data, vbt.size);
assert(v == lastv);
toku_free(kbt.data);
toku_free(vbt.data);
printf("\n");
#endif
r = brt_cursor_close(cursor);
assert(r==0);
}
void test_brt_cursor_first(int n) {
const char *fname="testbrt.brt";
CACHETABLE ct;
BRT brt;
int r;
int i;
printf("test_brt_cursor_first:%d\n", n);
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
/* insert a bunch of kv pairs */
for (i=0; i<n; i++) {
char key[8]; long long v;
DBT kbt, vbt;
snprintf(key, sizeof key, "%4.4d", i);
fill_dbt(&kbt, key, strlen(key)+1);
v = i;
fill_dbt(&vbt, &v, sizeof v);
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
}
assert_tree_first(brt, 0, n-1);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void test_brt_cursor_rfirst(int n) {
const char *fname="testbrt.brt";
CACHETABLE ct;
BRT brt;
int r;
int i;
printf("test_brt_cursor_rfirst:%d\n", n);
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
/* insert a bunch of kv pairs */
for (i=n-1; i>=0; i--) {
char key[8]; long long v;
DBT kbt, vbt;
snprintf(key, sizeof key, "%4.4d", i);
fill_dbt(&kbt, key, strlen(key)+1);
v = i;
fill_dbt(&vbt, &v, sizeof v);
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
}
assert_tree_first(brt, 0, n-1);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void assert_tree_walk(BRT brt, int n) {
BRT_CURSOR cursor;
int i;
int r;
r = brt_cursor(brt, &cursor);
assert(r==0);
printf("key: ");
for (i=0; ; i++) {
DBT kbt, vbt;
long long v;
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT);
if (r != 0)
break;
printf("%s ", (char*)kbt.data);
assert(vbt.size == sizeof v);
memcpy(&v, vbt.data, vbt.size);
assert(v == i);
toku_free(kbt.data);
toku_free(vbt.data);
}
printf("\n");
assert(i == n);
r = brt_cursor_close(cursor);
assert(r==0);
}
void test_brt_cursor_walk(int n) {
const char *fname="testbrt.brt";
CACHETABLE ct;
BRT brt;
int r;
int i;
printf("test_brt_cursor_walk:%d\n", n);
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
/* insert a bunch of kv pairs */
for (i=0; i<n; i++) {
char key[8]; long long v;
DBT kbt, vbt;
snprintf(key, sizeof key, "%4.4d", i);
fill_dbt(&kbt, key, strlen(key)+1);
v = i;
fill_dbt(&vbt, &v, sizeof v);
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
}
/* walk the tree */
assert_tree_walk(brt, n);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void assert_tree_walk_inorder(BRT brt, int n) {
BRT_CURSOR cursor;
int i;
int r;
char *prevkey;
r = brt_cursor(brt, &cursor);
assert(r==0);
prevkey = 0;
printf("key: ");
for (i=0; ; i++) {
DBT kbt, vbt;
long long v;
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT);
if (r != 0)
break;
printf("%s ", (char*)kbt.data);
assert(vbt.size == sizeof v);
memcpy(&v, vbt.data, vbt.size);
if (i != 0) {
assert(strcmp(prevkey, kbt.data) < 0);
toku_free(prevkey);
}
prevkey = kbt.data;
toku_free(vbt.data);
}
if (prevkey) toku_free(prevkey);
printf("\n");
assert(i == n);
r = brt_cursor_close(cursor);
assert(r==0);
}
void test_brt_cursor_rand(int n) {
const char *fname="testbrt.brt";
CACHETABLE ct;
BRT brt;
int r;
int i;
printf("test_brt_cursor_rand:%d\n", n);
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
/* insert a bunch of kv pairs */
for (i=0; i<n; i++) {
char key[8]; long long v;
DBT kbt, vbt;
for (;;) {
v = ((long long) random() << 32) + random();
snprintf(key, sizeof key, "%lld", v);
fill_dbt(&kbt, key, strlen(key)+1);
v = i;
fill_dbt(&vbt, &v, sizeof v);
r = brt_lookup(brt, &kbt, &vbt, 0);
if (r == 0) {
printf("dup");
continue;
}
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
break;
}
}
/* walk the tree */
assert_tree_walk_inorder(brt, n);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void test_brt_cursor_split(int n) {
const char *fname="testbrt.brt";
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
int r;
int keyseqnum;
int i;
DBT kbt, vbt;
printf("test_brt_cursor_split:%d\n", n);
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
/* insert a bunch of kv pairs */
for (keyseqnum=0; keyseqnum < n/2; keyseqnum++) {
char key[8]; long long v;
snprintf(key, sizeof key, "%4.4d", keyseqnum);
fill_dbt(&kbt, key, strlen(key)+1);
v = keyseqnum;
fill_dbt(&vbt, &v, sizeof v);
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
}
r = brt_cursor(brt, &cursor);
assert(r==0);
printf("key: ");
for (i=0; i<n/2; i++) {
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT);
assert(r==0);
printf("%s ", (char*)kbt.data);
toku_free(kbt.data);
toku_free(vbt.data);
}
printf("\n");
for (; keyseqnum<n; keyseqnum++) {
char key[8]; long long v;
snprintf(key, sizeof key, "%4.4d", keyseqnum);
fill_dbt(&kbt, key, strlen(key)+1);
v = keyseqnum;
fill_dbt(&vbt, &v, sizeof v);
r = brt_insert(brt, &kbt, &vbt, 0);
assert(r==0);
}
printf("key: ");
for (;;) {
init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
init_dbt(&vbt); vbt.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &kbt, &vbt, DB_NEXT);
if (r != 0)
break;
printf("%s ", (char*)kbt.data);
toku_free(kbt.data);
toku_free(vbt.data);
}
printf("\n");
r = brt_cursor_close(cursor);
assert(r==0);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void test_brt_cursor() {
int n;
if (1) for (n=100; n<10000; n += 100) {
test_brt_cursor_first(n); memory_check_all_free();
}
if (1) for (n=100; n<10000; n += 100) {
test_brt_cursor_rfirst(n); memory_check_all_free();
}
if (1) for (n=100; n<10000; n += 100) {
test_brt_cursor_walk(n); memory_check_all_free();
}
if (1) for (n=100; n<10000; n += 100) {
test_brt_cursor_split(n); memory_check_all_free();
}
if (1) for (n=100; n<10000; n += 100) {
test_brt_cursor_rand(n); memory_check_all_free();
}
}
static void brt_blackbox_test (void) {
test_brt_cursor();
test_wrongendian_compare(0, 2); memory_check_all_free();
test_wrongendian_compare(1, 2); memory_check_all_free();
test_wrongendian_compare(1, 257); memory_check_all_free();
......
......@@ -50,6 +50,7 @@ void brtnode_free (BRTNODE *nodep) {
if (node->u.n.htables[i]) {
toku_hashtable_free(&node->u.n.htables[i]);
}
assert(node->u.n.n_cursors[i] == 0);
}
} else {
if (node->u.l.buffer) // The buffer may have been freed already, in some cases.
......@@ -213,6 +214,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height)
n->u.n.children[i] = 0;
n->u.n.htables[i] = 0;
n->u.n.n_bytes_in_hashtable[i] = 0;
n->u.n.n_cursors[i] = 0;
}
n->u.n.n_bytes_in_hashtables = 0;
} else {
......@@ -254,6 +256,7 @@ void delete_node (BRT t, BRTNODE node) {
toku_hashtable_free(&node->u.n.htables[i]);
}
node->u.n.n_bytes_in_hashtable[0]=0;
assert(node->u.n.n_cursors[i] == 0);
}
node->u.n.n_bytes_in_hashtables = 0;
node->u.n.totalchildkeylens=0;
......@@ -336,6 +339,7 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
brt_update_cursors_leaf_split(t, node, A, B);
delete_node(t, node);
*nodea = A;
......@@ -420,6 +424,7 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d removing %lld\n", __FILE__, __LINE__, node->thisnodename);
brt_update_cursors_nonleaf_split(t, node, A, B);
delete_node(t, node);
assert(serialize_brtnode_size(A)<A->nodesize);
assert(serialize_brtnode_size(B)<B->nodesize);
......@@ -589,6 +594,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
void *app_private,
DB *db) {
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
HASHTABLE old_h = node->u.n.htables[childnum];
int old_count = node->u.n.n_bytes_in_hashtable[childnum];
int cnum;
......@@ -608,6 +614,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.children[cnum] = node->u.n.children[cnum-1];
node->u.n.htables[cnum] = node->u.n.htables[cnum-1];
node->u.n.n_bytes_in_hashtable[cnum] = node->u.n.n_bytes_in_hashtable[cnum-1];
node->u.n.n_cursors[cnum] = node->u.n.n_cursors[cnum-1];
}
node->u.n.children[childnum] = childa->thisnodename;
node->u.n.children[childnum+1] = childb->thisnodename;
......@@ -625,6 +632,8 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.totalchildkeylens += childsplitk->size;
node->u.n.n_children++;
brt_update_cursors_nonleaf_expand(t, node, childnum, childa, childb);
if (brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
......@@ -864,8 +873,47 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v,
DB *db) {
bytevec olddata;
ITEMLEN olddatalen;
unsigned int childnum = brtnode_which_child(node, k, t, db);
int found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen);
unsigned int childnum;
int found;
childnum = brtnode_which_child(node, k, t, db);
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
int r;
void *child_v;
BRTNODE child;
int child_did_split;
BRTNODE childa, childb;
DBT childsplitk;
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
*did_split = 0;
r = cachetable_get_and_pin(t->cf, node->u.n.children[childnum], &child_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
assert(r == 0);
child = child_v;
r = brtnode_insert(t, child, k, v,
&child_did_split, &childa, &childb, &childsplitk, 0, db);
assert(r == 0);
if (child_did_split) {
if (0) printf("brt_nonleaf_insert child_split %p\n", child);
r = handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
k->app_private, db);
assert(r == 0);
} else {
r = cachetable_unpin(t->cf, child->thisnodename, 0);
assert(r == 0);
}
return r;
}
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen);
if (0) { // It is faster to do this, except on yobiduck where things grind to a halt.
void *child_v;
......@@ -1142,6 +1190,34 @@ CACHEKEY* calculate_root_offset_pointer (BRT brt) {
abort();
}
int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp) {
BRTNODE MALLOC(newroot);
int r;
diskoff newroot_diskoff=malloc_diskblock(brt, brt->h->nodesize);
assert(newroot);
*rootp=newroot_diskoff;
brt->h->dirty=1;
// printf("new_root %lld\n", newroot_diskoff);
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->u.n.childkeys[0] = splitk.data;
newroot->u.n.childkeylens[0] = splitk.size;
newroot->u.n.totalchildkeylens=splitk.size;
newroot->u.n.children[0]=nodea->thisnodename;
newroot->u.n.children[1]=nodeb->thisnodename;
r=toku_hashtable_create(&newroot->u.n.htables[0]); if (r!=0) return r;
r=toku_hashtable_create(&newroot->u.n.htables[1]); if (r!=0) return r;
verify_counts(newroot);
r=cachetable_unpin(brt->cf, nodea->thisnodename, 1); if (r!=0) return r;
r=cachetable_unpin(brt->cf, nodeb->thisnodename, 1); if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
cachetable_put(brt->cf, newroot_diskoff, newroot,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brt_update_cursors_new_root(brt, newroot, nodea, nodeb);
return 0;
}
int brt_insert (BRT brt, DBT *k, DBT *v, DB* db) {
void *node_v;
BRTNODE node;
......@@ -1175,28 +1251,9 @@ int brt_insert (BRT brt, DBT *k, DBT *v, DB* db) {
assert(nodeb->nodesize>0);
}
if (did_split) {
/* We must cope. */
BRTNODE MALLOC(newroot);
diskoff newroot_diskoff=malloc_diskblock(brt, brt->h->nodesize);
assert(newroot);
*rootp=newroot_diskoff;
brt->h->dirty=1;
initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1);
newroot->u.n.n_children=2;
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->u.n.childkeys[0] = splitk.data;
newroot->u.n.childkeylens[0] = splitk.size;
newroot->u.n.totalchildkeylens=splitk.size;
newroot->u.n.children[0]=nodea->thisnodename;
newroot->u.n.children[1]=nodeb->thisnodename;
r=toku_hashtable_create(&newroot->u.n.htables[0]); if (r!=0) return r;
r=toku_hashtable_create(&newroot->u.n.htables[1]); if (r!=0) return r;
verify_counts(newroot);
r=cachetable_unpin(brt->cf, nodea->thisnodename, 1); if (r!=0) return r;
r=cachetable_unpin(brt->cf, nodeb->thisnodename, 1); if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
cachetable_put(brt->cf, newroot_diskoff, newroot,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp);
if (r != 0)
return r;
} else {
if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT);
......@@ -1473,16 +1530,276 @@ int brtnode_flush_child (BRT brt, BRTNODE node, int cnum) {
abort(); /* Algorithm: For each key in the cnum'th mdict, insert it to the childnode. It may cause a split. */
}
#define CURSOR_PATHLEN_LIMIT 256
struct brt_cursor {
BRT brt;
int path_len; /* -1 if the cursor points nowhere. */
BRTNODE path[CURSOR_PATHLEN_LIMIT]; /* Include the leaf (last). These are all pinned. */
int pathcnum[CURSOR_PATHLEN_LIMIT]; /* which child did we descend to from here? */
PMA_CURSOR pmacurs; /* The cursor into the leaf. NULL if the cursor doesn't exist. */
BRT_CURSOR prev,next;
};
static int unpin_cursor (BRT_CURSOR cursor);
/*
* Flush the buffer for a child of a node. Handle child splits by extending the
* current node and splitting when full. Follow the cursor path up the tree
* until the root has been reached or the current node has not split. If the
* root is reached, then create a new root for the tree.
*/
void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor) {
int r;
int child_did_split;
BRTNODE childa, childb;
DBT child_splitk;
#if 0
printf("brt_flush_child %lld %d\n", node->thisnodename, childnum);
brt_cursor_print(cursor);
#endif
init_dbt(&child_splitk);
r = push_some_kvpairs_down(t, node, childnum,
&child_did_split, &childa, &childb, &child_splitk, 0, 0, 0);
assert(r == 0);
#if 0
printf("brt_flush_child done %lld %d\n", node->thisnodename, childnum);
brt_cursor_print(cursor);
#endif
if (child_did_split) {
int i;
for (i=cursor->path_len-1; i >= 0; i--) {
if (cursor->path[i] == childa || cursor->path[i] == childb)
break;
}
assert(i == cursor->path_len-1);
while (child_did_split) {
child_did_split = 0;
#if 0
printf("child_did_split %lld %lld\n", childa->thisnodename, childb->thisnodename);
#endif
if (i == 0) {
CACHEKEY *rootp = calculate_root_offset_pointer(t);
r = brt_init_new_root(t, childa, childb, child_splitk, rootp);
assert(r == 0);
r = cachetable_unpin(t->cf, *rootp, 0);
assert(r == 0);
} else {
BRTNODE upnode;
assert(i > 0);
i = i-1;
upnode = cursor->path[i];
childnum = cursor->pathcnum[i];
r = handle_split_of_child(t, upnode, childnum,
childa, childb, &child_splitk,
&child_did_split, &childa, &childb, &child_splitk,
0, 0);
assert(r == 0);
}
}
}
}
/*
* Add a cursor to child of a node. Increment the cursor count on the child. Flush the buffer associated with the child.
*/
void brt_node_add_cursor(BRTNODE node, int childnum, BRT_CURSOR cursor) {
if (node->height > 0) {
if (0) printf("brt_node_add_cursor %lld %d %p\n", node->thisnodename, childnum, cursor);
node->u.n.n_cursors[childnum] += 1;
#if 0
if (node->u.n.n_bytes_in_hashtable[childnum] > 0) {
if (0) printf("hashtable not empty %d\n", node->u.n.n_bytes_in_hashtable[childnum]);
brt_node_flush_child(cursor->brt, node, childnum, cursor);
}
#endif
}
}
/*
* Remove a cursor from the child of a node. Decrement the cursor count on the child.
*/
void brt_node_remove_cursor(BRTNODE node, int childnum, BRT_CURSOR cursor __attribute__((unused))) {
if (node->height > 0) {
if (0) printf("brt_node_remove_cursor %lld %d %p\n", node->thisnodename, childnum, cursor);
assert(node->u.n.n_cursors[childnum] > 0);
node->u.n.n_cursors[childnum] -= 1;
}
}
void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
if (0) printf("brt_update_cursors_new_root %lld %lld %lld\n", newroot->thisnodename,
left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (brt_cursor_active(cursor)) {
brt_cursor_new_root(cursor, t, newroot, left, right);
}
}
}
void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
if (0) printf("brt_update_cursors_leaf_split %lld %lld %lld\n", oldnode->thisnodename,
left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (brt_cursor_active(cursor)) {
brt_cursor_leaf_split(cursor, t, oldnode, left, right);
}
}
}
void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
if (0) printf("brt_update_cursors_nonleaf_expand %lld h=%d c=%d nc=%d %lld %lld\n", node->thisnodename, node->height, childnum,
node->u.n.n_children, left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (brt_cursor_active(cursor)) {
brt_cursor_nonleaf_expand(cursor, t, node, childnum, left, right);
}
}
}
void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
if (0) printf("brt_update_cursors_nonleaf_split %lld %lld %lld\n", oldnode->thisnodename,
left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (brt_cursor_active(cursor)) {
brt_cursor_nonleaf_split(cursor, t, oldnode, left, right);
}
}
}
void brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right) {
int i;
int childnum;
int r;
void *v;
assert(!brt_cursor_path_full(cursor));
if (0) printf("brt_cursor_new_root %p %lld newroot %lld\n", cursor, cursor->path[0]->thisnodename, newroot->thisnodename);
assert(cursor->path[0] == left || cursor->path[0] == right);
/* make room for the newroot at the path base */
for (i=cursor->path_len; i>0; i--) {
cursor->path[i] = cursor->path[i-1];
cursor->pathcnum[i] = cursor->pathcnum[i-1];
}
cursor->path_len++;
/* shift the newroot */
cursor->path[0] = newroot;
childnum = cursor->path[1] == left ? 0 : 1;
cursor->pathcnum[0] = childnum;
r = cachetable_maybe_get_and_pin(t->cf, newroot->thisnodename, &v);
assert(r == 0 && v == newroot);
brt_node_add_cursor(newroot, childnum, cursor);
}
void brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
int r;
BRTNODE newnode;
PMA pma;
void *v;
assert(oldnode->height == 0);
if (cursor->path[cursor->path_len-1] == oldnode) {
assert(left->height == 0 && right->height == 0);
r = pma_cursor_get_pma(cursor->pmacurs, &pma);
assert(r == 0);
if (pma == left->u.l.buffer)
newnode = left;
else if (pma == right->u.l.buffer)
newnode = right;
else
newnode = 0;
assert(newnode);
if (0) printf("brt_cursor_leaf_split %p oldnode %lld newnode %lld\n", cursor,
oldnode->thisnodename, newnode->thisnodename);
r = cachetable_unpin(t->cf, oldnode->thisnodename, 0);
assert(r == 0);
r = cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
cursor->path[cursor->path_len-1] = newnode;
}
}
void brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unused)), BRTNODE node, int childnum, BRTNODE left, BRTNODE right) {
int i;
int oldchildnum, newchildnum;
assert(node->height > 0);
#if 0
i = cursor->path_len - node->height - 1;
if (i < 0)
i = cursor->path_len - 1;
if (i >= 0 && cursor->path[i] == node) {
#else
if (0) brt_cursor_print(cursor);
for (i = 0; i < cursor->path_len; i++)
if (cursor->path[i] == node)
break;
if (i < cursor->path_len) {
#endif
if (cursor->pathcnum[i] < childnum)
return;
if (cursor->pathcnum[i] > childnum) {
setnewchild:
oldchildnum = cursor->pathcnum[i];
newchildnum = oldchildnum + 1;
brt_node_remove_cursor(node, oldchildnum, cursor);
brt_node_add_cursor(node, newchildnum, cursor);
cursor->pathcnum[i] = newchildnum;
} else if (i+1 < cursor->path_len) {
assert(cursor->path[i+1] == left || cursor->path[i+1] == right);
if (cursor->path[i+1] == right) {
goto setnewchild;
}
}
}
}
void brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
int i;
BRTNODE newnode;
int r;
void *v;
int childnum;
assert(oldnode->height > 0 && left->height > 0 && right->height > 0);
#if 0
i = cursor->path_len - oldnode->height - 1;
if (i < 0)
i = cursor->path_len - 1;
if (i >= 0 && cursor->path[i] == oldnode) {
#else
for (i = 0; i < cursor->path_len; i++)
if (cursor->path[i] == oldnode)
break;
if (i < cursor->path_len) {
#endif
childnum = cursor->pathcnum[i];
brt_node_remove_cursor(oldnode, childnum, cursor);
if (childnum < left->u.n.n_children) {
newnode = left;
} else {
newnode = right;
childnum -= left->u.n.n_children;
}
if (0) printf("brt_cursor_nonleaf_split %p oldnode %lld newnode %lld\n",
cursor, oldnode->thisnodename, newnode->thisnodename);
r = cachetable_unpin(t->cf, oldnode->thisnodename, 0);
assert(r == 0);
r = cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
brt_node_add_cursor(newnode, childnum, cursor);
cursor->path[i] = newnode;
cursor->pathcnum[i] = childnum;
}
}
int brt_cursor (BRT brt, BRT_CURSOR*cursor) {
BRT_CURSOR MALLOC(result);
......@@ -1501,8 +1818,11 @@ int brt_cursor (BRT brt, BRT_CURSOR*cursor) {
brt->cursors_head = result;
*cursor = result;
return 0;
}
static int unpin_cursor(BRT_CURSOR);
int brt_cursor_close (BRT_CURSOR curs) {
BRT brt = curs->brt;
int r=unpin_cursor(curs);
......@@ -1526,9 +1846,28 @@ int brt_cursor_close (BRT_CURSOR curs) {
return r;
}
/*
* Print the path of a cursor
*/
void brt_cursor_print(BRT_CURSOR cursor) {
int i;
printf("cursor %p: ", cursor);
for (i=0; i<cursor->path_len; i++) {
printf("%lld", cursor->path[i]->thisnodename);
if (cursor->path[i]->height > 0)
printf(",%d ", cursor->pathcnum[i]);
else
printf(" ");
}
printf("\n");
}
int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off) {
BRT brt=cursor->brt;
void *node_v;
int more;
int r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
if (r!=0) {
......@@ -1542,19 +1881,29 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off) {
int childnum = node->u.n.n_children-1;
try_prev_child:
cursor->pathcnum[cursor->path_len-1] = childnum;
r=brtcurs_set_position_last (cursor, node->u.n.children[childnum]);
if (r==DB_NOTFOUND) {
brt_node_add_cursor(node, childnum, cursor);
for (;;) {
more = node->u.n.n_bytes_in_hashtable[childnum];
if (more == 0)
break;
brt_flush_child(cursor->brt, node, childnum, cursor);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
}
r=brtcurs_set_position_last (cursor, node->u.n.children[childnum]);
if (r == 0)
return 0;
assert(node == cursor->path[cursor->path_len-1]);
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum>0) {
childnum--;
goto try_prev_child;
}
}
if (r!=0) {
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
}
return 0;
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
} else {
r=pma_cursor(node->u.l.buffer, &cursor->pmacurs);
if (r!=0) {
......@@ -1571,6 +1920,8 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, diskoff off) {
int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off) {
BRT brt=cursor->brt;
void *node_v;
int more;
int r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
if (r!=0) {
......@@ -1584,19 +1935,30 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off) {
int childnum = 0;
try_next_child:
cursor->pathcnum[cursor->path_len-1] = childnum;
r=brtcurs_set_position_first (cursor, node->u.n.children[childnum]);
if (r==DB_NOTFOUND) {
brt_node_add_cursor(node, childnum, cursor);
for (;;) {
more = node->u.n.n_bytes_in_hashtable[childnum];
if (more == 0)
break;
brt_flush_child(cursor->brt, node, childnum, cursor);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
}
r=brtcurs_set_position_first (cursor, node->u.n.children[childnum]);
if (r == 0)
return r;
assert(node == cursor->path[cursor->path_len-1]);
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum+1<node->u.n.n_children) {
childnum++;
goto try_next_child;
}
}
if (r!=0) {
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
}
return 0;
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
} else {
r=pma_cursor(node->u.l.buffer, &cursor->pmacurs);
if (r!=0) {
......@@ -1610,15 +1972,62 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, diskoff off) {
}
}
int brtcurs_set_position_next2(BRT_CURSOR cursor) {
BRTNODE node;
int childnum;
int r;
int more;
assert(cursor->path_len > 0);
/* pop the node and childnum from the cursor path */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
cursor->path_len -= 1;
cachetable_unpin(cursor->brt->cf, node->thisnodename, 0);
if (brt_cursor_path_empty(cursor))
return DB_NOTFOUND;
/* set position first in the next right tree */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
assert(node->height > 0);
brt_node_remove_cursor(node, childnum, cursor);
childnum += 1;
while (childnum < node->u.n.n_children) {
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
for (;;) {
more = node->u.n.n_bytes_in_hashtable[childnum];
if (more == 0)
break;
brt_flush_child(cursor->brt, node, childnum, cursor);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
}
r = brtcurs_set_position_first(cursor, node->u.n.children[childnum]);
if (r == 0)
return 0;
assert(node == cursor->path[cursor->path_len-1]);
brt_node_remove_cursor(node, childnum, cursor);
childnum += 1;
}
return brtcurs_set_position_next2(cursor);
}
/* requires that the cursor is initialized. */
int brtcurs_set_position_next (BRT_CURSOR cursor) {
int r = pma_cursor_set_position_next(cursor->pmacurs);
if (r==DB_NOTFOUND) {
/* We fell off the end of the pma. */
if (cursor->path_len==1) return DB_NOTFOUND;
fprintf(stderr, "Need to deal with falling off the end of the pma in a cursor\n");
/* Part of the trickyness is we need to leave the cursor pointing at the current (possibly deleted) value if there is no next value. */
abort();
/* Part of the trickyness is we need to leave the cursor pointing at the current (possibly deleted) value if there is no next value. */
r = pma_cursor_free(&cursor->pmacurs);
assert(r == 0);
return brtcurs_set_position_next2(cursor);
}
return 0;
}
......@@ -1628,6 +2037,8 @@ static int unpin_cursor (BRT_CURSOR cursor) {
int i;
int r=0;
for (i=0; i<cursor->path_len; i++) {
BRTNODE node = cursor->path[i];
brt_node_remove_cursor(node, cursor->pathcnum[i], cursor);
int r2 = cachetable_unpin(brt->cf, cursor->path[i]->thisnodename, 0);
if (r==0) r=r2;
}
......@@ -1661,7 +2072,8 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) {
do_db_first:
r=unpin_cursor(cursor); if (r!=0) goto died0;
r=brtcurs_set_position_first(cursor, *rootp); if (r!=0) goto died0;
r=pma_cget_current(cursor->pmacurs, kbt, vbt);
// brt_cursor_print(cursor);
r=pma_cget_current(cursor->pmacurs, kbt, vbt);
break;
case DB_NEXT:
if (cursor->path_len<=0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment