Commit 4233ec1e authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5559 merge 5559 to main


git-svn-id: file:///svn/toku/tokudb@50812 c7de825b-a66e-492c-adef-691d508d4ae1
parent b1d3e831
......@@ -33,7 +33,7 @@ typedef struct flusher_advice FLUSHER_ADVICE;
typedef int (*FA_PICK_CHILD)(FT h, FTNODE parent, void* extra);
/**
* Decide whether to call `flush_some_child` on the child if it is
* Decide whether to call `toku_ft_flush_some_child` on the child if it is
* stable and a nonleaf node.
*
* Flusher threads: yes if child is gorged
......@@ -76,9 +76,9 @@ typedef bool (*FA_SHOULD_DESTROY_BN)(void* extra);
/**
* Update `ft_flusher_status` in whatever way necessary. Called once
* by `flush_some_child` right before choosing what to do next (split,
* by `toku_ft_flush_some_child` right before choosing what to do next (split,
* merge, recurse), with the number of nodes that were dirtied by this
* execution of `flush_some_child`.
* execution of `toku_ft_flush_some_child`.
*/
typedef void (*FA_UPDATE_STATUS)(FTNODE child, int dirtied, void* extra);
......@@ -109,17 +109,6 @@ struct flusher_advice {
void* extra; // parameter passed into callbacks
};
// FIXME all of these need the toku prefix
//
// how about:
//
// toku_ftnode_flush_some_child()
// toku_fa_flusher_advice_init()
// toku_fa_always_recursively_flush()
// toku_fa_dont_destroy_basement_nodes()
// toku_fa_default_merge_child()
// toku_fa_default_pick_child_after_split()
void
flusher_advice_init(
struct flusher_advice *fa,
......@@ -132,11 +121,11 @@ flusher_advice_init(
void* extra
);
void
flush_some_child(
FT h,
void toku_ft_flush_some_child(
FT ft,
FTNODE parent,
struct flusher_advice *fa);
struct flusher_advice *fa
);
bool
always_recursively_flush(FTNODE child, void* extra);
......
......@@ -288,7 +288,7 @@ flt_update_status(FTNODE child,
{
struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
update_flush_status(child, fste->cascades);
// If `flush_some_child` decides to recurse after this, we'll need
// If `toku_ft_flush_some_child` decides to recurse after this, we'll need
// cascades to increase. If not it doesn't matter.
fste->cascades++;
}
......@@ -420,7 +420,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
(void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
(void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
flush_some_child(h, root_node, &new_fa);
toku_ft_flush_some_child(h, root_node, &new_fa);
(void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
......@@ -436,7 +436,7 @@ ct_update_status(FTNODE child,
struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
update_flush_status(child, fste->cascades);
STATUS_VALUE(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
// Incrementing this in case `flush_some_child` decides to recurse.
// Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
fste->cascades++;
}
......@@ -719,6 +719,23 @@ move_leafentries(
}
}
static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
// Effect: Finalizes a split by updating some bits and dirtying both nodes
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(B);
verify_all_in_mempool(node);
verify_all_in_mempool(B);
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
// The new node in the split inherits the oldest known reference xid
B->oldest_known_referenced_xid = node->oldest_known_referenced_xid;
node->dirty = 1;
B->dirty = 1;
}
void
ftleaf_split(
FT h,
......@@ -914,18 +931,9 @@ ftleaf_split(
REALLOC_N(num_children_in_node-1, node->childkeys);
}
verify_all_in_mempool(node);
verify_all_in_mempool(B);
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dirty = 1;
B->dirty = 1;
ftnode_finalize_split(node, B, max_msn_applied_to_node);
*nodea = node;
*nodeb = B;
} // end of ftleaf_split()
void
......@@ -989,15 +997,7 @@ ft_nonleaf_split(
REALLOC_N(n_children_in_a-1, node->childkeys);
}
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dirty = 1;
B->dirty = 1;
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(B);
//VERIFY_NODE(t,node);
//VERIFY_NODE(t,B);
ftnode_finalize_split(node, B, max_msn_applied_to_node);
*nodea = node;
*nodeb = B;
}
......@@ -1050,12 +1050,12 @@ ft_split_child(
if (picked_child == childnum ||
(picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
toku_unpin_ftnode_off_client_thread(h, nodeb);
flush_some_child(h, nodea, fa);
toku_ft_flush_some_child(h, nodea, fa);
}
else if (picked_child == childnum + 1 ||
(picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
toku_unpin_ftnode_off_client_thread(h, nodea);
flush_some_child(h, nodeb, fa);
toku_ft_flush_some_child(h, nodeb, fa);
}
else {
toku_unpin_ftnode_off_client_thread(h, nodea);
......@@ -1063,6 +1063,21 @@ ft_split_child(
}
}
static void bring_node_fully_into_memory(FTNODE node, FT ft) {
if (!is_entire_node_in_memory(node)) {
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft);
toku_cachetable_pf_pinned_pair(
node,
toku_ftnode_pf_callback,
&bfe,
ft->cf,
node->thisnodename,
toku_cachetable_hash(ft->cf, node->thisnodename)
);
}
}
static void
flush_this_child(
FT h,
......@@ -1090,8 +1105,9 @@ flush_this_child(
NONLEAF_CHILDINFO bnc = BNC(node, childnum);
set_BNC(node, childnum, toku_create_empty_nl());
// now we have a bnc to flush to the child
toku_bnc_flush_to_child(h, bnc, child);
// now we have a bnc to flush to the child. pass down the parent's
// oldest known referenced xid as we flush down to the child.
toku_bnc_flush_to_child(h, bnc, child, node->oldest_known_referenced_xid);
destroy_nonleaf_childinfo(bnc);
}
......@@ -1487,18 +1503,18 @@ ft_merge_child(
toku_unpin_ftnode_off_client_thread(h, childb);
}
if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
flush_some_child(h, childa, fa);
toku_ft_flush_some_child(h, childa, fa);
}
else {
toku_unpin_ftnode_off_client_thread(h, childa);
}
}
void
flush_some_child(
FT h,
static void ft_flush_some_child_with_xid(
FT ft,
FTNODE parent,
struct flusher_advice *fa)
struct flusher_advice *fa,
TXNID oldest_referenced_xid)
// Effect: This function does the following:
// - Pick a child of parent (the heaviest child),
// - flush from parent to child,
......@@ -1514,27 +1530,27 @@ flush_some_child(
toku_assert_entire_node_in_memory(parent);
// pick the child we want to flush to
int childnum = fa->pick_child(h, parent, fa->extra);
int childnum = fa->pick_child(ft, parent, fa->extra);
// for test
call_flusher_thread_callback(flt_flush_before_child_pin);
// get the child into memory
BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
toku_verify_blocknum_allocated(h->blocktable, targetchild);
uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
toku_verify_blocknum_allocated(ft->blocktable, targetchild);
uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
FTNODE child;
struct ftnode_fetch_extra bfe;
// Note that we don't read the entire node into memory yet.
// The idea is let's try to do the minimum work before releasing the parent lock
fill_bfe_for_min_read(&bfe, h);
toku_pin_ftnode_off_client_thread(h, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child);
fill_bfe_for_min_read(&bfe, ft);
toku_pin_ftnode_off_client_thread(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child);
// for test
call_flusher_thread_callback(ft_flush_aflter_child_pin);
if (fa->should_destroy_basement_nodes(fa)) {
maybe_destroy_child_blbs(parent, child, h);
maybe_destroy_child_blbs(parent, child, ft);
}
//Note that at this point, we don't have the entire child in.
......@@ -1567,7 +1583,7 @@ flush_some_child(
// reactive, we can unpin the parent
//
if (!may_child_be_reactive) {
toku_unpin_ftnode_off_client_thread(h, parent);
toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL;
}
......@@ -1575,7 +1591,7 @@ flush_some_child(
// now, if necessary, read/decompress the rest of child into memory,
// so that we can proceed and apply the flush
//
bring_node_fully_into_memory(child, h);
bring_node_fully_into_memory(child, ft);
// It is possible after reading in the entire child,
// that we now know that the child is not reactive
......@@ -1583,9 +1599,9 @@ flush_some_child(
// we wont be splitting/merging child
// and we have already replaced the bnc
// for the root with a fresh one
enum reactivity child_re = get_node_reactivity(child, h->h->nodesize);
enum reactivity child_re = get_node_reactivity(child, ft->h->nodesize);
if (parent && child_re == RE_STABLE) {
toku_unpin_ftnode_off_client_thread(h, parent);
toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL;
}
......@@ -1601,9 +1617,10 @@ flush_some_child(
}
// do the actual flush
toku_bnc_flush_to_child(
h,
ft,
bnc,
child
child,
oldest_referenced_xid
);
destroy_nonleaf_childinfo(bnc);
}
......@@ -1612,7 +1629,7 @@ flush_some_child(
// let's get the reactivity of the child again,
// it is possible that the flush got rid of some values
// and now the parent is no longer reactive
child_re = get_node_reactivity(child, h->h->nodesize);
child_re = get_node_reactivity(child, ft->h->nodesize);
// if the parent has been unpinned above, then
// this is our only option, even if the child is not stable
// if the child is not stable, we'll handle it the next
......@@ -1623,17 +1640,17 @@ flush_some_child(
)
{
if (parent) {
toku_unpin_ftnode_off_client_thread(h, parent);
toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL;
}
//
// it is the responsibility of flush_some_child to unpin child
// it is the responsibility of ft_flush_some_child_with_xid to unpin child
//
if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
flush_some_child(h, child, fa);
ft_flush_some_child_with_xid(ft, child, fa, oldest_referenced_xid);
}
else {
toku_unpin_ftnode_off_client_thread(h, child);
toku_unpin_ftnode_off_client_thread(ft, child);
}
}
else if (child_re == RE_FISSIBLE) {
......@@ -1642,7 +1659,7 @@ flush_some_child(
// parent and child as it sees fit
//
paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
ft_split_child(h, parent, childnum, child, SPLIT_EVENLY, fa);
ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
}
else if (child_re == RE_FUSIBLE) {
//
......@@ -1650,13 +1667,20 @@ flush_some_child(
// parent and child as it sees fit
//
paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
fa->maybe_merge_child(fa, h, parent, childnum, child, fa->extra);
fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
}
else {
abort();
}
}
void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa) {
// Vanilla flush_some_child flushes from parent to child without
// providing a meaningful oldest_referenced_xid. No simple garbage
// collection is performed.
return ft_flush_some_child_with_xid(ft, parent, fa, TXNID_NONE);
}
static void
update_cleaner_status(
FTNODE node,
......@@ -1782,12 +1806,12 @@ toku_ftnode_cleaner_callback(
int childnum = find_heaviest_child(node);
update_cleaner_status(node, childnum);
// Either flush_some_child will unlock the node, or we do it here.
// Either toku_ft_flush_some_child will unlock the node, or we do it here.
if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
struct flusher_advice fa;
struct flush_status_update_extra fste;
ct_flusher_advice_init(&fa, &fste, h->h->nodesize);
flush_some_child(h, node, &fa);
toku_ft_flush_some_child(h, node, &fa);
} else {
toku_unpin_ftnode_off_client_thread(h, node);
}
......@@ -1798,6 +1822,7 @@ struct flusher_extra {
FT h;
FTNODE node;
NONLEAF_CHILDINFO bnc;
TXNID oldest_referenced_xid;
};
//
......@@ -1834,16 +1859,17 @@ static void flush_node_fun(void *fe_v)
toku_bnc_flush_to_child(
fe->h,
fe->bnc,
fe->node
fe->node,
fe->oldest_referenced_xid
);
destroy_nonleaf_childinfo(fe->bnc);
// after the flush has completed, now check to see if the node needs flushing
// If so, call flush_some_child on the node, and it is the responsibility
// of flush_some_child to unlock the node
// otherwise, we unlock the node here.
// If so, call ft_flush_some_child_with_xid on the node (because this flush intends to
// pass a meaningful oldest referenced xid for simple garbage collection), and it is the
// responsibility of the flush to unlock the node. otherwise, we unlock it here.
if (fe->node->height > 0 && toku_ft_nonleaf_is_gorged(fe->node, fe->h->h->nodesize)) {
flush_some_child(fe->h, fe->node, &fa);
ft_flush_some_child_with_xid(fe->h, fe->node, &fa, fe->oldest_referenced_xid);
}
else {
toku_unpin_ftnode_off_client_thread(fe->h,fe->node);
......@@ -1853,8 +1879,8 @@ static void flush_node_fun(void *fe_v)
// In this case, we were just passed a node with no
// bnc, which means we are tasked with flushing some
// buffer in the node.
// It is the responsibility of flush_some_child to unlock the node
flush_some_child(fe->h, fe->node, &fa);
// It is the responsibility of flush some child to unlock the node
ft_flush_some_child_with_xid(fe->h, fe->node, &fa, fe->oldest_referenced_xid);
}
remove_background_job_from_cf(fe->h->cf);
toku_free(fe);
......@@ -1864,12 +1890,14 @@ static void
place_node_and_bnc_on_background_thread(
FT h,
FTNODE node,
NONLEAF_CHILDINFO bnc)
NONLEAF_CHILDINFO bnc,
TXNID oldest_referenced_xid)
{
struct flusher_extra *XMALLOC(fe);
fe->h = h;
fe->node = node;
fe->bnc = bnc;
fe->oldest_referenced_xid = oldest_referenced_xid;
cachefile_kibbutz_enq(h->cf, flush_node_fun, fe);
}
......@@ -1886,9 +1914,9 @@ place_node_and_bnc_on_background_thread(
// child needs to be split/merged), then we place the parent on the background thread.
// The parent will be unlocked on the background thread
//
void
flush_node_on_background_thread(FT h, FTNODE parent)
void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
{
TXNID oldest_known_referenced_xid = parent->oldest_known_referenced_xid;
//
// first let's see if we can detach buffer on client thread
// and pick the child we want to flush to
......@@ -1903,9 +1931,9 @@ flush_node_on_background_thread(FT h, FTNODE parent)
int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
if (r != 0) {
// In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use flush_some_child, which checks to
// In the callback, we will use toku_ft_flush_some_child, which checks to
// see if we should blow away the old basement nodes.
place_node_and_bnc_on_background_thread(h, parent, NULL);
place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_known_referenced_xid);
}
else {
//
......@@ -1934,7 +1962,7 @@ flush_node_on_background_thread(FT h, FTNODE parent)
// so, because we know for sure the child is not
// reactive, we can unpin the parent
//
place_node_and_bnc_on_background_thread(h, child, bnc);
place_node_and_bnc_on_background_thread(h, child, bnc, oldest_known_referenced_xid);
toku_unpin_ftnode(h, parent);
}
else {
......@@ -1944,7 +1972,7 @@ flush_node_on_background_thread(FT h, FTNODE parent)
toku_unpin_ftnode(h, child);
// Again, we'll have the parent on the background thread, so
// we don't need to destroy the basement nodes yet.
place_node_and_bnc_on_background_thread(h, parent, NULL);
place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_known_referenced_xid);
}
}
}
......
......@@ -65,11 +65,11 @@ toku_flusher_thread_set_callback(
/**
* Puts a workitem on the flusher thread queue, scheduling the node to be
* flushed by flush_some_child.
* flushed by toku_ft_flush_some_child.
*/
void
flush_node_on_background_thread(
FT h,
toku_ft_flush_node_on_background_thread(
FT ft,
FTNODE parent
);
......
......@@ -141,7 +141,7 @@ hot_update_flusher_keys(FTNODE parent,
}
}
// Picks which child flush_some_child will use for flushing and
// Picks which child toku_ft_flush_some_child will use for flushing and
// recursion.
static int
hot_pick_child(FT h,
......@@ -308,7 +308,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
// This should recurse to the bottom of the tree and then
// return.
if (root->height > 0) {
flush_some_child(brt->ft, root, &advice);
toku_ft_flush_some_child(brt->ft, root, &advice);
} else {
// Since there are no children to flush, we should abort
// the HOT call.
......@@ -318,7 +318,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
// Set the highest pivot key seen here, since the parent may
// be unlocked and NULL'd later in our caller:
// flush_some_child().
// toku_ft_flush_some_child().
hot_set_highest_key(&flusher);
// This is where we determine if the traversal is finished or
......
......@@ -137,7 +137,7 @@ long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc);
long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc);
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp);
void toku_bnc_empty(NONLEAF_CHILDINFO bnc);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID oldest_referenced_xid);
bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull));
bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize);
......@@ -246,6 +246,17 @@ struct ftnode {
unsigned int totalchildkeylens;
DBT *childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Child 1's keys are > childkeys[0]. */
// What's the oldest referenced xid that this node knows about? The real oldest
// referenced xid might be younger, but this is our best estimate. We use it
// as a heuristic to transition provisional mvcc entries from provisional to
// committed (from implicity committed to really committed).
//
// A better heuristic would be the oldest live txnid, but we use this since it
// still works well most of the time, and its readily available on the inject
// code path.
TXNID oldest_known_referenced_xid;
// array of size n_children, consisting of ftnode partitions
// each one is associated with a child
// for internal nodes, the ith partition corresponds to the ith message buffer
......@@ -606,8 +617,6 @@ void toku_destroy_ftnode_internals(FTNODE node);
void toku_ftnode_free (FTNODE *node);
bool is_entire_node_in_memory(FTNODE node);
void toku_assert_entire_node_in_memory(FTNODE node);
// FIXME needs toku prefix
void bring_node_fully_into_memory(FTNODE node, FT h);
// append a child node to a parent node
void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey);
......@@ -1092,7 +1101,6 @@ toku_ft_leaf_apply_cmd (
FTNODE node,
int target_childnum,
FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone,
STAT64INFO stats_to_update
);
......@@ -1107,7 +1115,6 @@ toku_ft_node_put_cmd (
FT_MSG cmd,
bool is_fresh,
size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update
);
......
......@@ -16,7 +16,7 @@ to insert a message at the root
- capture the next msn of the root node and assign it to the message
- split the root if it needs to be split
- insert the message into the root buffer
- if the root is too full, then flush_some_child() of the root on a flusher thread
- if the root is too full, then toku_ft_flush_some_child() of the root on a flusher thread
flusher functions use an advice struct with provides some functions to
call that tell it what to do based on the context of the flush. see ft-flusher.h
......@@ -27,7 +27,7 @@ to flush some child, given a parent and some advice
- flush the buffer to the child
- if the child has stable reactivity and
advice->should_recursively_flush() is true, then
flush_some_child() of the child
toku_ft_flush_some_child() of the child
- otherwise split the child if it needs to be split
- otherwise maybe merge the child if it needs to be merged
......@@ -125,11 +125,13 @@ basement nodes, bulk fetch, and partial fetch:
#include "log-internal.h"
#include "sub_block.h"
#include "txn_manager.h"
#include "ule.h"
#include "leafentry.h"
#include "xids.h"
#include <toku_race_tools.h>
#include <portability/toku_atomic.h>
#include <util/mempool.h>
#include <util/partitioned_counter.h>
#include <util/rwlock.h>
......@@ -752,6 +754,7 @@ void toku_ftnode_clone_callback(
rebalance_ftnode_leaf(node, ft->h->basementnodesize);
}
cloned_node->oldest_known_referenced_xid = node->oldest_known_referenced_xid;
cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk;
cloned_node->flags = node->flags;
cloned_node->thisnodename = node->thisnodename;
......@@ -1374,6 +1377,7 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
n->childkeys = 0;
n->bp = 0;
n->n_children = num_children;
n->oldest_known_referenced_xid = TXNID_NONE;
if (num_children > 0) {
XMALLOC_N(num_children-1, n->childkeys);
......@@ -1548,13 +1552,10 @@ toku_ft_bn_apply_cmd_once (
if (le)
oldsize = leafentry_memsize(le);
// apply_msg_to_leafentry() may call mempool_malloc_from_omt() to allocate more space.
// toku_le_apply_msg() may call mempool_malloc_from_omt() to allocate more space.
// That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
// no longer in use. We'll have to release the old mempool later.
{
int r = apply_msg_to_leafentry(cmd, le, oldest_referenced_xid, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
invariant(r==0);
}
toku_le_apply_msg(cmd, le, oldest_referenced_xid, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
if (new_le) {
paranoid_invariant(newsize == leafentry_disksize(new_le));
......@@ -1734,7 +1735,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc,
BASEMENTNODE bn,
FT_MSG cmd,
TXNID oldest_referenced_xid,
TXNID oldest_known_referenced_xid,
uint64_t *workdone,
STAT64INFO stats_to_update
)
......@@ -1776,7 +1777,7 @@ toku_ft_bn_apply_cmd (
assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
}
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
......@@ -1804,7 +1805,7 @@ toku_ft_bn_apply_cmd (
if (r == DB_NOTFOUND) break;
assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
break;
}
......@@ -1820,7 +1821,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size);
......@@ -1846,7 +1847,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) {
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size);
......@@ -1867,10 +1868,10 @@ toku_ft_bn_apply_cmd (
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx);
if (r==DB_NOTFOUND) {
r = do_update(update_fun, desc, bn, cmd, idx, NULL, oldest_referenced_xid, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, NULL, oldest_known_referenced_xid, workdone, stats_to_update);
} else if (r==0) {
CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
} // otherwise, a worse error, just return it
break;
}
......@@ -1882,7 +1883,7 @@ toku_ft_bn_apply_cmd (
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
assert_zero(r);
if (num_leafentries_before == toku_omt_size(bn->buffer)) {
......@@ -2147,13 +2148,18 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid,
STAT64INFO_S * delta)
{
paranoid_invariant(leaf_entry);
// There is no point in running GC if there is only one committed
// leaf entry.
if (leaf_entry->type != LE_MVCC || leaf_entry->u.mvcc.num_cxrs <= 1) { // MAKE ACCESSOR
// Don't run garbage collection on non-mvcc leaf entries.
if (leaf_entry->type != LE_MVCC) {
goto exit;
}
// Don't run garbage collection if this leafentry decides it's not worth it.
if (!toku_le_worth_running_garbage_collection(leaf_entry, oldest_known_referenced_xid)) {
goto exit;
}
......@@ -2172,7 +2178,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
// Cache the size of the leaf entry.
oldsize = leafentry_memsize(leaf_entry);
garbage_collect_leafentry(leaf_entry,
toku_le_garbage_collect(leaf_entry,
&new_leaf_entry,
&newsize,
&bn->buffer,
......@@ -2180,7 +2186,8 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
&maybe_free,
snapshot_xids,
referenced_xids,
live_root_txns);
live_root_txns,
oldest_known_referenced_xid);
// These will represent the number of bytes and rows changed as
// part of the garbage collection.
......@@ -2225,6 +2232,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid,
STAT64INFO_S * delta)
{
int r = 0;
......@@ -2236,7 +2244,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
r = toku_omt_fetch(bn->buffer, index, &storedatav);
assert_zero(r);
CAST_FROM_VOIDP(leaf_entry, storedatav);
ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, delta);
ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, oldest_known_referenced_xid, delta);
// Check if the leaf entry was deleted or not.
if (num_leafentries_before == toku_omt_size(bn->buffer)) {
++index;
......@@ -2244,13 +2252,14 @@ basement_node_gc_all_les(BASEMENTNODE bn,
}
}
// Garbage collect all leaf entires.
// Garbage collect all leaf entires in all basement nodes.
static void
ft_leaf_gc_all_les(FTNODE node,
FT h,
FT ft,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns)
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid)
{
toku_assert_entire_node_in_memory(node);
paranoid_invariant_zero(node->height);
......@@ -2261,15 +2270,16 @@ ft_leaf_gc_all_les(FTNODE node,
STAT64INFO_S delta;
delta.numrows = 0;
delta.numbytes = 0;
basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, &delta);
toku_ft_update_stats(&h->in_memory_stats, delta);
basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, oldest_known_referenced_xid, &delta);
toku_ft_update_stats(&ft->in_memory_stats, delta);
}
}
void toku_bnc_flush_to_child(
FT h,
FT ft,
NONLEAF_CHILDINFO bnc,
FTNODE child
FTNODE child,
TXNID oldest_known_referenced_xid
)
{
paranoid_invariant(bnc);
......@@ -2293,25 +2303,25 @@ void toku_bnc_flush_to_child(
flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE;
}
toku_ft_node_put_cmd(
h->compare_fun,
h->update_fun,
&h->cmp_descriptor,
ft->compare_fun,
ft->update_fun,
&ft->cmp_descriptor,
child,
-1,
&ftcmd,
is_fresh,
flow_deltas,
TXNID_NONE,
&stats_delta
);
remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE;
}));
invariant(remaining_memsize == 0);
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&h->in_memory_stats, stats_delta);
toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
}
// Run garbage collection, if we are a leaf entry.
TOKULOGGER logger = toku_cachefile_logger(h->cf);
TOKULOGGER logger = toku_cachefile_logger(ft->cf);
if (child->height == 0 && logger) {
xid_omt_t snapshot_txnids;
rx_omt_t referenced_xids;
......@@ -2326,8 +2336,15 @@ void toku_bnc_flush_to_child(
STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there
STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
// Perform the garbage collection.
ft_leaf_gc_all_les(child, h, snapshot_txnids, referenced_xids, live_root_txns);
// Perform garbage collection. Provide a full snapshot of the transaction
// system plus the oldest known referenced xid that could have had messages
// applied to this leaf.
//
// Using the oldest xid in either the referenced_xids or live_root_txns
// snapshots is not sufficient, because there could be something older that is neither
// live nor referenced, but instead aborted somewhere above us as a message in the tree.
ft_leaf_gc_all_les(child, ft, snapshot_txnids, referenced_xids, live_root_txns, oldest_known_referenced_xid);
// Free the OMT's we used for garbage collecting.
snapshot_txnids.destroy();
......@@ -2342,22 +2359,6 @@ bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) {
return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold;
}
void bring_node_fully_into_memory(FTNODE node, FT h)
{
if (!is_entire_node_in_memory(node)) {
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
toku_cachetable_pf_pinned_pair(
node,
toku_ftnode_pf_callback,
&bfe,
h->cf,
node->thisnodename,
toku_cachetable_hash(h->cf, node->thisnodename)
);
}
}
void
toku_ft_node_put_cmd (
ft_compare_func compare_fun,
......@@ -2368,7 +2369,6 @@ toku_ft_node_put_cmd (
FT_MSG cmd,
bool is_fresh,
size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update
)
// Effect: Push CMD into the subtree rooted at NODE.
......@@ -2385,7 +2385,7 @@ toku_ft_node_put_cmd (
// and instead defer to these functions
//
if (node->height==0) {
toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, oldest_referenced_xid, nullptr, stats_to_update);
toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, nullptr, stats_to_update);
} else {
ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
}
......@@ -2405,7 +2405,6 @@ void toku_ft_leaf_apply_cmd(
FTNODE node,
int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone,
STAT64INFO stats_to_update
)
......@@ -2438,6 +2437,10 @@ void toku_ft_leaf_apply_cmd(
node->max_msn_applied_to_node_on_disk = cmd_msn;
}
// Pass the oldest possible live xid value to each basementnode
// when we apply messages to them.
TXNID oldest_known_referenced_xid = node->oldest_known_referenced_xid;
if (ft_msg_applies_once(cmd)) {
unsigned int childnum = (target_childnum >= 0
? target_childnum
......@@ -2450,7 +2453,7 @@ void toku_ft_leaf_apply_cmd(
desc,
bn,
cmd,
oldest_referenced_xid,
oldest_known_referenced_xid,
workdone,
stats_to_update);
} else {
......@@ -2466,7 +2469,7 @@ void toku_ft_leaf_apply_cmd(
desc,
BLB(node, childnum),
cmd,
oldest_referenced_xid,
oldest_known_referenced_xid,
workdone,
stats_to_update);
} else {
......@@ -2495,6 +2498,14 @@ static void inject_message_in_locked_node(
// otherwise.
invariant(toku_ctpair_is_write_locked(node->ct_pair));
toku_assert_entire_node_in_memory(node);
// Update the oldest known referenced xid for this node if it is younger
// than the one currently known. Otherwise, it's better to keep the heurstic
// we have and ignore this one.
if (oldest_referenced_xid >= node->oldest_known_referenced_xid) {
node->oldest_known_referenced_xid = oldest_referenced_xid;
}
// Get the MSN from the header. Now that we have a write lock on the
// node we're injecting into, we know no other thread will get an MSN
// after us and get that message into our subtree before us.
......@@ -2510,7 +2521,6 @@ static void inject_message_in_locked_node(
cmd,
true,
flow_deltas,
oldest_referenced_xid,
&stats_delta
);
if (stats_delta.numbytes || stats_delta.numrows) {
......@@ -2538,10 +2548,10 @@ static void inject_message_in_locked_node(
// verify that msn of latest message was captured in root node
paranoid_invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
// if we call flush_some_child, then that function unpins the root
// if we call toku_ft_flush_some_child, then that function unpins the root
// otherwise, we unpin ourselves
if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) {
flush_node_on_background_thread(ft, node);
toku_ft_flush_node_on_background_thread(ft, node);
}
else {
toku_unpin_ftnode(ft, node);
......
......@@ -379,6 +379,7 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
case FT_LAYOUT_VERSION_22:
case FT_LAYOUT_VERSION_21:
size += sizeof(MSN); // max_msn_in_ft
case FT_LAYOUT_VERSION_20:
......
......@@ -146,7 +146,6 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
&cmd,
true,
zero_flow_deltas,
TXNID_NONE,
NULL
);
......
......@@ -912,3 +912,64 @@ void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp
void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
ft_handle->ft->blackhole = true;
}
struct garbage_helper_extra {
FT ft;
size_t total_space;
size_t used_space;
};
static int
garbage_leafentry_helper(OMTVALUE v, uint32_t UU(idx), void *extra) {
struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
LEAFENTRY CAST_FROM_VOIDP(le, v);
info->total_space += leafentry_disksize(le);
info->used_space += LE_CLEAN_MEMSIZE(le_latest_keylen(le), le_latest_vallen(le));
return 0;
}
static int
garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
FTNODE node;
FTNODE_DISK_DATA ndd;
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->ft);
int fd = toku_cachefile_get_fd(info->ft->cf);
int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
if (r != 0) {
goto no_node;
}
if (node->height > 0) {
goto exit;
}
for (int i = 0; i < node->n_children; ++i) {
BASEMENTNODE bn = BLB(node, i);
r = toku_omt_iterate(bn->buffer, garbage_leafentry_helper, info);
if (r != 0) {
goto exit;
}
}
exit:
toku_ftnode_free(&node);
toku_free(ndd);
no_node:
return r;
}
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
// Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
// Note: It is ok to call this function concurrently with reads/writes to the table since
// the blocktable lock is held, which means no new allocations or file writes can occur.
invariant_notnull(total_space);
invariant_notnull(used_space);
struct garbage_helper_extra info = {
.ft = ft,
.total_space = 0,
.used_space = 0
};
toku_blocktable_iterate(ft->blocktable, TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
*total_space = info.total_space;
*used_space = info.used_space;
}
......@@ -104,4 +104,8 @@ void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p);
// mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle);
// Effect: Calculates the total space and used space for a FT's leaf data.
// The difference between the two is MVCC garbage.
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space);
#endif
......@@ -30,6 +30,7 @@ enum ft_layout_version_e {
// last_xid to shutdown
FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header,
// Removed log suppression logentry
FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -367,6 +367,7 @@ serialize_ftnode_info_size(FTNODE node)
retval += 4; // nodesize
retval += 4; // flags
retval += 4; // height;
retval += 8; // oldest_known_referenced_xid
retval += node->totalchildkeylens; // total length of pivots
retval += (node->n_children-1)*4; // encode length of each pivot
if (node->height > 0) {
......@@ -390,6 +391,8 @@ static void serialize_ftnode_info(FTNODE node,
wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
wbuf_nocrc_uint(&wb, node->flags);
wbuf_nocrc_int (&wb, node->height);
wbuf_TXNID(&wb, node->oldest_known_referenced_xid);
// pivot information
for (int i = 0; i < node->n_children-1; i++) {
wbuf_nocrc_bytes(&wb, node->childkeys[i].data, node->childkeys[i].size);
......@@ -1264,6 +1267,9 @@ deserialize_ftnode_info(
if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
(void) rbuf_int(&rb); // optimized_for_upgrade
}
if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
rbuf_TXNID(&rb, &node->oldest_known_referenced_xid);
}
// now create the basement nodes or childinfos, depending on whether this is a
// leaf node or internal node
......@@ -1505,6 +1511,17 @@ check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_blo
return r;
}
static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
// Effect: Allocate an FTNODE and fill in the values that are not read from
FTNODE XMALLOC(node);
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
node->bp = nullptr;
node->oldest_known_referenced_xid = TXNID_NONE;
return node;
}
static int
deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
FTNODE_DISK_DATA* ndd,
......@@ -1518,13 +1535,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero.
{
int r = 0;
FTNODE XMALLOC(node);
// fill in values that are known and not stored in rb
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
node->bp = NULL; // fill this in so we can free without a leak.
FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
if (rb->size < 24) {
// TODO: What error do we return here?
......@@ -2171,15 +2182,10 @@ deserialize_ftnode_from_rbuf(
// Effect: deserializes a ftnode that is in rb (with pointer of rb just past the magic) into a FTNODE.
{
int r = 0;
FTNODE XMALLOC(node);
struct sub_block sb_node_info;
// fill in values that are known and not stored in rb
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
// now start reading from rbuf
// first thing we do is read the header information
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
......
......@@ -280,61 +280,14 @@ dump_nodesizes(int f, FT h) {
printf("leafsizes: %" PRIu64 "\n", info.leafsizes);
}
typedef struct {
int f;
FT h;
size_t total_space;
size_t used_space;
} garbage_help_extra;
static int
garbage_leafentry_helper(OMTVALUE v, uint32_t UU(idx), void *extra) {
garbage_help_extra *CAST_FROM_VOIDP(info, extra);
LEAFENTRY CAST_FROM_VOIDP(le, v);
info->total_space += leafentry_disksize(le);
info->used_space += LE_CLEAN_MEMSIZE(le_latest_keylen(le), le_latest_vallen(le));
return 0;
}
static int
garbage_helper(BLOCKNUM b, int64_t UU(size), int64_t UU(address), void *extra) {
garbage_help_extra *CAST_FROM_VOIDP(info, extra);
FTNODE n;
FTNODE_DISK_DATA ndd = NULL;
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->h);
int r = toku_deserialize_ftnode_from(info->f, b, 0, &n, &ndd, &bfe);
if (r != 0) {
goto no_node;
}
if (n->height > 0) {
goto exit;
}
for (int i = 0; i < n->n_children; ++i) {
BASEMENTNODE bn = BLB(n, i);
r = toku_omt_iterate(bn->buffer, garbage_leafentry_helper, info);
if (r != 0) {
goto exit;
}
}
exit:
toku_ftnode_free(&n);
toku_free(ndd);
no_node:
return r;
}
static void
dump_garbage_stats(int f, FT h) {
garbage_help_extra info;
memset(&info, 0, sizeof info);
info.f = f;
info.h = h;
toku_blocktable_iterate(h->blocktable, TRANSLATION_CHECKPOINTED,
garbage_helper, &info, true, true);
printf("total_size: %zu\n", info.total_space);
printf("used_size: %zu\n", info.used_space);
dump_garbage_stats(int f, FT ft) {
invariant(f == toku_cachefile_get_fd(ft->cf));
uint64_t total_space = 0;
uint64_t used_space = 0;
toku_ft_get_garbage(ft, &total_space, &used_space);
printf("total_size: %zu\n", total_space);
printf("used_size: %zu\n", used_space);
}
static uint32_t
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef LEAFENTRY_H
#define LEAFENTRY_H
#ifndef TOKU_LEAFENTRY_H
#define TOKU_LEAFENTRY_H
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......@@ -10,11 +12,12 @@
#include <util/mempool.h>
#include "txn_manager.h"
#include "rbuf.h"
#include "x1764.h"
#include "omt.h"
#if 0
/*
Memory format of packed leaf entry
CONSTANTS:
num_uxrs
......@@ -35,10 +38,7 @@
Run-time-constants
key[]
val[]
#endif
#if TOKU_WINDOWS
#pragma pack(push, 1)
#endif
*/
//
// enum of possible values for LEAFENTRY->type field
......@@ -94,9 +94,6 @@ struct __attribute__ ((__packed__)) leafentry {
};
static_assert(10 == sizeof(leafentry), "leafentry size is wrong");
static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong place");
#if TOKU_WINDOWS
#pragma pack(pop)
#endif
#define LE_CLEAN_MEMSIZE(_keylen, _vallen) \
(sizeof(((LEAFENTRY)NULL)->type) /* type */ \
......@@ -123,6 +120,10 @@ static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong plac
typedef struct leafentry *LEAFENTRY;
typedef struct leafentry_13 *LEAFENTRY_13;
//
// TODO: consistency among names is very poor.
//
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
......@@ -144,19 +145,6 @@ void* le_key_and_len (LEAFENTRY le, uint32_t *len);
uint64_t le_outermost_uncommitted_xid (LEAFENTRY le);
void
le_committed_mvcc(uint8_t *key, uint32_t keylen,
uint8_t *val, uint32_t vallen,
TXNID xid,
void (*bytes)(struct dbuf *dbuf, const void *bytes, int nbytes),
struct dbuf *d);
void
le_clean(uint8_t *key, uint32_t keylen,
uint8_t *val, uint32_t vallen,
void (*bytes)(struct dbuf *dbuf, const void *bytes, int nbytes),
struct dbuf *d);
//Callback contract:
// Function checks to see if id is accepted by context.
// Returns:
......@@ -169,9 +157,9 @@ int le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_empty, TOKUT
int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context);
size_t
leafentry_disksize_13(LEAFENTRY_13 le);
int
toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize,
......@@ -179,7 +167,28 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored
OMT *omtp,
struct mempool *mp);
void toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
int64_t * numbytes_delta_p);
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_known_referenced_xid);
#endif
void toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid);
#endif /* TOKU_LEAFENTRY_H */
......@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, TXNID_NONE, nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, nullptr, nullptr);
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
}
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, TXNID_NONE, nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, nullptr, nullptr);
// message should be rejected for duplicate msn, row should still have original val
{
......@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
msn = next_dummymsn();
brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, TXNID_NONE, nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, nullptr, nullptr);
// message should be accepted, val should have new value
{
......@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }};
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, TXNID_NONE, nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, nullptr, nullptr);
// message should be rejected, val should still have value in pair2
{
......
......@@ -123,8 +123,7 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS
msg.u.id.val = valdbt;
size_t memsize;
int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_le_apply_msg(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn;
......@@ -164,8 +163,7 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE
msg.u.id.val = valdbt;
size_t memsize;
int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_le_apply_msg(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn;
......@@ -274,7 +272,7 @@ flush_to_internal(FT_HANDLE t) {
set_BNC(child, 0, child_bnc);
BP_STATE(child, 0) = PT_AVAIL;
toku_bnc_flush_to_child(t->ft, parent_bnc, child);
toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
int parent_messages_present[num_parent_messages];
int child_messages_present[num_child_messages];
......@@ -409,7 +407,7 @@ flush_to_internal_multiple(FT_HANDLE t) {
}
}
toku_bnc_flush_to_child(t->ft, parent_bnc, child);
toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
int total_messages = 0;
for (i = 0; i < 8; ++i) {
......@@ -580,7 +578,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -601,7 +599,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
}
if (use_flush) {
toku_bnc_flush_to_child(t->ft, parent_bnc, child);
toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
destroy_nonleaf_childinfo(parent_bnc);
} else {
FTNODE XMALLOC(parentnode);
......@@ -803,7 +801,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -995,8 +993,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -1010,7 +1008,7 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
}
}
toku_bnc_flush_to_child(t->ft, parent_bnc, child1);
toku_bnc_flush_to_child(t->ft, parent_bnc, child1, TXNID_NONE);
FTNODE XMALLOC(parentnode);
BLOCKNUM parentblocknum = { 17 };
......
......@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child
// callback functions for toku_ft_flush_some_child
static bool
dont_destroy_bn(void* UU(extra))
{
......@@ -160,7 +160,7 @@ doit (bool after_child_pin) {
assert(toku_bnc_nbytesinbuf(BNC(node, 0)) > 0);
// do the flush
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is flushed
......
......@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child
// callback functions for toku_ft_flush_some_child
static bool
dont_destroy_bn(void* UU(extra))
{
......@@ -177,7 +177,7 @@ doit (int state) {
assert(node->n_children == 2);
// do the flush
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has merged
......
......@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child
// callback functions for toku_ft_flush_some_child
static bool
dont_destroy_bn(void* UU(extra))
{
......@@ -197,7 +197,7 @@ doit (int state) {
assert(node->n_children == 2);
// do the flush
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has rebalanced
......
......@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child
// callback functions for toku_ft_flush_some_child
static bool
dont_destroy_bn(void* UU(extra))
{
......@@ -173,7 +173,7 @@ doit (bool after_split) {
assert(node->n_children == 1);
// do the flush
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has split
......
......@@ -396,14 +396,13 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
size_t result_memsize;
int64_t ignoreme;
r = apply_msg_to_leafentry(msg,
toku_le_apply_msg(msg,
le_initial,
TXNID_NONE,
&result_memsize,
&le_result,
NULL,
NULL, NULL, &ignoreme);
CKERR(r);
if (le_result)
le_verify_accessors(le_result, ule_expected, result_memsize);
......@@ -702,6 +701,111 @@ test_le_apply_messages(void) {
test_le_committed_apply();
}
static bool ule_worth_running_garbage_collection(ULE ule, TXNID oldest_known_referenced_xid) {
LEAFENTRY le;
size_t initial_memsize;
int r = le_pack(ule, &initial_memsize, &le, nullptr, nullptr, nullptr); CKERR(r);
invariant_notnull(le);
bool worth_running = toku_le_worth_running_garbage_collection(le, oldest_known_referenced_xid);
toku_free(le);
return worth_running;
}
static void test_le_garbage_collection_birdie(void) {
DBT key;
DBT val;
ULE_S ule_initial;
ULE_S ule_expected;
uint8_t keybuf[MAX_SIZE];
uint32_t keysize=8;
uint8_t valbuf[MAX_SIZE];
uint32_t valsize=8;
ule_initial.uxrs = ule_initial.uxrs_static;
ule_expected.uxrs = ule_expected.uxrs_static;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
bool do_garbage_collect;
fillrandom(keybuf, keysize);
fillrandom(valbuf, valsize);
//
// Test garbage collection "worth-doing" heurstic
//
// Garbage collection should not be worth doing on a clean leafentry.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 0;
ule_initial.uxrs[0].xid = TXNID_NONE;
ule_initial.uxrs[0].type = XR_INSERT;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is worth doing when there is more than one committed entry
ule_initial.num_cuxrs = 2;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is not worth doing when there is one of each, when the
// provisional entry is newer than the oldest known referenced xid
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 1500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
ule_initial.uxrs[1].xid = 200;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is not worth doing when there is only one committed entry,
// multiple provisional entries, but the outermost entry is newer.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 3;
ule_initial.uxrs[1].xid = 201;
ule_initial.uxrs[2].xid = 206;
ule_initial.uxrs[3].xid = 215;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is worth doing when the above scenario has an outermost entry
// older than the oldest known, even if its children seem newer.
// this children must have commit because the parent is not live.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 3;
ule_initial.uxrs[1].xid = 190;
ule_initial.uxrs[2].xid = 206;
ule_initial.uxrs[3].xid = 215;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is worth doing when there is more than one committed entry,
// even if a provisional entry exists that is newer than the
// oldest known refrenced xid
ule_initial.num_cuxrs = 2;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 499;
ule_initial.uxrs[2].xid = 500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is worth doing when there is one of each, and the provisional
// entry is older than the oldest known referenced xid
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 199;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is definately worth doing when the above case is true
// and there is more than one provisional entry.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 2;
ule_initial.uxrs[1].xid = 150;
ule_initial.uxrs[2].xid = 175;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
}
static void test_le_optimize(void) {
FT_MSG_S msg;
......@@ -900,6 +1004,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
test_le_pack();
test_le_apply_messages();
test_le_optimize();
test_le_garbage_collection_birdie();
destroy_xids();
return 0;
}
......
......@@ -162,7 +162,7 @@ doit (void) {
assert_zero(r);
// now with setup done, start the test
// test that if flush_some_child properly honors
// test that if toku_ft_flush_some_child properly honors
// what we say and flushes the child we pick
FTNODE node = NULL;
toku_pin_node_with_min_bfe(&node, node_internal, t);
......@@ -185,7 +185,7 @@ doit (void) {
);
curr_child_to_flush = 0;
num_flushes_called = 0;
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t);
......@@ -203,7 +203,7 @@ doit (void) {
assert(!node->dirty);
curr_child_to_flush = 1;
num_flushes_called = 0;
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t);
......@@ -221,7 +221,7 @@ doit (void) {
assert(!node->dirty);
curr_child_to_flush = 0;
num_flushes_called = 0;
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t);
......@@ -250,7 +250,7 @@ doit (void) {
toku_assert_entire_node_in_memory(node); // entire root is in memory
curr_child_to_flush = i;
num_flushes_called = 0;
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 2);
toku_pin_node_with_min_bfe(&node, node_internal, t);
......@@ -296,7 +296,7 @@ doit (void) {
toku_assert_entire_node_in_memory(node); // entire root is in memory
curr_child_to_flush = 0;
num_flushes_called = 0;
flush_some_child(t->ft, node, &fa);
toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 2);
r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
......
......@@ -88,15 +88,13 @@ void toku_ule_free(ULEHANDLE ule_p) {
toku_free(ule_p);
}
///////////////////////////////////////////////////////////////////////////////////
//
// Question: Can any software outside this file modify or read a leafentry?
// If so, is it worthwhile to put it all here?
//
// There are two entries, one each for modification and query:
// apply_msg_to_leafentry() performs all inserts/deletes/aborts
// toku_le_apply_msg() performs all inserts/deletes/aborts
//
//
//
......@@ -122,6 +120,7 @@ static void msg_init_empty_ule(ULE ule, FT_MSG msg);
static void msg_modify_ule(ULE ule, FT_MSG msg);
static void ule_init_empty_ule(ULE ule, uint32_t keylen, void * keyp);
static void ule_do_implicit_promotions(ULE ule, XIDS xids);
static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid);
static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index);
static void ule_promote_provisional_innermost_to_committed(ULE ule);
static void ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp);
......@@ -165,8 +164,6 @@ le_malloc(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free)
return rval;
}
/////////////////////////////////////////////////////////////////////
// Garbage collection related functions
//
......@@ -215,7 +212,7 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c
// so we get rid of them.
//
static void
simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid) {
ule_simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid) {
uint32_t curr_index = 0;
uint32_t num_entries;
if (ule->num_cuxrs == 1 || oldest_referenced_xid == TXNID_NONE) {
......@@ -244,7 +241,7 @@ done:;
}
static void
garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
if (ule->num_cuxrs == 1) goto done;
// will fail if too many num_cuxrs
bool necessary_static[MAX_TRANSACTION_RECORDS];
......@@ -340,7 +337,6 @@ garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &refe
done:;
}
/////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes
......@@ -353,8 +349,8 @@ done:;
// If the leafentry is destroyed it sets *new_leafentry_p to NULL.
// Otehrwise the new_leafentry_p points at the new leaf entry.
// As of October 2011, this function always returns 0.
int
apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
void
toku_le_apply_msg(FT_MSG msg, // message to apply to leafentry
LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize,
......@@ -364,31 +360,49 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
void **maybe_free,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
ULE_S ule;
int rval;
int64_t oldnumbytes = 0;
int64_t newnumbytes = 0;
if (old_leafentry == NULL) // if leafentry does not exist ...
msg_init_empty_ule(&ule, msg); // ... create empty unpacked leaf entry
else {
if (old_leafentry == NULL) {
msg_init_empty_ule(&ule, msg);
} else {
le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
oldnumbytes = ule_get_innermost_numbytes(&ule);
}
msg_modify_ule(&ule, msg); // modify unpacked leafentry
simple_garbage_collection(&ule, oldest_referenced_xid);
rval = le_pack(&ule, // create packed leafentry
ule_simple_garbage_collection(&ule, oldest_referenced_xid);
int rval = le_pack(&ule, // create packed leafentry
new_leafentry_memorysize,
new_leafentry_p,
omtp,
mp,
maybe_free);
if (new_leafentry_p)
invariant_zero(rval);
if (new_leafentry_p) {
newnumbytes = ule_get_innermost_numbytes(&ule);
}
*numbytes_delta_p = newnumbytes - oldnumbytes;
ule_cleanup(&ule);
return rval;
}
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_known_referenced_xid) {
// Effect: Quickly determines if it's worth trying to run garbage collection on a leafentry
// Return: True if it makes sense to try garbage collection, false otherwise.
// Rationale: Garbage collection is likely to clean up under two circumstances:
// 1.) There are multiple committed entries. Some may never be read by new txns.
// 2.) There is only one committed entry, but the outermost provisional entry
// is older than the oldest known referenced xid, so it must have commited.
// Therefor we can promote it to committed and get rid of the old commited entry.
if (le->type != LE_MVCC) {
return false;
}
if (le->u.mvcc.num_cxrs > 1) {
return true;
} else {
paranoid_invariant(le->u.mvcc.num_cxrs == 1);
}
return le->u.mvcc.num_pxrs > 0 && le_outermost_uncommitted_xid(le) < oldest_known_referenced_xid;
}
// Garbage collect one leaf entry, using the given OMT's.
// Parameters:
......@@ -408,8 +422,8 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
// -- referenced_xids : list of in memory active transactions.
// NOTE: it is not a good idea to garbage collect a leaf
// entry with only one committed value.
int
garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
void
toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
OMT *omtp,
......@@ -417,12 +431,22 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
void **maybe_free,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns) {
int r = 0;
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid) {
ULE_S ule;
le_unpack(&ule, old_leaf_entry);
garbage_collection(&ule, snapshot_xids, referenced_xids, live_root_txns);
r = le_pack(&ule,
// Before running garbage collection, try to promote the outermost provisional
// entries to committed if its xid is older than the oldest possible live xid.
//
// The oldest known refeferenced xid is a lower bound on the oldest possible
// live xid, so we use that. It's usually close enough to get rid of most
// garbage in leafentries.
TXNID oldest_possible_live_xid = oldest_known_referenced_xid;
ule_try_promote_provisional_outermost(&ule, oldest_possible_live_xid);
ule_garbage_collect(&ule, snapshot_xids, referenced_xids, live_root_txns);
int r = le_pack(&ule,
new_leaf_entry_memory_size,
new_leaf_entry,
omtp,
......@@ -430,7 +454,6 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
maybe_free);
assert(r == 0);
ule_cleanup(&ule);
return r;
}
/////////////////////////////////////////////////////////////////////////////////
......@@ -1529,6 +1552,15 @@ ule_promote_provisional_innermost_to_committed(ULE ule) {
}
}
static void
ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) {
// Effect: If there is a provisional record whose outermost xid is older than
// the oldest known referenced_xid, promote it to committed.
if (ule->num_puxrs > 0 && ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) {
ule_promote_provisional_innermost_to_committed(ule);
}
}
// Purpose is to promote the value (and type) of the innermost transaction
// record to the uxr at the specified index (keeping the txnid of the uxr at
// specified index.)
......
......@@ -45,31 +45,4 @@ TXNID uxr_get_txnid(UXRHANDLE uxr);
//1 does much slower debugging
#define GARBAGE_COLLECTION_DEBUG 0
void fast_msg_to_leafentry(
FT_MSG msg, // message to apply to leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p) ;
int apply_msg_to_leafentry(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
int64_t * numbytes_delta_p);
int garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns);
#endif // TOKU_ULE_H
......@@ -740,6 +740,9 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
} else {
rand_key_i[0] = arg->thread_idx;
}
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment