Commit ca7781d9 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5378 merge to main

git-svn-id: file:///svn/toku/tokudb@47027 c7de825b-a66e-492c-adef-691d508d4ae1
parent b2c3633e
...@@ -136,6 +136,8 @@ toku_pin_ftnode_batched( ...@@ -136,6 +136,8 @@ toku_pin_ftnode_batched(
{ {
void *node_v; void *node_v;
*msgs_applied = false; *msgs_applied = false;
pair_lock_type needed_lock_type = lock_type;
try_again_for_write_lock:
int r = toku_cachetable_get_and_pin_nonblocking_batched( int r = toku_cachetable_get_and_pin_nonblocking_batched(
brt->ft->cf, brt->ft->cf,
blocknum, blocknum,
...@@ -146,16 +148,32 @@ toku_pin_ftnode_batched( ...@@ -146,16 +148,32 @@ toku_pin_ftnode_batched(
toku_ftnode_fetch_callback, toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback, toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback, toku_ftnode_pf_callback,
lock_type, needed_lock_type,
bfe, //read_extraargs bfe, //read_extraargs
unlockers); unlockers);
if (r==0) { if (r==0) {
FTNODE node = (FTNODE) node_v; FTNODE node = static_cast<FTNODE>(node_v);
MSN max_msn_in_path;
bool needs_ancestors_messages = false;
if (apply_ancestor_messages && node->height == 0) {
needs_ancestors_messages = toku_ft_leaf_needs_ancestors_messages(brt->ft, node, ancestors, bounds, &max_msn_in_path);
if (needs_ancestors_messages && needed_lock_type == PL_READ) {
toku_unpin_ftnode_read_only(brt, node);
needed_lock_type = PL_WRITE_CHEAP;
goto try_again_for_write_lock;
}
}
if (end_batch_on_success) { if (end_batch_on_success) {
toku_cachetable_end_batched_pin(brt->ft->cf); toku_cachetable_end_batched_pin(brt->ft->cf);
} }
if (apply_ancestor_messages && node->height == 0) { if (apply_ancestor_messages && node->height == 0) {
toku_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied); if (needs_ancestors_messages) {
invariant(needed_lock_type != PL_READ);
toku_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied);
} else {
toku_ft_bn_update_max_msn(node, max_msn_in_path);
}
invariant(needed_lock_type != PL_READ || !*msgs_applied);
} }
if ((lock_type != PL_READ) && node->height > 0) { if ((lock_type != PL_READ) && node->height > 0) {
toku_move_ftnode_messages_to_stale(brt->ft, node); toku_move_ftnode_messages_to_stale(brt->ft, node);
......
...@@ -821,6 +821,10 @@ struct pivot_bounds { ...@@ -821,6 +821,10 @@ struct pivot_bounds {
__attribute__((nonnull)) __attribute__((nonnull))
void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node); void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node);
void toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied); void toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied);
__attribute__((nonnull))
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path);
__attribute__((nonnull))
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied);
int int
toku_ft_search_which_child( toku_ft_search_which_child(
......
...@@ -4117,6 +4117,96 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances ...@@ -4117,6 +4117,96 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
} }
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path)
// Effect: Determine whether there are messages in a node's ancestors
// which must be applied to it. These messages are in the correct
// keyrange for any available basement nodes, and are in nodes with the
// correct max_msn_applied_to_node_on_disk.
// Notes:
// This is an approximate query.
// Output:
// max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
// ancestors. This is used later to update basement nodes'
// max_msn_applied values in case we don't do the full algorithm.
// Returns:
// true if there may be some such messages
// false only if there are definitely no such messages
// Rationale:
// When we pin a node with a read lock, we want to quickly determine if
// we should exchange it for a write lock in preparation for applying
// messages. If there are no messages, we don't need the write lock.
{
invariant(node->height == 0);
MSN max_msn_applied = ZERO_MSN;
bool needs_ancestors_messages = false;
for (int i = 0; i < node->n_children; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) { continue; }
BASEMENTNODE bn = BLB(node, i);
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
if (bnc->broadcast_list.size() > 0) {
needs_ancestors_messages = true;
goto cleanup;
}
if (!bn->stale_ancestor_messages_applied) {
uint32_t stale_lbi, stale_ube;
find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun,
bnc->stale_message_tree,
bnc->buffer,
&curr_bounds,
&stale_lbi,
&stale_ube);
if (stale_lbi < stale_ube) {
needs_ancestors_messages = true;
goto cleanup;
}
}
uint32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun,
bnc->fresh_message_tree,
bnc->buffer,
&curr_bounds,
&fresh_lbi,
&fresh_ube);
if (fresh_lbi < fresh_ube) {
needs_ancestors_messages = true;
goto cleanup;
}
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied.msn) {
max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
}
}
}
}
*max_msn_in_path = max_msn_applied;
cleanup:
return needs_ancestors_messages;
}
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied) {
invariant(node->height == 0);
// At this point, we aren't going to run toku_apply_... but that
// doesn't mean max_msn_applied doesn't need to be updated.
// This function runs in a shared access context, so to silence tools
// like DRD, we use a CAS and ignore the result.
// Any threads trying to update these basement nodes should be
// updating them to the same thing (since they all have a read lock on
// the same root-to-leaf path) so this is safe.
for (int i = 0; i < node->n_children; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) { continue; }
BASEMENTNODE bn = BLB(node, i);
// Remember, this all happens in the context of a read lock.
if (max_msn_applied.msn > bn->max_msn_applied.msn) {
(void) __sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
}
}
}
struct copy_to_stale_extra { struct copy_to_stale_extra {
FT ft; FT ft;
NONLEAF_CHILDINFO bnc; NONLEAF_CHILDINFO bnc;
...@@ -4393,12 +4483,11 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F ...@@ -4393,12 +4483,11 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
); );
bool msgs_applied = false; bool msgs_applied = false;
{ {
pair_lock_type lock_type = (node->height == 1) ? PL_WRITE_CHEAP : PL_READ;
int rr = toku_pin_ftnode_batched(brt, childblocknum, fullhash, int rr = toku_pin_ftnode_batched(brt, childblocknum, fullhash,
unlockers, unlockers,
&next_ancestors, bounds, &next_ancestors, bounds,
&bfe, &bfe,
lock_type, // may_modify_node true iff child is leaf PL_READ, // we try to get a read lock, but we may upgrade to a write lock on a leaf for message application.
true, true,
(node->height == 1), // end_batch_on_success true iff child is a leaf (node->height == 1), // end_batch_on_success true iff child is a leaf
&childnode, &childnode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment