Commit cbc159bf authored by Rich Prohaska's avatar Rich Prohaska

add the txn parameter to the brt_search function. addresses #316

git-svn-id: file:///svn/tokudb@1940 c7de825b-a66e-492c-adef-691d508d4ae1
parent dd061396
......@@ -1843,15 +1843,15 @@ static inline void brt_split_init(BRT_SPLIT *split) {
toku_init_dbt(&split->splitk);
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split);
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn);
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (node->u.n.n_bytes_in_buffer[childnum] > 0) {
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, 0);
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, txn);
assert(rr == 0);
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
......@@ -1864,11 +1864,11 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, txn);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0);
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, txn);
assert(rr == 0);
break;
} else {
......@@ -1883,7 +1883,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
int r = DB_NOTFOUND;
int c;
......@@ -1901,7 +1901,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split);
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, txn);
if (r == 0 || r == EAGAIN)
break;
}
......@@ -1909,26 +1909,26 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split);
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, txn);
return r;
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
brt = brt; split = split;
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
brt = brt; split = split; txn = txn;
PMA pma = node->u.l.buffer;
int r = toku_pma_search(pma, search, newkey, newval);
return r;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split);
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, txn);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, split);
return brt_search_leaf_node(brt, node, search, newkey, newval, split, txn);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval) {
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKUTXN txn) {
int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
......@@ -1937,27 +1937,28 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval) {
CACHEKEY *rootp;
rootp = toku_calculate_root_offset_pointer(brt);
for (;;) {
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
BRTNODE node = node_v;
BRTNODE node = node_v;
for (;;) {
BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split);
r = brt_search_node(brt, node, search, newkey, newval, &split, txn);
if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
assert(rr == 0);
}
rr = unpin_brtnode(brt, node);
assert(rr == 0);
if (r != EAGAIN)
if (r != EAGAIN)
break;
}
rr = unpin_brtnode(brt, node);
assert(rr == 0);
rr = toku_unpin_brt_header(brt);
assert(rr == 0);
......@@ -2033,7 +2034,7 @@ static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int get_flags, DBT *outkey, DBT *outval) {
static int brt_cursor_current(BRT_CURSOR cursor, int get_flags, DBT *outkey, DBT *outval, TOKUTXN txn) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if ((get_flags & 256) == 0) {
......@@ -2041,7 +2042,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int get_flags, DBT *outkey, DBT
DBT newval; toku_init_dbt(&newval);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, txn);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
return DB_KEYEMPTY;
}
......@@ -2049,11 +2050,11 @@ static int brt_cursor_current(BRT_CURSOR cursor, int get_flags, DBT *outkey, DBT
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
......@@ -2064,11 +2065,11 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
......@@ -2082,11 +2083,11 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
......@@ -2104,14 +2105,14 @@ static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
return 1;
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2119,9 +2120,9 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2129,9 +2130,9 @@ static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2143,9 +2144,9 @@ static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
return keycmp == 0 && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2157,9 +2158,9 @@ static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *
return keycmp == 0 && compare_v_y(brt, search->v, y) <= 0; /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval) {
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2167,9 +2168,9 @@ static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
......@@ -2177,9 +2178,9 @@ static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
#ifdef DB_PREV_DUP
......@@ -2193,9 +2194,9 @@ static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) {
return keycmp == 0 && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */
}
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, txn);
}
#endif
......@@ -2205,14 +2206,14 @@ static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval) {
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, txn);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval) {
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKUTXN txn) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
......@@ -2226,63 +2227,63 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T
switch (get_flags) {
case DB_CURRENT:
case DB_CURRENT+256:
r = brt_cursor_current(cursor, get_flags, key, val);
r = brt_cursor_current(cursor, get_flags, key, val, txn);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val);
r = brt_cursor_first(cursor, key, val, txn);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val);
r = brt_cursor_last(cursor, key, val, txn);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val);
r = brt_cursor_first(cursor, key, val, txn);
else
r = brt_cursor_next(cursor, key, val);
r = brt_cursor_next(cursor, key, val, txn);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val);
r = brt_cursor_next_dup(cursor, key, val, txn);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val);
r = brt_cursor_first(cursor, key, val, txn);
else
r = brt_cursor_next_nodup(cursor, key, val);
r = brt_cursor_next_nodup(cursor, key, val, txn);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val);
r = brt_cursor_last(cursor, key, val, txn);
else
r = brt_cursor_prev(cursor, key, val);
r = brt_cursor_prev(cursor, key, val, txn);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val);
r = brt_cursor_prev_dup(cursor, key, val, txn);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val);
r = brt_cursor_last(cursor, key, val, txn);
else
r = brt_cursor_prev_nodup(cursor, key, val);
r = brt_cursor_prev_nodup(cursor, key, val, txn);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val);
r = brt_cursor_set(cursor, key, 0, 0, val, txn);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val);
r = brt_cursor_set_range(cursor, key, key, val, txn);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0);
r = brt_cursor_set(cursor, key, val, 0, 0, txn);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val);
r = brt_cursor_get_both_range(cursor, key, val, 0, val, txn);
break;
default:
r = EINVAL;
......@@ -2298,7 +2299,7 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags) {
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0);
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, 0);
if (r == 0)
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val);
return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment