Commit b9623f06 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3724], [t:3717], [t:3727], merge fixes to main

git-svn-id: file:///svn/toku/tokudb@32829 c7de825b-a66e-492c-adef-691d508d4ae1
parent 16406440
......@@ -136,10 +136,11 @@ struct brtnode_nonleaf_childinfo {
struct brtnode_leaf_basement_node {
uint32_t optimized_for_upgrade; // version number to which this leaf has been optimized, zero if never optimized for upgrade
BOOL soft_copy_is_up_to_date; // the data in the OMT reflects the softcopy state.
OMT buffer;
unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */
unsigned int seqinsert; /* number of sequential inserts to this leaf */
MSN max_msn_applied;
DSN max_dsn_applied; // max deserialization sequence number applied
};
#define PT_INVALID 0
......@@ -204,8 +205,8 @@ struct __attribute__((__packed__)) brtnode_partition {
};
struct brtnode {
MSN max_msn_applied_to_node_in_memory; // max msn that has been applied to this node (for root node, this is max msn for the tree)
MSN max_msn_applied_to_node_on_disk; // same as above, but for data on disk, only meaningful if node is clean
MSN max_msn_applied_to_node_on_disk; // max_msn_applied that will be written to disk
DSN dsn; // deserialization sequence number
unsigned int nodesize;
unsigned int flags;
BLOCKNUM thisnodename; // Which block number is this node?
......@@ -303,7 +304,8 @@ static inline void set_BSB(BRTNODE node, int i, SUB_BLOCK sb) {
// leaf node macros
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (BLB(node,i)->optimized_for_upgrade)
#define BLB_SOFTCOPYISUPTODATE(node,i) (BLB(node,i)->soft_copy_is_up_to_date)
#define BLB_MAX_MSN_APPLIED(node,i) (BLB(node,i)->max_msn_applied)
#define BLB_MAX_DSN_APPLIED(node,i) (BLB(node,i)->max_dsn_applied)
#define BLB_BUFFER(node,i) (BLB(node,i)->buffer)
#define BLB_NBYTESINBUF(node,i) (BLB(node,i)->n_bytes_in_buffer)
#define BLB_SEQINSERT(node,i) (BLB(node,i)->seqinsert)
......@@ -394,6 +396,8 @@ struct brt {
int (*close_db)(DB*, u_int32_t);
u_int32_t close_flags;
DSN curr_dsn;
struct toku_list live_brt_link;
struct toku_list zombie_brt_link;
};
......@@ -521,6 +525,11 @@ void toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children
void toku_initialize_empty_brtnode (BRTNODE n, BLOCKNUM nodename, int height, int num_children,
int layout_version, unsigned int nodesize, unsigned int flags);
int toku_pin_brtnode_if_clean(
BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p
);
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const pbounds,
......
......@@ -384,7 +384,7 @@ serialize_brtnode_info(
struct wbuf wb;
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
wbuf_MSN(&wb, node->max_msn_applied_to_node_in_memory);
wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk);
wbuf_nocrc_uint(&wb, node->nodesize);
wbuf_nocrc_uint(&wb, node->flags);
wbuf_nocrc_int (&wb, node->height);
......@@ -478,6 +478,7 @@ static void
rebalance_brtnode_leaf(BRTNODE node)
{
assert(node->height == 0);
assert(node->dirty);
// first create an array of OMTVALUE's that store all the data
u_int32_t num_le = 0;
for (int i = 0; i < node->n_children; i++) {
......@@ -525,6 +526,16 @@ rebalance_brtnode_leaf(BRTNODE node)
u_int32_t tmp_optimized_for_upgrade = BLB_OPTIMIZEDFORUPGRADE(node, node->n_children-1);
u_int32_t tmp_seqinsert = BLB_SEQINSERT(node, node->n_children-1);
MSN max_msn = MIN_MSN;
DSN min_dsn = MAX_DSN;
for (int i = 0; i < node->n_children; i++) {
DSN curr_dsn = BLB_MAX_DSN_APPLIED(node,i);
MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
min_dsn = (curr_dsn < min_dsn) ? curr_dsn : min_dsn;
max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
}
// Now destroy the old stuff;
toku_destroy_brtnode_internals(node);
......@@ -537,7 +548,7 @@ rebalance_brtnode_leaf(BRTNODE node)
node->n_children = num_children;
XMALLOC_N(num_children, node->bp);
for (int i = 0; i < num_children; i++) {
set_BLB(node, i, toku_create_empty_bn());
set_BLB(node, i, toku_create_empty_bn());
}
// now we start to fill in the data
......@@ -582,7 +593,11 @@ rebalance_brtnode_leaf(BRTNODE node)
BP_STATE(node,i) = PT_AVAIL;
BP_TOUCH_CLOCK(node,i);
BLB_MAX_DSN_APPLIED(node,i) = min_dsn;
BLB_MAX_MSN_APPLIED(node,i) = max_msn;
}
node->max_msn_applied_to_node_on_disk = max_msn;
// now the subtree estimates
toku_brt_leaf_reset_calc_leaf_stats(node);
......@@ -723,7 +738,6 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_h
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_in_memory;
return 0;
}
......@@ -799,7 +813,8 @@ BASEMENTNODE toku_create_empty_bn(void) {
BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
BASEMENTNODE XMALLOC(bn);
bn->soft_copy_is_up_to_date = TRUE;
bn->max_dsn_applied = 0;
bn->max_msn_applied.msn = 0;
bn->buffer = NULL;
bn->n_bytes_in_buffer = 0;
bn->seqinsert = 0;
......@@ -924,8 +939,9 @@ deserialize_brtnode_info(
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
rbuf_init(&rb, sb->uncompressed_ptr, data_size);
node->dsn = INVALID_DSN;
node->max_msn_applied_to_node_on_disk = rbuf_msn(&rb);
node->max_msn_applied_to_node_in_memory = node->max_msn_applied_to_node_on_disk;
node->nodesize = rbuf_int(&rb);
node->flags = rbuf_int(&rb);
node->height = rbuf_int(&rb);
......@@ -988,6 +1004,8 @@ static void
setup_available_brtnode_partition(BRTNODE node, int i) {
if (node->height == 0) {
set_BLB(node, i, toku_create_empty_bn());
BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk;
BLB_MAX_DSN_APPLIED(node,i) = 0;
}
else {
set_BNC(node, i, toku_create_empty_nl());
......@@ -1054,7 +1072,7 @@ deserialize_brtnode_partition(
unsigned char ch = rbuf_char(&rb);
assert(ch == BRTNODE_PARTITION_OMT_LEAVES);
BLB_OPTIMIZEDFORUPGRADE(node, index) = rbuf_int(&rb);
BLB_SOFTCOPYISUPTODATE(node, index) = FALSE;
// dont need to set max_dsn_applied because creation of basement node set it to correct value
BLB_SEQINSERT(node, index) = 0;
u_int32_t num_entries = rbuf_int(&rb);
OMTVALUE *XMALLOC_N(num_entries, array);
......@@ -1246,6 +1264,9 @@ toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode
read_and_decompress_sub_block(&rb, &curr_sb);
// at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, childnum);
if (node->height == 0) {
toku_brt_bn_reset_stats(node, childnum);
}
toku_free(curr_sb.uncompressed_ptr);
toku_free(raw_block);
}
......@@ -1269,6 +1290,9 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum) {
curr_sb->compressed_size
);
deserialize_brtnode_partition(curr_sb, node, childnum);
if (node->height == 0) {
toku_brt_bn_reset_stats(node, childnum);
}
toku_free(curr_sb->uncompressed_ptr);
toku_free(curr_sb->compressed_ptr);
toku_free(curr_sb);
......
......@@ -134,7 +134,7 @@ toku_verify_brtnode (BRT brt,
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node = node_v;
toku_assert_entire_node_in_memory(node);
thismsn = node->max_msn_applied_to_node_in_memory;
thismsn = node->max_msn_applied_to_node_on_disk;
if (rootmsn.msn == ZERO_MSN.msn) {
assert(parentmsn.msn == ZERO_MSN.msn);
rootmsn = thismsn;
......
......@@ -144,6 +144,15 @@ toku_assert_entire_node_in_memory(BRTNODE node) {
}
}
//
// MUST be called with the ydb lock held
//
static void
set_new_DSN_for_node(BRTNODE node, BRT t) {
node->dsn = t->curr_dsn;
t->curr_dsn++;
}
static u_int32_t
get_leaf_num_entries(BRTNODE node) {
u_int32_t result = 0;
......@@ -268,6 +277,30 @@ static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTO
static long brtnode_memory_size (BRTNODE node);
int toku_pin_brtnode_if_clean(
BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
BRTNODE *node_p
)
{
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(
brt->cf,
blocknum,
fullhash,
&node_v
); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
if (r==0) {
BRTNODE node = node_v;
if (node->dsn == INVALID_DSN) {
set_new_DSN_for_node(node, brt);
}
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
}
return r;
}
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
......@@ -290,6 +323,9 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
unlockers);
if (r==0) {
BRTNODE node = node_v;
if (node->dsn == INVALID_DSN) {
set_new_DSN_for_node(node, brt);
}
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
// printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
......@@ -321,6 +357,9 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
);
assert(r==0);
BRTNODE node = node_v;
if (node->dsn == INVALID_DSN) {
set_new_DSN_for_node(node, brt);
}
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
*node_p = node;
}
......@@ -360,17 +399,23 @@ calc_leaf_stats (OMT buffer) {
return e;
}
void
toku_brt_bn_reset_stats(BRTNODE node, int childnum)
{
// basement node may be evicted, so only update stats if the basement node
// is fully in memory
// TODO: (Zardosht) for row cache, figure out a better way to do this
if (BP_STATE(node,childnum) == PT_AVAIL) {
BP_SUBTREE_EST(node,childnum) = calc_leaf_stats(BLB_BUFFER(node, childnum));
}
}
void
toku_brt_leaf_reset_calc_leaf_stats(BRTNODE node) {
invariant(node->height==0);
int i = 0;
for (i = 0; i < node->n_children; i++) {
// basement node may be evicted, so only update stats if the basement node
// is fully in memory
// TODO: (Zardosht) for row cache, figure out a better way to do this
if (BP_STATE(node,i) == PT_AVAIL) {
node->bp[i].subtree_estimates = calc_leaf_stats(BLB_BUFFER(node, i));
}
toku_brt_bn_reset_stats(node,i);
}
}
......@@ -890,7 +935,7 @@ toku_initialize_empty_brtnode (BRTNODE n, BLOCKNUM nodename, int height, int num
assert(height >= 0);
n->max_msn_applied_to_node_on_disk = MIN_MSN; // correct value for root node, harmless for others
n->max_msn_applied_to_node_in_memory = MIN_MSN; // correct value for root node, harmless for others
n->dsn = INVALID_DSN; // the owner of the node should take responsibility for properly setting this
n->nodesize = nodesize;
n->flags = flags;
n->thisnodename = nodename;
......@@ -951,11 +996,12 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
fixup_child_estimates(newroot, 0, nodea, TRUE);
fixup_child_estimates(newroot, 1, nodeb, TRUE);
{
MSN msna = nodea->max_msn_applied_to_node_in_memory;
MSN msnb = nodeb->max_msn_applied_to_node_in_memory;
MSN msna = nodea->max_msn_applied_to_node_on_disk;
MSN msnb = nodeb->max_msn_applied_to_node_on_disk;
invariant(msna.msn == msnb.msn);
newroot->max_msn_applied_to_node_in_memory = msna;
newroot->max_msn_applied_to_node_on_disk = msna;
}
newroot->dsn = (nodea->dsn > nodeb->dsn) ? nodea->dsn : nodeb->dsn;
BP_STATE(newroot,0) = PT_AVAIL;
BP_STATE(newroot,1) = PT_AVAIL;
newroot->dirty = 1;
......@@ -981,6 +1027,7 @@ toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children) {
BRTNODE XMALLOC(n);
toku_initialize_empty_brtnode(n, name, height, n_children, t->h->layout_version, t->h->nodesize, t->flags);
assert(n->nodesize > 0);
set_new_DSN_for_node(n, t);
u_int32_t fullhash = toku_cachetable_hash(t->cf, n->thisnodename);
n->fullhash = fullhash;
......@@ -1128,12 +1175,13 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
// Effect: Split a leaf node.
{
BRTNODE B;
DSN dsn = node->dsn;
//printf("%s:%d splitting leaf %" PRIu64 " which is size %u (targetsize = %u)\n", __FILE__, __LINE__, node->thisnodename.b, toku_serialize_brtnode_size(node), node->nodesize);
assert(node->height==0);
assert(node->nodesize>0);
toku_assert_entire_node_in_memory(node);
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_in_memory;
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
......@@ -1215,6 +1263,8 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
);
BLB_NBYTESINBUF(node, split_node) -= diff_size;
BLB_NBYTESINBUF(B, 0) += diff_size;
BLB_MAX_DSN_APPLIED(B,0) = BLB_MAX_DSN_APPLIED(node, split_node);
BLB_MAX_MSN_APPLIED(B,0) = BLB_MAX_MSN_APPLIED(node, split_node);
subtract_estimates(&BP_SUBTREE_EST(node,split_node), &se_diff);
add_estimates(&BP_SUBTREE_EST(B,0), &se_diff);
......@@ -1255,8 +1305,11 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
splitk->flags=0;
}
node->max_msn_applied_to_node_in_memory = max_msn_applied_to_node;
B ->max_msn_applied_to_node_in_memory = max_msn_applied_to_node;
node->max_msn_applied_to_node_on_disk= max_msn_applied_to_node;
B ->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dsn = dsn;
B->dsn = dsn;
node->dirty = 1;
B->dirty = 1;
......@@ -1285,7 +1338,8 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
int old_n_children = node->n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_in_memory;
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
DSN dsn = node->dsn;
BRTNODE B;
assert(node->height>0);
assert(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
......@@ -1333,8 +1387,11 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
}
node->max_msn_applied_to_node_in_memory = max_msn_applied_to_node;
B ->max_msn_applied_to_node_in_memory = max_msn_applied_to_node;
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B ->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dsn = dsn;
B->dsn = dsn;
node->dirty = 1;
B ->dirty = 1;
......@@ -1750,6 +1807,13 @@ brt_leaf_put_cmd (
LEAFENTRY storeddata;
OMTVALUE storeddatav=NULL;
if (cmd->msn.msn <= bn->max_msn_applied.msn) {
// TODO3514 add accountability counter here
return;
}
else {
bn->max_msn_applied = cmd->msn;
}
u_int32_t omt_size;
int r;
......@@ -2119,8 +2183,8 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
//
{
MSN cmd_msn = cmd->msn;
invariant(cmd_msn.msn > node->max_msn_applied_to_node_in_memory.msn);
node->max_msn_applied_to_node_in_memory = cmd_msn;
invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
node->max_msn_applied_to_node_on_disk = cmd_msn;
//TODO: Accessing type directly
switch (cmd->type) {
......@@ -2320,19 +2384,21 @@ maybe_merge_pinned_nodes (BRTNODE parent, int childnum_of_parent, struct kv_pair
// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
{
MSN msn_max;
DSN dsn_max;
assert(a->height == b->height);
toku_assert_entire_node_in_memory(parent);
toku_assert_entire_node_in_memory(a);
toku_assert_entire_node_in_memory(b);
parent->dirty = 1; // just to make sure
{
MSN msna = a->max_msn_applied_to_node_in_memory;
MSN msnb = b->max_msn_applied_to_node_in_memory;
MSN msna = a->max_msn_applied_to_node_on_disk;
MSN msnb = b->max_msn_applied_to_node_on_disk;
msn_max = (msna.msn > msnb.msn) ? msna : msnb;
if (a->height > 0) {
invariant(msn_max.msn <= parent->max_msn_applied_to_node_in_memory.msn); // parent msn must be >= children's msn
invariant(msn_max.msn <= parent->max_msn_applied_to_node_on_disk.msn); // parent msn must be >= children's msn
}
}
dsn_max = (a->dsn > b->dsn) ? a->dsn : b->dsn;
if (a->height == 0) {
maybe_merge_pinned_leaf_nodes(parent, childnum_of_parent, a, b, parent_splitk, did_merge, did_rebalance, splitk);
} else {
......@@ -2341,8 +2407,10 @@ maybe_merge_pinned_nodes (BRTNODE parent, int childnum_of_parent, struct kv_pair
if (*did_merge || *did_rebalance) {
// accurate for leaf nodes because all msgs above have been applied,
// accurate for non-leaf nodes because buffer immediately above each node has been flushed
a->max_msn_applied_to_node_in_memory = msn_max;
b->max_msn_applied_to_node_in_memory = msn_max;
a->max_msn_applied_to_node_on_disk = msn_max;
b->max_msn_applied_to_node_on_disk = msn_max;
a->dsn = dsn_max;
b->dsn = dsn_max;
}
}
......@@ -2546,7 +2614,7 @@ static void assert_leaf_up_to_date(BRTNODE node) {
assert(node->height == 0);
toku_assert_entire_node_in_memory(node);
for (int i=0; i < node->n_children; i++) {
assert(BLB_SOFTCOPYISUPTODATE(node, i));
assert(BLB_MAX_DSN_APPLIED(node, i) >= MIN_DSN);
}
}
......@@ -2763,13 +2831,6 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdone) {
VERIFY_NODE(t, node);
// ignore messages that have already been applied to this leaf
if (cmd->msn.msn <= node->max_msn_applied_to_node_in_memory.msn) {
// TODO3514 add accountability counter here
return;
}
else {
node->max_msn_applied_to_node_in_memory = cmd->msn;
}
if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
......@@ -2834,7 +2895,10 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
uint64_t workdone_ignore = 0; // ignore workdone for root-leaf node
// not up to date, which means the get_and_pin actually fetched it into memory.
toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, &workdone_ignore);
if (made_dirty) node->dirty = 1;
node->dirty = 1;
MSN cmd_msn = cmd->msn;
invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
node->max_msn_applied_to_node_on_disk = cmd_msn;
} else {
brtnode_nonleaf_put_cmd_at_root(brt, node, cmd);
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
......@@ -2876,6 +2940,8 @@ static void apply_cmd_to_in_memory_non_root_leaves (
BRT_MSG cmd,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone
);
......@@ -2885,22 +2951,28 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
BOOL is_root,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone) {
// internal node
if (node->height>0) {
if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum);
if (is_root) // record workdone in root only, if not root then this is a recursive call so just pass along pointer
workdone = &(BP_WORKDONE(node,childnum));
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, workdone);
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone);
}
else if (brt_msg_applies_all(cmd)) {
for (int childnum=0; childnum<node->n_children; childnum++) {
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum);
if (is_root)
workdone = &(BP_WORKDONE(node,childnum));
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, workdone);
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone);
}
}
}
......@@ -2926,15 +2998,24 @@ static void apply_cmd_to_in_memory_non_root_leaves (
BRT_MSG cmd,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone
)
{
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(t->cf, nodenum, fullhash, &node_v); // this one doesn't need to use the toku_pin_brtnode function because it doesn't bring anything in, so it cannot create a non-up-to-date leaf node.
BRTNODE node = NULL;
int r = toku_pin_brtnode_if_clean(
t,
nodenum,
fullhash,
ancestors,
bounds,
&node
);
if (r) { goto exit; }
BRTNODE node = node_v;
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, workdone);
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone);
toku_unpin_brtnode(t, node);
exit:
......@@ -2968,7 +3049,7 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
fill_bfe_for_full_read(&bfe, brt->h);
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &bfe, &node);
toku_assert_entire_node_in_memory(node);
cmd->msn.msn = node->max_msn_applied_to_node_in_memory.msn + 1;
cmd->msn.msn = node->max_msn_applied_to_node_on_disk.msn + 1;
// Note, the lower level function that filters messages based on msn,
// (brt_leaf_put_cmd() or brt_nonleaf_put_cmd()) will capture the msn and
// store it in the relevant node, including the root node. This is how the
......@@ -2980,10 +3061,9 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
push_something_at_root(brt, &node, cmd);
// verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
invariant(cmd->msn.msn == node->max_msn_applied_to_node_in_memory.msn);
invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
if (node->height > 0) {
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(brt, node, cmd, TRUE, NULL, -1, NULL);
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(brt, node, cmd, TRUE, NULL, -1, (ANCESTORS)NULL, &infinite_bounds, NULL);
if (nonleaf_node_is_gorged(node)) {
// No need for a loop here. We only inserted one message, so flushing a single child suffices.
flush_some_child(brt, node, TRUE, TRUE,
......@@ -3426,6 +3506,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) {
BRTNODE XMALLOC(node);
toku_initialize_empty_brtnode(node, blocknum, 0, 1, t->h->layout_version, t->h->nodesize, t->flags);
BP_STATE(node,0) = PT_AVAIL;
set_new_DSN_for_node(node, t);
u_int32_t fullhash = toku_cachetable_hash(t->cf, blocknum);
node->fullhash = fullhash;
......@@ -4569,6 +4650,7 @@ int toku_brt_create(BRT *brt_ptr) {
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_builtin_compare_fun;
brt->update_fun = NULL;
brt->curr_dsn = 1; // start at 1, as 0 is reserved for basement nodes
int r = toku_omt_create(&brt->txns);
if (r!=0) { toku_free(brt); return r; }
*brt_ptr = brt;
......@@ -4912,7 +4994,6 @@ apply_buffer_messages_to_basement_node (
SUBTREE_EST se,
BRTNODE ancestor,
int childnum,
MSN min_applied_msn,
struct pivot_bounds const * const bounds
)
// Effect: For each messages in ANCESTOR that is between lower_bound_exclusive (exclusive) and upper_bound_inclusive (inclusive), apply the message to the node.
......@@ -4946,7 +5027,7 @@ apply_buffer_messages_to_basement_node (
({
DBT hk;
toku_fill_dbt(&hk, key, keylen);
if (msn.msn > min_applied_msn.msn && (!msg_type_has_key(type) || key_is_in_leaf_range(t, &hk, lbe_ptr, ubi_ptr))) {
if ((!msg_type_has_key(type) || key_is_in_leaf_range(t, &hk, lbe_ptr, ubi_ptr))) {
DBT hv;
BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id = {&hk,
toku_fill_dbt(&hv, val, vallen)} };
......@@ -5086,6 +5167,24 @@ apply_ancestors_messages_to_leafnode_and_maybe_flush (BRT t, BASEMENTNODE bm, SU
}
*/
static BOOL
partition_requires_msg_application(BRTNODE node, int childnum, ANCESTORS ancestors) {
BOOL requires_msg_application = FALSE;
if (BP_STATE(node,childnum) != PT_AVAIL) return FALSE;
for (
ANCESTORS curr_ancestors = ancestors;
curr_ancestors;
curr_ancestors = curr_ancestors->next
)
{
if (curr_ancestors->node->dsn > BLB_MAX_DSN_APPLIED(node,childnum)) {
requires_msg_application = TRUE;
break;
}
}
return requires_msg_application;
}
static void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
// Effect:
......@@ -5101,7 +5200,13 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
// need to apply messages to each basement node
// TODO: (Zardosht) cilkify this, watch out for setting of max_msn_applied_to_node
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) != PT_AVAIL || BLB_SOFTCOPYISUPTODATE(node, i)) {
BOOL requires_msg_application = partition_requires_msg_application(
node,
i,
ancestors
);
if (!requires_msg_application) {
continue;
}
update_stats = TRUE;
......@@ -5118,15 +5223,13 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
curr_se,
curr_ancestors->node,
curr_ancestors->childnum,
node->max_msn_applied_to_node_on_disk,
&curr_bounds
);
if (curr_ancestors->node->max_msn_applied_to_node_in_memory.msn > node->max_msn_applied_to_node_in_memory.msn) {
node->max_msn_applied_to_node_in_memory = curr_ancestors->node->max_msn_applied_to_node_in_memory;
}
curr_bn->max_dsn_applied = (curr_ancestors->node->dsn > curr_bn->max_dsn_applied)
? curr_ancestors->node->dsn
: curr_bn->max_dsn_applied;
curr_ancestors= curr_ancestors->next;
}
BLB_SOFTCOPYISUPTODATE(node, i) = TRUE;
}
// Must update the leaf estimates. Might as well use the estimates from the soft copy (even if they make it out to disk), since they are
......@@ -5161,7 +5264,7 @@ brt_search_basement_node(
BRT_CURSOR brtcursor
)
{
assert(bn->soft_copy_is_up_to_date);
assert(bn->max_dsn_applied >= MIN_DSN);
// Now we have to convert from brt_search_t to the heaviside function with a direction. What a pain...
......
......@@ -292,6 +292,7 @@ BOOL toku_brt_is_empty_fast (BRT brt) __attribute__ ((warn_unused_result));
BOOL toku_brt_is_recovery_logging_suppressed (BRT) __attribute__ ((warn_unused_result));
void toku_brt_bn_reset_stats(BRTNODE node, int childnum);
void toku_brt_leaf_reset_calc_leaf_stats(BRTNODE node);
int toku_brt_strerror_r(int error, char *buf, size_t buflen);
......
......@@ -57,9 +57,14 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN;
* Make the MSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_msn { u_int64_t msn; } MSN;
#define ZERO_MSN ((MSN){0}) // dummy used for message construction, to be filled in when msg is applied to tree
#define MIN_MSN ((MSN){(u_int64_t)1<<32}) // first 2**32 values reserved for messages created before Dr. No (for upgrade)
#define MIN_MSN ((MSN){(u_int64_t)1000*1000*1000}) // first 1B values reserved for messages created before Dr. No (for upgrade)
#define MAX_MSN ((MSN){UINT64_MAX})
typedef int64_t DSN; // DESERIALIZATION sequence number
#define INVALID_DSN -1
#define MIN_DSN 0
#define MAX_DSN INT64_MAX
/* At the brt layer, a FILENUM uniquely identifies an open file.
* At the ydb layer, a DICTIONARY_ID uniquely identifies an open dictionary.
* With the introduction of the loader (ticket 2216), it is possible for the file that holds
......
......@@ -146,7 +146,6 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
int fd = open(__FILE__ ".brt", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = 4*(1<<20);
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -154,6 +153,7 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = nrows;
sn.dirty = 1;
LEAFENTRY les[nrows];
{
......@@ -267,7 +267,6 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
int fd = open(__FILE__ ".brt", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = 4*(1<<20);
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -275,6 +274,7 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = 1;
sn.dirty = 1;
LEAFENTRY les[nrows];
{
......@@ -382,7 +382,6 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
int fd = open(__FILE__ ".brt", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = 4*(1<<20);
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -390,6 +389,7 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = 1;
sn.dirty = 1;
LEAFENTRY les[7];
{
......@@ -503,7 +503,6 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
int r;
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = nodesize;
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -511,6 +510,7 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = 7;
sn.dirty = 1;
LEAFENTRY elts[3];
elts[0] = le_malloc("a", "aval");
elts[1] = le_malloc("b", "bval");
......@@ -628,7 +628,6 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
int r;
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = nodesize;
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -636,6 +635,7 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = 4;
sn.dirty = 1;
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(sn.n_children-1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("A", 2, 0, 0);
......@@ -737,7 +737,6 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
int r;
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
sn.nodesize = nodesize;
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -745,6 +744,7 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
sn.n_children = 2;
sn.dirty = 1;
LEAFENTRY elts[3];
elts[0] = le_malloc("a", "aval");
elts[1] = le_malloc("b", "bval");
......@@ -862,10 +862,9 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
// source_brt.fd=fd;
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.max_msn_applied_to_node_in_memory.msn = 0;
char *hello_string;
sn.max_msn_applied_to_node_on_disk.msn = TESTMSNDSKVAL;
sn.max_msn_applied_to_node_in_memory.msn = TESTMSNMEMVAL;
//sn.max_msn_applied_to_node_in_memory.msn = TESTMSNMEMVAL;
sn.nodesize = nodesize;
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
......@@ -873,6 +872,7 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 1;
sn.n_children = 2;
sn.dirty = 1;
hello_string = toku_strdup("hello");
MALLOC_N(2, sn.bp);
MALLOC_N(1, sn.childkeys);
......@@ -938,14 +938,14 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
assert(sn.max_msn_applied_to_node_on_disk.msn == TESTMSNMEMVAL);
assert(sn.max_msn_applied_to_node_in_memory.msn == TESTMSNMEMVAL);
//assert(sn.max_msn_applied_to_node_on_disk.msn == TESTMSNMEMVAL);
//assert(sn.max_msn_applied_to_node_in_memory.msn == TESTMSNMEMVAL);
setup_dn(bft, fd, brt_h, &dn);
assert(dn->thisnodename.b==20);
assert(dn->max_msn_applied_to_node_on_disk.msn == TESTMSNMEMVAL);
assert(dn->max_msn_applied_to_node_in_memory.msn == TESTMSNMEMVAL);
//assert(dn->max_msn_applied_to_node_on_disk.msn == TESTMSNMEMVAL);
//assert(dn->max_msn_applied_to_node_in_memory.msn == TESTMSNMEMVAL);
assert(dn->layout_version ==BRT_LAYOUT_VERSION);
assert(dn->layout_version_original ==BRT_LAYOUT_VERSION);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment