Commit 082ae050 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3610] Get rid of a bunch of {{{toku_cacheable_get_and_pin}}} calls in favor...

[t:3610] Get rid of a bunch of {{{toku_cacheable_get_and_pin}}} calls in favor of the {{{toku_pin_brtnode}}} functions. Refs #3683.

git-svn-id: file:///svn/toku/tokudb@32712 c7de825b-a66e-492c-adef-691d508d4ae1
parent aff41cf3
...@@ -319,7 +319,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha ...@@ -319,7 +319,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
} }
static inline void static inline void
toku_verify_estimates (BRT t, BRTNODE node); toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS, struct pivot_bounds const * const bounds);
void toku_unpin_brtnode (BRT brt, BRTNODE node) void toku_unpin_brtnode (BRT brt, BRTNODE node)
// Effect: Unpin a brt node. // Effect: Unpin a brt node.
...@@ -416,9 +416,29 @@ fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL d ...@@ -416,9 +416,29 @@ fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL d
} }
} }
static struct kv_pair const *prepivotkey (BRTNODE node, int childnum, struct kv_pair const * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
return node->childkeys[childnum-1];
}
}
static struct kv_pair const *postpivotkey (BRTNODE node, int childnum, struct kv_pair const * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
return node->childkeys[childnum];
}
}
static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
.upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
return pb;
}
static inline void static inline void
toku_verify_estimates (BRT t, BRTNODE node) { toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds) {
int childnum; int childnum;
for (childnum=0; childnum<node->n_children; childnum++) { for (childnum=0; childnum<node->n_children; childnum++) {
// we'll just do this estimate // we'll just do this estimate
...@@ -426,27 +446,14 @@ toku_verify_estimates (BRT t, BRTNODE node) { ...@@ -426,27 +446,14 @@ toku_verify_estimates (BRT t, BRTNODE node) {
// can only check the state of available partitions // can only check the state of available partitions
if (BP_STATE(node, childnum) == PT_AVAIL) { if (BP_STATE(node, childnum) == PT_AVAIL) {
if (node->height > 0) { if (node->height > 0) {
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum); BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum); u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v; BRTNODE childnode;
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h); fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin( toku_pin_brtnode_holding_lock(t, childblocknum, fullhash, &next_ancestors, &next_bounds, &bfe, &childnode);
t->cf,
childblocknum,
fullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert_zero(r);
BRTNODE childnode = childnode_v;
for (int i=0; i<childnode->n_children; i++) { for (int i=0; i<childnode->n_children; i++) {
child_estimate += BP_SUBTREE_EST(childnode, i).ndata; child_estimate += BP_SUBTREE_EST(childnode, i).ndata;
} }
...@@ -993,27 +1000,6 @@ init_childkey(BRTNODE node, int childnum, struct kv_pair *pivotkey, size_t pivot ...@@ -993,27 +1000,6 @@ init_childkey(BRTNODE node, int childnum, struct kv_pair *pivotkey, size_t pivot
node->totalchildkeylens += pivotkeysize; node->totalchildkeylens += pivotkeysize;
} }
static struct kv_pair const *prepivotkey (BRTNODE node, int childnum, struct kv_pair const * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
return node->childkeys[childnum-1];
}
}
static struct kv_pair const *postpivotkey (BRTNODE node, int childnum, struct kv_pair const * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
return node->childkeys[childnum];
}
}
static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
.upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
return pb;
}
// Used only by test programs: append a child node to a parent node // Used only by test programs: append a child node to a parent node
void void
toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) { toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) {
...@@ -1446,7 +1432,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1446,7 +1432,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
} }
static void static void
brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react) brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
{ {
if (0) { if (0) {
printf("%s:%d Node %" PRId64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->n_children); printf("%s:%d Node %" PRId64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->n_children);
...@@ -1458,24 +1444,16 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react) ...@@ -1458,24 +1444,16 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
BRTNODE child; BRTNODE child;
assert(BNC_NBYTESINBUF(node, childnum)==0); // require that the buffer for this child is empty assert(BNC_NBYTESINBUF(node, childnum)==0); // require that the buffer for this child is empty
{ {
void *childnode_v;
// For now, don't use toku_pin_brtnode since we aren't yet prepared to deal with the TRY_AGAIN, and we don't have to apply all the messages above to do this split operation. // For now, don't use toku_pin_brtnode since we aren't yet prepared to deal with the TRY_AGAIN, and we don't have to apply all the messages above to do this split operation.
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h); fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(t->cf, toku_pin_brtnode_holding_lock(t,
BP_BLOCKNUM(node, childnum), BP_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum), compute_child_fullhash(t->cf, node, childnum),
&childnode_v, &next_ancestors, &next_bounds, &bfe,
NULL, &child);
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h);
assert(r==0);
child = childnode_v;
assert(child->thisnodename.b!=0); assert(child->thisnodename.b!=0);
VERIFY_NODE(t,child); VERIFY_NODE(t,child);
} }
...@@ -2417,47 +2395,20 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react, ...@@ -2417,47 +2395,20 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
BRTNODE childa, childb; BRTNODE childa, childb;
{ {
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma);
struct ancestors next_ancestors = {node, childnuma, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnuma, bounds);
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h); fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin( toku_pin_brtnode_holding_lock(t, BP_BLOCKNUM(node, childnuma), childfullhash, &next_ancestors, &next_bounds, &bfe, &childa);
t->cf,
BP_BLOCKNUM(node, childnuma),
childfullhash,
&childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert(r==0);
childa = childnode_v;
} }
{ {
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb);
struct ancestors next_ancestors = {node, childnumb, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnumb, bounds);
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h); fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin( toku_pin_brtnode_holding_lock(t, BP_BLOCKNUM(node, childnumb), childfullhash, &next_ancestors, &next_bounds, &bfe, &childb);
t->cf,
BP_BLOCKNUM(node, childnumb),
childfullhash, &childnode_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
t->h
);
assert(r==0);
childb = childnode_v;
} }
// now we have both children pinned in main memory. // now we have both children pinned in main memory.
...@@ -2521,7 +2472,7 @@ brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivi ...@@ -2521,7 +2472,7 @@ brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivi
*did_react = FALSE; *did_react = FALSE;
return; return;
case RE_FISSIBLE: case RE_FISSIBLE:
brt_split_child(t, node, childnum, did_react); brt_split_child(t, node, childnum, did_react, ancestors, bounds);
return; return;
case RE_FUSIBLE: case RE_FUSIBLE:
brt_merge_child(t, node, childnum, did_react, ancestors, bounds); brt_merge_child(t, node, childnum, did_react, ancestors, bounds);
...@@ -2990,7 +2941,7 @@ static void apply_cmd_to_in_memory_non_root_leaves ( ...@@ -2990,7 +2941,7 @@ static void apply_cmd_to_in_memory_non_root_leaves (
) )
{ {
void *node_v; void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(t->cf, nodenum, fullhash, &node_v); int r = toku_cachetable_get_and_pin_if_in_memory(t->cf, nodenum, fullhash, &node_v); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
if (r) { goto exit; } if (r) { goto exit; }
BRTNODE node = node_v; BRTNODE node = node_v;
...@@ -5056,7 +5007,6 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) { ...@@ -5056,7 +5007,6 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) {
FIFO fifo = BNC_BUFFER(node,childnum); FIFO fifo = BNC_BUFFER(node,childnum);
if (child->height==0) { if (child->height==0) {
// The child is a leaf node. // The child is a leaf node.
assert_leaf_up_to_date(child);
bytevec key, val; bytevec key, val;
ITEMLEN keylen, vallen; ITEMLEN keylen, vallen;
u_int32_t type; u_int32_t type;
...@@ -5077,6 +5027,8 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) { ...@@ -5077,6 +5027,8 @@ maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) {
node->dirty=TRUE; node->dirty=TRUE;
child->dirty=TRUE; child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE); fixup_child_estimates(node, childnum, child, TRUE);
for (int i=0; i<child->n_children; i++)
BLB_SOFTCOPYISUPTODATE(child,i)=true;
} else { } else {
bytevec key,val; bytevec key,val;
ITEMLEN keylen, vallen; ITEMLEN keylen, vallen;
...@@ -6080,30 +6032,17 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) { ...@@ -6080,30 +6032,17 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) { u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater,
ANCESTORS ancestors, struct pivot_bounds const * const bounds) {
BRTNODE node; BRTNODE node;
{ {
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename)); //assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
// TODO: (Zardosht) change this // TODO: (Zardosht) change this
fill_bfe_for_full_read(&bfe, brt->h); fill_bfe_for_full_read(&bfe, brt->h);
int rr = toku_cachetable_get_and_pin( toku_pin_brtnode_holding_lock(brt, nodename, fullhash,
brt->cf, ancestors, bounds, &bfe,
nodename, &node);
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
brt->h
);
assert_zero(rr);
node = node_v;
assert(node->fullhash==fullhash); assert(node->fullhash==fullhash);
} }
int n_keys = node->n_children-1; int n_keys = node->n_children-1;
...@@ -6130,8 +6069,11 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, ...@@ -6130,8 +6069,11 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
} else { } else {
// nextcomp>=0 and prevcomp<=0, so something in the subtree could match // nextcomp>=0 and prevcomp<=0, so something in the subtree could match
// but they are not both zero, so it's not the whole subtree, so we need to recurse // but they are not both zero, so it's not the whole subtree, so we need to recurse
struct ancestors next_ancestors = {node, i, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, i, bounds);
if (node->height > 0) { if (node->height > 0) {
toku_brt_keyrange_internal(brt, BP_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater); toku_brt_keyrange_internal(brt, BP_BLOCKNUM(node, i), compute_child_fullhash(brt->cf, node, i), key, less, equal, greater,
&next_ancestors, &next_bounds);
} }
else { else {
struct cmd_leafval_heaviside_extra be = {brt, key}; struct cmd_leafval_heaviside_extra be = {brt, key};
...@@ -6155,7 +6097,8 @@ int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u ...@@ -6155,7 +6097,8 @@ int toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less, u_int64_t *equal, u
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
*less = *equal = *greater = 0; *less = *equal = *greater = 0;
toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater); toku_brt_keyrange_internal (brt, *rootp, fullhash, key, less, equal, greater,
(ANCESTORS)NULL, &infinite_bounds);
return 0; return 0;
} }
...@@ -6173,25 +6116,10 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) { ...@@ -6173,25 +6116,10 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
u_int32_t fullhash; u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
CACHEKEY root = *rootp; CACHEKEY root = *rootp;
void *node_v;
struct brtnode_fetch_extra bfe; struct brtnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, brt->h); fill_bfe_for_min_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin( BRTNODE node;
brt->cf, toku_pin_brtnode_holding_lock(brt, root, fullhash, (ANCESTORS)NULL, &infinite_bounds, &bfe, &node);
root,
fullhash,
&node_v,
NULL,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
&bfe,
brt->h
);
if (r!=0) return r;
BRTNODE node = node_v;
s->nkeys = s->ndata = s->dsize = 0; s->nkeys = s->ndata = s->dsize = 0;
int i; int i;
...@@ -6202,7 +6130,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) { ...@@ -6202,7 +6130,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
s->dsize += se->dsize; s->dsize += se->dsize;
} }
r = toku_cachetable_unpin(brt->cf, root, fullhash, CACHETABLE_CLEAN, 0); int r = toku_cachetable_unpin(brt->cf, root, fullhash, CACHETABLE_CLEAN, 0);
if (r!=0) return r; if (r!=0) return r;
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment