Commit 9fd0c7c3 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3740], fix bug

git-svn-id: file:///svn/toku/tokudb@32939 c7de825b-a66e-492c-adef-691d508d4ae1
parent 47f65e17
...@@ -660,8 +660,7 @@ brt_leaf_apply_cmd_once ( ...@@ -660,8 +660,7 @@ brt_leaf_apply_cmd_once (
uint64_t *workdonep uint64_t *workdonep
); );
void void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ANCESTORS ancestors, uint64_t *workdone);
toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created); void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value. // Reset the root_xid_that_created field to the given value.
......
...@@ -278,30 +278,6 @@ static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTO ...@@ -278,30 +278,6 @@ static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTO
static long brtnode_memory_size (BRTNODE node); static long brtnode_memory_size (BRTNODE node);
int toku_pin_brtnode_if_clean(
BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p
)
{
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(
brt->cf,
blocknum,
fullhash,
&node_v
); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
if (r==0) {
BRTNODE node = node_v;
if (node->dsn.dsn == INVALID_DSN.dsn) {
set_new_DSN_for_node(node, brt);
}
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
}
return r;
}
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash, int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers, UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds, ANCESTORS ancestors, struct pivot_bounds const * const bounds,
...@@ -2846,17 +2822,40 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd) ...@@ -2846,17 +2822,40 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
brt_nonleaf_put_cmd(t, node, cmd); brt_nonleaf_put_cmd(t, node, cmd);
} }
static BOOL
partition_requires_msg_application(BRTNODE node, int childnum, ANCESTORS ancestors) {
BOOL requires_msg_application = FALSE;
if (BP_STATE(node,childnum) != PT_AVAIL) return FALSE;
for (
ANCESTORS curr_ancestors = ancestors;
curr_ancestors;
curr_ancestors = curr_ancestors->next
)
{
if (curr_ancestors->node->dsn.dsn > BLB_MAX_DSN_APPLIED(node,childnum).dsn) {
requires_msg_application = TRUE;
break;
}
}
return requires_msg_application;
}
// Effect: applies the cmd to the leaf if the appropriate basement node is in memory. // Effect: applies the cmd to the leaf if the appropriate basement node is in memory.
// If the appropriate basement node is not in memory, then nothing gets applied // If the appropriate basement node is not in memory, then nothing gets applied
// If the appropriate basement node must be in memory, it is the caller's responsibility to ensure // If the appropriate basement node must be in memory, it is the caller's responsibility to ensure
// that it is // that it is
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdone) { void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ANCESTORS ancestors, uint64_t *workdone) {
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
// ignore messages that have already been applied to this leaf // ignore messages that have already been applied to this leaf
if (brt_msg_applies_once(cmd)) { if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t); unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
if (BP_STATE(node,childnum) == PT_AVAIL) { BOOL req_msg_app = partition_requires_msg_application(node, childnum, ancestors);
// only apply the message if we have an available basement node that is up to date
// we know it is up to date if partition_requires_msg_application returns FALSE
if (BP_STATE(node,childnum) == PT_AVAIL && !req_msg_app) {
brt_leaf_put_cmd(t, brt_leaf_put_cmd(t,
BLB(node, childnum), BLB(node, childnum),
&BP_SUBTREE_EST(node, childnum), &BP_SUBTREE_EST(node, childnum),
...@@ -2869,7 +2868,10 @@ void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, ...@@ -2869,7 +2868,10 @@ void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change,
else if (brt_msg_applies_all(cmd)) { else if (brt_msg_applies_all(cmd)) {
bool bn_made_change = false; bool bn_made_change = false;
for (int childnum=0; childnum<node->n_children; childnum++) { for (int childnum=0; childnum<node->n_children; childnum++) {
if (BP_STATE(node,childnum) == PT_AVAIL) { BOOL req_msg_app = partition_requires_msg_application(node, childnum, ancestors);
// only apply the message if we have an available basement node that is up to date
// we know it is up to date if partition_requires_msg_application returns FALSE
if (BP_STATE(node,childnum) == PT_AVAIL && !req_msg_app) {
brt_leaf_put_cmd( brt_leaf_put_cmd(
t, t,
BLB(node, childnum), BLB(node, childnum),
...@@ -2916,7 +2918,7 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd) ...@@ -2916,7 +2918,7 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
bool made_dirty = 0; bool made_dirty = 0;
uint64_t workdone_ignore = 0; // ignore workdone for root-leaf node uint64_t workdone_ignore = 0; // ignore workdone for root-leaf node
// not up to date, which means the get_and_pin actually fetched it into memory. // not up to date, which means the get_and_pin actually fetched it into memory.
toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, &workdone_ignore); toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, NULL, &workdone_ignore);
node->dirty = 1; node->dirty = 1;
MSN cmd_msn = cmd->msn; MSN cmd_msn = cmd->msn;
invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn); invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
...@@ -2955,6 +2957,7 @@ static u_int32_t get_roothash (BRT brt) { ...@@ -2955,6 +2957,7 @@ static u_int32_t get_roothash (BRT brt) {
return rh->fullhash; return rh->fullhash;
} }
static void apply_cmd_to_in_memory_non_root_leaves ( static void apply_cmd_to_in_memory_non_root_leaves (
BRT t, BRT t,
CACHEKEY nodenum, CACHEKEY nodenum,
...@@ -3002,7 +3005,7 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t, ...@@ -3002,7 +3005,7 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
else { else {
invariant(!is_root); invariant(!is_root);
bool made_change; bool made_change;
toku_apply_cmd_to_leaf(t, node, cmd, &made_change, workdone); toku_apply_cmd_to_leaf(t, node, cmd, &made_change, ancestors, workdone);
} }
if (parent) { if (parent) {
...@@ -3025,18 +3028,19 @@ static void apply_cmd_to_in_memory_non_root_leaves ( ...@@ -3025,18 +3028,19 @@ static void apply_cmd_to_in_memory_non_root_leaves (
uint64_t * workdone uint64_t * workdone
) )
{ {
BRTNODE node = NULL; BRTNODE node = NULL;
int r = toku_pin_brtnode_if_clean( void *node_v;
t,
nodenum, int r = toku_cachetable_get_and_pin_if_in_memory(
fullhash, t->cf,
ancestors, nodenum,
bounds, fullhash,
&node &node_v
); );
if (r) { goto exit; } if (r) { goto exit; }
node = node_v;
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone); apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone);
toku_unpin_brtnode(t, node); toku_unpin_brtnode(t, node);
...@@ -5190,24 +5194,6 @@ apply_ancestors_messages_to_leafnode_and_maybe_flush (BRT t, BASEMENTNODE bm, SU ...@@ -5190,24 +5194,6 @@ apply_ancestors_messages_to_leafnode_and_maybe_flush (BRT t, BASEMENTNODE bm, SU
} }
*/ */
static BOOL
partition_requires_msg_application(BRTNODE node, int childnum, ANCESTORS ancestors) {
BOOL requires_msg_application = FALSE;
if (BP_STATE(node,childnum) != PT_AVAIL) return FALSE;
for (
ANCESTORS curr_ancestors = ancestors;
curr_ancestors;
curr_ancestors = curr_ancestors->next
)
{
if (curr_ancestors->node->dsn.dsn > BLB_MAX_DSN_APPLIED(node,childnum).dsn) {
requires_msg_application = TRUE;
break;
}
}
return requires_msg_application;
}
static void static void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds) maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
// Effect: // Effect:
......
...@@ -46,7 +46,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -46,7 +46,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
bool made_change; bool made_change;
u_int64_t workdone=0; u_int64_t workdone=0;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, &workdone); toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, NULL, &workdone);
{ {
int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair); int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0); assert(r==0);
...@@ -54,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -54,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
} }
BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } }; BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, &workdone); toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, NULL, &workdone);
// message should be rejected for duplicate msn, row should still have original val // message should be rejected for duplicate msn, row should still have original val
...@@ -67,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -67,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with proper msn gets through // now verify that message with proper msn gets through
msn = next_dummymsn(); msn = next_dummymsn();
BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } }; BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, &workdone); toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, NULL, &workdone);
// message should be accepted, val should have new value // message should be accepted, val should have new value
{ {
...@@ -80,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -80,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with lesser (older) msn is rejected // now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10; msn.msn = msn.msn - 10;
BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } }; BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, &workdone); toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, NULL, &workdone);
// message should be rejected, val should still have value in pair2 // message should be rejected, val should still have value in pair2
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment