Commit 9c560a21 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

comment functions below maybe_apply_ancestors_messages_to_node


git-svn-id: file:///svn/toku/tokudb@38877 c7de825b-a66e-492c-adef-691d508d4ae1
parent 285e5c37
......@@ -1752,6 +1752,9 @@ toku_fifo_entry_key_msn_cmp(void *extrap, const void *ap, const void *bp)
int
toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, brt_compare_func cmp)
// Effect: Enqueue the message represented by the parameters into the
// bnc's buffer, and put it in either the fresh or stale message tree,
// or the broadcast list.
{
int diff = keylen + datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
long offset;
......@@ -4299,8 +4302,9 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_
static int
move_to_stale(OMTVALUE v, u_int32_t UU(idx), BRT brt, NONLEAF_CHILDINFO bnc)
{
// we actually only copy to stale, and then delete messages out of
// fresh later on, because we call this during an iteration over fresh
// We actually only copy to stale, and then delete messages out of
// fresh later on, because we call this during an iteration over
// fresh, and aren't allowed to modify fresh.
const long offset = (long) v;
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(bnc->buffer, offset);
entry->is_fresh = false;
......@@ -4442,9 +4446,9 @@ bnc_find_iterate_bounds(
.key = kv_pair_key((struct kv_pair *) bounds->lower_bound_exclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->lower_bound_exclusive),
.msn = MAX_MSN };
// TODO: get this value and compare it with ube to see if we even
// need to continue
OMTVALUE found_lb;
// We use direction=+1 to convert lower bound exclusive to lower
// bound inclusive
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&lbi_extra, +1, &found_lb, lbi);
if (r == DB_NOTFOUND) {
......@@ -4454,6 +4458,8 @@ bnc_find_iterate_bounds(
return;
}
if (bounds->upper_bound_inclusive) {
// Check if what we found for lbi is greater than the upper
// bound inclusive that we have. If so, the range is empty.
DBT ubidbt_tmp = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive);
const long offset = (long) found_lb;
DBT found_lbidbt;
......@@ -4469,6 +4475,7 @@ bnc_find_iterate_bounds(
}
}
} else {
// No lower bound given, it's negative infinity.
*lbi = 0;
}
if (bounds->upper_bound_inclusive) {
......@@ -4478,12 +4485,17 @@ bnc_find_iterate_bounds(
.key = kv_pair_key((struct kv_pair *) bounds->upper_bound_inclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->upper_bound_inclusive),
.msn = MAX_MSN };
// We use direction=+1 to convert upper bound inclusive to upper
// bound exclusive
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&ube_extra, +1, NULL, ube);
if (r == DB_NOTFOUND) {
// Couldn't find one, it must be bigger than everything in our
// buffer, so we include everything
*ube = toku_omt_size(message_tree);
}
} else {
// No upper bound given, it's positive infinity.
*ube = toku_omt_size(message_tree);
}
}
......@@ -4504,6 +4516,9 @@ bnc_apply_messages_to_basement_node(
{
int r;
NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
// Determine the offsets between which we need to apply messages from
// this buffer
u_int32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) {
bnc_find_iterate_bounds(&t->h->descriptor, t->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
......@@ -4517,20 +4532,41 @@ bnc_apply_messages_to_basement_node(
TOKULOGGER logger = toku_cachefile_logger(t->cf);
OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL;
OMT live_list_reverse = logger ? logger->live_list_reverse : NULL;
// The following process will keep track of the max msn of any message
// we apply so that we can update it in the basement node at the end
MSN max_msn_applied = MIN_MSN;
// We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of
// the lists contains relevant messages
if (toku_omt_size(bnc->broadcast_list) > 0) {
const int buffer_size = (stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + toku_omt_size(bnc->broadcast_list);
// We have some broadcasts, which don't have keys, so we grab all
// the relevant messages and sort them by MSN.
const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + toku_omt_size(bnc->broadcast_list));
long *XMALLOC_N(buffer_size, offsets);
struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
if (!bn->stale_ancestor_messages_applied) {
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra); assert_zero(r);
// If we must apply stale messages, store their offsets
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra);
assert_zero(r);
}
// Store fresh offsets, and move the messages we store to stale
struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra); assert_zero(r);
r = toku_omt_iterate(bnc->broadcast_list, store_fifo_offset, &sfo_extra); assert_zero(r);
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra);
assert_zero(r);
// Store offsets of broadcast messages
r = toku_omt_iterate(bnc->broadcast_list, store_fifo_offset, &sfo_extra);
assert_zero(r);
invariant(sfo_extra.i == buffer_size);
r = mergesort_r(offsets, buffer_size, sizeof offsets[0], bnc->buffer, fifo_offset_msn_cmp); assert_zero(r);
// Sort by MSN.
r = mergesort_r(offsets, buffer_size, sizeof offsets[0], bnc->buffer, fifo_offset_msn_cmp);
assert_zero(r);
// Apply the messages in MSN order
for (int i = 0; i < buffer_size; ++i) {
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, entry);
......@@ -4538,19 +4574,31 @@ bnc_apply_messages_to_basement_node(
toku_free(offsets);
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied };
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra); assert_zero(r);
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra);
assert_zero(r);
} else if (fresh_lbi == fresh_ube) {
// No fresh messages to apply, we just apply stale messages
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied };
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, iterate_do_brt_leaf_put_cmd, &iter_extra); assert_zero(r);
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
assert_zero(r);
} else {
// We have stale and fresh messages but no broadcasts. We can
// iterate over both OMTs together. We'll build up a list of
// offsets in the fresh OMT that need to be moved to stale later.
long *XMALLOC_N(fresh_ube - fresh_lbi, fresh_offsets_to_move);
u_int32_t stale_i = stale_lbi, fresh_i = fresh_lbi;
OMTVALUE stale_v, fresh_v;
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v); assert_zero(r);
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v); assert_zero(r);
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v);
assert_zero(r);
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v);
assert_zero(r);
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->h->descriptor, .cmp = t->compare_fun, .fifo = bnc->buffer };
// Iterate over both lists, applying the smaller (in (key, msn)
// order) message at each step
while (stale_i < stale_ube && fresh_i < fresh_ube) {
const long stale_offset = (long) stale_v;
const long fresh_offset = (long) fresh_v;
......@@ -4575,6 +4623,7 @@ bnc_apply_messages_to_basement_node(
assert(false);
}
}
// Apply the rest of the stale messages, if any exist
while (stale_i < stale_ube) {
const long stale_offset = (long) stale_v;
const struct fifo_entry *stale_entry = toku_fifo_get_entry(bnc->buffer, stale_offset);
......@@ -4584,6 +4633,7 @@ bnc_apply_messages_to_basement_node(
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v); assert_zero(r);
}
}
// Apply the rest of the fresh messages, if any exist
while (fresh_i < fresh_ube) {
const long fresh_offset = (long) fresh_v;
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
......@@ -4594,8 +4644,10 @@ bnc_apply_messages_to_basement_node(
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v); assert_zero(r);
}
}
// Now move all the fresh messages we collected to stale
for (u_int32_t i = 0; i < fresh_ube - fresh_lbi; ++i) {
r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc); assert_zero(r);
r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc);
assert_zero(r);
}
toku_free(fresh_offsets_to_move);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment