Commit 7e32e7c7 authored by Yoni Fogel's avatar Yoni Fogel

Refs Tokutek/ft-index#46 Minor optimizations, added some todos for code review. WILL NOT COMPILE

parent 8d45bea3
Notes during 2014-01-08 Leif/Yoni
-Should verify (dmt?omt?bndata?) crash or return error on failed verify
Replace dmt_functor with implicit interface only. Instead of (for data type x) requiring the name to be dmt_functor<x> just pass the writer's class name into the dmt's template as a new parameter.
Replace dmt_functor<default> with comments explaining the "interface"
See wiki:
......@@ -98,11 +98,23 @@ namespace toku {
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::create(void) {
this->values_same_size = true;
this->value_length = 0;
this->is_array = true;
this->d.a.num_values = 0;
//TODO: maybe allocate enough space for something by default?
// We may be relying on not needing to allocate space the first time (due to limited time spent while a lock is held)
* Note: create_from_sorted_memory_of_fixed_size_elements does not take ownership of 'mem'.
* Owner is still responsible for freeing it.
* While in the OMT a similar function would steal ownership, this doesn't make sense for the DMT because
* we (usually) have to add padding for alignment (mem has all of the elements PACKED).
* Also all current uses (as of Jan 12, 2014) of this function would require mallocing a new array
* in order to allow stealing.
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::create_from_sorted_memory_of_fixed_size_elements(
const void *mem,
......@@ -112,20 +124,21 @@ void dmt<dmtdata_t, dmtdataout_t>::create_from_sorted_memory_of_fixed_size_eleme
this->values_same_size = true;
this->value_length = fixed_value_length;
this->is_array = true;
this->d.a.start_idx = 0;
this->d.a.num_values = numvalues;
const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
uint32_t aligned_memsize = mem_length + numvalues * pad_bytes;
toku_mempool_construct(&this->mp, aligned_memsize);
if (aligned_memsize > 0) {
paranoid_invariant(numvalues > 0);
void *ptr = toku_mempool_malloc(&this->mp, aligned_memsize, 1);
uint8_t *CAST_FROM_VOIDP(dest, ptr);
const uint8_t *CAST_FROM_VOIDP(src, mem);
uint8_t * const CAST_FROM_VOIDP(dest, ptr);
const uint8_t * const CAST_FROM_VOIDP(src, mem);
if (pad_bytes == 0) {
paranoid_invariant(aligned_memsize == mem_length);
memcpy(dest, src, aligned_memsize);
} else {
// TODO(leif): check what vectorizes best: multiplying like this or adding to offsets
const uint32_t fixed_len = this->value_length;
const uint32_t fixed_aligned_len = align(this->value_length);
paranoid_invariant(this->d.a.num_values*fixed_len == mem_length);
......@@ -136,25 +149,6 @@ void dmt<dmtdata_t, dmtdataout_t>::create_from_sorted_memory_of_fixed_size_eleme
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::create_no_array(void) {
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::create_internal_no_alloc(bool as_tree) {
this->values_same_size = true;
this->value_length = 0;
this->is_array = !as_tree;
if (as_tree) {
} else {
this->d.a.start_idx = 0;
this->d.a.num_values = 0;
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::clone(const dmt &src) {
*this = src;
......@@ -164,11 +158,12 @@ void dmt<dmtdata_t, dmtdataout_t>::clone(const dmt &src) {
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::clear(void) {
this->is_array = true;
this->d.a.start_idx = 0;
this->d.a.num_values = 0;
this->values_same_size = true; // Reset state
this->value_length = 0;
//TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
// One 'fix' is for mempool to also track what was touched, and reset() shouldn't reset that, though realloc() might.
template<typename dmtdata_t, typename dmtdataout_t>
......@@ -220,28 +215,17 @@ int dmt<dmtdata_t, dmtdataout_t>::insert_at(const dmtdatain_t &value, const uint
if (idx > this->size()) { return EINVAL; }
bool same_size = this->values_same_size && (this->size() == 0 || value.get_dmtdatain_t_size() == this->value_length);
if (same_size) {
if (this->is_array) {
if (idx == this->d.a.num_values) {
return this->insert_at_array_end<true>(value);
#if 0
//TODO: enable if we support delete_at with array
if (idx == 0 && this->d.a.start_idx > 0) {
paranoid_invariant(false); // Should not be possible (yet)
return this->insert_at_array_beginning(value);
if (this->is_array) {
if (same_size && idx == this->d.a.num_values) {
return this->insert_at_array_end<true>(value);
// Is a tree.
if (!same_size) {
this->values_same_size = false;
// Is a tree.
subtree *rebalance_subtree = nullptr;
......@@ -263,27 +247,13 @@ int dmt<dmtdata_t, dmtdataout_t>::insert_at_array_end(const dmtdatain_t& value_i
paranoid_invariant(this->value_length == value_in.get_dmtdatain_t_size());
if (with_resize) {
dmtdata_t *dest = this->alloc_array_value_end();
return 0;
template<typename dmtdata_t, typename dmtdataout_t>
int dmt<dmtdata_t, dmtdataout_t>::insert_at_array_beginning(const dmtdatain_t& value_in) {
invariant(false); //TODO: enable this later
paranoid_invariant(this->d.a.num_values > 0);
//TODO: when deleting last element, should set start_idx to 0
this->maybe_resize_array(+1); // +1 or 0? Depends on how memory management works
dmtdata_t *dest = this->alloc_array_value_beginning();
return 0;
template<typename dmtdata_t, typename dmtdataout_t>
dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::alloc_array_value_end(void) {
......@@ -298,55 +268,37 @@ dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::alloc_array_value_end(void) {
return n;
template<typename dmtdata_t, typename dmtdataout_t>
dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::alloc_array_value_beginning(void) {
invariant(false); //TODO: enable this later
paranoid_invariant(this->d.a.start_idx > 0);
const uint32_t real_idx = --this->d.a.start_idx;
//TODO: figure out how to keep mempool correct here.. do we free during delete_at (begin)? If so how do we re'malloc' from beginning? Alternatively never free from beginning?
return get_array_value_internal(&this->mp, real_idx);
template<typename dmtdata_t, typename dmtdataout_t>
dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::get_array_value(const uint32_t idx) const {
//TODO: verify initial create always set is_array and values_same_size
paranoid_invariant(idx < this->d.a.num_values);
const uint32_t real_idx = idx + this->d.a.start_idx;
return get_array_value_internal(&this->mp, real_idx);
return get_array_value_internal(&this->mp, idx);
template<typename dmtdata_t, typename dmtdataout_t>
dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::get_array_value_internal(const struct mempool *mempool, const uint32_t real_idx) const {
void* ptr = toku_mempool_get_pointer_from_base_and_offset(mempool, real_idx * align(this->value_length));
dmtdata_t * dmt<dmtdata_t, dmtdataout_t>::get_array_value_internal(const struct mempool *mempool, const uint32_t idx) const {
void* ptr = toku_mempool_get_pointer_from_base_and_offset(mempool, idx * align(this->value_length));
dmtdata_t *CAST_FROM_VOIDP(value, ptr);
return value;
//TODO(leif) write microbenchmarks to compare growth factor. Note: growth factor here is actually 2.5 because of mempool_construct
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::maybe_resize_array(const int change) {
paranoid_invariant(change == -1 || change == 1);
paranoid_invariant(change == 1); //TODO: go over change == -1.. may or may not be ok
bool space_available = change < 0 || toku_mempool_get_free_space(&this->mp) >= align(this->value_length);
void dmt<dmtdata_t, dmtdataout_t>::maybe_resize_array_for_insert(void) {
bool space_available = toku_mempool_get_free_space(&this->mp) >= align(this->value_length);
const uint32_t n = this->d.a.num_values + change;
const uint32_t new_n = n<=2 ? 4 : 2*n;
const uint32_t new_space = align(this->value_length) * new_n;
bool too_much_space = new_space <= toku_mempool_get_size(&this->mp) / 2;
if (!space_available) {
const uint32_t n = this->d.a.num_values + 1;
const uint32_t new_n = n <=2 ? 4 : 2*n;
const uint32_t new_space = align(this->value_length) * new_n;
if (!space_available || too_much_space) {
struct mempool new_kvspace;
toku_mempool_construct(&new_kvspace, new_space);
size_t copy_bytes = this->d.a.num_values * align(this->value_length);
invariant(copy_bytes + align(this->value_length) <= new_space);
paranoid_invariant(copy_bytes <= toku_mempool_get_used_space(&this->mp));
// Copy over to new mempool
if (this->d.a.num_values > 0) {
void* dest = toku_mempool_malloc(&new_kvspace, copy_bytes, 1);
......@@ -355,7 +307,6 @@ void dmt<dmtdata_t, dmtdataout_t>::maybe_resize_array(const int change) {
this->mp = new_kvspace;
this->d.a.start_idx = 0;
......@@ -378,16 +329,16 @@ void dmt<dmtdata_t, dmtdataout_t>::convert_from_tree_to_array(void) {
const uint32_t num_values = this->size();
node_idx *tmp_array;
node_offset *tmp_array;
bool malloced = false;
tmp_array = alloc_temp_node_idxs(num_values);
tmp_array = alloc_temp_node_offsets(num_values);
if (!tmp_array) {
malloced = true;
XMALLOC_N(num_values, tmp_array);
this->fill_array_with_subtree_idxs(tmp_array, this->d.t.root);
this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
struct mempool new_mp = this->mp;
struct mempool new_mp;
const uint32_t fixed_len = this->value_length;
const uint32_t fixed_aligned_len = align(this->value_length);
size_t mem_needed = num_values * fixed_aligned_len;
......@@ -401,7 +352,6 @@ void dmt<dmtdata_t, dmtdataout_t>::convert_from_tree_to_array(void) {
this->mp = new_mp;
this->is_array = true;
this->d.a.start_idx = 0;
this->d.a.num_values = num_values;
if (malloced) toku_free(tmp_array);
......@@ -414,12 +364,10 @@ void dmt<dmtdata_t, dmtdataout_t>::convert_from_array_to_tree(void) {
//save array-format information to locals
const uint32_t num_values = this->d.a.num_values;
const uint32_t offset = this->d.a.start_idx;
paranoid_invariant_zero(offset); //TODO: remove this
node_idx *tmp_array;
node_offset *tmp_array;
bool malloced = false;
tmp_array = alloc_temp_node_idxs(num_values);
tmp_array = alloc_temp_node_offsets(num_values);
if (!tmp_array) {
malloced = true;
XMALLOC_N(num_values, tmp_array);
......@@ -430,11 +378,11 @@ void dmt<dmtdata_t, dmtdataout_t>::convert_from_array_to_tree(void) {
toku_mempool_construct(&this->mp, mem_needed);
for (uint32_t i = 0; i < num_values; i++) {
dmtdatain_t functor(this->value_length, get_array_value_internal(&old_mp, i+offset));
dmtdatain_t functor(this->value_length, get_array_value_internal(&old_mp, i));
tmp_array[i] = node_malloc_and_set_value(functor);
this->is_array = false;
this->rebuild_subtree_from_idxs(&this->d.t.root, tmp_array, num_values);
this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, num_values);
if (malloced) toku_free(tmp_array);
......@@ -450,20 +398,14 @@ int dmt<dmtdata_t, dmtdataout_t>::delete_at(const uint32_t idx) {
return 0;
if (this->is_array) {
//TODO: support array delete
//TODO: If/when we implement array delete, we must update verify() (and others?) w.r.t. mempool fragmentation
if (this->is_array) {
} else {
subtree *rebalance_subtree = nullptr;
this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree);
if (rebalance_subtree != nullptr) {
subtree *rebalance_subtree = nullptr;
this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree);
if (rebalance_subtree != nullptr) {
return 0;
......@@ -488,6 +430,7 @@ int dmt<dmtdata_t, dmtdataout_t>::iterate_on_range(const uint32_t left, const ui
return this->iterate_internal<iterate_extra_t, f>(left, right, this->d.t.root, 0, iterate_extra);
//TODO(yoni) determine where this is used and if it should crash or return an error on bad verify
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::verify(void) const {
uint32_t num_values = this->size();
......@@ -504,7 +447,7 @@ void dmt<dmtdata_t, dmtdataout_t>::verify(void) const {
invariant(pool_used == num_values * align(this->value_length));
// Array form must have 0 fragmentation in mempool.
invariant(pool_frag == 0); //TODO: if/when we implement array delete this invariant may need to change.
invariant(pool_frag == 0);
} else {
if (this->values_same_size) {
// We know exactly how much memory should be used.
......@@ -624,13 +567,11 @@ size_t dmt<dmtdata_t, dmtdataout_t>::memory_size(void) {
template<typename dmtdata_t, typename dmtdataout_t>
dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t>::get_node(const subtree &subtree) const {
return get_node(subtree.get_index());
return get_node(subtree.get_offset());
template<typename dmtdata_t, typename dmtdataout_t>
dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t>::get_node(const node_idx offset) const {
//TODO: implement
//Need to decide what to do with regards to cnode/dnode
dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t>::get_node(const node_offset offset) const {
void* ptr = toku_mempool_get_pointer_from_base_and_offset(&this->mp, offset);
dmt_node *CAST_FROM_VOIDP(node, ptr);
return *node;
......@@ -643,7 +584,7 @@ void dmt<dmtdata_t, dmtdataout_t>::node_set_value(dmt_node * n, const dmtdatain_
template<typename dmtdata_t, typename dmtdataout_t>
node_idx dmt<dmtdata_t, dmtdataout_t>::node_malloc_and_set_value(const dmtdatain_t &value) {
node_offset dmt<dmtdata_t, dmtdataout_t>::node_malloc_and_set_value(const dmtdatain_t &value) {
size_t val_size = value.get_dmtdatain_t_size();
size_t size_to_alloc = __builtin_offsetof(dmt_node, value) + val_size;
size_to_alloc = align(size_to_alloc);
......@@ -676,29 +617,31 @@ void dmt<dmtdata_t, dmtdataout_t>::maybe_resize_tree(const dmtdatain_t * value)
const ssize_t need_size = curr_used + add_size;
paranoid_invariant(need_size <= UINT32_MAX);
//TODO(leif) consider different growth rates
const ssize_t new_size = 2*need_size;
paranoid_invariant(new_size <= UINT32_MAX);
//const uint32_t num_nodes = this->nweight(this->d.t.root);
if ((curr_capacity / 2 >= new_size) || // Way too much allocated
(curr_free < add_size)) { // No room in mempool
// Copy all memory and reconstruct dmt in new mempool.
struct mempool new_kvspace;
struct mempool old_kvspace;
toku_mempool_construct(&new_kvspace, new_size);
if (curr_free < add_size && toku_mempool_get_frag_size(&this->mp) == 0) {
// TODO(yoni) or TODO(leif) consider doing this not just when frag size is zero, but also when it is a small percentage of the total mempool size
// Offsets remain the same in the new mempool so we can just realloc.
toku_mempool_realloc_larger(&this->mp, new_size);
} else if (!this->d.t.root.is_null()) {
struct mempool new_kvspace;
toku_mempool_construct(&new_kvspace, new_size);
if (!this->d.t.root.is_null()) {
const dmt_node &n = get_node(this->d.t.root);
node_idx *tmp_array;
node_offset *tmp_array;
bool malloced = false;
tmp_array = alloc_temp_node_idxs(n.weight);
tmp_array = alloc_temp_node_offsets(n.weight);
if (!tmp_array) {
malloced = true;
XMALLOC_N(n.weight, tmp_array);
this->fill_array_with_subtree_idxs(tmp_array, this->d.t.root);
for (node_idx i = 0; i < n.weight; i++) {
this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
for (node_offset i = 0; i < n.weight; i++) {
dmt_node &node = get_node(tmp_array[i]);
const size_t bytes_to_copy = __builtin_offsetof(dmt_node, value) + node.value_length;
const size_t bytes_to_alloc = align(bytes_to_copy);
......@@ -707,15 +650,14 @@ void dmt<dmtdata_t, dmtdataout_t>::maybe_resize_tree(const dmtdatain_t * value)
tmp_array[i] = toku_mempool_get_offset_from_pointer_and_base(&new_kvspace, newdata);
old_kvspace = this->mp;
struct mempool old_kvspace = this->mp;
this->mp = new_kvspace;
this->rebuild_subtree_from_idxs(&this->d.t.root, tmp_array, n.weight);
this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, n.weight);
if (malloced) toku_free(tmp_array);
else {
} else {
this->mp = new_kvspace;
toku_mempool_construct(&this->mp, new_size);
......@@ -737,12 +679,12 @@ template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::insert_internal(subtree *const subtreep, const dmtdatain_t &value, const uint32_t idx, subtree **const rebalance_subtree) {
if (subtreep->is_null()) {
const node_idx newidx = this->node_malloc_and_set_value(value);
dmt_node &newnode = get_node(newidx);
const node_offset newoffset = this->node_malloc_and_set_value(value);
dmt_node &newnode = get_node(newoffset);
newnode.weight = 1;
} else {
dmt_node &n = get_node(*subtreep);
......@@ -777,12 +719,13 @@ void dmt<dmtdata_t, dmtdataout_t>::delete_internal(subtree *const subtreep, cons
} else if (idx == leftweight) {
// Found the correct index.
if (n.left.is_null()) {
// Delete n and let parent point to n.right
subtree ptr_this = *subtreep;
*subtreep = n.right;
subtree to_free;
if (subtree_replace != nullptr) {
// Swap self with the other node.
// Swap self with the other node. Taking over all responsibility.
to_free = *subtree_replace;
dmt_node &ancestor = get_node(*subtree_replace);
if (*rebalance_subtree == &ancestor.right) {
......@@ -801,6 +744,7 @@ void dmt<dmtdata_t, dmtdataout_t>::delete_internal(subtree *const subtreep, cons
// Delete n and let parent point to n.left
subtree to_free = *subtreep;
*subtreep = n.left;
paranoid_invariant_null(subtree_replace); // To be recursive, we're looking for index 0. n is index > 0 here.
} else {
......@@ -913,36 +857,37 @@ void dmt<dmtdata_t, dmtdataout_t>::fetch_internal(const subtree &subtree, const
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::fill_array_with_subtree_idxs(node_idx *const array, const subtree &subtree) const {
void dmt<dmtdata_t, dmtdataout_t>::fill_array_with_subtree_offsets(node_offset *const array, const subtree &subtree) const {
if (!subtree.is_null()) {
const dmt_node &tree = get_node(subtree);
this->fill_array_with_subtree_idxs(&array[0], tree.left);
array[this->nweight(tree.left)] = subtree.get_index();
this->fill_array_with_subtree_idxs(&array[this->nweight(tree.left) + 1], tree.right);
this->fill_array_with_subtree_offsets(&array[0], tree.left);
array[this->nweight(tree.left)] = subtree.get_offset();
this->fill_array_with_subtree_offsets(&array[this->nweight(tree.left) + 1], tree.right);
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::rebuild_subtree_from_idxs(subtree *const subtree, const node_idx *const idxs, const uint32_t numvalues) {
void dmt<dmtdata_t, dmtdataout_t>::rebuild_subtree_from_offsets(subtree *const subtree, const node_offset *const offsets, const uint32_t numvalues) {
if (numvalues==0) {
} else {
uint32_t halfway = numvalues/2;
dmt_node &newnode = get_node(idxs[halfway]);
dmt_node &newnode = get_node(offsets[halfway]);
newnode.weight = numvalues;
// value is already in there.
this->rebuild_subtree_from_idxs(&newnode.left, &idxs[0], halfway);
this->rebuild_subtree_from_idxs(&newnode.right, &idxs[halfway+1], numvalues-(halfway+1));
this->rebuild_subtree_from_offsets(&newnode.left, &offsets[0], halfway);
this->rebuild_subtree_from_offsets(&newnode.right, &offsets[halfway+1], numvalues-(halfway+1));
//TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
template<typename dmtdata_t, typename dmtdataout_t>
node_idx* dmt<dmtdata_t, dmtdataout_t>::alloc_temp_node_idxs(uint32_t num_idxs) {
size_t mem_needed = num_idxs * sizeof(node_idx);
node_offset* dmt<dmtdata_t, dmtdataout_t>::alloc_temp_node_offsets(uint32_t num_offsets) {
size_t mem_needed = num_offsets * sizeof(node_offset);
size_t mem_free;
mem_free = toku_mempool_get_free_space(&this->mp);
node_idx* CAST_FROM_VOIDP(tmp, toku_mempool_get_next_free_ptr(&this->mp));
node_offset* CAST_FROM_VOIDP(tmp, toku_mempool_get_next_free_ptr(&this->mp));
if (mem_free >= mem_needed) {
return tmp;
......@@ -951,36 +896,28 @@ node_idx* dmt<dmtdata_t, dmtdataout_t>::alloc_temp_node_idxs(uint32_t num_idxs)
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::rebalance(subtree *const subtree) {
(void) subtree;
node_idx idx = subtree->get_index();
//TODO: restore optimization, maybe only if values_same_size
#if 0
if (!dynamic && idx==this->d.t.root.get_index()) {
//Try to convert to an array.
//If this fails, (malloc) nothing will have changed.
//In the failure case we continue on to the standard rebalance
if (supports_marks) {
} else {
const dmt_node &n = get_node(idx);
node_idx *tmp_array;
bool malloced = false;
tmp_array = alloc_temp_node_idxs(n.weight);
if (!tmp_array) {
malloced = true;
XMALLOC_N(n.weight, tmp_array);
this->fill_array_with_subtree_idxs(tmp_array, *subtree);
this->rebuild_subtree_from_idxs(subtree, tmp_array, n.weight);
if (malloced) toku_free(tmp_array);
#if 0
// There is a possible "optimization" here:
// if (this->values_same_size && subtree == &this->d.t.root) {
// this->convert_from_tree_to_array();
// return;
// }
// but we don't want to do it because it involves actually copying values around
// as opposed to stopping in the middle of rebalancing (like in the OMT)
node_offset offset = subtree->get_offset();
const dmt_node &n = get_node(offset);
node_offset *tmp_array;
bool malloced = false;
tmp_array = alloc_temp_node_offsets(n.weight);
if (!tmp_array) {
malloced = true;
XMALLOC_N(n.weight, tmp_array);
this->fill_array_with_subtree_offsets(tmp_array, *subtree);
this->rebuild_subtree_from_offsets(subtree, tmp_array, n.weight);
if (malloced) toku_free(tmp_array);
template<typename dmtdata_t, typename dmtdataout_t>
......@@ -1207,22 +1144,22 @@ bool dmt<dmtdata_t, dmtdataout_t>::value_length_is_fixed(void) const {
template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::serialize_values(uint32_t expected_unpadded_memory, struct wbuf *wb) const {
const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
const uint32_t fixed_len = this->value_length;
const uint32_t fixed_aligned_len = align(this->value_length);
paranoid_invariant(expected_unpadded_memory == this->d.a.num_values * this->value_length);
paranoid_invariant(toku_mempool_get_used_space(&this->mp) >=
expected_unpadded_memory + pad_bytes * this->d.a.num_values +
this->d.a.start_idx * fixed_aligned_len);
expected_unpadded_memory + pad_bytes * this->d.a.num_values);
if (this->d.a.num_values == 0) {
// Nothing to serialize
} else if (pad_bytes == 0) {
// Basically a memcpy
wbuf_nocrc_literal_bytes(wb, get_array_value(0), expected_unpadded_memory);
} else {
uint8_t* dest = wbuf_nocrc_reserve_literal_bytes(wb, expected_unpadded_memory);
uint8_t* src = reinterpret_cast<uint8_t*>(get_array_value(0));
paranoid_invariant(this->d.a.num_values*fixed_len == expected_unpadded_memory);
uint8_t* const dest = wbuf_nocrc_reserve_literal_bytes(wb, expected_unpadded_memory);
const uint8_t* const src = reinterpret_cast<uint8_t*>(get_array_value(0));
//TODO(leif) maybe look at vectorization here
for (uint32_t i = 0; i < this->d.a.num_values; i++) {
memcpy(&dest[i*fixed_len], &src[i*fixed_aligned_len], fixed_len);
......@@ -1233,9 +1170,10 @@ template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::builder::create(uint32_t _max_values, uint32_t _max_value_bytes) {
this->max_values = _max_values;
this->max_value_bytes = _max_value_bytes;
this->temp_valid = true;
this->sorted_nodes = nullptr;
this->sorted_node_offsets = nullptr;
// Include enough space for alignment padding
size_t initial_space = (ALIGNMENT - 1) * _max_values + _max_value_bytes;
......@@ -1251,8 +1189,8 @@ void dmt<dmtdata_t, dmtdataout_t>::builder::append(const dmtdatain_t &value) {
if (this->temp.is_array) {
// Convert to tree format
XMALLOC_N(this->max_values, this->sorted_nodes);
// Convert to tree format (without weights and linkage)
XMALLOC_N(this->max_values, this->sorted_node_offsets);
// Include enough space for alignment padding
size_t mem_needed = (ALIGNMENT - 1 + __builtin_offsetof(dmt_node, value)) * max_values + max_value_bytes;
......@@ -1261,19 +1199,18 @@ void dmt<dmtdata_t, dmtdataout_t>::builder::append(const dmtdatain_t &value) {
const uint32_t num_values = this->temp.d.a.num_values;
toku_mempool_construct(&this->, mem_needed);
// Copy over and get node_idxs
// Copy over and get node_offsets
for (uint32_t i = 0; i < num_values; i++) {
dmtdatain_t functor(this->temp.value_length, this->temp.get_array_value_internal(&old_mp, i));
this->sorted_nodes[i] = this->temp.node_malloc_and_set_value(functor);
this->sorted_node_offsets[i] = this->temp.node_malloc_and_set_value(functor);
this->temp.is_array = false;
this->temp.values_same_size = false;
this->temp.value_length = 0;
this->temp.values_same_size = false;
// Insert dynamic.
this->sorted_nodes[this->temp.d.a.num_values++] = this->temp.node_malloc_and_set_value(value);
this->sorted_node_offsets[this->temp.d.a.num_values++] = this->temp.node_malloc_and_set_value(value);
template<typename dmtdata_t, typename dmtdataout_t>
......@@ -1286,22 +1223,30 @@ template<typename dmtdata_t, typename dmtdataout_t>
void dmt<dmtdata_t, dmtdataout_t>::builder::build(dmt<dmtdata_t, dmtdataout_t> *dest) {
//NOTE: Always use d.a.num_values for size because we have not yet created root.
invariant(this->temp.d.a.num_values == this->max_values); // Optionally make it <=
// Memory invariant is taken care of incrementally
invariant(this->temp.d.a.num_values <= this->max_values);
// Memory invariant is taken care of incrementally (during append())
if (!this->temp.is_array) {
this->temp.rebuild_subtree_from_idxs(&this->temp.d.t.root, this->sorted_nodes, this->temp.d.a.num_values);
this->sorted_nodes = nullptr;
size_t used = toku_mempool_get_used_space(&this->;
size_t allocated = toku_mempool_get_size(&this->;
size_t max_allowed = used + used / 4;
size_t footprint = toku_mempool_footprint(&this->;
if (allocated > max_allowed && footprint > max_allowed) {
this->temp.rebuild_subtree_from_offsets(&this->temp.d.t.root, this->sorted_node_offsets, this->temp.d.a.num_values);
this->sorted_node_offsets = nullptr;
const size_t used = toku_mempool_get_used_space(&this->;
const size_t allocated = toku_mempool_get_size(&this->;
// We want to use no more than (about) the actual used space + 25% overhead for mempool growth.
// When we know the elements are fixed-length, we use the better dmt constructor.
// In practice, as of Jan 2014, we use the builder in two cases:
// - When we know the elements are not fixed-length.
// - During upgrade of a pre version 25 basement node.
// During upgrade, we will probably wildly overallocate because we don't account for the values that aren't stored in the dmt, so here we want to shrink the mempool.
// When we know the elements are not fixed-length, we still know how much memory they occupy in total, modulo alignment, so we want to allow for mempool overhead and worst-case alignment overhead, and not shrink the mempool.
const size_t max_allowed = used + (ALIGNMENT-1) * this->temp.size();
const size_t max_allowed_with_mempool_overhead = max_allowed + max_allowed / 4;
//TODO(leif): get footprint calculation correct (under jemalloc) and add some form of footprint constraint
if (allocated > max_allowed_with_mempool_overhead) {
// Reallocate smaller mempool to save memory
struct mempool new_mp;
......@@ -98,7 +98,7 @@ PATENT RIGHTS GRANT:
#include <vector>
namespace toku {
typedef uint32_t node_idx;
typedef uint32_t node_offset;
......@@ -170,14 +170,14 @@ class subtree {
inline bool is_null(void) const {
return NODE_NULL == this->get_index();
return NODE_NULL == this->get_offset();
inline node_idx get_index(void) const {
inline node_offset get_offset(void) const {
return m_index;
inline void set_index(node_idx index) {
inline void set_offset(node_offset index) {
paranoid_invariant(index != NODE_NULL);
m_index = index;
......@@ -231,7 +231,7 @@ class dmt {
uint32_t max_values;
uint32_t max_value_bytes;
node_idx *sorted_nodes;
node_offset *sorted_node_offsets;
bool temp_valid;
dmt<dmtdata_t, dmtdataout_t> temp;
......@@ -242,13 +242,6 @@ class dmt {
void create(void);
* Effect: Create an empty DMT with no internal allocated space.
* Performance: constant time.
* Rationale: In some cases we need a valid dmt but don't want to malloc.
void create_no_array(void);
* Effect: Create a DMT containing values. The number of values is in numvalues.
* Requires: this has not been created yet
......@@ -517,7 +510,6 @@ class dmt {
struct dmt_array {
uint32_t start_idx;
uint32_t num_values;
......@@ -525,6 +517,7 @@ class dmt {
subtree root;
// TODO(yoni): comment relationship between values_same_size, num values, value_length, note about tree still keeping track (until lost once)
bool values_same_size;
uint32_t value_length;
struct mempool mp;
......@@ -538,21 +531,19 @@ class dmt {
void verify_internal(const subtree &subtree, std::vector<bool> *touched) const;
void create_internal_no_alloc(bool as_tree);
dmt_node & get_node(const subtree &subtree) const;
dmt_node & get_node(const node_idx offset) const;
dmt_node & get_node(const node_offset offset) const;
uint32_t nweight(const subtree &subtree) const;
node_idx node_malloc_and_set_value(const dmtdatain_t &value);
node_offset node_malloc_and_set_value(const dmtdatain_t &value);
void node_set_value(dmt_node *n, const dmtdatain_t &value);
void node_free(const subtree &st);
void maybe_resize_array(const int change);
void maybe_resize_array_for_insert(void);
void convert_to_tree(void);
......@@ -566,15 +557,11 @@ class dmt {
template<bool with_resize>
int insert_at_array_end(const dmtdatain_t& value_in);
int insert_at_array_beginning(const dmtdatain_t& value_in);
dmtdata_t * alloc_array_value_end(void);
dmtdata_t * alloc_array_value_beginning(void);
dmtdata_t * get_array_value(const uint32_t idx) const;
dmtdata_t * get_array_value_internal(const struct mempool *mempool, const uint32_t real_idx) const;
dmtdata_t * get_array_value_internal(const struct mempool *mempool, const uint32_t idx) const;
void convert_from_array_to_tree(void);
......@@ -610,10 +597,10 @@ class dmt {
void fetch_internal(const subtree &subtree, const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const;
void fill_array_with_subtree_idxs(node_idx *const array, const subtree &subtree) const;
void fill_array_with_subtree_offsets(node_offset *const array, const subtree &subtree) const;
void rebuild_subtree_from_idxs(subtree *const subtree, const node_idx *const idxs, const uint32_t numvalues);
void rebuild_subtree_from_offsets(subtree *const subtree, const node_offset *const offsets, const uint32_t numvalues);
void rebalance(subtree *const subtree);
......@@ -654,7 +641,7 @@ class dmt {
int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
int find_internal_minus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const;
node_idx* alloc_temp_node_idxs(uint32_t num_idxs);
node_offset* alloc_temp_node_offsets(uint32_t num_idxs);
uint32_t align(const uint32_t x) const;
......@@ -142,6 +142,11 @@ void toku_mempool_construct(struct mempool *mp, size_t data_size) {
void toku_mempool_reset(struct mempool *mp) {
mp->free_offset = 0;
mp->frag_size = 0;
void toku_mempool_realloc_larger(struct mempool *mp, size_t data_size) {
invariant(data_size > mp->free_offset);
size_t mpsize = data_size + (data_size/4); // allow 1/4 room for expansion (would be wasted if read-only)
......@@ -123,6 +123,10 @@ void toku_mempool_init(struct mempool *mp, void *base, size_t free_offset, size_
void toku_mempool_construct(struct mempool *mp, size_t data_size);
/* treat mempool as if it has just been created; ignore any frag and start allocating from beginning again.
void toku_mempool_reset(struct mempool *mp);
/* reallocate memory for construct mempool
void toku_mempool_realloc_larger(struct mempool *mp, size_t data_size);
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment