Commit f2df37d6 authored by John Esmet's avatar John Esmet Committed by John Esmet

FT-93 Add a class for pivot bounds, remove the assumption that pivot

keys must come from a DBT stored in the ftnode by adding
ftnode_pivot_keys::fill_dbt()
parent 211027e5
......@@ -209,7 +209,7 @@ toku_pin_ftnode_for_query(
uint32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors,
const PIVOT_BOUNDS bounds,
const pivot_bounds &bounds,
FTNODE_FETCH_EXTRA bfe,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p,
......
......@@ -147,7 +147,7 @@ toku_pin_ftnode_for_query(
uint32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds,
const pivot_bounds &bounds,
FTNODE_FETCH_EXTRA bfe,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p,
......
......@@ -468,7 +468,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
ctme.is_last_child = false;
pivot_to_save = childnum;
}
toku_clone_dbt(&ctme.target_key, *parent->pivotkeys.get_pivot(pivot_to_save));
toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
// at this point, ctme is properly setup, now we can do the merge
struct flusher_advice new_fa;
......@@ -580,7 +580,7 @@ handle_split_of_child(
if (toku_ft_debug_mode) {
printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i)->data);
for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
printf("\n");
}
)
......@@ -631,7 +631,7 @@ handle_split_of_child(
WHEN_NOT_GCOV(
if (toku_ft_debug_mode) {
printf("%s:%d splitkeys:", __FILE__, __LINE__);
for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i)->data);
for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
printf("\n");
}
)
......@@ -937,7 +937,7 @@ ftleaf_split(
int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
node->pivotkeys.split_at(split_idx, &B->pivotkeys);
if (split_on_boundary && num_left_bns < node->n_children && splitk) {
toku_copyref_dbt(splitk, *node->pivotkeys.get_pivot(num_left_bns - 1));
toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
} else if (splitk) {
bn_data* bd = BLB_DATA(node, num_left_bns - 1);
uint32_t keylen;
......@@ -997,7 +997,7 @@ ft_nonleaf_split(
// the split key for our parent is the rightmost pivot key in node
node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
toku_clone_dbt(splitk, *node->pivotkeys.get_pivot(n_children_in_a - 1));
toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
node->pivotkeys.delete_at(n_children_in_a - 1);
node->n_children = n_children_in_a;
......@@ -1408,8 +1408,8 @@ ft_merge_child(
{
DBT splitk;
toku_init_dbt(&splitk);
const DBT *old_split_key = node->pivotkeys.get_pivot(childnuma);
maybe_merge_pinned_nodes(node, old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
//toku_verify_estimates(t,childa);
// the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
*did_react = (bool)(did_merge || did_rebalance);
......
......@@ -199,7 +199,7 @@ hot_update_flusher_keys(FTNODE parent,
// child node.
if (childnum < (parent->n_children - 1)) {
toku_destroy_dbt(&flusher->max_current_key);
toku_clone_dbt(&flusher->max_current_key, *parent->pivotkeys.get_pivot(childnum));
toku_clone_dbt(&flusher->max_current_key, parent->pivotkeys.get_pivot(childnum));
}
}
......
......@@ -461,15 +461,26 @@ void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe, FT ft, struct ft_curs
void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe);
struct pivot_bounds {
const DBT * const lower_bound_exclusive;
const DBT * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
};
typedef struct pivot_bounds const * const PIVOT_BOUNDS;
class pivot_bounds {
public:
pivot_bounds(const DBT &lbe_dbt, const DBT &ubi_dbt);
pivot_bounds next_bounds(FTNODE node, int childnum) const;
const DBT *lbe() const;
const DBT *ubi() const;
const DBT *prepivotkey (FTNODE node, int childnum, const DBT * const lower_bound_exclusive);
const DBT *postpivotkey (FTNODE node, int childnum, const DBT * const upper_bound_inclusive);
struct pivot_bounds next_pivot_keys (FTNODE node, int childnum, struct pivot_bounds const * const old_pb);
static pivot_bounds infinite_bounds();
private:
DBT _prepivotkey(FTNODE node, int childnum, const DBT &lbe_dbt) const;
DBT _postpivotkey(FTNODE node, int childnum, const DBT &ubi_dbt) const;
// if toku_dbt_is_empty() is true for either bound, then it represents
// negative or positive infinity (which are exclusive in practice)
const DBT _lower_bound_exclusive;
const DBT _upper_bound_inclusive;
};
bool
toku_bfe_wants_child_available (struct ftnode_fetch_extra* bfe, int childnum);
......
......@@ -445,28 +445,55 @@ uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum));
}
const DBT *prepivotkey (FTNODE node, int childnum, const DBT * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
//
// pivot bounds
// TODO: move me to ft/node.cc?
//
pivot_bounds::pivot_bounds(const DBT &lbe_dbt, const DBT &ubi_dbt) :
_lower_bound_exclusive(lbe_dbt), _upper_bound_inclusive(ubi_dbt) {
}
pivot_bounds pivot_bounds::infinite_bounds() {
DBT dbt;
toku_init_dbt(&dbt);
// infinity is represented by an empty dbt
invariant(toku_dbt_is_empty(&dbt));
return pivot_bounds(dbt, dbt);
}
const DBT *pivot_bounds::lbe() const {
return &_lower_bound_exclusive;
}
const DBT *pivot_bounds::ubi() const {
return &_upper_bound_inclusive;
}
DBT pivot_bounds::_prepivotkey(FTNODE node, int childnum, const DBT &lbe_dbt) const {
if (childnum == 0) {
return lbe_dbt;
} else {
return node->pivotkeys.get_pivot(childnum - 1);
}
}
const DBT *postpivotkey (FTNODE node, int childnum, const DBT * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
DBT pivot_bounds::_postpivotkey(FTNODE node, int childnum, const DBT &ubi_dbt) const {
if (childnum + 1 == node->n_children) {
return ubi_dbt;
} else {
return node->pivotkeys.get_pivot(childnum);
}
}
struct pivot_bounds next_pivot_keys (FTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
struct pivot_bounds pb = {.lower_bound_exclusive = prepivotkey(node, childnum, old_pb->lower_bound_exclusive),
.upper_bound_inclusive = postpivotkey(node, childnum, old_pb->upper_bound_inclusive)};
return pb;
pivot_bounds pivot_bounds::next_bounds(FTNODE node, int childnum) const {
return pivot_bounds(_prepivotkey(node, childnum, _lower_bound_exclusive),
_postpivotkey(node, childnum, _upper_bound_inclusive));
}
////////////////////////////////////////////////////////////////////////////////
static long get_avail_internal_node_partition_size(FTNODE node, int i) {
paranoid_invariant(node->height > 0);
return toku_bnc_memory_size(BNC(node, i));
......@@ -3443,7 +3470,7 @@ ft_search_node (
FT_CURSOR ftcursor,
UNLOCKERS unlockers,
ANCESTORS,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
bool can_bulk_fetch
);
......@@ -3540,7 +3567,7 @@ unlock_ftnode_fun (void *v) {
/* search in a node's child */
static int
ft_search_child(FT_HANDLE ft_handle, FTNODE node, int childnum, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool *doprefetch, FT_CURSOR ftcursor, UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool can_bulk_fetch)
ANCESTORS ancestors, const pivot_bounds &bounds, bool can_bulk_fetch)
// Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned).
{
struct ancestors next_ancestors = {node, childnum, ancestors};
......@@ -3620,7 +3647,7 @@ ft_search_child(FT_HANDLE ft_handle, FTNODE node, int childnum, ft_search *searc
static inline int
search_which_child_cmp_with_bound(const toku::comparator &cmp, FTNODE node, int childnum,
ft_search *search, DBT *dbt) {
return cmp(toku_copyref_dbt(dbt, *node->pivotkeys.get_pivot(childnum)), &search->pivot_bound);
return cmp(toku_copyref_dbt(dbt, node->pivotkeys.get_pivot(childnum)), &search->pivot_bound);
}
int
......@@ -3634,7 +3661,7 @@ toku_ft_search_which_child(const toku::comparator &cmp, FTNODE node, ft_search *
int mi;
while (lo < hi) {
mi = (lo + hi) / 2;
toku_copyref_dbt(&pivotkey, *node->pivotkeys.get_pivot(mi));
node->pivotkeys.fill_pivot(mi, &pivotkey);
// search->compare is really strange, and only works well with a
// linear search, it makes binary search a pita.
//
......@@ -3690,7 +3717,7 @@ maybe_search_save_bound(
int p = (search->direction == FT_SEARCH_LEFT) ? child_searched : child_searched - 1;
if (p >= 0 && p < node->n_children-1) {
toku_destroy_dbt(&search->pivot_bound);
toku_clone_dbt(&search->pivot_bound, *node->pivotkeys.get_pivot(p));
toku_clone_dbt(&search->pivot_bound, node->pivotkeys.get_pivot(p));
}
}
......@@ -3725,7 +3752,7 @@ ft_search_node(
FT_CURSOR ftcursor,
UNLOCKERS unlockers,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
bool can_bulk_fetch
)
{
......@@ -3737,7 +3764,7 @@ ft_search_node(
// At this point, we must have the necessary partition available to continue the search
//
assert(BP_STATE(node,child_to_search) == PT_AVAIL);
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
const pivot_bounds next_bounds = bounds.next_bounds(node, child_to_search);
if (node->height > 0) {
r = ft_search_child(
ft_handle,
......@@ -3750,7 +3777,7 @@ ft_search_node(
ftcursor,
unlockers,
ancestors,
&next_bounds,
next_bounds,
can_bulk_fetch
);
}
......@@ -3779,12 +3806,8 @@ ft_search_node(
// we have a new pivotkey
if (node->height == 0) {
// when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
const DBT *pivot = nullptr;
if (search->direction == FT_SEARCH_LEFT) {
pivot = next_bounds.upper_bound_inclusive; // left -> right
} else {
pivot = next_bounds.lower_bound_exclusive; // right -> left
}
const DBT *pivot = search->direction == FT_SEARCH_LEFT ? next_bounds.ubi() : // left -> right
next_bounds.lbe(); // right -> left
if (pivot != nullptr) {
int rr = getf(pivot->size, pivot->data, 0, nullptr, getf_v, true);
if (rr != 0) {
......@@ -3812,11 +3835,6 @@ ft_search_node(
return r;
}
static const struct pivot_bounds infinite_bounds = {
.lower_bound_exclusive = nullptr,
.upper_bound_inclusive = nullptr,
};
int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, FT_CURSOR ftcursor, bool can_bulk_fetch)
// Effect: Perform a search. Associate cursor with a leaf if possible.
// All searches are performed through this function.
......@@ -3894,7 +3912,7 @@ try_again:
{
bool doprefetch = false;
//static int counter = 0; counter++;
r = ft_search_node(ft_handle, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds, can_bulk_fetch);
r = ft_search_node(ft_handle, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, pivot_bounds::infinite_bounds(), can_bulk_fetch);
if (r==TOKUDB_TRY_AGAIN) {
// there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_ftnode returned
......@@ -4048,7 +4066,7 @@ toku_ft_keysrange_internal (FT_HANDLE ft_handle, FTNODE node,
uint64_t estimated_num_rows,
struct ftnode_fetch_extra *min_bfe, // set up to read a minimal read.
struct ftnode_fetch_extra *match_bfe, // set up to read a basement node iff both keys in it
struct unlockers *unlockers, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
struct unlockers *unlockers, ANCESTORS ancestors, const pivot_bounds &bounds)
// Implementation note: Assign values to less, equal, and greater, and then on the way out (returning up the stack) we add more values in.
{
int r = 0;
......@@ -4096,11 +4114,11 @@ toku_ft_keysrange_internal (FT_HANDLE ft_handle, FTNODE node,
struct unlock_ftnode_extra unlock_extra = {ft_handle,childnode,false};
struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, unlockers};
const struct pivot_bounds next_bounds = next_pivot_keys(node, left_child_number, bounds);
const struct pivot_bounds next_bounds = bounds.next_bounds(node, left_child_number);
r = toku_ft_keysrange_internal(ft_handle, childnode, key_left, key_right, child_may_find_right,
less, equal_left, middle, equal_right, greater, single_basement_node,
rows_per_child, min_bfe, match_bfe, &next_unlockers, &next_ancestors, &next_bounds);
rows_per_child, min_bfe, match_bfe, &next_unlockers, &next_ancestors, next_bounds);
if (r != TOKUDB_TRY_AGAIN) {
assert_zero(r);
......@@ -4179,7 +4197,7 @@ try_again:
r = toku_ft_keysrange_internal (ft_handle, node, key_left, key_right, true,
&less, &equal_left, &middle, &equal_right, &greater,
&single_basement_node, numrows,
&min_bfe, &match_bfe, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
&min_bfe, &match_bfe, &unlockers, (ANCESTORS)NULL, pivot_bounds::infinite_bounds());
assert(r == 0 || r == TOKUDB_TRY_AGAIN);
if (r == TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
......@@ -4195,7 +4213,7 @@ try_again:
r = toku_ft_keysrange_internal (ft_handle, node, key_right, nullptr, false,
&less2, &equal_left2, &middle2, &equal_right2, &greater2,
&ignore, numrows,
&min_bfe, &match_bfe, &unlockers, (ANCESTORS)nullptr, &infinite_bounds);
&min_bfe, &match_bfe, &unlockers, (ANCESTORS)nullptr, pivot_bounds::infinite_bounds());
assert(r == 0 || r == TOKUDB_TRY_AGAIN);
if (r == TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
......@@ -4282,9 +4300,9 @@ static int get_key_after_bytes_in_basementnode(FT ft, BASEMENTNODE bn, const DBT
return r;
}
static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, PIVOT_BOUNDS bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped);
static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped);
static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, PIVOT_BOUNDS bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, int childnum, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, int childnum, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
int r;
struct ancestors next_ancestors = {node, childnum, ancestors};
BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
......@@ -4299,11 +4317,11 @@ static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLO
assert_zero(r);
struct unlock_ftnode_extra unlock_extra = {ft_h, child, false};
struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
return get_key_after_bytes_in_subtree(ft_h, ft, child, &next_unlockers, &next_ancestors, &next_bounds, bfe, search, subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
const pivot_bounds next_bounds = bounds.next_bounds(node, childnum);
return get_key_after_bytes_in_subtree(ft_h, ft, child, &next_unlockers, &next_ancestors, next_bounds, bfe, search, subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
}
static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, PIVOT_BOUNDS bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, FTNODE_FETCH_EXTRA bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
int r;
int childnum = toku_ft_search_which_child(ft->cmp, node, search);
const uint64_t child_subtree_bytes = subtree_bytes / node->n_children;
......@@ -4321,7 +4339,8 @@ static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UN
} else {
*skipped += child_subtree_bytes;
if (*skipped >= skip_len && i < node->n_children - 1) {
callback(node->pivotkeys.get_pivot(i), *skipped, cb_extra);
DBT pivot;
callback(node->pivotkeys.fill_pivot(i, &pivot), *skipped, cb_extra);
r = 0;
}
// Otherwise, r is still DB_NOTFOUND. If this is the last
......@@ -4389,7 +4408,7 @@ int toku_ft_get_key_after_bytes(FT_HANDLE ft_h, const DBT *start_key, uint64_t s
numbytes = 0;
}
uint64_t skipped = 0;
r = get_key_after_bytes_in_subtree(ft_h, ft, root, &unlockers, nullptr, &infinite_bounds, &bfe, &search, (uint64_t) numbytes, start_key, skip_len, callback, cb_extra, &skipped);
r = get_key_after_bytes_in_subtree(ft_h, ft, root, &unlockers, nullptr, pivot_bounds::infinite_bounds(), &bfe, &search, (uint64_t) numbytes, start_key, skip_len, callback, cb_extra, &skipped);
assert(!unlockers.locked);
if (r != TOKUDB_TRY_AGAIN) {
if (r == DB_NOTFOUND) {
......@@ -4450,7 +4469,7 @@ toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth,
int i;
for (i=0; i+1< node->n_children; i++) {
fprintf(file, "%*spivotkey %d =", depth+1, "", i);
toku_print_BYTESTRING(file, node->pivotkeys.get_pivot(i)->size, (char *) node->pivotkeys.get_pivot(i)->data);
toku_print_BYTESTRING(file, node->pivotkeys.get_pivot(i).size, (char *) node->pivotkeys.get_pivot(i).data);
fprintf(file, "\n");
}
for (i=0; i< node->n_children; i++) {
......@@ -4492,12 +4511,13 @@ toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth,
for (i=0; i<node->n_children; i++) {
fprintf(file, "%*schild %d\n", depth, "", i);
if (i>0) {
char *CAST_FROM_VOIDP(key, node->pivotkeys.get_pivot(i - 1)->data);
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->pivotkeys.get_pivot(i - 1)->size, (unsigned)toku_dtoh32(*(int*)key));
char *CAST_FROM_VOIDP(key, node->pivotkeys.get_pivot(i - 1).data);
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->pivotkeys.get_pivot(i - 1).size, (unsigned)toku_dtoh32(*(int*)key));
}
DBT x, y;
toku_dump_ftnode(file, ft_handle, BP_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->pivotkeys.get_pivot(i - 1),
(i==node->n_children-1) ? hirange : node->pivotkeys.get_pivot(i));
(i==0) ? lorange : node->pivotkeys.fill_pivot(i - 1, &x),
(i==node->n_children-1) ? hirange : node->pivotkeys.fill_pivot(i, &y));
}
}
}
......
......@@ -158,7 +158,8 @@ get_ith_key_dbt (BASEMENTNODE bn, int i) {
#define VERIFY_ASSERTION(predicate, i, string) ({ \
if(!(predicate)) { \
if (verbose) { \
(void) verbose; \
if (true) { \
fprintf(stderr, "%s:%d: Looking at child %d of block %" PRId64 ": %s\n", __FILE__, __LINE__, i, blocknum.b, string); \
} \
result = TOKUDB_NEEDS_REPAIR; \
......@@ -398,24 +399,27 @@ toku_verify_ftnode_internal(FT_HANDLE ft_handle,
}
// Verify that all the pivot keys are in order.
for (int i = 0; i < node->n_children-2; i++) {
int compare = compare_pairs(ft_handle, node->pivotkeys.get_pivot(i), node->pivotkeys.get_pivot(i + 1));
DBT x, y;
int compare = compare_pairs(ft_handle, node->pivotkeys.fill_pivot(i, &x), node->pivotkeys.fill_pivot(i + 1, &y));
VERIFY_ASSERTION(compare < 0, i, "Value is >= the next value");
}
// Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
for (int i = 0; i < node->n_children-1; i++) {
DBT x;
if (lesser_pivot) {
int compare = compare_pairs(ft_handle, lesser_pivot, node->pivotkeys.get_pivot(i));
int compare = compare_pairs(ft_handle, lesser_pivot, node->pivotkeys.fill_pivot(i, &x));
VERIFY_ASSERTION(compare < 0, i, "Pivot is >= the lower-bound pivot");
}
if (greatereq_pivot) {
int compare = compare_pairs(ft_handle, greatereq_pivot, node->pivotkeys.get_pivot(i));
int compare = compare_pairs(ft_handle, greatereq_pivot, node->pivotkeys.fill_pivot(i, &x));
VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
}
}
for (int i = 0; i < node->n_children; i++) {
const DBT *curr_less_pivot = (i==0) ? lesser_pivot : node->pivotkeys.get_pivot(i - 1);
const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.get_pivot(i);
DBT x, y;
const DBT *curr_less_pivot = (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x);
const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y);
if (node->height > 0) {
NONLEAF_CHILDINFO bnc = BNC(node, i);
// Verify that messages in the buffers are in the right place.
......@@ -518,14 +522,15 @@ toku_verify_ftnode (FT_HANDLE ft_handle,
for (int i = 0; i < node->n_children; i++) {
FTNODE child_node;
toku_get_node_for_verify(BP_BLOCKNUM(node, i), ft_handle, &child_node);
DBT x, y;
int r = toku_verify_ftnode(ft_handle, rootmsn,
(toku_bnc_n_entries(BNC(node, i)) > 0
? this_msn
: parentmsn_with_messages),
messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0,
child_node, node->height-1,
(i==0) ? lesser_pivot : node->pivotkeys.get_pivot(i - 1),
(i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.get_pivot(i),
(i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x),
(i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y),
progress_callback, progress_extra,
recurse, verbose, keep_going_on_failure);
if (r) {
......
......@@ -100,137 +100,350 @@ PATENT RIGHTS GRANT:
void ftnode_pivot_keys::create_empty() {
_num_pivots = 0;
_total_size = 0;
_keys = nullptr;
_fixed_keys = nullptr;
_fixed_keylen = 0;
_dbt_keys = nullptr;
}
void ftnode_pivot_keys::create_from_dbts(const DBT *keys, int n) {
create_empty();
_num_pivots = n;
_total_size = 0;
XMALLOC_N(_num_pivots, _keys);
// see if every key has the same length
bool keys_same_size = true;
for (int i = 1; i < _num_pivots; i++) {
if (keys[i].size != keys[i - 1].size) {
keys_same_size = false;
break;
}
}
if (keys_same_size && _num_pivots > 0) {
// if so, store pivots in a tightly packed array of fixed length keys
_fixed_keylen = keys[0].size;
_total_size = _fixed_keylen * _num_pivots;
XMALLOC_N(_total_size, _fixed_keys);
for (int i = 0; i < _num_pivots; i++) {
invariant(keys[i].size == _fixed_keylen);
memcpy(_fixed_key(i), keys[i].data, _fixed_keylen);
}
} else {
// otherwise we'll just store the pivots in an array of dbts
XMALLOC_N(_num_pivots, _dbt_keys);
for (int i = 0; i < _num_pivots; i++) {
size_t size = keys[i].size;
toku_memdup_dbt(&_keys[i], keys[i].data, size);
toku_memdup_dbt(&_dbt_keys[i], keys[i].data, size);
_total_size += size;
}
}
}
void ftnode_pivot_keys::_create_from_fixed_keys(const char *fixedkeys, size_t fixed_keylen, int n) {
create_empty();
_num_pivots = n;
_fixed_keylen = fixed_keylen;
_total_size = _fixed_keylen * _num_pivots;
XMEMDUP_N(_fixed_keys, fixedkeys, _total_size);
}
// effect: create pivot keys as a clone of an existing set of pivotkeys
void ftnode_pivot_keys::create_from_pivot_keys(const ftnode_pivot_keys &pivotkeys) {
create_from_dbts(pivotkeys._keys, pivotkeys._num_pivots);
if (pivotkeys._fixed_format()) {
_create_from_fixed_keys(pivotkeys._fixed_keys, pivotkeys._fixed_keylen, pivotkeys._num_pivots);
} else {
create_from_dbts(pivotkeys._dbt_keys, pivotkeys._num_pivots);
}
}
void ftnode_pivot_keys::destroy() {
if (_keys != nullptr) {
if (_dbt_keys != nullptr) {
for (int i = 0; i < _num_pivots; i++) {
toku_destroy_dbt(&_keys[i]);
toku_destroy_dbt(&_dbt_keys[i]);
}
toku_free(_dbt_keys);
_dbt_keys = nullptr;
}
toku_free(_keys);
if (_fixed_keys != nullptr) {
toku_free(_fixed_keys);
_fixed_keys = nullptr;
}
_keys = nullptr;
_fixed_keylen = 0;
_num_pivots = 0;
_total_size = 0;
}
void ftnode_pivot_keys::_convert_to_fixed_format() {
invariant(!_fixed_format());
// convert to a tightly packed array of fixed length keys
_fixed_keylen = _dbt_keys[0].size;
_total_size = _fixed_keylen * _num_pivots;
XMALLOC_N(_total_size, _fixed_keys);
for (int i = 0; i < _num_pivots; i++) {
invariant(_dbt_keys[i].size == _fixed_keylen);
memcpy(_fixed_key(i), _dbt_keys[i].data, _fixed_keylen);
}
// destroy the dbt array format
for (int i = 0; i < _num_pivots; i++) {
toku_destroy_dbt(&_dbt_keys[i]);
}
toku_free(_dbt_keys);
_dbt_keys = nullptr;
invariant(_fixed_format());
}
void ftnode_pivot_keys::_convert_to_dbt_format() {
invariant(_fixed_format());
// convert to an aray of dbts
XREALLOC_N(_num_pivots, _dbt_keys);
for (int i = 0; i < _num_pivots; i++) {
toku_memdup_dbt(&_dbt_keys[i], _fixed_key(i), _fixed_keylen);
}
// destroy the fixed key format
toku_free(_fixed_keys);
_fixed_keys = nullptr;
_fixed_keylen = 0;
invariant(!_fixed_format());
}
void ftnode_pivot_keys::deserialize_from_rbuf(struct rbuf *rb, int n) {
XMALLOC_N(n, _keys);
_num_pivots = n;
_total_size = 0;
_fixed_keys = nullptr;
_fixed_keylen = 0;
_dbt_keys = nullptr;
XMALLOC_N(_num_pivots, _dbt_keys);
bool keys_same_size = true;
for (int i = 0; i < _num_pivots; i++) {
bytevec pivotkeyptr;
uint32_t size;
rbuf_bytes(rb, &pivotkeyptr, &size);
toku_memdup_dbt(&_keys[i], pivotkeyptr, size);
toku_memdup_dbt(&_dbt_keys[i], pivotkeyptr, size);
_total_size += size;
if (i > 0 && keys_same_size && _dbt_keys[i].size != _dbt_keys[i - 1].size) {
// not all keys are the same size, we'll stick to the dbt array format
keys_same_size = false;
}
}
if (keys_same_size && _num_pivots > 0) {
_convert_to_fixed_format();
}
}
DBT ftnode_pivot_keys::get_pivot(int i) const {
paranoid_invariant(i < _num_pivots);
if (_fixed_format()) {
paranoid_invariant(i * _fixed_keylen < _total_size);
DBT dbt;
toku_fill_dbt(&dbt, _fixed_key(i), _fixed_keylen);
return dbt;
} else {
return _dbt_keys[i];
}
}
const DBT *ftnode_pivot_keys::get_pivot(int i) const {
DBT *ftnode_pivot_keys::fill_pivot(int i, DBT *dbt) const {
paranoid_invariant(i < _num_pivots);
return &_keys[i];
if (_fixed_format()) {
toku_fill_dbt(dbt, _fixed_key(i), _fixed_keylen);
} else {
toku_copyref_dbt(dbt, _dbt_keys[i]);
}
return dbt;
}
void ftnode_pivot_keys::_add_key(const DBT *key, int i) {
toku_clone_dbt(&_keys[i], *key);
_total_size += _keys[i].size;
void ftnode_pivot_keys::_add_key_dbt(const DBT *key, int i) {
toku_clone_dbt(&_dbt_keys[i], *key);
_total_size += _dbt_keys[i].size;
}
void ftnode_pivot_keys::_destroy_key(int i) {
invariant(_total_size >= _keys[i].size);
_total_size -= _keys[i].size;
toku_destroy_dbt(&_keys[i]);
void ftnode_pivot_keys::_destroy_key_dbt(int i) {
invariant(_total_size >= _dbt_keys[i].size);
_total_size -= _dbt_keys[i].size;
toku_destroy_dbt(&_dbt_keys[i]);
}
void ftnode_pivot_keys::_insert_at_dbt(const DBT *key, int i) {
// make space for a new pivot, slide existing keys to the right
REALLOC_N(_num_pivots + 1, _dbt_keys);
memmove(&_dbt_keys[i + 1], &_dbt_keys[i], (_num_pivots - i) * sizeof(DBT));
_add_key_dbt(key, i);
}
void ftnode_pivot_keys::_insert_at_fixed(const DBT *key, int i) {
REALLOC_N((_num_pivots + 1) * _fixed_keylen, _fixed_keys);
memmove(_fixed_key(i + 1), _fixed_key(i), (_num_pivots - i) * _fixed_keylen);
memcpy(_fixed_key(i), key->data, _fixed_keylen);
_total_size += _fixed_keylen;
}
void ftnode_pivot_keys::insert_at(const DBT *key, int i) {
invariant(i <= _num_pivots); // it's ok to insert at the end, so we check <= n
// make space for a new pivot, slide existing keys to the right
REALLOC_N(_num_pivots + 1, _keys);
memmove(&_keys[i + 1], &_keys[i], (_num_pivots - i) * sizeof(DBT));
// if the new key doesn't have the same size, we can't be in fixed format
if (_fixed_format() && key->size != _fixed_keylen) {
_convert_to_dbt_format();
}
if (_fixed_format()) {
_insert_at_fixed(key, i);
} else {
_insert_at_dbt(key, i);
}
_num_pivots++;
_add_key(key, i);
invariant(total_size() > 0);
}
void ftnode_pivot_keys::append(const ftnode_pivot_keys &pivotkeys) {
REALLOC_N(_num_pivots + pivotkeys._num_pivots, _keys);
void ftnode_pivot_keys::_append_dbt(const ftnode_pivot_keys &pivotkeys) {
REALLOC_N(_num_pivots + pivotkeys._num_pivots, _dbt_keys);
bool other_fixed = pivotkeys._fixed_format();
for (int i = 0; i < pivotkeys._num_pivots; i++) {
const DBT *key = &pivotkeys._keys[i];
toku_memdup_dbt(&_keys[_num_pivots + i], key->data, key->size);
toku_memdup_dbt(&_dbt_keys[_num_pivots + i],
other_fixed ? pivotkeys._fixed_key(i) :
pivotkeys._dbt_keys[i].data,
other_fixed ? pivotkeys._fixed_keylen :
pivotkeys._dbt_keys[i].size);
}
}
void ftnode_pivot_keys::_append_fixed(const ftnode_pivot_keys &pivotkeys) {
if (pivotkeys._fixed_format() && pivotkeys._fixed_keylen == _fixed_keylen) {
// other pivotkeys have the same fixed keylen
REALLOC_N((_num_pivots + pivotkeys._num_pivots) * _fixed_keylen, _fixed_keys);
memcpy(_fixed_key(_num_pivots), pivotkeys._fixed_keys, pivotkeys._total_size);
} else {
// must convert to dbt format, other pivotkeys have different length'd keys
_convert_to_dbt_format();
_append_dbt(pivotkeys);
}
}
void ftnode_pivot_keys::append(const ftnode_pivot_keys &pivotkeys) {
if (_fixed_format()) {
_append_fixed(pivotkeys);
} else {
_append_dbt(pivotkeys);
}
_num_pivots += pivotkeys._num_pivots;
_total_size += pivotkeys._total_size;
}
void ftnode_pivot_keys::_replace_at_dbt(const DBT *key, int i) {
_destroy_key_dbt(i);
_add_key_dbt(key, i);
}
void ftnode_pivot_keys::_replace_at_fixed(const DBT *key, int i) {
if (key->size == _fixed_keylen) {
memcpy(_fixed_key(i), key->data, _fixed_keylen);
} else {
// must convert to dbt format, replacement key has different length
_convert_to_dbt_format();
_replace_at_dbt(key, i);
}
}
void ftnode_pivot_keys::replace_at(const DBT *key, int i) {
if (i < _num_pivots) {
_destroy_key(i);
_add_key(key, i);
if (_fixed_format()) {
_replace_at_fixed(key, i);
} else {
_replace_at_dbt(key, i);
}
} else {
invariant(i == _num_pivots); // appending to the end is ok
insert_at(key, i);
}
invariant(total_size() > 0);
}
void ftnode_pivot_keys::_delete_at_fixed(int i) {
memmove(_fixed_key(i), _fixed_key(i + 1), (_num_pivots - 1 - i) * _fixed_keylen);
_total_size -= _fixed_keylen;
}
void ftnode_pivot_keys::_delete_at_dbt(int i) {
// slide over existing keys, then shrink down to size
_destroy_key_dbt(i);
memmove(&_dbt_keys[i], &_dbt_keys[i + 1], (_num_pivots - 1 - i) * sizeof(DBT));
REALLOC_N(_num_pivots - 1, _dbt_keys);
}
void ftnode_pivot_keys::delete_at(int i) {
invariant(i < _num_pivots);
_destroy_key(i);
// slide over existing keys
memmove(&_keys[i], &_keys[i + 1], (_num_pivots - 1 - i) * sizeof(DBT));
if (_fixed_format()) {
_delete_at_fixed(i);
} else {
_delete_at_dbt(i);
}
// shrink down to the new size
_num_pivots--;
REALLOC_N(_num_pivots, _keys);
}
void ftnode_pivot_keys::split_at(int i, ftnode_pivot_keys *other) {
if (i < _num_pivots) {
other->create_from_dbts(&_keys[i], _num_pivots - i);
void ftnode_pivot_keys::_split_at_fixed(int i, ftnode_pivot_keys *other) {
// recreate the other set of pivots from index >= i
other->_create_from_fixed_keys(_fixed_key(i), _fixed_keylen, _num_pivots - i);
// shrink down to size
_total_size = i * _fixed_keylen;
REALLOC_N(_total_size, _fixed_keys);
}
void ftnode_pivot_keys::_split_at_dbt(int i, ftnode_pivot_keys *other) {
// recreate the other set of pivots from index >= i
other->create_from_dbts(&_dbt_keys[i], _num_pivots - i);
// destroy everything greater
// destroy everything greater, shrink down to size
for (int k = i; k < _num_pivots; k++) {
_destroy_key(k);
_destroy_key_dbt(k);
}
REALLOC_N(i, _dbt_keys);
}
void ftnode_pivot_keys::split_at(int i, ftnode_pivot_keys *other) {
if (i < _num_pivots) {
if (_fixed_format()) {
_split_at_fixed(i, other);
} else {
_split_at_dbt(i, other);
}
_num_pivots = i;
REALLOC_N(_num_pivots, _keys);
}
}
void ftnode_pivot_keys::serialize_to_wbuf(struct wbuf *wb) const {
bool fixed = _fixed_format();
size_t written = 0;
for (int i = 0; i < _num_pivots; i++) {
size_t size = fixed ? _fixed_keylen : _dbt_keys[i].size;
invariant(size);
wbuf_nocrc_bytes(wb, fixed ? _fixed_key(i) : _dbt_keys[i].data, size);
written += size;
}
invariant(written == _total_size);
}
int ftnode_pivot_keys::num_pivots() const {
// if we have fixed size keys, the number of pivots should be consistent
paranoid_invariant(_fixed_keys == nullptr || (_total_size == _fixed_keylen * _num_pivots));
return _num_pivots;
}
size_t ftnode_pivot_keys::total_size() const {
// if we have fixed size keys, the total size should be consistent
paranoid_invariant(_fixed_keys == nullptr || (_total_size == _fixed_keylen * _num_pivots));
return _total_size;
}
void ftnode_pivot_keys::serialize_to_wbuf(struct wbuf *wb) const {
for (int i = 0; i < _num_pivots; i++) {
wbuf_nocrc_bytes(wb, _keys[i].data, _keys[i].size);
}
}
// Effect: Fill in N as an empty ftnode.
// TODO: Rename toku_ftnode_create
void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
......@@ -465,20 +678,20 @@ find_bounds_within_message_tree(
const toku::comparator &cmp,
const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices
message_buffer *msg_buffer, /// message buffer in which messages are found
struct pivot_bounds const * const bounds, /// key bounds within the basement node we're applying messages to
const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to
uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
)
{
int r = 0;
if (bounds->lower_bound_exclusive) {
if (!toku_dbt_is_empty(bounds.lbe())) {
// By setting msn to MAX_MSN and by using direction of +1, we will
// get the first message greater than (in (key, msn) order) any
// message (with any msn) with the key lower_bound_exclusive.
// This will be a message we want to try applying, so it is the
// "lower bound inclusive" within the message_tree.
struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds->lower_bound_exclusive, MAX_MSN);
struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
int32_t found_lb;
r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
if (r == DB_NOTFOUND) {
......@@ -489,11 +702,11 @@ find_bounds_within_message_tree(
*ube = 0;
return;
}
if (bounds->upper_bound_inclusive) {
if (!toku_dbt_is_empty(bounds.ubi())) {
// Check if what we found for lbi is greater than the upper
// bound inclusive that we have. If so, there are no relevant
// messages between these bounds.
const DBT *ubi = bounds->upper_bound_inclusive;
const DBT *ubi = bounds.ubi();
const int32_t offset = found_lb;
DBT found_lbidbt;
msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
......@@ -514,12 +727,12 @@ find_bounds_within_message_tree(
// the first message in the OMT.
*lbi = 0;
}
if (bounds->upper_bound_inclusive) {
if (!toku_dbt_is_empty(bounds.ubi())) {
// Again, we use an msn of MAX_MSN and a direction of +1 to get
// the first thing bigger than the upper_bound_inclusive key.
// This is therefore the smallest thing we don't want to apply,
// and omt::iterate_on_range will not examine it.
struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds->upper_bound_inclusive, MAX_MSN);
struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
if (r == DB_NOTFOUND) {
// Couldn't find anything in the buffer bigger than our key,
......@@ -547,7 +760,7 @@ bnc_apply_messages_to_basement_node(
BASEMENTNODE bn, // where to apply messages
FTNODE ancestor, // the ancestor node where we can find messages to apply
int childnum, // which child buffer of ancestor contains messages we want
struct pivot_bounds const * const bounds, // contains pivot key bounds of this basement node
const pivot_bounds &bounds, // contains pivot key bounds of this basement node
txn_gc_info *gc_info,
bool* msgs_applied
)
......@@ -641,13 +854,13 @@ apply_ancestors_messages_to_bn(
FTNODE node,
int childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
txn_gc_info *gc_info,
bool* msgs_applied
)
{
BASEMENTNODE curr_bn = BLB(node, childnum);
struct pivot_bounds curr_bounds = next_pivot_keys(node, childnum, bounds);
const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
......@@ -656,7 +869,7 @@ apply_ancestors_messages_to_bn(
curr_bn,
curr_ancestors->node,
curr_ancestors->childnum,
&curr_bounds,
curr_bounds,
gc_info,
msgs_applied
);
......@@ -678,7 +891,7 @@ toku_apply_ancestors_messages_to_node (
FT_HANDLE t,
FTNODE node,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
bool* msgs_applied,
int child_to_read
)
......@@ -741,13 +954,13 @@ static bool bn_needs_ancestors_messages(
FT ft,
FTNODE node,
int childnum,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
ANCESTORS ancestors,
MSN* max_msn_applied
)
{
BASEMENTNODE bn = BLB(node, childnum);
struct pivot_bounds curr_bounds = next_pivot_keys(node, childnum, bounds);
const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
bool needs_ancestors_messages = false;
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
......@@ -762,7 +975,7 @@ static bool bn_needs_ancestors_messages(
find_bounds_within_message_tree(ft->cmp,
bnc->stale_message_tree,
&bnc->msg_buffer,
&curr_bounds,
curr_bounds,
&stale_lbi,
&stale_ube);
if (stale_lbi < stale_ube) {
......@@ -774,7 +987,7 @@ static bool bn_needs_ancestors_messages(
find_bounds_within_message_tree(ft->cmp,
bnc->fresh_message_tree,
&bnc->msg_buffer,
&curr_bounds,
curr_bounds,
&fresh_lbi,
&fresh_ube);
if (fresh_lbi < fresh_ube) {
......@@ -794,7 +1007,7 @@ bool toku_ft_leaf_needs_ancestors_messages(
FT ft,
FTNODE node,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
const pivot_bounds &bounds,
MSN *const max_msn_in_path,
int child_to_read
)
......@@ -1767,9 +1980,11 @@ int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &c
// a funny case of no pivots
if (node->n_children <= 1) return 0;
DBT pivot;
// check the last key to optimize seq insertions
int n = node->n_children-1;
int c = ft_compare_pivot(cmp, k, node->pivotkeys.get_pivot(n - 1));
int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
if (c > 0) return n;
// binary search the pivots
......@@ -1778,7 +1993,7 @@ int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &c
int mi;
while (lo < hi) {
mi = (lo + hi) / 2;
c = ft_compare_pivot(cmp, k, node->pivotkeys.get_pivot(mi));
c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
if (c > 0) {
lo = mi+1;
continue;
......@@ -1794,12 +2009,13 @@ int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &c
// Used for HOT.
int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
DBT pivot;
int low = 0;
int hi = node->n_children - 1;
int mi;
while (low < hi) {
mi = (low + hi) / 2;
int r = ft_compare_pivot(cmp, k, node->pivotkeys.get_pivot(mi));
int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
if (r > 0) {
low = mi + 1;
} else if (r < 0) {
......
......@@ -106,7 +106,7 @@ public:
void create_empty();
// effect: create pivot keys by copying the given DBT array
void create_from_dbts(const DBT *keys, int num_pivots);
void create_from_dbts(const DBT *keys, int n);
// effect: create pivot keys as a clone of an existing set of pivotkeys
void create_from_pivot_keys(const ftnode_pivot_keys &pivotkeys);
......@@ -114,10 +114,14 @@ public:
void destroy();
// effect: deserialize pivot keys previously serialized by serialize_to_wbuf()
void deserialize_from_rbuf(struct rbuf *rb, int num_pivots);
void deserialize_from_rbuf(struct rbuf *rb, int n);
// returns: unowned DBT representing the i'th pivot key
const DBT *get_pivot(int i) const;
DBT get_pivot(int i) const;
// effect: fills a DBT with the i'th pivot key
// returns: the given dbt
DBT *fill_pivot(int i, DBT *dbt) const;
// effect: insert a pivot into the i'th position, shifting others to the right
void insert_at(const DBT *key, int i);
......@@ -136,21 +140,59 @@ public:
// requires: *other is empty (size == 0)
void split_at(int i, ftnode_pivot_keys *other);
// effect: serialize pivot keys to a wbuf
// requires: wbuf has at least ftnode_pivot_keys::total_size() bytes available
void serialize_to_wbuf(struct wbuf *wb) const;
int num_pivots() const;
// return: the sum of the keys sizes of each pivot
size_t total_size() const;
// effect: serialize pivot keys to a wbuf
// requires: wbuf has at least ftnode_pivot_keys::total_size() bytes available
void serialize_to_wbuf(struct wbuf *wb) const;
private:
// adds/destroys keys at a certain index, maintaining _total_size, but not _num_pivots
void _add_key(const DBT *key, int i);
void _destroy_key(int i);
// effect: create pivot keys, in fixed key format, by copying the given key array
void _create_from_fixed_keys(const char *fixedkeys, size_t fixed_keylen, int n);
char *_fixed_key(int i) const {
return &_fixed_keys[i * _fixed_keylen];
}
bool _fixed_format() const {
return _fixed_keys != nullptr;
}
void sanity_check() const;
void _insert_at_dbt(const DBT *key, int i);
void _append_dbt(const ftnode_pivot_keys &pivotkeys);
void _replace_at_dbt(const DBT *key, int i);
void _delete_at_dbt(int i);
void _split_at_dbt(int i, ftnode_pivot_keys *other);
void _insert_at_fixed(const DBT *key, int i);
void _append_fixed(const ftnode_pivot_keys &pivotkeys);
void _replace_at_fixed(const DBT *key, int i);
void _delete_at_fixed(int i);
void _split_at_fixed(int i, ftnode_pivot_keys *other);
// adds/destroys keys at a certain index (in dbt format),
// maintaining _total_size, but not _num_pivots
void _add_key_dbt(const DBT *key, int i);
void _destroy_key_dbt(int i);
// conversions to and from packed key array format
void _convert_to_dbt_format();
void _convert_to_fixed_format();
// If every key is _fixed_keylen long, then _fixed_key is a
// packed array of keys..
char *_fixed_keys;
size_t _fixed_keylen;
// ..otherwise _fixed_keys is null and we store an array of dbts,
// each representing a key. this is simpler but less cache-efficient.
DBT *_dbt_keys;
DBT *_keys;
int _num_pivots;
size_t _total_size;
};
......@@ -482,12 +524,13 @@ void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc);
void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node);
// TODO: Should ft_handle just be FT?
class pivot_bounds;
void toku_apply_ancestors_messages_to_node(FT_HANDLE t, FTNODE node, ANCESTORS ancestors,
struct pivot_bounds const *const bounds,
const pivot_bounds &bounds,
bool *msgs_applied, int child_to_read);
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors,
struct pivot_bounds const *const bounds,
const pivot_bounds &bounds,
MSN *const max_msn_in_path, int child_to_read);
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read);
......
......@@ -360,7 +360,7 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, bool do_clone) {
assert(leafentry_memsize(curr_le) == leafentry_memsize(elts[last_i].le));
assert(memcmp(curr_le, elts[last_i].le, leafentry_memsize(curr_le)) == 0);
if (bn < npartitions-1) {
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn)->data, elts[last_i].keyp) <= 0);
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn).data, elts[last_i].keyp) <= 0);
}
// TODO for later, get a key comparison here as well
last_i++;
......@@ -495,7 +495,7 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, bool do_clone
assert(leafentry_memsize(curr_le) == leafentry_memsize(les[last_i].le));
assert(memcmp(curr_le, les[last_i].le, leafentry_memsize(curr_le)) == 0);
if (bn < npartitions-1) {
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn)->data, les[last_i].keyp) <= 0);
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn).data, les[last_i].keyp) <= 0);
}
// TODO for later, get a key comparison here as well
last_i++;
......@@ -618,7 +618,7 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, bool do_clone) {
assert(leafentry_memsize(curr_le) == leafentry_memsize(les[last_i].le));
assert(memcmp(curr_le, les[last_i].le, leafentry_memsize(curr_le)) == 0);
if (bn < npartitions-1) {
uint32_t *CAST_FROM_VOIDP(pivot, dn->pivotkeys.get_pivot(bn)->data);
uint32_t *CAST_FROM_VOIDP(pivot, dn->pivotkeys.get_pivot(bn).data);
void* tmp = les[last_i].keyp;
uint32_t *CAST_FROM_VOIDP(item, tmp);
assert(*pivot >= *item);
......@@ -759,7 +759,7 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, bool do_clone)
assert(leafentry_memsize(curr_le) == leafentry_memsize(les[last_i].le));
assert(memcmp(curr_le, les[last_i].le, leafentry_memsize(curr_le)) == 0);
if (bn < npartitions-1) {
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn)->data, (char*)(les[last_i].keyp)) <= 0);
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn).data, (char*)(les[last_i].keyp)) <= 0);
}
// TODO for later, get a key comparison here as well
last_i++;
......@@ -888,7 +888,7 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, bool
assert(leafentry_memsize(curr_le) == leafentry_memsize(elts[last_i].le));
assert(memcmp(curr_le, elts[last_i].le, leafentry_memsize(curr_le)) == 0);
if (bn < npartitions-1) {
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn)->data, (char*)(elts[last_i].keyp)) <= 0);
assert(strcmp((char*)dn->pivotkeys.get_pivot(bn).data, (char*)(elts[last_i].keyp)) <= 0);
}
// TODO for later, get a key comparison here as well
last_i++;
......@@ -1107,8 +1107,8 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, bool do_clone) {
assert(dn->layout_version_read_from_disk ==FT_LAYOUT_VERSION);
assert(dn->height == 1);
assert(dn->n_children==2);
assert(strcmp((char*)dn->pivotkeys.get_pivot(0)->data, "hello")==0);
assert(dn->pivotkeys.get_pivot(0)->size==6);
assert(strcmp((char*)dn->pivotkeys.get_pivot(0).data, "hello")==0);
assert(dn->pivotkeys.get_pivot(0).size==6);
assert(BP_BLOCKNUM(dn,0).b==30);
assert(BP_BLOCKNUM(dn,1).b==35);
......
......@@ -737,9 +737,8 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
BP_STATE(parentnode, 0) = PT_AVAIL;
parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied, -1);
toku_apply_ancestors_messages_to_node(t, child, &ancestors, pivot_bounds::infinite_bounds(), &msgs_applied, -1);
struct checkit_fn {
int operator()(FT_MSG UU(msg), bool is_fresh) {
......@@ -962,12 +961,11 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
DBT lbe, ubi;
const struct pivot_bounds bounds = {
.lower_bound_exclusive = toku_init_dbt(&lbe),
.upper_bound_inclusive = toku_clone_dbt(&ubi, childkeys[7])
};
toku_init_dbt(&lbe);
toku_clone_dbt(&ubi, childkeys[7]);
const pivot_bounds bounds(lbe, ubi);
bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied, -1);
toku_apply_ancestors_messages_to_node(t, child, &ancestors, bounds, &msgs_applied, -1);
struct checkit_fn {
DBT *childkeys;
......@@ -1162,9 +1160,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
BP_STATE(parentnode, 0) = PT_AVAIL;
parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied, -1);
toku_apply_ancestors_messages_to_node(t, child2, &ancestors, pivot_bounds::infinite_bounds(), &msgs_applied, -1);
struct checkit_fn {
int operator()(FT_MSG UU(msg), bool is_fresh) {
......
......@@ -260,11 +260,11 @@ static void dump_node(int fd, BLOCKNUM blocknum, FT ft) {
printf(" pivots:\n");
for (int i=0; i<n->n_children-1; i++) {
const DBT *piv = n->pivotkeys.get_pivot(i);
const DBT piv = n->pivotkeys.get_pivot(i);
printf(" pivot %2d:", i);
if (n->flags)
printf(" flags=%x ", n->flags);
print_item(piv->data, piv->size);
print_item(piv.data, piv.size);
printf("\n");
}
printf(" children:\n");
......
......@@ -317,6 +317,12 @@ bool toku_dbt_is_infinite(const DBT *dbt) {
return dbt == toku_dbt_positive_infinity() || dbt == toku_dbt_negative_infinity();
}
bool toku_dbt_is_empty(const DBT *dbt) {
// can't have a null data field with a non-zero size
paranoid_invariant(dbt->data != nullptr || dbt->size == 0);
return dbt->data == nullptr;
}
int toku_dbt_infinite_compare(const DBT *a, const DBT *b) {
if (a == b) {
return 0;
......
......@@ -129,6 +129,9 @@ const DBT *toku_dbt_negative_infinity(void);
// returns: true if the given dbt is either positive or negative infinity
bool toku_dbt_is_infinite(const DBT *dbt);
// returns: true if the given dbt has no data (ie: dbt->data == nullptr)
bool toku_dbt_is_empty(const DBT *dbt);
// effect: compares two potentially infinity-valued dbts
// requires: at least one is infinite (assert otherwise)
int toku_dbt_infinite_compare(const DBT *a, const DBT *b);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment