Commit b01cbe48 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

one more edit from code review today: move fresh messages to stale after

all application is done


git-svn-id: file:///svn/toku/tokudb@38942 c7de825b-a66e-492c-adef-691d508d4ae1
parent 960bf17f
......@@ -4305,19 +4305,23 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_
}
}
struct copy_to_stale_extra {
BRT brt;
NONLEAF_CHILDINFO bnc;
};
static int
move_to_stale(OMTVALUE v, u_int32_t UU(idx), BRT brt, NONLEAF_CHILDINFO bnc)
copy_to_stale(OMTVALUE v, u_int32_t UU(idx), void *extrap)
{
// We actually only copy to stale, and then delete messages out of
// fresh later on, because we call this during an iteration over
// fresh, and aren't allowed to modify fresh.
struct copy_to_stale_extra *extra = extrap;
const long offset = (long) v;
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(bnc->buffer, offset);
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(extra->bnc->buffer, offset);
entry->is_fresh = false;
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &brt->h->descriptor, .cmp = brt->compare_fun, .fifo = bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn };
int r = toku_omt_insert(bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL); assert_zero(r);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->brt->h->descriptor, .cmp = extra->brt->compare_fun, .fifo = extra->bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn };
int r = toku_omt_insert(extra->bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL);
assert_zero(r);
return r;
}
......@@ -4336,21 +4340,6 @@ store_fifo_offset(OMTVALUE v, u_int32_t UU(idx), void *extrap)
return 0;
}
struct store_fifo_offset_and_move_to_stale_extra {
BRT brt;
struct store_fifo_offset_extra *sfo_extra;
NONLEAF_CHILDINFO bnc;
};
static int
store_fifo_offset_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct store_fifo_offset_and_move_to_stale_extra *extra = extrap;
int r = store_fifo_offset(v, idx, extra->sfo_extra); assert_zero(r);
r = move_to_stale(v, idx, extra->brt, extra->bnc); assert_zero(r);
return r;
}
/**
* Given pointers to offsets within a FIFO where we can find messages,
* figure out the MSN of each message, and compare those MSNs. Returns 1,
......@@ -4426,21 +4415,6 @@ iterate_do_brt_leaf_put_cmd(OMTVALUE v, u_int32_t UU(idx), void *extrap)
return 0;
}
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra {
BRT brt;
struct iterate_do_brt_leaf_put_cmd_extra *iter_extra;
NONLEAF_CHILDINFO bnc;
};
static int
iterate_do_brt_leaf_put_cmd_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra *e = extrap;
int r = iterate_do_brt_leaf_put_cmd(v, idx, e->iter_extra); assert_zero(r);
r = move_to_stale(v, idx, e->brt, e->bnc); assert_zero(r);
return r;
}
/**
* Given the bounds of the basement node to which we will apply messages,
* find the indexes within message_tree which contain the range of
......@@ -4600,10 +4574,8 @@ bnc_apply_messages_to_basement_node(
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, store_fifo_offset, &sfo_extra);
assert_zero(r);
// Then store fresh offsets, and move the messages we store to
// the stale tree.
struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset_and_move_to_stale, &sfoamts_extra);
// Then store fresh offsets
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, store_fifo_offset, &sfo_extra);
assert_zero(r);
// Store offsets of all broadcast messages.
......@@ -4623,13 +4595,10 @@ bnc_apply_messages_to_basement_node(
toku_free(offsets);
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages. We
// also move those messages to the stale tree.
// No stale messages to apply, we just apply fresh messages.
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum, .snapshot_txnids = snapshot_txnids, .live_list_reverse = live_list_reverse };
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra);
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
assert_zero(r);
} else if (fresh_lbi == fresh_ube) {
// No fresh messages to apply, we just apply stale messages.
......@@ -4640,9 +4609,7 @@ bnc_apply_messages_to_basement_node(
assert_zero(r);
} else {
// We have stale and fresh messages but no broadcasts. We can
// iterate over both OMTs together. We'll build up a list of
// offsets in the fresh OMT that need to be moved to stale later.
long *XMALLOC_N(fresh_ube - fresh_lbi, fresh_offsets_to_move);
// iterate over both OMTs together.
// For the loop, we'll keep the indices into both the fresh and
// stale trees, and also the OMTVALUE at those indices.
......@@ -4675,10 +4642,8 @@ bnc_apply_messages_to_basement_node(
}
} else if (c > 0) {
// The fresh message we're pointing to is smaller. We'll
// store its offset (so we can move it into the stale tree
// later), apply it, then get the next fresh message into
// fresh_i and fresh_v.
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
// apply it, then get the next fresh message into fresh_i
// and fresh_v.
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry);
fresh_i++;
......@@ -4704,11 +4669,9 @@ bnc_apply_messages_to_basement_node(
}
}
// Apply (and store offsets of) the rest of the fresh messages, if
// any exist
// Apply the rest of the fresh messages, if any exist
while (fresh_i < fresh_ube) {
const long fresh_offset = (long) fresh_v;
fresh_offsets_to_move[fresh_i - fresh_lbi] = fresh_offset;
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, snapshot_txnids, live_list_reverse, fresh_entry);
fresh_i++;
......@@ -4717,18 +4680,13 @@ bnc_apply_messages_to_basement_node(
assert_zero(r);
}
}
// Now move all the fresh messages we collected above to the stale
// tree
for (u_int32_t i = 0; i < fresh_ube - fresh_lbi; ++i) {
r = move_to_stale((OMTVALUE) fresh_offsets_to_move[i], i + fresh_lbi, t, bnc);
assert_zero(r);
}
toku_free(fresh_offsets_to_move);
}
// We can't delete things out of the fresh tree inside move_to_stale
// because that happens while we're still looking at the fresh tree.
// Instead we have to delete from the fresh tree after we're done
// looking at it.
// We can't delete things out of the fresh tree inside the above
// procedures because we're still looking at the fresh tree. Instead
// we have to move messages after we're done looking at it.
struct copy_to_stale_extra cts_extra = { .brt = t, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, copy_to_stale, &cts_extra);
assert_zero(r);
for (u_int32_t ube = fresh_ube; fresh_lbi < ube; --ube) {
// When we delete the message at the fresh_lbi index, everything
// to the right moves down one spot, including the offset at ube.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment