Commit cd9e6af2 authored by Rich Prohaska's avatar Rich Prohaska

brt cursor set and set range features

git-svn-id: file:///svn/tokudb@290 c7de825b-a66e-492c-adef-691d508d4ae1
parent 99d87cf6
...@@ -1410,6 +1410,147 @@ void test_multiple_brt_cursors(int n) { ...@@ -1410,6 +1410,147 @@ void test_multiple_brt_cursors(int n) {
assert(r==0); assert(r==0);
} }
void test_brt_cursor_set(int n, int cursor_op) {
printf("test_brt_cursor_set:%d %d\n", n, cursor_op);
int r;
char fname[]="testbrt.brt";
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
int i;
DBT key, val;
int k, v;
/* insert keys 0, 10, 20 .. 10*(n-1) */
for (i=0; i<n; i++) {
k = htonl(10*i);
v = 10*i;
fill_dbt(&key, &k, sizeof k);
fill_dbt(&val, &v, sizeof v);
r = brt_insert(brt, &key, &val, 0);
assert(r == 0);
}
r = brt_cursor(brt, &cursor);
assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
int vv;
v = 10*(random() % n);
k = htonl(v);
fill_dbt(&key, &k, sizeof k);
init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &key, &val, cursor_op);
assert(r == 0);
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == v);
toku_free(val.data);
}
/* try to set cursor to keys not in the tree, all should fail */
for (i=0; i<10*n; i++) {
if (i % 10 == 0)
continue;
k = htonl(i);
fill_dbt(&key, &k, sizeof k);
init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &key, &val, DB_SET);
assert(r == DB_NOTFOUND);
}
r = brt_cursor_close(cursor);
assert(r==0);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
void test_brt_cursor_set_range(int n) {
printf("test_brt_cursor_set_range:%d\n", n);
int r;
char fname[]="testbrt.brt";
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
unlink(fname);
r = brt_create_cachetable(&ct, 0);
assert(r==0);
r = open_brt(fname, 0, 1, &brt, 1<<12, ct, default_compare_fun);
assert(r==0);
int i;
DBT key, val;
int k, v;
/* insert keys 0, 10, 20 .. 10*(n-1) */
int max_key = 10*(n-1);
for (i=0; i<n; i++) {
k = htonl(10*i);
v = 10*i;
fill_dbt(&key, &k, sizeof k);
fill_dbt(&val, &v, sizeof v);
r = brt_insert(brt, &key, &val, 0);
assert(r == 0);
}
r = brt_cursor(brt, &cursor);
assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
for (i=0; i<n; i++) {
int vv;
v = random() % (10*n);
k = htonl(v);
fill_dbt(&key, &k, sizeof k);
init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = brt_c_get(cursor, &key, &val, DB_SET_RANGE);
if (v > max_key)
/* there is no smallest key if v > the max key */
assert(r == DB_NOTFOUND);
else {
assert(r == 0);
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (((v+9)/10)*10));
toku_free(val.data);
}
}
r = brt_cursor_close(cursor);
assert(r==0);
r = close_brt(brt);
assert(r==0);
r = cachetable_close(&ct);
assert(r==0);
}
int test_brt_cursor_inc = 1000;
int test_brt_cursor_limit = 10000;
void test_brt_cursor() { void test_brt_cursor() {
int n; int n;
...@@ -1417,30 +1558,36 @@ void test_brt_cursor() { ...@@ -1417,30 +1558,36 @@ void test_brt_cursor() {
test_multiple_brt_cursors(2); test_multiple_brt_cursors(2);
test_multiple_brt_cursors(3); test_multiple_brt_cursors(3);
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_first(n); memory_check_all_free(); test_brt_cursor_first(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_rfirst(n); memory_check_all_free(); test_brt_cursor_rfirst(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_walk(n); memory_check_all_free(); test_brt_cursor_walk(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_last(n); memory_check_all_free(); test_brt_cursor_last(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_first_last(n); memory_check_all_free(); test_brt_cursor_first_last(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_split(n); memory_check_all_free(); test_brt_cursor_split(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_rand(n); memory_check_all_free(); test_brt_cursor_rand(n); memory_check_all_free();
} }
if (1) for (n=0; n<10000; n += 100) { for (n=0; n<test_brt_cursor_limit; n += test_brt_cursor_inc) {
test_brt_cursor_rwalk(n); memory_check_all_free(); test_brt_cursor_rwalk(n); memory_check_all_free();
} }
test_brt_cursor_set(1000, DB_SET); memory_check_all_free();
test_brt_cursor_set(10000, DB_SET); memory_check_all_free();
test_brt_cursor_set(1000, DB_SET_RANGE); memory_check_all_free();
test_brt_cursor_set_range(1000); memory_check_all_free();
test_brt_cursor_set_range(10000); memory_check_all_free();
} }
void test_large_kv(int bsize, int ksize, int vsize) { void test_large_kv(int bsize, int ksize, int vsize) {
......
...@@ -1318,14 +1318,12 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) { ...@@ -1318,14 +1318,12 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) {
void *node_v; void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, int r = cachetable_get_and_pin(brt->cf, off, &node_v,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
BRTNODE node;
int childnum;
if (r!=0) if (r!=0)
return r; return r;
node=node_v; BRTNODE node = node_v;
// Leaves have a single mdict, where the data is found. int childnum;
if (node->height==0) { if (node->height==0) {
result = pma_lookup(node->u.l.buffer, k, v, db); result = pma_lookup(node->u.l.buffer, k, v, db);
//printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen); //printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen);
...@@ -1634,7 +1632,7 @@ void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor) { ...@@ -1634,7 +1632,7 @@ void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor) {
CACHEKEY *rootp = calculate_root_offset_pointer(t); CACHEKEY *rootp = calculate_root_offset_pointer(t);
r = brt_init_new_root(t, childa, childb, child_splitk, rootp); r = brt_init_new_root(t, childa, childb, child_splitk, rootp);
assert(r == 0); assert(r == 0);
r = cachetable_unpin(t->cf, *rootp, 0); r = cachetable_unpin(t->cf, *rootp, 1);
assert(r == 0); assert(r == 0);
} else { } else {
BRTNODE upnode; BRTNODE upnode;
...@@ -1812,7 +1810,7 @@ void brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unused)), ...@@ -1812,7 +1810,7 @@ void brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unused)),
cursor->pathcnum[i] = newchildnum; cursor->pathcnum[i] = newchildnum;
return; return;
} }
if (i == cursor->path_len-1 && cursor->op == DB_PREV) { if (i == cursor->path_len-1 && (cursor->op == DB_PREV || cursor->op == DB_LAST)) {
goto setnewchild; goto setnewchild;
} }
if (i+1 < cursor->path_len) { if (i+1 < cursor->path_len) {
...@@ -2168,6 +2166,121 @@ int brtcurs_set_position_prev (BRT_CURSOR cursor) { ...@@ -2168,6 +2166,121 @@ int brtcurs_set_position_prev (BRT_CURSOR cursor) {
return 0; return 0;
} }
int brtcurs_set_key(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db) {
BRT brt = cursor->brt;
void *node_v;
int r;
r = cachetable_get_and_pin(brt->cf, off, &node_v, brtnode_flush_callback,
brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
if (r != 0)
return r;
BRTNODE node = node_v;
int childnum;
if (node->height > 0) {
cursor->path_len += 1;
for (;;) {
childnum = brtnode_which_child(node, key, brt, db);
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
/* the node may have split. search the node keys again */
continue;
}
break;
}
r = brtcurs_set_key(cursor, node->u.n.children[childnum], key, db);
if (r != 0)
brt_node_remove_cursor(node, childnum, cursor);
} else {
cursor->path_len += 1;
cursor->path[cursor->path_len-1] = node;
r = pma_cursor(node->u.l.buffer, &cursor->pmacurs);
if (r == 0) {
r = pma_cursor_set_key(cursor->pmacurs, key, db);
if (r != 0) {
int rr = pma_cursor_free(&cursor->pmacurs);
assert(rr == 0);
}
}
}
if (r != 0) {
cursor->path_len -= 1;
cachetable_unpin(brt->cf, off, 0);
}
return r;
}
int brtcurs_set_range(BRT_CURSOR cursor, diskoff off, DBT *key, DB *db) {
BRT brt = cursor->brt;
void *node_v;
int r;
r = cachetable_get_and_pin(brt->cf, off, &node_v, brtnode_flush_callback,
brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
if (r != 0)
return r;
BRTNODE node = node_v;
int childnum;
if (node->height > 0) {
cursor->path_len += 1;
/* select a subtree by key */
childnum = brtnode_which_child(node, key, brt, db);
next_child:
for (;;) {
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
continue;
}
break;
}
r = brtcurs_set_range(cursor, node->u.n.children[childnum], key, db);
if (r != 0) {
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
/* no key in the child subtree is >= key, need to search the next child */
childnum += 1;
if (0) printf("set_range %d %d\n", childnum, node->u.n.n_children);
if (childnum < node->u.n.n_children)
goto next_child;
}
} else {
cursor->path_len += 1;
cursor->path[cursor->path_len-1] = node;
r = pma_cursor(node->u.l.buffer, &cursor->pmacurs);
if (r == 0) {
r = pma_cursor_set_range(cursor->pmacurs, key, db);
if (r != 0) {
int rr = pma_cursor_free(&cursor->pmacurs);
assert(rr == 0);
}
}
}
if (r != 0) {
cursor->path_len -= 1;
cachetable_unpin(brt->cf, off, 0);
}
return r;
}
static int unpin_cursor (BRT_CURSOR cursor) { static int unpin_cursor (BRT_CURSOR cursor) {
BRT brt=cursor->brt; BRT brt=cursor->brt;
int i; int i;
...@@ -2252,6 +2365,23 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) { ...@@ -2252,6 +2365,23 @@ int brt_c_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags) {
r = pma_cget_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0; r = pma_cget_current(cursor->pmacurs, kbt, vbt); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor); if (r == 0) assert_cursor_path(cursor);
break; break;
case DB_SET:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, 0);
if (r != 0) goto died0;
r = pma_cget_current(cursor->pmacurs, kbt, vbt);
if (r != 0) goto died0;
break;
case DB_SET_RANGE:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_range(cursor, *rootp, kbt, 0);
if (r != 0) goto died0;
r = pma_cget_current(cursor->pmacurs, kbt, vbt);
if (r != 0) goto died0;
break;
break;
default: default:
fprintf(stderr, "%s:%d c_get(...,%d) not ready\n", __FILE__, __LINE__, flags); fprintf(stderr, "%s:%d c_get(...,%d) not ready\n", __FILE__, __LINE__, flags);
abort(); abort();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment