Commit 1eb7167b authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

comments added from code review today


git-svn-id: file:///svn/toku/tokudb@38883 c7de825b-a66e-492c-adef-691d508d4ae1
parent c5c0e52d
...@@ -1717,7 +1717,13 @@ key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, ...@@ -1717,7 +1717,13 @@ key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
FAKE_DB(db, tmpdesc, descriptor); FAKE_DB(db, tmpdesc, descriptor);
int r = key_cmp(&db, a, b); int r = key_cmp(&db, a, b);
if (r == 0) { if (r == 0) {
r = (amsn.msn > bmsn.msn) - (amsn.msn < bmsn.msn); if (amsn.msn > bmsn.msn) {
r = +1;
} else if (amsn.msn < bmsn.msn) {
r = -1;
} else {
r = 0;
}
} }
return r; return r;
} }
...@@ -4345,6 +4351,11 @@ store_fifo_offset_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap) ...@@ -4345,6 +4351,11 @@ store_fifo_offset_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
return r; return r;
} }
/**
* Given pointers to offsets within a FIFO where we can find messages,
* figure out the MSN of each message, and compare those MSNs. Returns 1,
* 0, or -1 if a is larger than, equal to, or smaller than b.
*/
static int static int
fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb) fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb)
{ {
...@@ -4353,9 +4364,20 @@ fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb) ...@@ -4353,9 +4364,20 @@ fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb)
const long *bo = vb; const long *bo = vb;
const struct fifo_entry *a = toku_fifo_get_entry(fifo, *ao); const struct fifo_entry *a = toku_fifo_get_entry(fifo, *ao);
const struct fifo_entry *b = toku_fifo_get_entry(fifo, *bo); const struct fifo_entry *b = toku_fifo_get_entry(fifo, *bo);
return (a->msn.msn > b->msn.msn) - (a->msn.msn < b->msn.msn); if (a->msn.msn > b->msn.msn) {
return +1;
}
if (a->msn.msn < b->msn.msn) {
return -1;
}
return 0;
} }
/**
* Given a fifo_entry, either decompose it into its parameters and call
* brt_leaf_put_cmd, or discard it, based on its MSN and the MSN of the
* basement node.
*/
static void static void
do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor, int childnum, OMT snapshot_txnids, OMT live_list_reverse, MSN *max_msn_applied, const struct fifo_entry *entry) do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor, int childnum, OMT snapshot_txnids, OMT live_list_reverse, MSN *max_msn_applied, const struct fifo_entry *entry)
{ {
...@@ -4372,9 +4394,10 @@ do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor, ...@@ -4372,9 +4394,10 @@ do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor,
DBT hv; DBT hv;
BRT_MSG_S brtcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } }; BRT_MSG_S brtcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } };
bool made_change; bool made_change;
// the messages are in (key,msn) order so all the messages for one key // The messages are being iterated over in (key,msn) order or just in
// in one buffer are in ascending msn order, so it's ok that we don't // msn order, so all the messages for one key, from one buffer, are in
// update the basement node's msn until the end // ascending msn order. So it's ok that we don't update the basement
// node's msn until the end.
if (brtcmd.msn.msn > bn->max_msn_applied.msn) { if (brtcmd.msn.msn > bn->max_msn_applied.msn) {
if (brtcmd.msn.msn > max_msn_applied->msn) { if (brtcmd.msn.msn > max_msn_applied->msn) {
*max_msn_applied = brtcmd.msn; *max_msn_applied = brtcmd.msn;
...@@ -4422,24 +4445,40 @@ iterate_do_brt_leaf_put_cmd_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *e ...@@ -4422,24 +4445,40 @@ iterate_do_brt_leaf_put_cmd_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *e
return r; return r;
} }
/**
* Given the bounds of the basement node to which we will apply messages,
* find the indexes within message_tree which contain the range of
* relevant messages.
*
* The message tree contains offsets into the buffer, where messages are
* found. The pivot_bounds are the lower bound exclusive and upper bound
* inclusive, because they come from pivot keys in the tree. We want OMT
* indices, which must have the lower bound be inclusive and the upper
* bound exclusive. We will get these by telling toku_omt_find to look
* for something strictly bigger than each of our pivot bounds.
*
* Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
* bound exclusive).
*/
static void static void
bnc_find_iterate_bounds( find_bounds_within_message_tree(
DESCRIPTOR desc, DESCRIPTOR desc, /// used for cmp
brt_compare_func cmp, brt_compare_func cmp, /// used to compare keys
OMT message_tree, OMT message_tree, /// tree holding FIFO offsets, in which we want to look for indices
FIFO buffer, FIFO buffer, /// buffer in which messages are found
struct pivot_bounds const * const bounds, struct pivot_bounds const * const bounds, /// key bounds within the basement node we're applying messages to
u_int32_t *lbi, u_int32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
u_int32_t *ube u_int32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
) )
{ {
int r = 0; int r = 0;
// The bounds given to us are of the form (lbe,ubi] but the omt is
// going to iterate over [left,right) (see toku_omt_iterate_on_range),
// so we're going to convert it to [lbe,ubi) through an application of
// heaviside functions.
if (bounds->lower_bound_exclusive) { if (bounds->lower_bound_exclusive) {
// By setting msn to MAX_MSN and by using direction of +1, we will
// get the first message greater than (in (key, msn) order) any
// message (with any msn) with the key lower_bound_exclusive.
// This will be a message we want to try applying, so it is the
// "lower bound inclusive" within the message_tree.
struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra = { struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra = {
.desc = desc, .cmp = cmp, .desc = desc, .cmp = cmp,
.fifo = buffer, .fifo = buffer,
...@@ -4447,55 +4486,64 @@ bnc_find_iterate_bounds( ...@@ -4447,55 +4486,64 @@ bnc_find_iterate_bounds(
.keylen = kv_pair_keylen((struct kv_pair *) bounds->lower_bound_exclusive), .keylen = kv_pair_keylen((struct kv_pair *) bounds->lower_bound_exclusive),
.msn = MAX_MSN }; .msn = MAX_MSN };
OMTVALUE found_lb; OMTVALUE found_lb;
// We use direction=+1 to convert lower bound exclusive to lower
// bound inclusive
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside, r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&lbi_extra, +1, &found_lb, lbi); &lbi_extra, +1, &found_lb, lbi);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
// no relevant data, we're done // There is no relevant data (the lower bound is bigger than
// any message in this tree), so we have no range and we're
// done.
*lbi = 0; *lbi = 0;
*ube = 0; *ube = 0;
return; return;
} }
if (bounds->upper_bound_inclusive) { if (bounds->upper_bound_inclusive) {
// Check if what we found for lbi is greater than the upper // Check if what we found for lbi is greater than the upper
// bound inclusive that we have. If so, the range is empty. // bound inclusive that we have. If so, there are no relevant
// messages between these bounds.
DBT ubidbt_tmp = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive); DBT ubidbt_tmp = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive);
const long offset = (long) found_lb; const long offset = (long) found_lb;
DBT found_lbidbt; DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset)); fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
FAKE_DB(db, tmpdesc, desc); FAKE_DB(db, tmpdesc, desc);
int c = cmp(&db, &found_lbidbt, &ubidbt_tmp); int c = cmp(&db, &found_lbidbt, &ubidbt_tmp);
// These DBTs really are both inclusive so we need strict inequality. // These DBTs really are both inclusive bounds, so we need
// strict inequality in order to determine that there's
// nothing between them. If they're equal, then we actually
// need to apply the message pointed to by lbi, and also
// anything with the same key but a bigger msn.
if (c > 0) { if (c > 0) {
// no relevant data, we're done
*lbi = 0; *lbi = 0;
*ube = 0; *ube = 0;
return; return;
} }
} }
} else { } else {
// No lower bound given, it's negative infinity. // No lower bound given, it's negative infinity, so we start at
// the first message in the OMT.
*lbi = 0; *lbi = 0;
} }
if (bounds->upper_bound_inclusive) { if (bounds->upper_bound_inclusive) {
// Again, we use an msn of MAX_MSN and a direction of +1 to get
// the first thing bigger than the upper_bound_inclusive key.
// This is therefore the smallest thing we don't want to apply,
// and toku_omt_iterate_on_range will not examine it.
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra = { struct toku_fifo_entry_key_msn_heaviside_extra ube_extra = {
.desc = desc, .cmp = cmp, .desc = desc, .cmp = cmp,
.fifo = buffer, .fifo = buffer,
.key = kv_pair_key((struct kv_pair *) bounds->upper_bound_inclusive), .key = kv_pair_key((struct kv_pair *) bounds->upper_bound_inclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->upper_bound_inclusive), .keylen = kv_pair_keylen((struct kv_pair *) bounds->upper_bound_inclusive),
.msn = MAX_MSN }; .msn = MAX_MSN };
// We use direction=+1 to convert upper bound inclusive to upper
// bound exclusive
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside, r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&ube_extra, +1, NULL, ube); &ube_extra, +1, NULL, ube);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
// Couldn't find one, it must be bigger than everything in our // Couldn't find anything in the buffer bigger than our key,
// buffer, so we include everything // so we need to look at everything up to the end of
// message_tree.
*ube = toku_omt_size(message_tree); *ube = toku_omt_size(message_tree);
} }
} else { } else {
// No upper bound given, it's positive infinity. // No upper bound given, it's positive infinity, so we need to go
// through the end of the OMT.
*ube = toku_omt_size(message_tree); *ube = toku_omt_size(message_tree);
} }
} }
...@@ -4517,47 +4565,49 @@ bnc_apply_messages_to_basement_node( ...@@ -4517,47 +4565,49 @@ bnc_apply_messages_to_basement_node(
int r; int r;
NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum); NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
// Determine the offsets between which we need to apply messages from // Determine the offsets in the message trees between which we need to
// this buffer // apply messages from this buffer
u_int32_t stale_lbi, stale_ube; u_int32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) { if (!bn->stale_ancestor_messages_applied) {
bnc_find_iterate_bounds(&t->h->descriptor, t->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube); find_bounds_within_message_tree(&t->h->descriptor, t->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
} else { } else {
stale_lbi = 0; stale_lbi = 0;
stale_ube = 0; stale_ube = 0;
} }
u_int32_t fresh_lbi, fresh_ube; u_int32_t fresh_lbi, fresh_ube;
bnc_find_iterate_bounds(&t->h->descriptor, t->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube); find_bounds_within_message_tree(&t->h->descriptor, t->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbi, &fresh_ube);
TOKULOGGER logger = toku_cachefile_logger(t->cf); TOKULOGGER logger = toku_cachefile_logger(t->cf);
OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL; OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL;
OMT live_list_reverse = logger ? logger->live_list_reverse : NULL; OMT live_list_reverse = logger ? logger->live_list_reverse : NULL;
// The following process will keep track of the max msn of any message // The following process will keep track of the max msn of any message
// we apply so that we can update it in the basement node at the end // we apply so that we can update it in the basement node at the end.
MSN max_msn_applied = MIN_MSN; MSN max_msn_applied = MIN_MSN;
// We now know where all the messages we must apply are, so one of the // We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of // following 4 cases will do the application, depending on which of
// the lists contains relevant messages // the lists contains relevant messages.
if (toku_omt_size(bnc->broadcast_list) > 0) { if (toku_omt_size(bnc->broadcast_list) > 0) {
// We have some broadcasts, which don't have keys, so we grab all // We have some broadcasts, which don't have keys, so we grab all
// the relevant messages and sort them by MSN. // the relevant messages' offsets and sort them by MSN, then apply
// them in MSN order.
const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + toku_omt_size(bnc->broadcast_list)); const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + toku_omt_size(bnc->broadcast_list));
long *XMALLOC_N(buffer_size, offsets); long *XMALLOC_N(buffer_size, offsets);
struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 }; struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
if (!bn->stale_ancestor_messages_applied) { if (!bn->stale_ancestor_messages_applied) {
// If we must apply stale messages, store their offsets // If we must apply stale messages, store their offsets.
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra); r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra);
assert_zero(r); assert_zero(r);
} }
// Store fresh offsets, and move the messages we store to stale // Then store fresh offsets, and move the messages we store to
// the stale tree.
struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc }; struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra); r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra);
assert_zero(r); assert_zero(r);
// Store offsets of broadcast messages // Store offsets of all broadcast messages.
r = toku_omt_iterate(bnc->broadcast_list, store_fifo_offset, &sfo_extra); r = toku_omt_iterate(bnc->broadcast_list, store_fifo_offset, &sfo_extra);
assert_zero(r); assert_zero(r);
invariant(sfo_extra.i == buffer_size); invariant(sfo_extra.i == buffer_size);
...@@ -4566,7 +4616,7 @@ bnc_apply_messages_to_basement_node( ...@@ -4566,7 +4616,7 @@ bnc_apply_messages_to_basement_node(
r = mergesort_r(offsets, buffer_size, sizeof offsets[0], bnc->buffer, fifo_offset_msn_cmp); r = mergesort_r(offsets, buffer_size, sizeof offsets[0], bnc->buffer, fifo_offset_msn_cmp);
assert_zero(r); assert_zero(r);
// Apply the messages in MSN order // Apply the messages in MSN order.
for (int i = 0; i < buffer_size; ++i) { for (int i = 0; i < buffer_size; ++i) {
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]); const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, entry);
...@@ -4574,14 +4624,19 @@ bnc_apply_messages_to_basement_node( ...@@ -4574,14 +4624,19 @@ bnc_apply_messages_to_basement_node(
toku_free(offsets); toku_free(offsets);
} else if (stale_lbi == stale_ube) { } else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages // No stale messages to apply, we just apply fresh messages. We
// also move those messages to the stale tree.
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied }; struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied };
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc }; struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra); r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra);
assert_zero(r); assert_zero(r);
} else if (fresh_lbi == fresh_ube) { } else if (fresh_lbi == fresh_ube) {
// No fresh messages to apply, we just apply stale messages // No fresh messages to apply, we just apply stale messages.
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied }; struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse, .max_msn_applied = &max_msn_applied };
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, iterate_do_brt_leaf_put_cmd, &iter_extra); r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
assert_zero(r); assert_zero(r);
} else { } else {
...@@ -4590,13 +4645,18 @@ bnc_apply_messages_to_basement_node( ...@@ -4590,13 +4645,18 @@ bnc_apply_messages_to_basement_node(
// offsets in the fresh OMT that need to be moved to stale later. // offsets in the fresh OMT that need to be moved to stale later.
long *XMALLOC_N(fresh_ube - fresh_lbi, fresh_offsets_to_move); long *XMALLOC_N(fresh_ube - fresh_lbi, fresh_offsets_to_move);
// For the loop, we'll keep the indices into both the fresh and
// stale trees, and also the OMTVALUE at those indices.
u_int32_t stale_i = stale_lbi, fresh_i = fresh_lbi; u_int32_t stale_i = stale_lbi, fresh_i = fresh_lbi;
OMTVALUE stale_v, fresh_v; OMTVALUE stale_v, fresh_v;
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v); r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v);
assert_zero(r); assert_zero(r);
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v); r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v);
assert_zero(r); assert_zero(r);
// This comparison extra struct won't change during iteration.
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->h->descriptor, .cmp = t->compare_fun, .fifo = bnc->buffer }; struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->h->descriptor, .cmp = t->compare_fun, .fifo = bnc->buffer };
// Iterate over both lists, applying the smaller (in (key, msn) // Iterate over both lists, applying the smaller (in (key, msn)
// order) message at each step // order) message at each step
while (stale_i < stale_ube && fresh_i < fresh_ube) { while (stale_i < stale_ube && fresh_i < fresh_ube) {
...@@ -4604,25 +4664,35 @@ bnc_apply_messages_to_basement_node( ...@@ -4604,25 +4664,35 @@ bnc_apply_messages_to_basement_node(
const long fresh_offset = (long) fresh_v; const long fresh_offset = (long) fresh_v;
int c = toku_fifo_entry_key_msn_cmp(&extra, &stale_offset, &fresh_offset); int c = toku_fifo_entry_key_msn_cmp(&extra, &stale_offset, &fresh_offset);
if (c < 0) { if (c < 0) {
// The stale message we're pointing to is smaller. We'll
// apply it, then get the next stale message into stale_i
// and stale_v.
const struct fifo_entry *stale_entry = toku_fifo_get_entry(bnc->buffer, stale_offset); const struct fifo_entry *stale_entry = toku_fifo_get_entry(bnc->buffer, stale_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, stale_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, stale_entry);
stale_i++; stale_i++;
if (stale_i != stale_ube) { if (stale_i != stale_ube) {
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v); assert_zero(r); r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v);
assert_zero(r);
} }
} else if (c > 0) { } else if (c > 0) {
// The fresh message we're pointing to is smaller. We'll
// store its offset (so we can move it into the stale tree
// later), apply it, then get the next fresh message into
// fresh_i and fresh_v.
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset; fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset); const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, fresh_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, fresh_entry);
fresh_i++; fresh_i++;
if (fresh_i != fresh_ube) { if (fresh_i != fresh_ube) {
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v); assert_zero(r); r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v);
assert_zero(r);
} }
} else { } else {
// there is a message in both trees // There is a message in both trees. This should not happen.
assert(false); assert(false);
} }
} }
// Apply the rest of the stale messages, if any exist // Apply the rest of the stale messages, if any exist
while (stale_i < stale_ube) { while (stale_i < stale_ube) {
const long stale_offset = (long) stale_v; const long stale_offset = (long) stale_v;
...@@ -4630,10 +4700,13 @@ bnc_apply_messages_to_basement_node( ...@@ -4630,10 +4700,13 @@ bnc_apply_messages_to_basement_node(
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, stale_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, stale_entry);
stale_i++; stale_i++;
if (stale_i != stale_ube) { if (stale_i != stale_ube) {
r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v); assert_zero(r); r = toku_omt_fetch(bnc->stale_message_tree, stale_i, &stale_v);
assert_zero(r);
} }
} }
// Apply the rest of the fresh messages, if any exist
// Apply (and store offsets of) the rest of the fresh messages, if
// any exist
while (fresh_i < fresh_ube) { while (fresh_i < fresh_ube) {
const long fresh_offset = (long) fresh_v; const long fresh_offset = (long) fresh_v;
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset; fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
...@@ -4641,23 +4714,27 @@ bnc_apply_messages_to_basement_node( ...@@ -4641,23 +4714,27 @@ bnc_apply_messages_to_basement_node(
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, fresh_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, &max_msn_applied, fresh_entry);
fresh_i++; fresh_i++;
if (fresh_i != fresh_ube) { if (fresh_i != fresh_ube) {
r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v); assert_zero(r); r = toku_omt_fetch(bnc->fresh_message_tree, fresh_i, &fresh_v);
assert_zero(r);
} }
} }
// Now move all the fresh messages we collected to stale // Now move all the fresh messages we collected above to the stale
// tree
for (u_int32_t i = 0; i < fresh_ube - fresh_lbi; ++i) { for (u_int32_t i = 0; i < fresh_ube - fresh_lbi; ++i) {
r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc); r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc);
assert_zero(r); assert_zero(r);
} }
toku_free(fresh_offsets_to_move); toku_free(fresh_offsets_to_move);
} }
// we can't delete things inside move_to_stale because that happens // We can't delete things out of the fresh tree inside move_to_stale
// inside an iteration, instead we have to delete from fresh after // because that happens while we're still looking at the fresh tree.
// Instead we have to delete from the fresh tree after we're done
// looking at it.
for (u_int32_t ube = fresh_ube; fresh_lbi < ube; --ube) { for (u_int32_t ube = fresh_ube; fresh_lbi < ube; --ube) {
r = toku_omt_delete_at(bnc->fresh_message_tree, fresh_lbi); assert_zero(r); // When we delete the message at the fresh_lbi index, everything
} // to the right moves down one spot, including the offset at ube.
if (ancestor->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) { r = toku_omt_delete_at(bnc->fresh_message_tree, fresh_lbi);
bn->max_msn_applied = ancestor->max_msn_applied_to_node_on_disk; assert_zero(r);
} }
return r; return r;
} }
...@@ -4665,9 +4742,13 @@ bnc_apply_messages_to_basement_node( ...@@ -4665,9 +4742,13 @@ bnc_apply_messages_to_basement_node(
void void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds) maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
// Effect: // Effect:
// Bring a leaf node up-to-date according to all the messages in the ancestors. // Bring a leaf node up-to-date according to all the messages in the ancestors.
// If the leaf node is already up-to-date then do nothing. // If the leaf node is already up-to-date then do nothing.
// If the leaf node is not already up-to-date, then record the work done for that leaf in each ancestor. // If the leaf node is not already up-to-date, then record the work done
// for that leaf in each ancestor.
// Requires:
// This is being called when pinning a leaf node for the query path.
// The entire root-to-leaf path is pinned and appears in the ancestors list.
{ {
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
if (node->height > 0) { goto exit; } if (node->height > 0) { goto exit; }
...@@ -4678,12 +4759,10 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors ...@@ -4678,12 +4759,10 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
// allows the cleaner thread to just pick an internal node and flush it // allows the cleaner thread to just pick an internal node and flush it
// as opposed to being forced to start from the root. // as opposed to being forced to start from the root.
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
int height = 0;
if (BP_STATE(node, i) != PT_AVAIL) { continue; } if (BP_STATE(node, i) != PT_AVAIL) { continue; }
BASEMENTNODE curr_bn = BLB(node, i); BASEMENTNODE curr_bn = BLB(node, i);
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
height++;
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) { if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
bnc_apply_messages_to_basement_node( bnc_apply_messages_to_basement_node(
...@@ -4694,11 +4773,16 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors ...@@ -4694,11 +4773,16 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
curr_ancestors->childnum, curr_ancestors->childnum,
&curr_bounds &curr_bounds
); );
// we don't want to check this node again if the next time // We don't want to check this ancestor node again if the
// we query it, the msn hasn't changed. // next time we query it, the msn hasn't changed.
curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk; curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
} }
} }
// At this point, we know all the stale messages above this
// basement node have been applied, and any new messages will be
// fresh, so we don't need to look at stale messages for this
// basement node, unless it gets evicted (and this field becomes
// false when it's read in again).
curr_bn->stale_ancestor_messages_applied = true; curr_bn->stale_ancestor_messages_applied = true;
} }
exit: exit:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment