Commit 4ed966c1 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3740], fix bug

git-svn-id: file:///svn/toku/tokudb@32939 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1dd96b44
......@@ -660,8 +660,7 @@ brt_leaf_apply_cmd_once (
uint64_t *workdonep
);
void
toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ANCESTORS ancestors, uint64_t *workdone);
void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value.
......
......@@ -278,30 +278,6 @@ static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTO
static long brtnode_memory_size (BRTNODE node);
int toku_pin_brtnode_if_clean(
BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p
)
{
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(
brt->cf,
blocknum,
fullhash,
&node_v
); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
if (r==0) {
BRTNODE node = node_v;
if (node->dsn.dsn == INVALID_DSN.dsn) {
set_new_DSN_for_node(node, brt);
}
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
}
return r;
}
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
......@@ -2846,17 +2822,40 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
brt_nonleaf_put_cmd(t, node, cmd);
}
static BOOL
partition_requires_msg_application(BRTNODE node, int childnum, ANCESTORS ancestors) {
BOOL requires_msg_application = FALSE;
if (BP_STATE(node,childnum) != PT_AVAIL) return FALSE;
for (
ANCESTORS curr_ancestors = ancestors;
curr_ancestors;
curr_ancestors = curr_ancestors->next
)
{
if (curr_ancestors->node->dsn.dsn > BLB_MAX_DSN_APPLIED(node,childnum).dsn) {
requires_msg_application = TRUE;
break;
}
}
return requires_msg_application;
}
// Effect: applies the cmd to the leaf if the appropriate basement node is in memory.
// If the appropriate basement node is not in memory, then nothing gets applied
// If the appropriate basement node must be in memory, it is the caller's responsibility to ensure
// that it is
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdone) {
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ANCESTORS ancestors, uint64_t *workdone) {
VERIFY_NODE(t, node);
// ignore messages that have already been applied to this leaf
if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
if (BP_STATE(node,childnum) == PT_AVAIL) {
BOOL req_msg_app = partition_requires_msg_application(node, childnum, ancestors);
// only apply the message if we have an available basement node that is up to date
// we know it is up to date if partition_requires_msg_application returns FALSE
if (BP_STATE(node,childnum) == PT_AVAIL && !req_msg_app) {
brt_leaf_put_cmd(t,
BLB(node, childnum),
&BP_SUBTREE_EST(node, childnum),
......@@ -2869,7 +2868,10 @@ void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change,
else if (brt_msg_applies_all(cmd)) {
bool bn_made_change = false;
for (int childnum=0; childnum<node->n_children; childnum++) {
if (BP_STATE(node,childnum) == PT_AVAIL) {
BOOL req_msg_app = partition_requires_msg_application(node, childnum, ancestors);
// only apply the message if we have an available basement node that is up to date
// we know it is up to date if partition_requires_msg_application returns FALSE
if (BP_STATE(node,childnum) == PT_AVAIL && !req_msg_app) {
brt_leaf_put_cmd(
t,
BLB(node, childnum),
......@@ -2916,7 +2918,7 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
bool made_dirty = 0;
uint64_t workdone_ignore = 0; // ignore workdone for root-leaf node
// not up to date, which means the get_and_pin actually fetched it into memory.
toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, &workdone_ignore);
toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, NULL, &workdone_ignore);
node->dirty = 1;
MSN cmd_msn = cmd->msn;
invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
......@@ -2955,6 +2957,7 @@ static u_int32_t get_roothash (BRT brt) {
return rh->fullhash;
}
static void apply_cmd_to_in_memory_non_root_leaves (
BRT t,
CACHEKEY nodenum,
......@@ -3002,7 +3005,7 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
else {
invariant(!is_root);
bool made_change;
toku_apply_cmd_to_leaf(t, node, cmd, &made_change, workdone);
toku_apply_cmd_to_leaf(t, node, cmd, &made_change, ancestors, workdone);
}
if (parent) {
......@@ -3026,17 +3029,18 @@ static void apply_cmd_to_in_memory_non_root_leaves (
)
{
BRTNODE node = NULL;
int r = toku_pin_brtnode_if_clean(
t,
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(
t->cf,
nodenum,
fullhash,
ancestors,
bounds,
&node
&node_v
);
if (r) { goto exit; }
node = node_v;
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone);
toku_unpin_brtnode(t, node);
......@@ -5190,24 +5194,6 @@ apply_ancestors_messages_to_leafnode_and_maybe_flush (BRT t, BASEMENTNODE bm, SU
}
*/
static BOOL
partition_requires_msg_application(BRTNODE node, int childnum, ANCESTORS ancestors) {
BOOL requires_msg_application = FALSE;
if (BP_STATE(node,childnum) != PT_AVAIL) return FALSE;
for (
ANCESTORS curr_ancestors = ancestors;
curr_ancestors;
curr_ancestors = curr_ancestors->next
)
{
if (curr_ancestors->node->dsn.dsn > BLB_MAX_DSN_APPLIED(node,childnum).dsn) {
requires_msg_application = TRUE;
break;
}
}
return requires_msg_application;
}
static void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
// Effect:
......
......@@ -46,7 +46,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
bool made_change;
u_int64_t workdone=0;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, &workdone);
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, NULL, &workdone);
{
int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -54,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
}
BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, &workdone);
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, NULL, &workdone);
// message should be rejected for duplicate msn, row should still have original val
......@@ -67,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with proper msn gets through
msn = next_dummymsn();
BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, &workdone);
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, NULL, &workdone);
// message should be accepted, val should have new value
{
......@@ -80,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, &workdone);
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, NULL, &workdone);
// message should be rejected, val should still have value in pair2
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment