Commit a0eeedd5 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5700, merge to main

git-svn-id: file:///svn/toku/tokudb@50517 c7de825b-a66e-492c-adef-691d508d4ae1
parent bb07e095
...@@ -913,7 +913,7 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra) ...@@ -913,7 +913,7 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra)
__attribute__((__warn_unused_result__)); __attribute__((__warn_unused_result__));
// toku_ft_root_put_cmd() accepts non-constant cmd because this is where we set the msn // toku_ft_root_put_cmd() accepts non-constant cmd because this is where we set the msn
void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd); void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd, TXNID oldest_referenced_xid);
void *mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free); void *mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free);
// Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items // Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items
...@@ -1073,6 +1073,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1073,6 +1073,7 @@ toku_ft_bn_apply_cmd_once (
const FT_MSG cmd, const FT_MSG cmd,
uint32_t idx, uint32_t idx,
LEAFENTRY le, LEAFENTRY le,
TXNID oldest_referenced_xid,
uint64_t *workdonep, uint64_t *workdonep,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1084,6 +1085,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1084,6 +1085,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc, DESCRIPTOR desc,
BASEMENTNODE bn, BASEMENTNODE bn,
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1096,6 +1098,7 @@ toku_ft_leaf_apply_cmd ( ...@@ -1096,6 +1098,7 @@ toku_ft_leaf_apply_cmd (
FTNODE node, FTNODE node,
int target_childnum, int target_childnum,
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1110,6 +1113,7 @@ toku_ft_node_put_cmd ( ...@@ -1110,6 +1113,7 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
size_t flow_deltas[], size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
......
...@@ -1522,6 +1522,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1522,6 +1522,7 @@ toku_ft_bn_apply_cmd_once (
const FT_MSG cmd, const FT_MSG cmd,
uint32_t idx, uint32_t idx,
LEAFENTRY le, LEAFENTRY le,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -1543,7 +1544,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1543,7 +1544,7 @@ toku_ft_bn_apply_cmd_once (
// That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
// no longer in use. We'll have to release the old mempool later. // no longer in use. We'll have to release the old mempool later.
{ {
int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta); int r = apply_msg_to_leafentry(cmd, le, oldest_referenced_xid, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
invariant(r==0); invariant(r==0);
} }
...@@ -1614,6 +1615,7 @@ struct setval_extra_s { ...@@ -1614,6 +1615,7 @@ struct setval_extra_s {
const DBT *key; const DBT *key;
uint32_t idx; uint32_t idx;
LEAFENTRY le; LEAFENTRY le;
TXNID oldest_referenced_xid;
uint64_t * workdone; // set by toku_ft_bn_apply_cmd_once() uint64_t * workdone; // set by toku_ft_bn_apply_cmd_once()
STAT64INFO stats_to_update; STAT64INFO stats_to_update;
}; };
...@@ -1646,6 +1648,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { ...@@ -1646,6 +1648,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
} }
toku_ft_bn_apply_cmd_once(svextra->bn, &msg, toku_ft_bn_apply_cmd_once(svextra->bn, &msg,
svextra->idx, svextra->le, svextra->idx, svextra->le,
svextra->oldest_referenced_xid,
svextra->workdone, svextra->stats_to_update); svextra->workdone, svextra->stats_to_update);
svextra->setval_r = 0; svextra->setval_r = 0;
} }
...@@ -1657,6 +1660,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { ...@@ -1657,6 +1660,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
// the original msn seems cleaner and it preserves accountability at a lower layer. // the original msn seems cleaner and it preserves accountability at a lower layer.
static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn, FT_MSG cmd, uint32_t idx, static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn, FT_MSG cmd, uint32_t idx,
LEAFENTRY le, LEAFENTRY le,
TXNID oldest_referenced_xid,
uint64_t * workdone, uint64_t * workdone,
STAT64INFO stats_to_update) { STAT64INFO stats_to_update) {
LEAFENTRY le_for_update; LEAFENTRY le_for_update;
...@@ -1700,7 +1704,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn ...@@ -1700,7 +1704,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
} }
struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids, struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids,
keyp, idx, le_for_update, workdone, stats_to_update}; keyp, idx, le_for_update, oldest_referenced_xid, workdone, stats_to_update};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun() // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc); FAKE_DB(db, desc);
int r = update_fun( int r = update_fun(
...@@ -1723,6 +1727,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1723,6 +1727,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc, DESCRIPTOR desc,
BASEMENTNODE bn, BASEMENTNODE bn,
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -1764,7 +1769,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1764,7 +1769,7 @@ toku_ft_bn_apply_cmd (
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
} }
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of // if the insertion point is within a window of the right edge of
// the leaf then it is sequential // the leaf then it is sequential
...@@ -1796,7 +1801,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1796,7 +1801,7 @@ toku_ft_bn_apply_cmd (
while (1) { while (1) {
uint32_t num_leafentries_before = toku_omt_size(bn->buffer); uint32_t num_leafentries_before = toku_omt_size(bn->buffer);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
{ {
// Now we must find the next leafentry. // Now we must find the next leafentry.
...@@ -1842,7 +1847,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1842,7 +1847,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0; int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do. if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer); uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -1868,7 +1873,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1868,7 +1873,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0; int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) { if (le_has_xids(storeddata, cmd->xids)) {
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer); uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -1889,10 +1894,10 @@ toku_ft_bn_apply_cmd ( ...@@ -1889,10 +1894,10 @@ toku_ft_bn_apply_cmd (
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx); &storeddatav, &idx);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
r = do_update(update_fun, desc, bn, cmd, idx, NULL, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, NULL, oldest_referenced_xid, workdone, stats_to_update);
} else if (r==0) { } else if (r==0) {
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
} // otherwise, a worse error, just return it } // otherwise, a worse error, just return it
break; break;
} }
...@@ -1904,7 +1909,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1904,7 +1909,7 @@ toku_ft_bn_apply_cmd (
r = toku_omt_fetch(bn->buffer, idx, &storeddatav); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update);
assert_zero(r); assert_zero(r);
if (num_leafentries_before == toku_omt_size(bn->buffer)) { if (num_leafentries_before == toku_omt_size(bn->buffer)) {
...@@ -2323,6 +2328,7 @@ void toku_bnc_flush_to_child( ...@@ -2323,6 +2328,7 @@ void toku_bnc_flush_to_child(
&ftcmd, &ftcmd,
is_fresh, is_fresh,
flow_deltas, flow_deltas,
TXNID_NONE,
&stats_delta &stats_delta
); );
remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE; remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE;
...@@ -2389,6 +2395,7 @@ toku_ft_node_put_cmd ( ...@@ -2389,6 +2395,7 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
size_t flow_deltas[], size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
// Effect: Push CMD into the subtree rooted at NODE. // Effect: Push CMD into the subtree rooted at NODE.
...@@ -2405,7 +2412,7 @@ toku_ft_node_put_cmd ( ...@@ -2405,7 +2412,7 @@ toku_ft_node_put_cmd (
// and instead defer to these functions // and instead defer to these functions
// //
if (node->height==0) { if (node->height==0) {
toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, nullptr, stats_to_update); toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, oldest_referenced_xid, nullptr, stats_to_update);
} else { } else {
ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas); ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
} }
...@@ -2425,6 +2432,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2425,6 +2432,7 @@ void toku_ft_leaf_apply_cmd(
FTNODE node, FTNODE node,
int target_childnum, // which child to inject to, or -1 if unknown int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -2469,6 +2477,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2469,6 +2477,7 @@ void toku_ft_leaf_apply_cmd(
desc, desc,
bn, bn,
cmd, cmd,
oldest_referenced_xid,
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
...@@ -2484,6 +2493,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2484,6 +2493,7 @@ void toku_ft_leaf_apply_cmd(
desc, desc,
BLB(node, childnum), BLB(node, childnum),
cmd, cmd,
oldest_referenced_xid,
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
...@@ -2497,7 +2507,15 @@ void toku_ft_leaf_apply_cmd( ...@@ -2497,7 +2507,15 @@ void toku_ft_leaf_apply_cmd(
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
} }
static void inject_message_in_locked_node(FT ft, FTNODE node, int childnum, FT_MSG_S *cmd, size_t flow_deltas[]) { static void inject_message_in_locked_node(
FT ft,
FTNODE node,
int childnum,
FT_MSG_S *cmd,
size_t flow_deltas[],
TXNID oldest_referenced_xid
)
{
// No guarantee that we're the writer, but oh well. // No guarantee that we're the writer, but oh well.
// TODO(leif): Implement "do I have the lock or is it someone else?" // TODO(leif): Implement "do I have the lock or is it someone else?"
// check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop // check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop
...@@ -2519,6 +2537,7 @@ static void inject_message_in_locked_node(FT ft, FTNODE node, int childnum, FT_M ...@@ -2519,6 +2537,7 @@ static void inject_message_in_locked_node(FT ft, FTNODE node, int childnum, FT_M
cmd, cmd,
true, true,
flow_deltas, flow_deltas,
oldest_referenced_xid,
&stats_delta &stats_delta
); );
if (stats_delta.numbytes || stats_delta.numrows) { if (stats_delta.numbytes || stats_delta.numrows) {
...@@ -2677,7 +2696,7 @@ static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int ...@@ -2677,7 +2696,7 @@ static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int
abort(); abort();
} }
static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[]) static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[], TXNID oldest_referenced_xid)
// Effect: // Effect:
// Inject cmd into the node at this blocknum (cachekey). // Inject cmd into the node at this blocknum (cachekey).
// Gets a write lock on the node for you. // Gets a write lock on the node for you.
...@@ -2689,7 +2708,7 @@ static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t f ...@@ -2689,7 +2708,7 @@ static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t f
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
paranoid_invariant(node->fullhash==fullhash); paranoid_invariant(node->fullhash==fullhash);
ft_verify_flags(ft, node); ft_verify_flags(ft, node);
inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas); inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas, oldest_referenced_xid);
} }
__attribute__((const)) __attribute__((const))
...@@ -2702,7 +2721,17 @@ static inline bool should_inject_in_node(seqinsert_loc loc, int height, int dept ...@@ -2702,7 +2721,17 @@ static inline bool should_inject_in_node(seqinsert_loc loc, int height, int dept
return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2))); return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2)));
} }
static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_childnum, FT_MSG_S *cmd, size_t flow_deltas[], int depth, seqinsert_loc loc, bool just_did_split_or_merge) static void push_something_in_subtree(
FT ft,
FTNODE subtree_root,
int target_childnum,
FT_MSG_S *cmd,
size_t flow_deltas[],
TXNID oldest_referenced_xid,
int depth,
seqinsert_loc loc,
bool just_did_split_or_merge
)
// Effects: // Effects:
// Assign cmd an MSN from ft->h. // Assign cmd an MSN from ft->h.
// Put cmd in the subtree rooted at node. Due to promotion the message may not be injected directly in this node. // Put cmd in the subtree rooted at node. Due to promotion the message may not be injected directly in this node.
...@@ -2739,7 +2768,7 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi ...@@ -2739,7 +2768,7 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi
default: default:
STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
} }
inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas); inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas, oldest_referenced_xid);
} else { } else {
int r; int r;
int childnum; int childnum;
...@@ -2828,13 +2857,13 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi ...@@ -2828,13 +2857,13 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi
struct ftnode_fetch_extra bfe; struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft); // should be fully in memory, we just split it fill_bfe_for_full_read(&bfe, ft); // should be fully in memory, we just split it
toku_pin_ftnode_off_client_thread_batched(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, 0, nullptr, &newparent); toku_pin_ftnode_off_client_thread_batched(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, 0, nullptr, &newparent);
push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, depth, loc, true); push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, oldest_referenced_xid, depth, loc, true);
return; return;
} }
} }
if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) { if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) {
push_something_in_subtree(ft, child, -1, cmd, flow_deltas, depth + 1, next_loc, false); push_something_in_subtree(ft, child, -1, cmd, flow_deltas, oldest_referenced_xid, depth + 1, next_loc, false);
toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]); toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]);
// The recursive call unpinned the child, but // The recursive call unpinned the child, but
// we're responsible for unpinning subtree_root. // we're responsible for unpinning subtree_root.
...@@ -2870,12 +2899,12 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi ...@@ -2870,12 +2899,12 @@ static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_chi
default: default:
STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
} }
inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas); inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas, oldest_referenced_xid);
} }
} }
} }
void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd) void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd, TXNID oldest_referenced_xid)
// Effect: // Effect:
// - assign msn to cmd and update msn in the header // - assign msn to cmd and update msn in the header
// - push the cmd into the ft // - push the cmd into the ft
...@@ -2975,22 +3004,22 @@ void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd) ...@@ -2975,22 +3004,22 @@ void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd)
// If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here. // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here.
toku_unpin_ftnode_read_only(ft, node); toku_unpin_ftnode_read_only(ft, node);
STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1); STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas); inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, oldest_referenced_xid);
} else if (node->height > 1) { } else if (node->height > 1) {
// If the root's above height 1, we are definitely eligible for promotion. // If the root's above height 1, we are definitely eligible for promotion.
push_something_in_subtree(ft, node, -1, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false); push_something_in_subtree(ft, node, -1, cmd, flow_deltas, oldest_referenced_xid, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
} else { } else {
// The root's height 1. We may be eligible for promotion here. // The root's height 1. We may be eligible for promotion here.
// On the extremes, we want to promote, in the middle, we don't. // On the extremes, we want to promote, in the middle, we don't.
int childnum = toku_ftnode_which_child(node, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun); int childnum = toku_ftnode_which_child(node, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun);
if (childnum == 0 || childnum == node->n_children - 1) { if (childnum == 0 || childnum == node->n_children - 1) {
// On the extremes, promote. We know which childnum we're going to, so pass that down too. // On the extremes, promote. We know which childnum we're going to, so pass that down too.
push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false); push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, oldest_referenced_xid, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
} else { } else {
// At height 1 in the middle, don't promote, drop the read lock and inject here. // At height 1 in the middle, don't promote, drop the read lock and inject here.
toku_unpin_ftnode_read_only(ft, node); toku_unpin_ftnode_read_only(ft, node);
STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1); STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas); inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, oldest_referenced_xid);
} }
} }
} }
...@@ -3053,7 +3082,7 @@ void toku_ft_optimize (FT_HANDLE brt) { ...@@ -3053,7 +3082,7 @@ void toku_ft_optimize (FT_HANDLE brt) {
toku_init_dbt(&key); toku_init_dbt(&key);
toku_init_dbt(&val); toku_init_dbt(&val);
FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u = { .id = {&key,&val} } }; FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u = { .id = {&key,&val} } };
toku_ft_root_put_cmd(brt->ft, &ftcmd); toku_ft_root_put_cmd(brt->ft, &ftcmd, TXNID_NONE);
xids_destroy(&message_xids); xids_destroy(&message_xids);
} }
} }
...@@ -3127,7 +3156,7 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool ...@@ -3127,7 +3156,7 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
// do nothing // do nothing
} else { } else {
toku_ft_send_insert(ft_h, key, val, message_xids, type); toku_ft_send_insert(ft_h, key, val, message_xids, type, txn->oldest_referenced_xid);
} }
} }
...@@ -3136,7 +3165,7 @@ ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) { ...@@ -3136,7 +3165,7 @@ ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) {
msg->xids = (txn msg->xids = (txn
? toku_txn_get_xids(txn) ? toku_txn_get_xids(txn)
: xids_get_root_xids()); : xids_get_root_xids());
toku_ft_root_put_cmd(brt->ft, msg); toku_ft_root_put_cmd(brt->ft, msg, txn->oldest_referenced_xid);
} }
void toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra, void toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra,
...@@ -3205,15 +3234,15 @@ void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_e ...@@ -3205,15 +3234,15 @@ void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_e
} }
} }
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type) { void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, TXNID oldest_referenced_xid) {
FT_MSG_S ftcmd = { type, ZERO_MSN, xids, .u = { .id = { key, val } } }; FT_MSG_S ftcmd = { type, ZERO_MSN, xids, .u = { .id = { key, val } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd); toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid);
} }
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids) { void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid) {
DBT val; DBT val;
FT_MSG_S ftcmd = { FT_COMMIT_ANY, ZERO_MSN, xids, .u = { .id = { key, toku_init_dbt(&val) } } }; FT_MSG_S ftcmd = { FT_COMMIT_ANY, ZERO_MSN, xids, .u = { .id = { key, toku_init_dbt(&val) } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd); toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid);
} }
void toku_ft_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn) { void toku_ft_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn) {
...@@ -3269,14 +3298,14 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali ...@@ -3269,14 +3298,14 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) { if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
// do nothing // do nothing
} else { } else {
toku_ft_send_delete(ft_h, key, message_xids); toku_ft_send_delete(ft_h, key, message_xids, txn->oldest_referenced_xid);
} }
} }
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids) { void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid) {
DBT val; toku_init_dbt(&val); DBT val; toku_init_dbt(&val);
FT_MSG_S ftcmd = { FT_DELETE_ANY, ZERO_MSN, xids, .u = { .id = { key, &val } } }; FT_MSG_S ftcmd = { FT_DELETE_ANY, ZERO_MSN, xids, .u = { .id = { key, &val } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd); toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid);
} }
/* mempool support */ /* mempool support */
...@@ -4143,6 +4172,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, uint64_t ...@@ -4143,6 +4172,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, uint64_t
&t->ft->cmp_descriptor, &t->ft->cmp_descriptor,
bn, bn,
&ftcmd, &ftcmd,
TXNID_NONE,
workdone, workdone,
stats_to_update stats_to_update
); );
......
...@@ -154,9 +154,9 @@ void toku_ft_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn); ...@@ -154,9 +154,9 @@ void toku_ft_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery. // Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
void toku_ft_maybe_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging); void toku_ft_maybe_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging);
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type); void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, TXNID oldest_referenced_xid);
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids); void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid);
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids); void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xids);
int toku_close_ft_handle_nolsn (FT_HANDLE, char **error_string) __attribute__ ((warn_unused_result)); int toku_close_ft_handle_nolsn (FT_HANDLE, char **error_string) __attribute__ ((warn_unused_result));
......
...@@ -146,6 +146,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char ...@@ -146,6 +146,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
&cmd, &cmd,
true, true,
zero_flow_deltas, zero_flow_deltas,
TXNID_NONE,
NULL NULL
); );
......
...@@ -2705,7 +2705,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2705,7 +2705,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
.xids = lbuf->xids, .xids = lbuf->xids,
.u = { .id = { &thekey, &theval } } }; .u = { .id = { &thekey, &theval } } };
uint64_t workdone=0; uint64_t workdone=0;
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, &workdone, stats_to_update); toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, &workdone, stats_to_update);
} }
static int write_literal(struct dbout *out, void*data, size_t len) { static int write_literal(struct dbout *out, void*data, size_t len) {
......
...@@ -152,6 +152,7 @@ struct tokutxn { ...@@ -152,6 +152,7 @@ struct tokutxn {
DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn
xid_omt_t *live_root_txn_list; // the root txns live when the root ancestor (self if a root) started. xid_omt_t *live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
XIDS xids; // Represents the xid list XIDS xids; // Represents the xid list
TXNID oldest_referenced_xid;
bool begin_was_logged; bool begin_was_logged;
// These are not read until a commit, prepare, or abort starts, and // These are not read until a commit, prepare, or abort starts, and
......
...@@ -177,7 +177,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, ...@@ -177,7 +177,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
? toku_fill_dbt(&data_dbt, data->data, data->len) ? toku_fill_dbt(&data_dbt, data->data, data->len)
: toku_init_dbt(&data_dbt) } } }; : toku_init_dbt(&data_dbt) } } };
toku_ft_root_put_cmd(h, &ftcmd); toku_ft_root_put_cmd(h, &ftcmd, txn->oldest_referenced_xid);
if (reset_root_xid_that_created) { if (reset_root_xid_that_created) {
TXNID new_root_xid_that_created = xids_get_outermost_xid(xids); TXNID new_root_xid_that_created = xids_get_outermost_xid(xids);
toku_reset_root_xid_that_created(h, new_root_xid_that_created); toku_reset_root_xid_that_created(h, new_root_xid_that_created);
......
...@@ -40,7 +40,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -40,7 +40,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u = {.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u = {.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn; leafnode->max_msn_applied_to_node_on_disk = msn;
......
...@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
brt->ft->h->max_msn_in_ft = msn; brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, TXNID_NONE, nullptr, nullptr);
{ {
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair); int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0); assert(r==0);
...@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
} }
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} }; FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, TXNID_NONE, nullptr, nullptr);
// message should be rejected for duplicate msn, row should still have original val // message should be rejected for duplicate msn, row should still have original val
{ {
...@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
msn = next_dummymsn(); msn = next_dummymsn();
brt->ft->h->max_msn_in_ft = msn; brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} }; FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, TXNID_NONE, nullptr, nullptr);
// message should be accepted, val should have new value // message should be accepted, val should have new value
{ {
...@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with lesser (older) msn is rejected // now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10; msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }}; FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }};
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, TXNID_NONE, nullptr, nullptr);
// message should be rejected, val should still have value in pair2 // message should be rejected, val should still have value in pair2
{ {
......
...@@ -123,9 +123,9 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS ...@@ -123,9 +123,9 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS
msg.u.id.val = valdbt; msg.u.id.val = valdbt;
size_t memsize; size_t memsize;
int64_t numbytes; int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, &memsize, save, NULL, NULL, NULL, &numbytes); int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r); assert_zero(r);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, NULL, NULL); toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) { if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn; blb->max_msn_applied = msn;
} }
...@@ -164,13 +164,13 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE ...@@ -164,13 +164,13 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE
msg.u.id.val = valdbt; msg.u.id.val = valdbt;
size_t memsize; size_t memsize;
int64_t numbytes; int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, &memsize, save, NULL, NULL, NULL, &numbytes); int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r); assert_zero(r);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, NULL, NULL); toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) { if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn; blb1->max_msn_applied = msn;
} }
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb2, &msg, NULL, NULL); toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb2, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb2->max_msn_applied.msn) { if (msn.msn > blb2->max_msn_applied.msn) {
blb2->max_msn_applied = msn; blb2->max_msn_applied = msn;
} }
...@@ -580,7 +580,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { ...@@ -580,7 +580,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) { if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) { if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -803,7 +803,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -803,7 +803,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 && if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) { !parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -995,8 +995,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -995,8 +995,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) { if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) { if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], TXNID_NONE, NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
......
...@@ -398,6 +398,7 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) { ...@@ -398,6 +398,7 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
int64_t ignoreme; int64_t ignoreme;
r = apply_msg_to_leafentry(msg, r = apply_msg_to_leafentry(msg,
le_initial, le_initial,
TXNID_NONE,
&result_memsize, &result_memsize,
&le_result, &le_result,
NULL, NULL,
......
...@@ -43,7 +43,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -43,7 +43,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// Create bad tree (don't do following): // Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn; // leafnode->max_msn_applied_to_node = msn;
......
...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -32,7 +32,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -32,7 +32,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -32,7 +32,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -32,7 +32,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -34,7 +34,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -34,7 +34,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -31,7 +31,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, NULL, NULL); toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -151,6 +151,7 @@ void toku_txn_create_txn ( ...@@ -151,6 +151,7 @@ void toku_txn_create_txn (
.container_db_txn = container_db_txn, .container_db_txn = container_db_txn,
.live_root_txn_list = nullptr, .live_root_txn_list = nullptr,
.xids = xids, .xids = xids,
.oldest_referenced_xid = TXNID_NONE,
.begin_was_logged = false, .begin_was_logged = false,
.do_fsync = false, .do_fsync = false,
.force_fsync_on_commit = false, .force_fsync_on_commit = false,
......
...@@ -255,6 +255,23 @@ max_xid(TXNID a, TXNID b) { ...@@ -255,6 +255,23 @@ max_xid(TXNID a, TXNID b) {
return a < b ? b : a; return a < b ? b : a;
} }
static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) {
TXNID oldest_referenced_xid = TXNID_NONE_LIVING;
int r = txn_manager->live_root_txns.fetch(0, &oldest_referenced_xid);
// this function should only be called when we know there is at least
// one live transaction
invariant_zero(r);
struct referenced_xid_tuple* tuple;
if (txn_manager->referenced_xids.size() > 0) {
r = txn_manager->referenced_xids.fetch(0, &tuple);
if (r == 0 && tuple->begin_id < oldest_referenced_xid) {
oldest_referenced_xid = tuple->begin_id;
}
}
return oldest_referenced_xid;
}
int toku_txn_manager_start_txn( int toku_txn_manager_start_txn(
TOKUTXN *txnp, TOKUTXN *txnp,
TXN_MANAGER txn_manager, TXN_MANAGER txn_manager,
...@@ -318,6 +335,7 @@ int toku_txn_manager_start_txn( ...@@ -318,6 +335,7 @@ int toku_txn_manager_start_txn(
lazy_assert_zero(r); lazy_assert_zero(r);
} }
{ {
// //
// maintain the data structures necessary for MVCC: // maintain the data structures necessary for MVCC:
...@@ -342,6 +360,7 @@ int toku_txn_manager_start_txn( ...@@ -342,6 +360,7 @@ int toku_txn_manager_start_txn(
} }
r = txn_manager->live_root_txns.insert_at(txn->txnid64, idx); r = txn_manager->live_root_txns.insert_at(txn->txnid64, idx);
} }
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager);
// setup information for snapshot reads // setup information for snapshot reads
if (txn->snapshot_type != TXN_SNAPSHOT_NONE) { if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
......
...@@ -207,6 +207,42 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c ...@@ -207,6 +207,42 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c
return rval; return rval;
} }
//
// This function does some simple garbage collection given a TXNID known
// to be the oldest referenced xid, that is, the oldest xid in any live list.
// We find the youngest entry in the stack with an xid less
// than oldest_referenced_xid. All elements below this entry are garbage,
// so we get rid of them.
//
static void
simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid) {
uint32_t curr_index = 0;
uint32_t num_entries;
if (ule->num_cuxrs == 1 || oldest_referenced_xid == TXNID_NONE) {
goto done;
}
// starting at the top of the committed stack, find the first
// uxr with a txnid that is less than oldest_referenced_xid
for (uint32_t i = 0; i < ule->num_cuxrs; i++) {
curr_index = ule->num_cuxrs - i - 1;
if (ule->uxrs[curr_index].xid < oldest_referenced_xid) {
break;
}
}
// curr_index is now set to the youngest uxr older than oldest_referenced_xid
if (curr_index == 0) {
goto done;
}
// now get rid of the entries below curr_index
num_entries = ule->num_cuxrs + ule->num_puxrs - curr_index;
memmove(&ule->uxrs[0], &ule->uxrs[curr_index], num_entries * sizeof(ule->uxrs[0]));
ule->uxrs[0].xid = TXNID_NONE; //New 'bottom of stack' loses its TXNID
ule->num_cuxrs -= curr_index;
done:;
}
static void static void
garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) { garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
if (ule->num_cuxrs == 1) goto done; if (ule->num_cuxrs == 1) goto done;
...@@ -320,6 +356,7 @@ done:; ...@@ -320,6 +356,7 @@ done:;
int int
apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
LEAFENTRY old_leafentry, // NULL if there was no stored data. LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize, size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p, LEAFENTRY *new_leafentry_p,
OMT *omtp, OMT *omtp,
...@@ -338,6 +375,7 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry ...@@ -338,6 +375,7 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
oldnumbytes = ule_get_innermost_numbytes(&ule); oldnumbytes = ule_get_innermost_numbytes(&ule);
} }
msg_modify_ule(&ule, msg); // modify unpacked leafentry msg_modify_ule(&ule, msg); // modify unpacked leafentry
simple_garbage_collection(&ule, oldest_referenced_xid);
rval = le_pack(&ule, // create packed leafentry rval = le_pack(&ule, // create packed leafentry
new_leafentry_memorysize, new_leafentry_memorysize,
new_leafentry_p, new_leafentry_p,
......
...@@ -54,6 +54,7 @@ void fast_msg_to_leafentry( ...@@ -54,6 +54,7 @@ void fast_msg_to_leafentry(
int apply_msg_to_leafentry(FT_MSG msg, int apply_msg_to_leafentry(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data. LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize, size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p, LEAFENTRY *new_leafentry_p,
OMT *omtp, OMT *omtp,
......
...@@ -509,7 +509,7 @@ indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xi ...@@ -509,7 +509,7 @@ indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xi
result = toku_ydb_check_avail_fs_space(indexer->i->env); result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) { if (result == 0) {
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids); toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, TXNID_NONE);
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
} }
...@@ -549,7 +549,7 @@ indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *ho ...@@ -549,7 +549,7 @@ indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *ho
result = toku_ydb_check_avail_fs_space(indexer->i->env); result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) { if (result == 0) {
toku_multi_operation_client_lock(); toku_multi_operation_client_lock();
toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT); toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, TXNID_NONE);
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
} }
...@@ -570,7 +570,7 @@ indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) { ...@@ -570,7 +570,7 @@ indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
} else { } else {
result = toku_ydb_check_avail_fs_space(indexer->i->env); result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) if (result == 0)
toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids); toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, TXNID_NONE);
} }
} }
return result; return result;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_cursor_2.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <errno.h>
#include <sys/stat.h>
#include <db.h>
//
// This test ensures that we can do many updates to a single key when the dictionary
// is just that key.
//
static void
run_test (void) {
DB_ENV * env;
DB *db;
const char * const fname = "test.updates_single_key.ft_handle";
int r;
r = db_env_create(&env, 0); assert(r == 0);
env->set_errfile(env, stderr);
// no need to run with logging, so DB_INIT_LOG not passed in
r = env->open(env, ENVDIR, DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_create(&db, env, 0); assert(r == 0);
db->set_errfile(db,stderr); // Turn off those annoying errors
r = db->open(db, NULL, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
int i;
for (i=0; i<1000000; i++) {
int k = 1;
int v = i;
DBT key, val;
DB_TXN* txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
// want this test to go as fast as possible, so no need to use the lock tree
// we just care to see that #5700 is behaving better, that some garbage collection is happening
r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), DB_PRELOCKED_WRITE);
txn->commit(txn, DB_TXN_NOSYNC);
CKERR(r);
}
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
}
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
run_test();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment