Commit 8f1ee9c2 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5155 merge some work to main:

 - better delete marked algorithm
 - use marked omt for fresh message tree
 - little cleanup things


git-svn-id: file:///svn/toku/tokudb@46549 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6a42c920
......@@ -57,7 +57,7 @@ static int next_power_of_two (int n) {
return r;
}
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, long *dest) {
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, int32_t *dest) {
int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen
+ xids_get_size(xids)
......@@ -134,7 +134,7 @@ DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry) {
return toku_fill_dbt(dbt, xids_get_end_of_array((XIDS) &entry->xids_s), entry->keylen);
}
const struct fifo_entry *toku_fifo_get_entry(FIFO fifo, long off) {
struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off) {
return toku_fifo_iterate_internal_get_entry(fifo, off);
}
......
......@@ -55,7 +55,7 @@ void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, long *dest);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, int32_t *dest);
unsigned int toku_fifo_buffer_size_in_use (FIFO fifo);
unsigned long toku_fifo_memory_size_in_use(FIFO fifo); // return how much memory in the fifo holds useful data
......@@ -90,7 +90,7 @@ int toku_fifo_iterate_internal_next(FIFO fifo, int off);
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off);
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry);
const struct fifo_entry *toku_fifo_get_entry(FIFO fifo, long off);
struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off);
void toku_fifo_clone(FIFO orig_fifo, FIFO* cloned_fifo);
......
......@@ -149,8 +149,11 @@ toku_pin_ftnode(
unlockers);
if (r==0) {
FTNODE node = (FTNODE) node_v;
if (apply_ancestor_messages) {
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied);
if (apply_ancestor_messages && node->height == 0) {
toku_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied);
}
if (may_modify_node && node->height > 0) {
toku_move_ftnode_messages_to_stale(brt->ft, node);
}
*node_p = node;
// printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
......@@ -204,9 +207,26 @@ toku_pin_ftnode_off_client_thread(
);
assert(r==0);
FTNODE node = (FTNODE) node_v;
if (may_modify_node && node->height > 0) {
toku_move_ftnode_messages_to_stale(h, node);
}
*node_p = node;
}
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep, bool may_modify_node) {
void *node_v;
int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, &node_v);
if (r != 0) {
goto cleanup;
}
CAST_FROM_VOIDP(*nodep, node_v);
if (may_modify_node && (*nodep)->height > 0) {
toku_move_ftnode_messages_to_stale(ft, *nodep);
}
cleanup:
return r;
}
void
toku_unpin_ftnode_off_client_thread(FT ft, FTNODE node)
{
......
......@@ -94,6 +94,12 @@ toku_pin_ftnode_off_client_thread(
FTNODE *node_p
);
/**
* This function may return a pinned ftnode to the caller, if pinning is cheap.
* If the node is already locked, or is pending a checkpoint, the node is not pinned and -1 is returned.
*/
int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep, bool may_modify_node);
/**
* Effect: Unpin a brt node. Used for
* nodes that were pinned off client thread.
......
......@@ -1784,15 +1784,9 @@ flush_node_on_background_thread(FT h, FTNODE parent)
//
// see if we can pin the child
//
void *node_v;
FTNODE child;
uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
int r = toku_cachetable_maybe_get_and_pin_clean (
h->cf,
BP_BLOCKNUM(parent,childnum),
childfullhash,
&node_v
);
int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, &child, true);
if (r != 0) {
// In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use flush_some_child, which checks to
......@@ -1803,7 +1797,6 @@ flush_node_on_background_thread(FT h, FTNODE parent)
//
// successfully locked child
//
child = (FTNODE) node_v;
bool may_child_be_reactive = may_node_be_reactive(child);
if (!may_child_be_reactive) {
// We're going to unpin the parent, so before we do, we must
......
......@@ -104,7 +104,7 @@ struct toku_fifo_entry_key_msn_heaviside_extra {
// comparison function for inserting messages into a
// ftnode_nonleaf_childinfo's message_tree
int
toku_fifo_entry_key_msn_heaviside(const long &v, const struct toku_fifo_entry_key_msn_heaviside_extra &extra);
toku_fifo_entry_key_msn_heaviside(const int32_t &v, const struct toku_fifo_entry_key_msn_heaviside_extra &extra);
struct toku_fifo_entry_key_msn_cmp_extra {
DESCRIPTOR desc;
......@@ -114,15 +114,16 @@ struct toku_fifo_entry_key_msn_cmp_extra {
// same thing for qsort_r
int
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extrap, const long &a, const long &b);
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extrap, const int &a, const int &b);
typedef toku::omt<long> off_omt_t;
typedef toku::omt<int32_t> off_omt_t;
typedef toku::omt<int32_t, int32_t, true> marked_off_omt_t;
// data of an available partition of a nonleaf ftnode
struct ftnode_nonleaf_childinfo {
FIFO buffer;
off_omt_t broadcast_list;
off_omt_t fresh_message_tree;
marked_off_omt_t fresh_message_tree;
off_omt_t stale_message_tree;
};
......@@ -807,8 +808,9 @@ struct pivot_bounds {
const DBT * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
};
// FIXME needs toku prefix
void maybe_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied);
__attribute__((nonnull))
void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node);
void toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied);
int
toku_ft_search_which_child(
......
......@@ -1800,7 +1800,7 @@ key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
}
int
toku_fifo_entry_key_msn_heaviside(const long &offset, const struct toku_fifo_entry_key_msn_heaviside_extra &extra)
toku_fifo_entry_key_msn_heaviside(const int32_t &offset, const struct toku_fifo_entry_key_msn_heaviside_extra &extra)
{
const struct fifo_entry *query = toku_fifo_get_entry(extra.fifo, offset);
DBT qdbt;
......@@ -1811,7 +1811,7 @@ toku_fifo_entry_key_msn_heaviside(const long &offset, const struct toku_fifo_ent
}
int
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extra, const long &ao, const long &bo)
toku_fifo_entry_key_msn_cmp(const struct toku_fifo_entry_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo)
{
const struct fifo_entry *a = toku_fifo_get_entry(extra.fifo, ao);
const struct fifo_entry *b = toku_fifo_get_entry(extra.fifo, bo);
......@@ -1830,7 +1830,7 @@ toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, cons
//
// This is only exported for tests.
{
long offset;
int32_t offset;
int r = toku_fifo_enq(bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, &offset);
assert_zero(r);
if (ft_msg_type_applies_once(type)) {
......@@ -2351,7 +2351,7 @@ void toku_ft_leaf_apply_cmd(
// Because toku_ft_leaf_apply_cmd is called with the intent of permanently
// applying a message to a leaf node (meaning the message is permanently applied
// and will be purged from the system after this call, as opposed to
// maybe_apply_ancestors_messages_to_node, which applies a message
// toku_apply_ancestors_messages_to_node, which applies a message
// for a query, but the message may still reside in the system and
// be reapplied later), we mark the node as dirty and
// take the opportunity to update node->max_msn_applied_to_node_on_disk.
......@@ -2362,7 +2362,7 @@ void toku_ft_leaf_apply_cmd(
// we cannot blindly update node->max_msn_applied_to_node_on_disk,
// we must check to see if the msn is greater that the one already stored,
// because the cmd may have already been applied earlier (via
// maybe_apply_ancestors_messages_to_node) to answer a query
// toku_apply_ancestors_messages_to_node) to answer a query
//
// This is why we handle node->max_msn_applied_to_node_on_disk both here
// and in ft_nonleaf_put_cmd, as opposed to in one location, toku_ft_node_put_cmd.
......@@ -3770,35 +3770,13 @@ static bool search_pivot_is_bounded (ft_search_t *search, DESCRIPTOR desc, ft_co
}
}
struct copy_to_stale_extra {
FT_HANDLE ft_handle;
NONLEAF_CHILDINFO bnc;
};
// template-only function, but must be extern
int copy_to_stale(const long &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
__attribute__((nonnull(3)));
int copy_to_stale(const long &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
{
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(extra->bnc->buffer, offset);
entry->is_fresh = false;
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft_handle->ft->cmp_descriptor, .cmp = extra->ft_handle->ft->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
int r = extra->bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, heaviside_extra, nullptr);
assert_zero(r);
return r;
}
struct store_fifo_offset_extra {
long *offsets;
int32_t *offsets;
int i;
};
// template-only function, but must be extern
int store_fifo_offset(const long &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra)
__attribute__((nonnull(3)));
int store_fifo_offset(const long &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra)
__attribute__((nonnull(3)))
static int store_fifo_offset(const int32_t &offset, const uint32_t UU(idx), struct store_fifo_offset_extra *const extra)
{
extra->offsets[extra->i] = offset;
extra->i++;
......@@ -3810,10 +3788,8 @@ int store_fifo_offset(const long &offset, const uint32_t UU(idx), struct store_f
* figure out the MSN of each message, and compare those MSNs. Returns 1,
* 0, or -1 if a is larger than, equal to, or smaller than b.
*/
// template-only function, but must be extern
int fifo_offset_msn_cmp(FIFO &fifo, const long &ao, const long &bo);
int
fifo_offset_msn_cmp(FIFO &fifo, const long &ao, const long &bo)
static int
fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo)
{
const struct fifo_entry *a = toku_fifo_get_entry(fifo, ao);
const struct fifo_entry *b = toku_fifo_get_entry(fifo, bo);
......@@ -3832,7 +3808,7 @@ fifo_offset_msn_cmp(FIFO &fifo, const long &ao, const long &bo)
* basement node.
*/
static void
do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, const struct fifo_entry *entry, STAT64INFO stats_to_update)
do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, struct fifo_entry *entry, STAT64INFO stats_to_update)
{
// The messages are being iterated over in (key,msn) order or just in
// msn order, so all the messages for one key, from one buffer, are in
......@@ -3846,6 +3822,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, con
const XIDS xids = (XIDS) &entry->xids_s;
bytevec key = xids_get_end_of_array(xids);
bytevec val = (uint8_t*)key + entry->keylen;
entry->is_fresh = false;
DBT hk;
toku_fill_dbt(&hk, key, keylen);
......@@ -3873,13 +3850,11 @@ struct iterate_do_bn_apply_cmd_extra {
STAT64INFO stats_to_update;
};
// template-only function, but must be extern
int iterate_do_bn_apply_cmd(const long &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
__attribute__((nonnull(3)));
int iterate_do_bn_apply_cmd(const long &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
__attribute__((nonnull(3)))
static int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
{
NONLEAF_CHILDINFO bnc = BNC(e->ancestor, e->childnum);
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset);
struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset);
do_bn_apply_cmd(e->t, e->bn, e->ancestor, e->childnum, entry, e->stats_to_update);
return 0;
}
......@@ -3899,11 +3874,12 @@ int iterate_do_bn_apply_cmd(const long &offset, const uint32_t UU(idx), struct i
* Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
* bound exclusive).
*/
template<typename find_bounds_omt_t>
static void
find_bounds_within_message_tree(
DESCRIPTOR desc, /// used for cmp
ft_compare_func cmp, /// used to compare keys
const off_omt_t &message_tree, /// tree holding FIFO offsets, in which we want to look for indices
const find_bounds_omt_t &message_tree, /// tree holding FIFO offsets, in which we want to look for indices
FIFO buffer, /// buffer in which messages are found
struct pivot_bounds const * const bounds, /// key bounds within the basement node we're applying messages to
uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
......@@ -3918,13 +3894,15 @@ find_bounds_within_message_tree(
// message (with any msn) with the key lower_bound_exclusive.
// This will be a message we want to try applying, so it is the
// "lower bound inclusive" within the message_tree.
struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra = {
.desc = desc, .cmp = cmp,
.fifo = buffer,
.key = bounds->lower_bound_exclusive,
.msn = MAX_MSN };
long found_lb;
r = message_tree.find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra;
ZERO_STRUCT(lbi_extra);
lbi_extra.desc = desc;
lbi_extra.cmp = cmp;
lbi_extra.fifo = buffer;
lbi_extra.key = bounds->lower_bound_exclusive;
lbi_extra.msn = MAX_MSN;
int32_t found_lb;
r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
if (r == DB_NOTFOUND) {
// There is no relevant data (the lower bound is bigger than
// any message in this tree), so we have no range and we're
......@@ -3938,7 +3916,7 @@ find_bounds_within_message_tree(
// bound inclusive that we have. If so, there are no relevant
// messages between these bounds.
const DBT *ubi = bounds->upper_bound_inclusive;
const long offset = (long) found_lb;
const int32_t offset = found_lb;
DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
FAKE_DB(db, desc);
......@@ -3964,12 +3942,14 @@ find_bounds_within_message_tree(
// the first thing bigger than the upper_bound_inclusive key.
// This is therefore the smallest thing we don't want to apply,
// and toku_omt_iterate_on_range will not examine it.
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra = {
.desc = desc, .cmp = cmp,
.fifo = buffer,
.key = bounds->upper_bound_inclusive,
.msn = MAX_MSN };
r = message_tree.find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra;
ZERO_STRUCT(ube_extra);
ube_extra.desc = desc;
ube_extra.cmp = cmp;
ube_extra.fifo = buffer;
ube_extra.key = bounds->upper_bound_inclusive;
ube_extra.msn = MAX_MSN;
r = message_tree.template find<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
if (r == DB_NOTFOUND) {
// Couldn't find anything in the buffer bigger than our key,
// so we need to look at everything up to the end of
......@@ -3990,7 +3970,7 @@ find_bounds_within_message_tree(
* or plus infinity respectively if they are NULL. Do not mark the node
* as dirty (preserve previous state of 'dirty' bit).
*/
static int
static void
bnc_apply_messages_to_basement_node(
FT_HANDLE t, // used for comparison function
BASEMENTNODE bn, // where to apply messages
......@@ -4020,24 +4000,24 @@ bnc_apply_messages_to_basement_node(
// following 4 cases will do the application, depending on which of
// the lists contains relevant messages:
//
// 1. broadcast messages and anything else
// 1. broadcast messages and anything else, or a mix of fresh and stale
// 2. only fresh messages
// 3. only stale messages
// 4. fresh and stale messages but no broadcasts
if (bnc->broadcast_list.size() > 0) {
// We have some broadcasts, which don't have keys, so we grab all
if (bnc->broadcast_list.size() > 0 ||
(stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
// We have messages in multiple trees, so we grab all
// the relevant messages' offsets and sort them by MSN, then apply
// them in MSN order.
const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size());
long *XMALLOC_N(buffer_size, offsets);
int32_t *XMALLOC_N(buffer_size, offsets);
struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
// Populate offsets array with offsets to stale messages
r = bnc->stale_message_tree.iterate_on_range<struct store_fifo_offset_extra, store_fifo_offset>(stale_lbi, stale_ube, &sfo_extra);
assert_zero(r);
// Then store fresh offsets
r = bnc->fresh_message_tree.iterate_on_range<struct store_fifo_offset_extra, store_fifo_offset>(fresh_lbi, fresh_ube, &sfo_extra);
// Then store fresh offsets, and mark them to be moved to stale later.
r = bnc->fresh_message_tree.iterate_and_mark_range<struct store_fifo_offset_extra, store_fifo_offset>(fresh_lbi, fresh_ube, &sfo_extra);
assert_zero(r);
// Store offsets of all broadcast messages.
......@@ -4046,24 +4026,25 @@ bnc_apply_messages_to_basement_node(
invariant(sfo_extra.i == buffer_size);
// Sort by MSN.
r = toku::sort<long, FIFO, fifo_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->buffer);
r = toku::sort<int32_t, FIFO, fifo_offset_msn_cmp>::mergesort_r(offsets, buffer_size, bnc->buffer);
assert_zero(r);
// Apply the messages in MSN order.
for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true;
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
do_bn_apply_cmd(t, bn, ancestor, childnum, entry, &stats_delta);
}
toku_free(offsets);
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages.
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum, .stats_to_update = &stats_delta};
if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
r = bnc->fresh_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra);
r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra);
assert_zero(r);
} else if (fresh_lbi == fresh_ube) {
} else {
invariant(fresh_lbi == fresh_ube);
// No fresh messages to apply, we just apply stale messages.
if (stale_ube - stale_lbi > 0) *msgs_applied = true;
......@@ -4071,82 +4052,6 @@ bnc_apply_messages_to_basement_node(
r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(stale_lbi, stale_ube, &iter_extra);
assert_zero(r);
} else {
// We have stale and fresh messages but no broadcasts. We can
// iterate over both OMTs together.
// For the loop, we'll keep the indices into both the fresh and
// stale trees, and also the OMTVALUE at those indices.
uint32_t stale_i = stale_lbi, fresh_i = fresh_lbi;
long stale_offset, fresh_offset;
r = bnc->stale_message_tree.fetch(stale_i, &stale_offset);
assert_zero(r);
r = bnc->fresh_message_tree.fetch(fresh_i, &fresh_offset);
assert_zero(r);
// This comparison extra struct won't change during iteration.
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc= &t->ft->cmp_descriptor, .cmp = t->ft->compare_fun, .fifo = bnc->buffer };
// Iterate over both lists, applying the smaller (in (key, msn)
// order) message at each step
while (stale_i < stale_ube && fresh_i < fresh_ube) {
*msgs_applied = true;
int c = toku_fifo_entry_key_msn_cmp(extra, stale_offset, fresh_offset);
if (c < 0) {
// The stale message we're pointing to either has a
// smaller key than the fresh message, or has the same key
// but a smaller MSN. We'll apply it, then get the next
// stale message into stale_i and stale_v.
const struct fifo_entry *stale_entry = toku_fifo_get_entry(bnc->buffer, stale_offset);
do_bn_apply_cmd(t, bn, ancestor, childnum, stale_entry, &stats_delta);
stale_i++;
if (stale_i != stale_ube) {
invariant(stale_i < stale_ube);
r = bnc->stale_message_tree.fetch(stale_i, &stale_offset);
assert_zero(r);
}
} else if (c > 0) {
// The fresh message we're pointing to either has a
// smaller key than the stale message, or has the same key
// but a smaller MSN. We'll apply it, then get the next
// fresh message into fresh_i and fresh_v.
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_bn_apply_cmd(t, bn, ancestor, childnum, fresh_entry, &stats_delta);
fresh_i++;
if (fresh_i != fresh_ube) {
invariant(fresh_i < fresh_ube);
r = bnc->fresh_message_tree.fetch(fresh_i, &fresh_offset);
assert_zero(r);
}
} else {
// We have found the same MSN in both trees. This means a
// single message showing up in both trees. This should
// not happen.
abort();
}
}
// Apply the rest of the stale messages, if any exist
while (stale_i < stale_ube) {
const struct fifo_entry *stale_entry = toku_fifo_get_entry(bnc->buffer, stale_offset);
do_bn_apply_cmd(t, bn, ancestor, childnum, stale_entry, &stats_delta);
stale_i++;
if (stale_i != stale_ube) {
r = bnc->stale_message_tree.fetch(stale_i, &stale_offset);
assert_zero(r);
}
}
// Apply the rest of the fresh messages, if any exist
while (fresh_i < fresh_ube) {
const struct fifo_entry *fresh_entry = toku_fifo_get_entry(bnc->buffer, fresh_offset);
do_bn_apply_cmd(t, bn, ancestor, childnum, fresh_entry, &stats_delta);
fresh_i++;
if (fresh_i != fresh_ube) {
r = bnc->fresh_message_tree.fetch(fresh_i, &fresh_offset);
assert_zero(r);
}
}
}
//
// update stats
......@@ -4154,23 +4059,12 @@ bnc_apply_messages_to_basement_node(
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
}
// We can't delete things out of the fresh tree inside the above
// procedures because we're still looking at the fresh tree. Instead
// we have to move messages after we're done looking at it.
struct copy_to_stale_extra cts_extra = { .ft_handle = t, .bnc = bnc };
r = bnc->fresh_message_tree.iterate_on_range<struct copy_to_stale_extra, copy_to_stale>(fresh_lbi, fresh_ube, &cts_extra);
assert_zero(r);
for (uint32_t ube = fresh_ube; fresh_lbi < ube; --ube) {
// When we delete the message at the fresh_lbi index, everything
// to the right moves down one spot, including the offset at ube.
r = bnc->fresh_message_tree.delete_at(fresh_lbi);
assert_zero(r);
}
return r;
#if 0
#endif
}
void
maybe_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied)
toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied)
// Effect:
// Bring a leaf node up-to-date according to all the messages in the ancestors.
// If the leaf node is already up-to-date then do nothing.
......@@ -4181,7 +4075,7 @@ maybe_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ance
// The entire root-to-leaf path is pinned and appears in the ancestors list.
{
VERIFY_NODE(t, node);
if (node->height > 0) { goto exit; }
invariant(node->height == 0);
// know we are a leaf node
// An important invariant:
// We MUST bring every available basement node up to date.
......@@ -4215,10 +4109,46 @@ maybe_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ance
// false when it's read in again).
curr_bn->stale_ancestor_messages_applied = true;
}
exit:
VERIFY_NODE(t, node);
}
struct copy_to_stale_extra {
FT ft;
NONLEAF_CHILDINFO bnc;
};
__attribute__((nonnull(3)))
static int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
{
struct fifo_entry *entry = toku_fifo_get_entry(extra->bnc->buffer, offset);
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->ft->cmp_descriptor, .cmp = extra->ft->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
int r = extra->bnc->stale_message_tree.insert<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(offset, heaviside_extra, nullptr);
invariant_zero(r);
return 0;
}
__attribute__((nonnull))
void
toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
invariant(node->height > 0);
// TODO: could be cilkified
for (int i = 0; i < node->n_children; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) {
continue;
}
NONLEAF_CHILDINFO bnc = BNC(node, i);
// We can't delete things out of the fresh tree inside the above
// procedures because we're still looking at the fresh tree. Instead
// we have to move messages after we're done looking at it.
struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
invariant_zero(r);
bnc->fresh_message_tree.delete_all_marked();
}
}
static int
ft_cursor_shortcut (
FT_CURSOR cursor,
......
......@@ -106,9 +106,9 @@ struct count_msgs_extra {
};
// template-only function, but must be extern
int count_msgs(const long &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
__attribute__((nonnull(3)));
int count_msgs(const long &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
{
const struct fifo_entry *entry = toku_fifo_get_entry(e->fifo, offset);
if (entry->msn.msn == e->msn.msn) {
......@@ -128,9 +128,9 @@ struct verify_message_tree_extra {
};
// template-only function, but must be extern
int verify_message_tree(const long &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
__attribute__((nonnull(3)));
int verify_message_tree(const long &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
{
int verbose = e->verbose;
BLOCKNUM blocknum = e->blocknum;
......@@ -155,16 +155,21 @@ int verify_message_tree(const long &offset, const uint32_t UU(idx), struct verif
return result;
}
template<typename verify_omt_t>
static int
verify_sorted_by_key_msn(FT_HANDLE brt, FIFO fifo, const off_omt_t &mt) {
verify_sorted_by_key_msn(FT_HANDLE brt, FIFO fifo, const verify_omt_t &mt) {
int result = 0;
size_t last_offset = 0;
for (uint32_t i = 0; i < mt.size(); i++) {
long offset;
int32_t offset;
int r = mt.fetch(i, &offset);
assert_zero(r);
if (i > 0) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &brt->ft->cmp_descriptor, .cmp = brt->ft->compare_fun, .fifo = fifo };
struct toku_fifo_entry_key_msn_cmp_extra extra;
ZERO_STRUCT(extra);
extra.desc = &brt->ft->cmp_descriptor;
extra.cmp = brt->ft->compare_fun;
extra.fifo = fifo;
if (toku_fifo_entry_key_msn_cmp(extra, last_offset, offset) >= 0) {
result = TOKUDB_NEEDS_REPAIR;
break;
......@@ -175,12 +180,17 @@ verify_sorted_by_key_msn(FT_HANDLE brt, FIFO fifo, const off_omt_t &mt) {
return result;
}
template<typename count_omt_t>
static int
count_eq_key_msn(FT_HANDLE brt, FIFO fifo, const off_omt_t &mt, const DBT *key, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = {
.desc = &brt->ft->cmp_descriptor, .cmp = brt->ft->compare_fun, .fifo = fifo, .key = key, .msn = msn
};
int r = mt.find_zero<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(extra, nullptr, nullptr);
count_eq_key_msn(FT_HANDLE brt, FIFO fifo, const count_omt_t &mt, const DBT *key, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra;
ZERO_STRUCT(extra);
extra.desc = &brt->ft->cmp_descriptor;
extra.cmp = brt->ft->compare_fun;
extra.fifo = fifo;
extra.key = key;
extra.msn = msn;
int r = mt.template find_zero<struct toku_fifo_entry_key_msn_heaviside_extra, toku_fifo_entry_key_msn_heaviside>(extra, nullptr, nullptr);
int count;
if (r == 0) {
count = 1;
......
......@@ -918,8 +918,8 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
DESCRIPTOR desc, ft_compare_func cmp) {
int r;
int n_in_this_buffer = rbuf_int(rbuf);
long *fresh_offsets = NULL, *stale_offsets = NULL;
long *broadcast_offsets = NULL;
int32_t *fresh_offsets = NULL, *stale_offsets = NULL;
int32_t *broadcast_offsets = NULL;
int nfresh = 0, nstale = 0;
int nbroadcast_offsets = 0;
if (cmp) {
......@@ -940,7 +940,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
long *dest;
int32_t *dest;
if (cmp) {
if (ft_msg_type_applies_once(type)) {
if (is_fresh) {
......@@ -968,11 +968,11 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
if (cmp) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer };
r = toku::sort<long, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra);
r = toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra);
assert_zero(r);
bnc->fresh_message_tree.destroy();
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer);
r = toku::sort<long, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra);
r = toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra);
assert_zero(r);
bnc->stale_message_tree.destroy();
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer);
......@@ -1790,8 +1790,8 @@ deserialize_and_upgrade_internal_node(FTNODE node,
NONLEAF_CHILDINFO bnc = BNC(node, i);
int n_in_this_buffer = rbuf_int(rb);
long *fresh_offsets = NULL;
long *broadcast_offsets = NULL;
int32_t *fresh_offsets = NULL;
int32_t *broadcast_offsets = NULL;
int nfresh = 0;
int nbroadcast_offsets = 0;
......@@ -1822,7 +1822,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
rbuf_bytes(rb, &val, &vallen);
// <CER> can we factor this out?
long *dest;
int32_t *dest;
if (bfe->h->compare_fun) {
if (ft_msg_type_applies_once(type)) {
dest = &fresh_offsets[nfresh];
......@@ -1858,7 +1858,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &bfe->h->cmp_descriptor,
.cmp = bfe->h->compare_fun,
.fifo = bnc->buffer };
typedef toku::sort<long, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp> key_msn_sort;
typedef toku::sort<int32_t, const struct toku_fifo_entry_key_msn_cmp_extra, toku_fifo_entry_key_msn_cmp> key_msn_sort;
r = key_msn_sort::mergesort_r(fresh_offsets, nfresh, extra);
assert_zero(r);
bnc->fresh_message_tree.destroy();
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <toku_assert.h>
#include <toku_time.h>
#include <memory.h>
#include "fttypes.h"
#include "log-internal.h"
#include "omt.h"
/*static int intcmp(OMTVALUE onev, void *twov) {
int64_t one = (int64_t) onev;
int64_t two = (int64_t) twov;
return two - one;
}*/
static int find_by_xid(OMTVALUE txnv, void *findidv) {
TOKUTXN txn = (TOKUTXN) txnv;
TXNID findid = (TXNID) findidv;
if (txn->txnid64 > findid) {
return 1;
}
if (txn->txnid64 < findid) {
return -1;
}
return 0;
}
static int txn_iter(OMTVALUE UU(txnv), uint32_t UU(idx), void *UU(v)) {
return 0;
}
const int NTXNS = 1<<23;
void runit(void)
{
{
srandom(0);
double inserttime = 0.0, querytime = 0.0, itertime = 0.0;
size_t overhead = 0;
for (int trial = 0; trial < 100; ++trial) {
OMT txn_omt;
toku_omt_create(&txn_omt);
TOKUTXN XMALLOC_N(NTXNS, txns);
for (int i = 0; i < NTXNS; ++i) {
TOKUTXN txn = &txns[i];
txn->txnid64 = ((random() << 32) | random());
}
tokutime_t t0 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
TOKUTXN txn = &txns[i];
int r = toku_omt_insert(txn_omt, (OMTVALUE) txn, find_by_xid, (void *) txn->txnid64, NULL);
invariant_zero(r);
//invariant(r == 0 || r == DB_KEYEXIST);
}
tokutime_t t1 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
TOKUTXN txn;
int r = toku_omt_find_zero(txn_omt, find_by_xid, (void *) txns[i].txnid64, (OMTVALUE *) &txn, NULL);
invariant_zero(r);
invariant(txn == &txns[i]);
}
tokutime_t t2 = get_tokutime();
toku_omt_iterate(txn_omt, txn_iter, NULL);
tokutime_t t3 = get_tokutime();
inserttime += tokutime_to_seconds(t1-t0);
querytime += tokutime_to_seconds(t2-t1);
itertime += tokutime_to_seconds(t3-t2);
if (overhead == 0) {
overhead = toku_omt_memory_size(txn_omt);
}
toku_omt_destroy(&txn_omt);
invariant_null(txn_omt);
toku_free(txns);
}
printf("inserts: %.03lf\nqueries: %.03lf\niterate: %.03lf\noverhead: %lu\n",
inserttime, querytime, itertime, overhead);
}
int64_t maxrss;
toku_os_get_max_rss(&maxrss);
printf("memused: %" PRId64 "\n", maxrss);
/* {
srand(0);
OMT int_omt;
toku_omt_create(&int_omt);
int64_t *XMALLOC_N(NTXNS, ints);
for (int i = 0; i < NTXNS; ++i) {
ints[i] = rand() >> 8;
}
tokutime_t t0 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
//int r =
toku_omt_insert(int_omt, (OMTVALUE) ints[i], intcmp, (void *) ints[i], NULL);
//invariant(r == 0 || r == DB_KEYEXIST);
}
tokutime_t t1 = get_tokutime();
OMT clone;
toku_omt_clone_noptr(&clone, int_omt);
tokutime_t t2 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
//int r =
toku_omt_find_zero(clone, intcmp, (void *) ints[i], NULL, NULL);
//invariant_zero(r);
}
tokutime_t t3 = get_tokutime();
printf("omtsize: %" PRIu32 "\ninserts: %.03lf\nqueries: %.03lf\n",
toku_omt_size(clone), tokutime_to_seconds(t1-t0), tokutime_to_seconds(t3-t2));
toku_omt_destroy(&int_omt);
invariant_null(int_omt);
toku_omt_destroy(&clone);
invariant_null(clone);
toku_free(ints);
}*/
}
int main(void)
{
runit();
return 0;
}
......@@ -29,8 +29,15 @@ void omt<omtdata_t, omtdataout_t, supports_marks>::create(void) {
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::create_no_array(void) {
static_assert(!supports_marks, "cannot create_no_array an omt that supports marks");
if (!supports_marks) {
this->create_internal_no_array(0);
} else {
this->is_array = false;
this->capacity = 0;
this->d.t.nodes = nullptr;
this->d.t.root.set_to_null();
this->d.t.free_idx = 0;
}
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
......@@ -57,6 +64,7 @@ void omt<omtdata_t, omtdataout_t, supports_marks>::create_steal_sorted_array(omt
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
int omt<omtdata_t, omtdataout_t, supports_marks>::split_at(omt *const newomt, const uint32_t idx) {
barf_if_marked(*this);
invariant_notnull(newomt);
if (idx > this->size()) { return EINVAL; }
this->convert_to_array();
......@@ -72,6 +80,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::split_at(omt *const newomt, co
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::merge(omt *const leftomt, omt *const rightomt) {
barf_if_marked(*this);
invariant_notnull(leftomt);
invariant_notnull(rightomt);
const uint32_t leftsize = leftomt->size();
......@@ -113,6 +122,7 @@ void omt<omtdata_t, omtdataout_t, supports_marks>::merge(omt *const leftomt, omt
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::clone(const omt &src) {
barf_if_marked(*this);
this->create_internal(src.size());
if (src.is_array) {
memcpy(&this->d.a.values[0], &src.d.a.values[src.d.a.start_idx], src.d.a.num_values * (sizeof this->d.a.values[0]));
......@@ -179,9 +189,31 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::insert(const omtdata_t &value,
return 0;
}
// The following 3 functions implement a static if for us.
template<typename omtdata_t, typename omtdataout_t>
static void barf_if_marked(const omt<omtdata_t, omtdataout_t, false> &UU(omt)) {
}
template<typename omtdata_t, typename omtdataout_t>
static void barf_if_marked(const omt<omtdata_t, omtdataout_t, true> &omt) {
invariant(!omt.has_marks());
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
bool omt<omtdata_t, omtdataout_t, supports_marks>::has_marks(void) const {
static_assert(supports_marks, "Does not support marks");
if (this->d.t.root.is_null()) {
return false;
}
const omt_node &node = this->d.t.nodes[this->d.t.root.get_index()];
return node.get_marks_below() || node.get_marked();
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
int omt<omtdata_t, omtdataout_t, supports_marks>::insert_at(const omtdata_t &value, const uint32_t idx) {
barf_if_marked(*this);
if (idx > this->size()) { return EINVAL; }
this->maybe_resize_or_convert(this->size() + 1);
if (this->is_array && idx != this->d.a.num_values &&
(idx != 0 || this->d.a.start_idx == 0)) {
......@@ -208,7 +240,9 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::insert_at(const omtdata_t &val
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
int omt<omtdata_t, omtdataout_t, supports_marks>::set_at(const omtdata_t &value, const uint32_t idx) {
barf_if_marked(*this);
if (idx >= this->size()) { return EINVAL; }
if (this->is_array) {
this->set_at_internal_array(value, idx);
} else {
......@@ -219,7 +253,9 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::set_at(const omtdata_t &value,
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
int omt<omtdata_t, omtdataout_t, supports_marks>::delete_at(const uint32_t idx) {
barf_if_marked(*this);
if (idx >= this->size()) { return EINVAL; }
this->maybe_resize_or_convert(this->size() - 1);
if (this->is_array && idx != 0 && idx != this->d.a.num_values - 1) {
this->convert_to_tree();
......@@ -253,6 +289,7 @@ template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const {
if (right > this->size()) { return EINVAL; }
if (left == right) { return 0; }
if (this->is_array) {
return this->iterate_internal_array<iterate_extra_t, f>(left, right, iterate_extra);
}
......@@ -280,40 +317,46 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked(iterate_ex
return this->iterate_over_marked_internal<iterate_extra_t, f>(this->d.t.root, 0, iterate_extra);
}
struct to_delete_extra {
uint32_t num_indexes;
uint32_t *indexes;
};
static_assert(std::is_pod<to_delete_extra>::value, "not POD");
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::unmark(const subtree &subtree, const uint32_t index, uint32_t *const num_indexes, uint32_t *const indexes) {
if (subtree.is_null()) { return; }
omt_node &n = this->d.t.nodes[subtree.get_index()];
const uint32_t index_root = index + this->nweight(n.left);
// REQUIRED FOR SLOW VERSION OF DELETE ALL MARKED
template<typename omtdata_t>
static int log_marked_indexes(const omtdata_t &UU(value), const uint32_t index, struct to_delete_extra * const info) {
info->indexes[info->num_indexes++] = index;
return 0;
const bool below = n.get_marks_below();
if (below) {
this->unmark(n.left, index, num_indexes, indexes);
}
if (n.get_marked()) {
indexes[(*num_indexes)++] = index_root;
}
n.clear_stolen_bits();
if (below) {
this->unmark(n.right, index_root + 1, num_indexes, indexes);
}
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::delete_all_marked(void) {
static_assert(supports_marks, "does not support marks");
if (!this->has_marks()) {
return;
}
invariant(!this->is_array);
uint32_t marked_indexes[this->size()];
struct to_delete_extra extra = { 0, &marked_indexes[0] };
uint32_t num_indexes = 0;
this->iterate_over_marked<struct to_delete_extra, log_marked_indexes<omtdata_t> >(&extra);
// Remove all marks.
// We need to delete all the stolen bits before calling delete_at to prevent barfing.
this->unmark(this->d.t.root, 0, &num_indexes, marked_indexes);
for (uint32_t i = 0; i < extra.num_indexes; i++) {
for (uint32_t i = 0; i < num_indexes; i++) {
// Delete from left to right, shift by number already deleted.
// Alternative is delete from right to left.
int r = this->delete_at(extra.indexes[i] - i);
int r = this->delete_at(marked_indexes[i] - i);
lazy_assert_zero(r);
}
// Remove all marks. Remove even from 'freed' nodes. (probably not necessary because there is no free list)
const uint32_t num_nodes = this->capacity;
for (uint32_t i = 0; i < num_nodes; i++) {
omt_node &node = this->d.t.nodes[i];
node.clear_stolen_bits();
}
barf_if_marked(*this);
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
......@@ -783,14 +826,14 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked_internal(c
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal_array(const uint32_t i, omtdataout_t *value) const {
void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal_array(const uint32_t i, omtdataout_t *const value) const {
if (value != nullptr) {
copyout(value, &this->d.a.values[this->d.a.start_idx + i]);
}
}
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal(const subtree &subtree, const uint32_t i, omtdataout_t *value) const {
void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal(const subtree &subtree, const uint32_t i, omtdataout_t *const value) const {
omt_node &n = this->d.t.nodes[subtree.get_index()];
const uint32_t leftweight = this->nweight(n.left);
if (i < leftweight) {
......@@ -888,7 +931,7 @@ void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(omtdata_t **const out
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
uint32_t min = this->d.a.start_idx;
uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
......@@ -926,7 +969,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero_array(const
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
if (subtree.is_null()) {
*idxp = 0;
......@@ -956,7 +999,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero(const subtr
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
uint32_t min = this->d.a.start_idx;
uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
......@@ -983,7 +1026,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus_array(const
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
if (subtree.is_null()) {
return DB_NOTFOUND;
......@@ -1012,7 +1055,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus(const subtr
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
uint32_t min = this->d.a.start_idx;
uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
......@@ -1039,7 +1082,7 @@ int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus_array(cons
template<typename omtdata_t, typename omtdataout_t, bool supports_marks>
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const {
int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const {
invariant_notnull(idxp);
if (subtree.is_null()) {
return DB_NOTFOUND;
......
......@@ -82,7 +82,7 @@ class subtree_templated {
}
inline bool is_null(void) const {
return NODE_NULL == get_index();
return NODE_NULL == this->get_index();
}
inline uint32_t get_index(void) const {
......@@ -108,11 +108,11 @@ class subtree_templated<true> {
public:
static const uint32_t NODE_NULL = INT32_MAX;
inline void set_to_null(void) {
set_index_internal(NODE_NULL);
this->set_index_internal(NODE_NULL);
}
inline bool is_null(void) const {
return NODE_NULL == get_index();
return NODE_NULL == this->get_index();
}
inline uint32_t get_index(void) const {
......@@ -121,7 +121,7 @@ class subtree_templated<true> {
inline void set_index(uint32_t index) {
invariant(index < NODE_NULL);
set_index_internal(index);
this->set_index_internal(index);
}
inline bool get_bit(void) const {
......@@ -173,7 +173,7 @@ class omt_node_templated<omtdata_t, true> {
inline void set_marks_below_bit(void) {
// This function can be called by multiple threads.
// Checking first reduces cache invalidation.
if (!get_marks_below()) {
if (!this->get_marks_below()) {
right.enable_bit();
}
}
......@@ -182,8 +182,8 @@ class omt_node_templated<omtdata_t, true> {
}
inline void clear_stolen_bits(void) {
unset_marked_bit();
unset_marks_below_bit();
this->unset_marked_bit();
this->unset_marks_below_bit();
}
} __attribute__((__packed__,aligned(4)));
......@@ -434,6 +434,12 @@ class omt {
*/
void verify_marks_consistent(void) const;
/**
* Effect: None
* Returns whether there are any marks in the tree.
*/
bool has_marks(void) const;
/**
* Effect: Iterate over the values of the omt, from left to right, calling f on each value.
* The first argument passed to f is a pointer to the value stored in the omt.
......@@ -460,7 +466,6 @@ class omt {
*/
int fetch(const uint32_t idx, omtdataout_t *const value) const;
/**
* Effect: Find the smallest i such that h(V_i, extra)>=0
* If there is such an i and h(V_i,extra)==0 then set *idxp=i, set *value = V_i, and return 0.
......@@ -577,6 +582,8 @@ class omt {
struct omt_tree t;
} d;
__attribute__((nonnull))
void unmark(const subtree &subtree, const uint32_t index, uint32_t *const num_indexes, uint32_t *const indexes);
void create_internal_no_array(const uint32_t new_capacity);
......@@ -611,6 +618,7 @@ class omt {
void set_at_internal(const subtree &subtree, const omtdata_t &value, const uint32_t idx);
__attribute__((nonnull(2,5)))
void delete_internal(subtree *const subtreep, const uint32_t idx, omt_node *const copyn, subtree **const rebalance_subtree);
template<typename iterate_extra_t,
......@@ -648,9 +656,9 @@ class omt {
uint32_t verify_marks_consistent_internal(const subtree &subtree, const bool allow_marks) const;
void fetch_internal_array(const uint32_t i, omtdataout_t *value) const;
void fetch_internal_array(const uint32_t i, omtdataout_t *const value) const;
void fetch_internal(const subtree &subtree, const uint32_t i, omtdataout_t *value) const;
void fetch_internal(const subtree &subtree, const uint32_t i, omtdataout_t *const value) const;
__attribute__((nonnull))
void fill_array_with_subtree_idxs(node_idx *const array, const subtree &subtree) const;
......@@ -675,27 +683,27 @@ class omt {
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_zero_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_zero_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_zero(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_zero(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_plus_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_plus_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_plus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_plus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_minus_array(const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_minus_array(const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_minus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_minus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *const value, uint32_t *const idxp) const;
};
} // namespace toku
......
......@@ -20,11 +20,13 @@
namespace toku {
namespace test {
struct four_xids {
TXNID one, two, three, four;
};
inline int find_xid_one(const struct four_xids &xids, const TXNID &find) {
static inline int find_xid_one(const struct four_xids &xids, const TXNID &find) {
if (xids.one > find) {
return 1;
}
......@@ -34,7 +36,7 @@ namespace toku {
return 0;
}
inline int find_xid_two(const struct four_xids &xids, const TXNID &find) {
static inline int find_xid_two(const struct four_xids &xids, const TXNID &find) {
if (xids.two > find) {
return 1;
}
......@@ -44,7 +46,7 @@ namespace toku {
return 0;
}
inline int fx_iter(const struct four_xids &xids __attribute__((__unused__)), const uint32_t idx __attribute__((__unused__)), int &unused __attribute__((__unused__))) {
static inline int fx_iter(const struct four_xids &UU(xids), const uint32_t UU(idx), void *const UU(unused)) {
return 0;
}
......@@ -52,7 +54,7 @@ namespace toku {
static_assert(std::is_pod<fx_omt_t>::value, "fx_omt_t isn't POD");
static_assert(24 == sizeof(fx_omt_t), "fx_omt_t is bigger than 24 bytes");
inline int find_by_xid(const TOKUTXN &txn, const TXNID &findid) {
static inline int find_by_xid(const TOKUTXN &txn, const TXNID &findid) {
if (txn->txnid64 > findid) {
return 1;
}
......@@ -62,7 +64,7 @@ namespace toku {
return 0;
}
inline int txn_iter(const TOKUTXN &txn __attribute__((__unused__)), const uint32_t idx __attribute__((__unused__)), int &unused __attribute__((__unused__))) {
static inline int txn_iter(const TOKUTXN &UU(txn), const uint32_t UU(idx), void *const UU(unused)) {
return 0;
}
......@@ -72,7 +74,7 @@ namespace toku {
const int NTXNS = 1<<13;
void runit(void)
static void perftest(void)
{
if (0) {
srandom(0);
......@@ -80,8 +82,8 @@ namespace toku {
size_t overhead = 0;
for (int trial = 0; trial < 100; ++trial) {
fx_omt_t *fx_omt;
fx_omt_t::create(fx_omt);
fx_omt_t *XMALLOC(fx_omt);
fx_omt->create();
struct four_xids *XMALLOC_N(NTXNS, txns);
for (int i = 0; i < NTXNS; ++i) {
......@@ -103,8 +105,7 @@ namespace toku {
invariant(v != &txns[i]);
}
tokutime_t t2 = get_tokutime();
int unused = 0;
fx_omt->iterate<int, fx_iter>(unused);
fx_omt->iterate<void, fx_iter>(nullptr);
tokutime_t t3 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
struct four_xids *v;
......@@ -121,7 +122,8 @@ namespace toku {
overhead = fx_omt->memory_size();
}
fx_omt_t::destroy(fx_omt);
fx_omt->destroy();
toku_free(fx_omt);
toku_free(txns);
}
......@@ -134,13 +136,14 @@ namespace toku {
size_t overhead = 0;
for (int trial = 0; trial < 100; ++trial) {
txn_omt_t *txn_omt;
txn_omt_t::create(txn_omt);
txn_omt_t *XMALLOC(txn_omt);
txn_omt->create();
TOKUTXN XMALLOC_N(NTXNS, txns);
for (int i = 0; i < NTXNS; ++i) {
TOKUTXN txn = &txns[i];
txn->txnid64 = ((random() << 32) | random());
// eww, sorry:
*(const_cast<TXNID *>(&txn->txnid64)) = ((random() << 32) | random());
}
tokutime_t t0 = get_tokutime();
for (int i = 0; i < NTXNS; ++i) {
......@@ -157,8 +160,7 @@ namespace toku {
invariant(txn == &txns[i]);
}
tokutime_t t2 = get_tokutime();
int unused = 0;
txn_omt->iterate<int, txn_iter>(unused);
txn_omt->iterate<void, txn_iter>(nullptr);
tokutime_t t3 = get_tokutime();
inserttime += tokutime_to_seconds(t1-t0);
......@@ -168,8 +170,8 @@ namespace toku {
overhead = txn_omt->memory_size();
}
txn_omt_t::destroy(txn_omt);
invariant_null(txn_omt);
txn_omt->destroy();
toku_free(txn_omt);
toku_free(txns);
}
......@@ -181,7 +183,7 @@ namespace toku {
printf("memused: %" PRId64 "\n", maxrss);
}
inline int intcmp(const int &a, const int &b) {
static inline int intcmp(const int &a, const int &b) {
if (a < b) {
return -1;
}
......@@ -194,8 +196,8 @@ namespace toku {
typedef omt<int> int_omt_t;
static int intiter_magic = 0xdeadbeef;
int intiter(const int &value __attribute__((__unused__)), const uint32_t idx __attribute__((__unused__)), int &extra) {
invariant(extra == intiter_magic);
static int intiter(const int &value __attribute__((__unused__)), const uint32_t idx __attribute__((__unused__)), int *const extra) {
invariant(*extra == intiter_magic);
return 0;
}
......@@ -203,17 +205,17 @@ namespace toku {
int count;
int last;
};
int intiter2(const int &value, const uint32_t idx __attribute__((__unused__)), struct intiter2extra &extra) {
extra.count++;
invariant(extra.last < value);
extra.last = value;
static int intiter2(const int &value, const uint32_t idx __attribute__((__unused__)), struct intiter2extra *const extra) {
extra->count++;
invariant(extra->last < value);
extra->last = value;
return 0;
}
void unittest(void) {
static void unittest(void) {
int_omt_t o;
int r;
o.init();
o.create();
invariant(o.size() == 0);
r = o.insert<int, intcmp>(1, 1, nullptr);
......@@ -234,11 +236,11 @@ namespace toku {
invariant(x == 2);
r = o.iterate<int, intiter>(intiter_magic);
r = o.iterate<int, intiter>(&intiter_magic);
invariant_zero(r);
struct intiter2extra e = {0, 0};
r = o.iterate_on_range<struct intiter2extra, intiter2>(0, 2, e);
r = o.iterate_on_range<struct intiter2extra, intiter2>(0, 2, &e);
invariant_zero(r);
invariant(e.count == 2);
invariant(e.last == 2);
......@@ -250,16 +252,16 @@ namespace toku {
invariant(o.size() == 2);
o.deinit();
o.destroy();
int *XMALLOC_N(4, intarray);
for (int i = 0; i < 4; ++i) {
intarray[i] = i + 1;
}
int_omt_t left, right;
left.init_steal_sorted_array(intarray, 4, 4);
left.create_steal_sorted_array(&intarray, 4, 4);
invariant_null(intarray);
right.init();
right.create();
r = right.insert<int, intcmp>(8, 8, nullptr);
invariant_zero(r);
r = right.insert<int, intcmp>(7, 7, nullptr);
......@@ -270,38 +272,25 @@ namespace toku {
invariant_zero(r);
int_omt_t combined;
combined.merge_init(left, right);
combined.merge(&left, &right);
invariant(combined.size() == 8);
invariant(left.size() == 0);
invariant(right.size() == 0);
struct intiter2extra e2 = {0, 0};
r = combined.iterate<struct intiter2extra, intiter2>(e2);
r = combined.iterate<struct intiter2extra, intiter2>(&e2);
invariant_zero(r);
invariant(e2.count == 8);
invariant(e2.last == 8);
combined.deinit();
omt<int *> intptr_omt;
intptr_omt.init();
int *ptrs[3];
for (int i = 0; i < 3; ++i) {
XMALLOC(ptrs[i]);
*(ptrs[i]) = i;
intptr_omt.insert_at(ptrs[i], i);
}
omt<int *> intptr_omt2;
intptr_omt2.deep_clone_init(intptr_omt);
intptr_omt.free_items();
intptr_omt.deinit();
intptr_omt2.free_items();
intptr_omt2.deinit();
combined.destroy();
}
};
} // end namespace test
} // end namespace toku
int main(void) {
toku::unittest();
toku::runit();
toku::test::unittest();
toku::test::perftest();
return 0;
}
......@@ -616,15 +616,15 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied;
maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied);
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{
key = key; keylen = keylen; val = val; vallen = vallen; type = type; msn = msn; xids = xids;
assert(!is_fresh);
});
assert(parent_bnc->fresh_message_tree.size() == 0);
assert(parent_bnc->stale_message_tree.size() == (uint32_t) num_parent_messages);
invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size()
== (uint32_t) num_parent_messages);
toku_ftnode_free(&parentnode);
}
......@@ -841,7 +841,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
.upper_bound_inclusive = toku_clone_dbt(&ubi, childkeys[7])
};
bool msgs_applied;
maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied);
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{
......@@ -1024,15 +1024,15 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied;
maybe_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied);
toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{
key = key; keylen = keylen; val = val; vallen = vallen; type = type; msn = msn; xids = xids;
assert(!is_fresh);
});
assert(parent_bnc->fresh_message_tree.size() == 0);
assert(parent_bnc->stale_message_tree.size() == (uint32_t) num_parent_messages);
invariant(parent_bnc->fresh_message_tree.size() + parent_bnc->stale_message_tree.size()
== (uint32_t) num_parent_messages);
toku_ftnode_free(&parentnode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment