Commit 53b6dd9b authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Write the brt keyrange code. Untested. Addresses #764.

git-svn-id: file:///svn/tokudb@3852 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3efba711
......@@ -2551,14 +2551,10 @@ static inline void brt_split_init(BRT_SPLIT *split) {
toku_init_dbt(&split->splitk);
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval,
u_int64_t *est_left,
BRT_SPLIT *split, TOKULOGGER logger);
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger);
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval,
u_int64_t *est_left, // estimate number of items to left.
BRT_SPLIT *split, TOKULOGGER logger) {
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
......@@ -2576,7 +2572,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, est_left, &childsplit, logger);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
......@@ -2594,7 +2590,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, u_int64_t *est_left, BRT_SPLIT *split, TOKULOGGER logger) {
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
int r;
int c;
......@@ -2612,38 +2608,15 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, est_left, split, logger);
if (r == 0) {
if (est_left) {
int i;
for (i=0; i<c; i++)
*est_left += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, child[i]);
}
return r;
} else if (r == EAGAIN) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
if (r == 0 || r == EAGAIN) {
return r;
}
}
}
/* check the first (left) or last (right) node if nothing has been found */
r = brt_search_child(brt, node, child[c], search, newkey, newval, est_left, split, logger);
if (r == 0) {
if (est_left) {
int i;
for (i=0; i+1<c; i++)
*est_left += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, child[i]);
}
return r;
} else if (r == DB_NOTFOUND) {
if (est_left) {
int i;
for (i=0; i<c; i++)
*est_left += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, child[i]);
}
return r;
}
return r;
return brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
}
int pair_leafval_bessel_le_committed (u_int32_t klen, void *kval,
......@@ -2691,7 +2664,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) {
LESWITCHCALL(leafval, pair_leafval_bessel, search);
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, u_int64_t *est_left) {
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) {
// Now we have to convert from brt_search_t to the bessel function with a direction. What a pain...
int direction;
switch (search->direction) {
......@@ -2707,16 +2680,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
search,
direction,
&datav, &idx);
if (r!=0) {
if (est_left) {
switch (search->direction) {
case BRT_SEARCH_LEFT: *est_left = toku_omt_size(node->u.l.buffer); return 0;
case BRT_SEARCH_RIGHT: *est_left = 0; return 0;
}
}
return r;
}
if (est_left) *est_left = idx;
if (r!=0) return r;
LEAFENTRY le = datav;
if (le_is_provdel(le)) {
......@@ -2757,23 +2721,20 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
return 0;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval,
u_int64_t *est_left,
BRT_SPLIT *split, TOKULOGGER logger) {
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, est_left, split, logger);
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, est_left);
return brt_search_leaf_node(brt, node, search, newkey, newval);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, u_int64_t *est_left, TOKULOGGER logger) {
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
assert(rr == 0);
CACHEKEY *rootp;
rootp = toku_calculate_root_offset_pointer(brt);
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
......@@ -2794,7 +2755,7 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, u_i
for (;;) {
BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, est_left, &split, logger);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger);
if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
......@@ -2930,7 +2891,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, (u_int64_t*)0,logger);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
r = DB_KEYEMPTY;
dbt_cleanup(&newkey);
......@@ -2945,7 +2906,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, (u_int64_t*)0, logger);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
......@@ -2960,7 +2921,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, (u_int64_t*)0, logger);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
......@@ -2978,7 +2939,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, (u_int64_t*)0, logger);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
......@@ -3183,6 +3144,68 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T
return r;
}
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
BRTNODE node;
{
void *node_v;
int rr = toku_cachetable_get_and_pin(brt->cf, nodename, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
node = node_v;
}
if (node->height>0) {
int prevcomp=-1;
int i;
int comp;
for (i=0; i+1<node->u.n.n_children; i++) {
struct kv_pair *pivot = node->u.n.childkeys[i];
DBT dbt;
comp = brt->compare_fun(brt->db, toku_fill_dbt(&dbt, kv_pair_key(pivot), kv_pair_keylen(pivot)), key);
if (comp<0) {
*less += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
} else if (comp==0 && prevcomp==0) {
*equal += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
} else if (prevcomp>0) {
*greater += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
} else {
// prevcomp<=0 && comp>=0, but not both zero, so we have a subtree
toku_brt_keyrange_internal(brt, BNC_DISKOFF(node, i), key, less, equal, greater);
}
prevcomp=comp;
}
if (prevcomp<0) {
*greater += BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
} else {
toku_brt_keyrange_internal(brt, BNC_DISKOFF(node, i), key, less, equal, greater);
}
} else {
BRT_CMD_S cmd = { BRT_INSERT, 0, .u.id={key,0}};
struct cmd_leafval_bessel_extra be = {brt, &cmd, 0};
u_int32_t idx;
int r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, 0, &idx);
*less += idx;
*greater += toku_omt_size(node->u.l.buffer)-idx;
if (r==0) {
(*equal)++;
(*greater)--;
}
}
}
int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
{
int rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
assert(rr == 0);
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
*less = *equal = *greater = 0;
toku_brt_keyrange_internal (brt, *rootp, key, less, equal, greater);
{
int rr = toku_unpin_brt_header(brt);
assert(rr == 0);
}
return 0;
}
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment