Commit ccc5899c authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

one more edit from code review today: move fresh messages to stale after

all application is done


git-svn-id: file:///svn/toku/tokudb@38942 c7de825b-a66e-492c-adef-691d508d4ae1
parent dd758c2e
...@@ -4305,19 +4305,23 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_ ...@@ -4305,19 +4305,23 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_
} }
} }
struct copy_to_stale_extra {
BRT brt;
NONLEAF_CHILDINFO bnc;
};
static int static int
move_to_stale(OMTVALUE v, u_int32_t UU(idx), BRT brt, NONLEAF_CHILDINFO bnc) copy_to_stale(OMTVALUE v, u_int32_t UU(idx), void *extrap)
{ {
// We actually only copy to stale, and then delete messages out of struct copy_to_stale_extra *extra = extrap;
// fresh later on, because we call this during an iteration over
// fresh, and aren't allowed to modify fresh.
const long offset = (long) v; const long offset = (long) v;
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(bnc->buffer, offset); struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(extra->bnc->buffer, offset);
entry->is_fresh = false; entry->is_fresh = false;
DBT keydbt; DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry); DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &brt->h->descriptor, .cmp = brt->compare_fun, .fifo = bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn }; struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->brt->h->descriptor, .cmp = extra->brt->compare_fun, .fifo = extra->bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn };
int r = toku_omt_insert(bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL); assert_zero(r); int r = toku_omt_insert(extra->bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL);
assert_zero(r);
return r; return r;
} }
...@@ -4336,21 +4340,6 @@ store_fifo_offset(OMTVALUE v, u_int32_t UU(idx), void *extrap) ...@@ -4336,21 +4340,6 @@ store_fifo_offset(OMTVALUE v, u_int32_t UU(idx), void *extrap)
return 0; return 0;
} }
struct store_fifo_offset_and_move_to_stale_extra {
BRT brt;
struct store_fifo_offset_extra *sfo_extra;
NONLEAF_CHILDINFO bnc;
};
static int
store_fifo_offset_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct store_fifo_offset_and_move_to_stale_extra *extra = extrap;
int r = store_fifo_offset(v, idx, extra->sfo_extra); assert_zero(r);
r = move_to_stale(v, idx, extra->brt, extra->bnc); assert_zero(r);
return r;
}
/** /**
* Given pointers to offsets within a FIFO where we can find messages, * Given pointers to offsets within a FIFO where we can find messages,
* figure out the MSN of each message, and compare those MSNs. Returns 1, * figure out the MSN of each message, and compare those MSNs. Returns 1,
...@@ -4426,21 +4415,6 @@ iterate_do_brt_leaf_put_cmd(OMTVALUE v, u_int32_t UU(idx), void *extrap) ...@@ -4426,21 +4415,6 @@ iterate_do_brt_leaf_put_cmd(OMTVALUE v, u_int32_t UU(idx), void *extrap)
return 0; return 0;
} }
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra {
BRT brt;
struct iterate_do_brt_leaf_put_cmd_extra *iter_extra;
NONLEAF_CHILDINFO bnc;
};
static int
iterate_do_brt_leaf_put_cmd_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra *e = extrap;
int r = iterate_do_brt_leaf_put_cmd(v, idx, e->iter_extra); assert_zero(r);
r = move_to_stale(v, idx, e->brt, e->bnc); assert_zero(r);
return r;
}
/** /**
* Given the bounds of the basement node to which we will apply messages, * Given the bounds of the basement node to which we will apply messages,
* find the indexes within message_tree which contain the range of * find the indexes within message_tree which contain the range of
...@@ -4600,10 +4574,8 @@ bnc_apply_messages_to_basement_node( ...@@ -4600,10 +4574,8 @@ bnc_apply_messages_to_basement_node(
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra); r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra);
assert_zero(r); assert_zero(r);
// Then store fresh offsets, and move the messages we store to // Then store fresh offsets
// the stale tree. r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset, &sfo_extra);
struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra);
assert_zero(r); assert_zero(r);
// Store offsets of all broadcast messages. // Store offsets of all broadcast messages.
...@@ -4623,13 +4595,10 @@ bnc_apply_messages_to_basement_node( ...@@ -4623,13 +4595,10 @@ bnc_apply_messages_to_basement_node(
toku_free(offsets); toku_free(offsets);
} else if (stale_lbi == stale_ube) { } else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages. We // No stale messages to apply, we just apply fresh messages.
// also move those messages to the stale tree.
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse }; struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse };
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra); r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
assert_zero(r); assert_zero(r);
} else if (fresh_lbi == fresh_ube) { } else if (fresh_lbi == fresh_ube) {
// No fresh messages to apply, we just apply stale messages. // No fresh messages to apply, we just apply stale messages.
...@@ -4640,9 +4609,7 @@ bnc_apply_messages_to_basement_node( ...@@ -4640,9 +4609,7 @@ bnc_apply_messages_to_basement_node(
assert_zero(r); assert_zero(r);
} else { } else {
// We have stale and fresh messages but no broadcasts. We can // We have stale and fresh messages but no broadcasts. We can
// iterate over both OMTs together. We'll build up a list of // iterate over both OMTs together.
// offsets in the fresh OMT that need to be moved to stale later.
long *XMALLOC_N(fresh_ube - fresh_lbi, fresh_offsets_to_move);
// For the loop, we'll keep the indices into both the fresh and // For the loop, we'll keep the indices into both the fresh and
// stale trees, and also the OMTVALUE at those indices. // stale trees, and also the OMTVALUE at those indices.
...@@ -4675,10 +4642,8 @@ bnc_apply_messages_to_basement_node( ...@@ -4675,10 +4642,8 @@ bnc_apply_messages_to_basement_node(
} }
} else if (c > 0) { } else if (c > 0) {
// The fresh message we're pointing to is smaller. We'll // The fresh message we're pointing to is smaller. We'll
// store its offset (so we can move it into the stale tree // apply it, then get the next fresh message into fresh_i
// later), apply it, then get the next fresh message into // and fresh_v.
// fresh_i and fresh_v.
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset); const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry);
fresh_i++; fresh_i++;
...@@ -4704,11 +4669,9 @@ bnc_apply_messages_to_basement_node( ...@@ -4704,11 +4669,9 @@ bnc_apply_messages_to_basement_node(
} }
} }
// Apply (and store offsets of) the rest of the fresh messages, if // Apply the rest of the fresh messages, if any exist
// any exist
while (fresh_i < fresh_ube) { while (fresh_i < fresh_ube) {
const long fresh_offset = (long) fresh_v; const long fresh_offset = (long) fresh_v;
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset); const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry); do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry);
fresh_i++; fresh_i++;
...@@ -4717,18 +4680,13 @@ bnc_apply_messages_to_basement_node( ...@@ -4717,18 +4680,13 @@ bnc_apply_messages_to_basement_node(
assert_zero(r); assert_zero(r);
} }
} }
// Now move all the fresh messages we collected above to the stale
// tree
for (u_int32_t i = 0; i < fresh_ube - fresh_lbi; ++i) {
r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc);
assert_zero(r);
}
toku_free(fresh_offsets_to_move);
} }
// We can't delete things out of the fresh tree inside move_to_stale // We can't delete things out of the fresh tree inside the above
// because that happens while we're still looking at the fresh tree. // procedures because we're still looking at the fresh tree. Instead
// Instead we have to delete from the fresh tree after we're done // we have to move messages after we're done looking at it.
// looking at it. struct copy_to_stale_extra cts_extra = { .brt = t, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, copy_to_stale, &cts_extra);
assert_zero(r);
for (u_int32_t ube = fresh_ube; fresh_lbi < ube; --ube) { for (u_int32_t ube = fresh_ube; fresh_lbi < ube; --ube) {
// When we delete the message at the fresh_lbi index, everything // When we delete the message at the fresh_lbi index, everything
// to the right moves down one spot, including the offset at ube. // to the right moves down one spot, including the offset at ube.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment