Commit d4b6b7ef authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:3889] [t:3890] merging IndexedAndVersionedBuffers to mainline

git-svn-id: file:///svn/toku/tokudb@34348 c7de825b-a66e-492c-adef-691d508d4ae1
parent 87b48841
......@@ -229,11 +229,19 @@ toku_fifo_entry_key_msn_cmp(void *extrap, const void *ap, const void *bp);
// data of an available partition of a nonleaf brtnode
struct brtnode_nonleaf_childinfo {
FIFO buffer;
OMT broadcast_buffer;
OMT message_tree;
OMT broadcast_list;
OMT fresh_message_tree;
OMT stale_message_tree;
unsigned int n_bytes_in_buffer; /* How many bytes are in each buffer (including overheads for the disk-representation) */
};
unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc);
int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc);
long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc);
int toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, bool is_fresh, DB *cmp_extra, brt_compare_func cmp);
void toku_bnc_empty(NONLEAF_CHILDINFO bnc);
int toku_bnc_flush_to_child(BRT t, NONLEAF_CHILDINFO bnc, BRTNODE child);
// data of an available partition of a leaf brtnode
struct brtnode_leaf_basement_node {
uint32_t optimized_for_upgrade; // version number to which this leaf has been optimized, zero if never optimized for upgrade
......@@ -241,6 +249,7 @@ struct brtnode_leaf_basement_node {
unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */
unsigned int seqinsert; /* number of sequential inserts to this leaf */
MSN max_msn_applied; // max message sequence number applied
bool stale_ancestor_messages_applied;
};
#define PT_INVALID 0
......@@ -401,12 +410,6 @@ static inline void set_BSB(BRTNODE node, int i, SUB_BLOCK sb) {
p->u.subblock = sb;
}
// macros for brtnode_nonleaf_childinfo
#define BNC_BUFFER(node,i) (BNC(node,i)->buffer)
#define BNC_BROADCAST_BUFFER(node,i) (BNC(node,i)->broadcast_buffer)
#define BNC_MESSAGE_TREE(node, i) (BNC(node,i)->message_tree)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
// brtnode leaf basementnode macros,
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (BLB(node,i)->optimized_for_upgrade)
#define BLB_MAX_MSN_APPLIED(node,i) (BLB(node,i)->max_msn_applied)
......@@ -549,7 +552,7 @@ void toku_assert_entire_node_in_memory(BRTNODE node);
void toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize);
// append a cmd to a nonleaf node child buffer
void toku_brt_append_to_child_buffer(BRT brt, BRTNODE node, int childnum, int type, MSN msn, XIDS xids, const DBT *key, const DBT *val);
void toku_brt_append_to_child_buffer(BRT brt, BRTNODE node, int childnum, int type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val);
#if 1
#define DEADBEEF ((void*)0xDEADBEEF)
......
......@@ -238,7 +238,7 @@ serialize_brtnode_partition_size (BRTNODE node, int i)
result++; // Byte that states what the partition is
if (node->height > 0) {
result += 4; // size of bytes in buffer table
result += BNC_NBYTESINBUF(node, i);
result += toku_bnc_nbytesinbuf(BNC(node, i));
}
else {
result += 4; // n_entries in buffer table
......@@ -252,6 +252,25 @@ serialize_brtnode_partition_size (BRTNODE node, int i)
#define BRTNODE_PARTITION_OMT_LEAVES 0xaa
#define BRTNODE_PARTITION_FIFO_MSG 0xbb
static void
serialize_nonleaf_childinfo(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
{
unsigned char ch = BRTNODE_PARTITION_FIFO_MSG;
wbuf_nocrc_char(wb, ch);
// serialize the FIFO, first the number of entries, then the elements
wbuf_nocrc_int(wb, toku_bnc_n_entries(bnc));
FIFO_ITERATE(
bnc->buffer, key, keylen, data, datalen, type, msn, xids, UU(is_fresh),
{
invariant((int)type>=0 && type<256);
wbuf_nocrc_char(wb, (unsigned char)type);
wbuf_MSN(wb, msn);
wbuf_nocrc_xids(wb, xids);
wbuf_nocrc_bytes(wb, key, keylen);
wbuf_nocrc_bytes(wb, data, datalen);
});
}
//
// Serialize the i'th partition of node into sb
// For leaf nodes, this would be the i'th basement node
......@@ -270,19 +289,7 @@ serialize_brtnode_partition(BRTNODE node, int i, struct sub_block *sb) {
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
if (node->height > 0) {
// TODO: (Zardosht) possibly exit early if there are no messages
unsigned char ch = BRTNODE_PARTITION_FIFO_MSG;
wbuf_nocrc_char(&wb, ch);
// serialize the FIFO, first the number of entries, then the elements
wbuf_nocrc_int(&wb, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, msn, xids,
{
invariant((int)type>=0 && type<256);
wbuf_nocrc_char(&wb, (unsigned char)type);
wbuf_MSN(&wb, msn);
wbuf_nocrc_xids(&wb, xids);
wbuf_nocrc_bytes(&wb, key, keylen);
wbuf_nocrc_bytes(&wb, data, datalen);
});
serialize_nonleaf_childinfo(BNC(node, i), &wb);
}
else {
unsigned char ch = BRTNODE_PARTITION_OMT_LEAVES;
......@@ -857,7 +864,7 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_h
}
static void
deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf,
deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
DB *cmp_extra, brt_compare_func cmp) {
int r;
int n_bytes_in_buffer = 0;
......@@ -896,7 +903,7 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf,
} else {
dest = NULL;
}
r = toku_fifo_enq(BNC_BUFFER(node, cnum), key, keylen, val, vallen, type, msn, xids, dest); /* Copies the data into the fifo */
r = toku_fifo_enq(bnc->buffer, key, keylen, val, vallen, type, msn, xids, true, dest); /* Copies the data into the fifo */
lazy_assert_zero(r);
n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
//printf("Inserted\n");
......@@ -905,18 +912,17 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf,
invariant(rbuf->ndone == rbuf->size);
if (cmp) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .cmp_extra = cmp_extra, .cmp = cmp, .fifo = BNC_BUFFER(node, cnum) };
struct toku_fifo_entry_key_msn_cmp_extra extra = { .cmp_extra = cmp_extra, .cmp = cmp, .fifo = bnc->buffer };
r = mergesort_r(offsets, noffsets, sizeof offsets[0], &extra, toku_fifo_entry_key_msn_cmp);
assert_zero(r);
toku_omt_destroy(&BNC_MESSAGE_TREE(node, cnum));
r = toku_omt_create_steal_sorted_array(&BNC_MESSAGE_TREE(node, cnum), &offsets, noffsets, n_in_this_buffer);
toku_omt_destroy(&bnc->fresh_message_tree);
r = toku_omt_create_steal_sorted_array(&bnc->fresh_message_tree, &offsets, noffsets, n_in_this_buffer);
assert_zero(r);
toku_omt_destroy(&BNC_BROADCAST_BUFFER(node, cnum));
r = toku_omt_create_steal_sorted_array(&BNC_BROADCAST_BUFFER(node, cnum), &broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
toku_omt_destroy(&bnc->broadcast_list);
r = toku_omt_create_steal_sorted_array(&bnc->broadcast_list, &broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
assert_zero(r);
}
BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer;
BP_WORKDONE(node, cnum) = 0;
bnc->n_bytes_in_buffer = n_bytes_in_buffer;
}
// dump a buffer to stderr
......@@ -970,18 +976,17 @@ BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
bn->n_bytes_in_buffer = 0;
bn->seqinsert = 0;
bn->optimized_for_upgrade = 0;
bn->stale_ancestor_messages_applied = false;
return bn;
}
NONLEAF_CHILDINFO toku_create_empty_nl(void) {
NONLEAF_CHILDINFO XMALLOC(cn);
cn->n_bytes_in_buffer = 0;
int r = toku_fifo_create(&cn->buffer);
assert_zero(r);
r = toku_omt_create(&cn->message_tree);
assert_zero(r);
r = toku_omt_create(&cn->broadcast_buffer);
assert_zero(r);
int r = toku_fifo_create(&cn->buffer); assert_zero(r);
r = toku_omt_create(&cn->fresh_message_tree); assert_zero(r);
r = toku_omt_create(&cn->stale_message_tree); assert_zero(r);
r = toku_omt_create(&cn->broadcast_list); assert_zero(r);
return cn;
}
......@@ -997,8 +1002,9 @@ void destroy_basement_node (BASEMENTNODE bn)
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
{
toku_fifo_free(&nl->buffer);
toku_omt_destroy(&nl->message_tree);
toku_omt_destroy(&nl->broadcast_buffer);
toku_omt_destroy(&nl->fresh_message_tree);
toku_omt_destroy(&nl->stale_message_tree);
toku_omt_destroy(&nl->broadcast_list);
toku_free(nl);
}
......@@ -1233,7 +1239,8 @@ deserialize_brtnode_partition(
if (node->height > 0) {
unsigned char ch = rbuf_char(&rb);
assert(ch == BRTNODE_PARTITION_FIFO_MSG);
deserialize_child_buffer(node, index, &rb, cmp_extra, cmp);
deserialize_child_buffer(BNC(node, index), &rb, cmp_extra, cmp);
BP_WORKDONE(node, index) = 0;
}
else {
unsigned char ch = rbuf_char(&rb);
......
......@@ -171,6 +171,13 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
return 0;
}
static int
testhelper_string_key_cmp(DB *UU(e), const DBT *a, const DBT *b)
{
char *s = a->data, *t = b->data;
return strcmp(s, t);
}
int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_type cmdtype, char *key, int keylen, char *val, int vallen) {
void *node_v;
int r;
......@@ -205,14 +212,12 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
XIDS xids_0 = xids_get_root_xids();
MSN msn = next_dummymsn();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, msn, xids_0, NULL);
r = toku_bnc_insert_msg(BNC(node, childnum), key, keylen, val, vallen, cmdtype, msn, xids_0, true, NULL, testhelper_string_key_cmp);
assert(r==0);
// Hack to get the test working. The problem is that this test
// is directly queueing something in a FIFO instead of
// using brt APIs.
node->max_msn_applied_to_node_on_disk = msn;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0);
BNC_NBYTESINBUF(node, childnum) += sizediff;
node->dirty = 1;
toku_unpin_brtnode(brt, node_v);
......
......@@ -172,7 +172,8 @@ toku_verify_brtnode (BRT brt,
if (node->height > 0) {
MSN lastmsn = ZERO_MSN;
// Verify that messages in the buffers are in the right place.
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, msn, xid,
NONLEAF_CHILDINFO bnc = BNC(node, i);
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, type, msn, xid, UU(is_fresh),
{
int r = verify_msg_in_child_buffer(brt, type, msn, key, keylen, data, datalen, xid,
curr_less_pivot,
......
......@@ -199,6 +199,12 @@ get_node_reactivity (BRTNODE node) {
return get_nonleaf_reactivity(node);
}
unsigned int
toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc)
{
return bnc->n_bytes_in_buffer;
}
// return TRUE if the size of the buffers plus the amount of work done is large enough. (But return false if there is nothing to be flushed (the buffers empty)).
static bool
nonleaf_node_is_gorged (BRTNODE node) {
......@@ -218,7 +224,7 @@ nonleaf_node_is_gorged (BRTNODE node) {
size += BP_WORKDONE(node, child);
}
for (int child = 0; child < node->n_children; ++child) {
if (BNC_NBYTESINBUF(node, child) > 0) {
if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
buffers_are_empty = FALSE;
break;
}
......@@ -228,7 +234,7 @@ nonleaf_node_is_gorged (BRTNODE node) {
(!buffers_are_empty));
}
static void brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd);
static void brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, bool is_fresh);
static void
......@@ -400,6 +406,12 @@ brt_leaf_check_leaf_stats (BRTNODE node)
// assert(node->u.l.leaf_stats.exact);
}
int
toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)
{
return toku_fifo_n_entries(bnc->buffer);
}
// This should be done incrementally in most cases.
static void
fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL dirty_it)
......@@ -421,7 +433,7 @@ fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL d
if (!child_se->exact) estimates.exact = FALSE;
if (child->height>0) {
if (BP_STATE(child,i) != PT_AVAIL ||
toku_fifo_n_entries(BNC_BUFFER(child,i))!=0)
toku_bnc_n_entries(BNC(child,i))!=0)
{
estimates.exact=FALSE;
}
......@@ -493,17 +505,21 @@ fetch_from_buf (OMT omt, u_int32_t idx) {
return (LEAFENTRY)v;
}
long
toku_bnc_memory_size(NONLEAF_CHILDINFO bnc)
{
return (sizeof(*bnc) +
toku_fifo_memory_size(bnc->buffer) +
toku_omt_memory_size(bnc->fresh_message_tree) +
toku_omt_memory_size(bnc->stale_message_tree) +
toku_omt_memory_size(bnc->broadcast_list));
}
static long
get_avail_internal_node_partition_size(BRTNODE node, int i)
{
long retval = 0;
assert(node->height > 0);
NONLEAF_CHILDINFO childinfo = BNC(node, i);
retval += sizeof(*childinfo);
retval += toku_fifo_memory_size(BNC_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_BROADCAST_BUFFER(node, i));
retval += toku_omt_memory_size(BNC_MESSAGE_TREE(node, i));
return retval;
return toku_bnc_memory_size(BNC(node, i));
}
static long
......@@ -1605,7 +1621,7 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(childa);
toku_assert_entire_node_in_memory(childb);
int old_count = BNC_NBYTESINBUF(node, childnum);
int old_count = toku_bnc_nbytesinbuf(BNC(node, childnum));
assert(old_count==0);
int cnum;
WHEN_NOT_GCOV(
......@@ -1689,7 +1705,7 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react, ANCESTORS a
}
assert(node->height>0);
BRTNODE child;
assert(BNC_NBYTESINBUF(node, childnum)==0); // require that the buffer for this child is empty
assert(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
{
// For now, don't use toku_pin_brtnode since we aren't yet prepared to deal with the TRY_AGAIN, and we don't have to apply all the messages above to do this split operation.
struct ancestors next_ancestors = {node, childnum, ancestors};
......@@ -2218,36 +2234,44 @@ toku_fifo_entry_key_msn_cmp(void *extrap, const void *ap, const void *bp)
extra->cmp_extra, extra->cmp);
}
// append a cmd to a nonleaf node's child buffer
// should be static, but used by test programs
void
toku_brt_append_to_child_buffer(BRT brt, BRTNODE node, int childnum, int type, MSN msn, XIDS xids, const DBT *key, const DBT *val) {
assert(BP_STATE(node,childnum) == PT_AVAIL);
int diff = key->size + val->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
int
toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, bool is_fresh, DB *cmp_extra, brt_compare_func cmp)
{
int diff = keylen + datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
long offset;
int r = toku_fifo_enq(BNC_BUFFER(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, &offset);
assert_zero(r);
int r = toku_fifo_enq(bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, &offset); assert_zero(r);
enum brt_msg_type etype = (enum brt_msg_type) type;
if (brt_msg_type_applies_once(etype)) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = { .cmp_extra = brt->db, .cmp = brt->compare_fun, .fifo = BNC_BUFFER(node, childnum), .key = key->data, .keylen = key->size, .msn = msn };
r = toku_omt_insert(BNC_MESSAGE_TREE(node, childnum), (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &extra, NULL);
assert_zero(r);
struct toku_fifo_entry_key_msn_heaviside_extra extra = { .cmp_extra = cmp_extra, .cmp = cmp, .fifo = bnc->buffer, .key = key, .keylen = keylen, .msn = msn };
if (is_fresh) {
r = toku_omt_insert(bnc->fresh_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &extra, NULL); assert_zero(r);
} else {
r = toku_omt_insert(bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &extra, NULL); assert_zero(r);
}
} else if (brt_msg_type_applies_all(etype) || brt_msg_type_does_nothing(etype)) {
u_int32_t idx = toku_omt_size(BNC_BROADCAST_BUFFER(node, childnum));
r = toku_omt_insert_at(BNC_BROADCAST_BUFFER(node, childnum), (OMTVALUE) offset, idx);
assert_zero(r);
u_int32_t idx = toku_omt_size(bnc->broadcast_list);
r = toku_omt_insert_at(bnc->broadcast_list, (OMTVALUE) offset, idx); assert_zero(r);
} else {
assert(FALSE);
}
BNC_NBYTESINBUF(node, childnum) += diff;
bnc->n_bytes_in_buffer += diff;
return r;
}
// append a cmd to a nonleaf node's child buffer
// should be static, but used by test programs
void
toku_brt_append_to_child_buffer(BRT brt, BRTNODE node, int childnum, int type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
assert(BP_STATE(node,childnum) == PT_AVAIL);
int r = toku_bnc_insert_msg(BNC(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, is_fresh, brt->db, brt->compare_fun); assert_zero(r);
node->dirty = 1;
}
static void brt_nonleaf_cmd_once_to_child (BRT brt, BRTNODE node, unsigned int childnum, BRT_MSG cmd)
static void brt_nonleaf_cmd_once_to_child (BRT brt, BRTNODE node, unsigned int childnum, BRT_MSG cmd, bool is_fresh)
// Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
// Also we don't worry about the node getting overfull here. It's the caller's problem.
{
toku_brt_append_to_child_buffer(brt, node, childnum, cmd->type, cmd->msn, cmd->xids, cmd->u.id.key, cmd->u.id.val);
toku_brt_append_to_child_buffer(brt, node, childnum, cmd->type, cmd->msn, cmd->xids, is_fresh, cmd->u.id.key, cmd->u.id.val);
}
/* find the leftmost child that may contain the key */
......@@ -2307,7 +2331,7 @@ unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
#endif
}
static void brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_MSG cmd)
static void brt_nonleaf_cmd_once(BRT t, BRTNODE node, BRT_MSG cmd, bool is_fresh)
// Effect: Insert a message into a nonleaf. We may put it into a child, possibly causing the child to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to reactivity of any modified child.
......@@ -2317,18 +2341,18 @@ static void brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_MSG cmd)
//TODO: accesses key, val directly
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t->db, t->compare_fun);
brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd);
brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, is_fresh);
}
static void
brt_nonleaf_cmd_all (BRT t, BRTNODE node, BRT_MSG cmd)
brt_nonleaf_cmd_all (BRT t, BRTNODE node, BRT_MSG cmd, bool is_fresh)
// Effect: Put the cmd into a nonleaf node. We put it into all children, possibly causing the children to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
{
int i;
for (i = 0; i < node->n_children; i++) {
brt_nonleaf_cmd_once_to_child(t, node, i, cmd);
brt_nonleaf_cmd_once_to_child(t, node, i, cmd, is_fresh);
}
}
......@@ -2351,7 +2375,7 @@ brt_msg_does_nothing(BRT_MSG cmd)
}
static void
brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, bool is_fresh)
// Effect: Put the cmd into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
......@@ -2369,7 +2393,7 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
case BRT_UPDATE:
brt_nonleaf_cmd_once(t, node, cmd);
brt_nonleaf_cmd_once(t, node, cmd, is_fresh);
return;
case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
......@@ -2377,7 +2401,7 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
case BRT_OPTIMIZE:
case BRT_OPTIMIZE_FOR_UPGRADE:
case BRT_UPDATE_BROADCAST_ALL:
brt_nonleaf_cmd_all (t, node, cmd); // send message to all children
brt_nonleaf_cmd_all (t, node, cmd, is_fresh); // send message to all children
return;
case BRT_NONE:
return;
......@@ -2609,11 +2633,11 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
const struct pivot_bounds next_bounds_a = next_pivot_keys(node, childnuma, bounds);
const struct pivot_bounds next_bounds_b = next_pivot_keys(node, childnumb, bounds);
if (toku_fifo_n_entries(BNC_BUFFER(node,childnuma))>0) {
if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
enum reactivity ignore;
flush_this_child(t, node, childnuma, &ignore, FALSE, FALSE, ancestors, &next_bounds_a);
}
if (toku_fifo_n_entries(BNC_BUFFER(node,childnumb))>0) {
if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
enum reactivity ignore;
flush_this_child(t, node, childnumb, &ignore, FALSE, FALSE, ancestors, &next_bounds_b);
}
......@@ -2738,20 +2762,21 @@ brt_handle_maybe_reactive_root (BRT brt, CACHEKEY *rootp, BRTNODE *nodep) {
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0) + BP_WORKDONE(node, 0);
int max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->n_children>0);
for (i=1; i<node->n_children; i++) {
if (BP_WORKDONE(node,i))
assert (BNC_NBYTESINBUF(node,i));
int this_weight = BNC_NBYTESINBUF(node,i) + BP_WORKDONE(node,i);;
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
if (BP_WORKDONE(node,i)) {
assert(toku_bnc_nbytesinbuf(BNC(node,i)) > 0);
}
int this_weight = toku_bnc_nbytesinbuf(BNC(node,i)) + BP_WORKDONE(node,i);;
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
......@@ -2772,13 +2797,13 @@ flush_some_child (BRT t, BRTNODE node, BOOL is_first_flush, BOOL flush_recursive
toku_assert_entire_node_in_memory(node);
int childnum;
find_heaviest_child(node, &childnum);
assert(toku_fifo_n_entries(BNC_BUFFER(node, childnum))>0);
assert(toku_bnc_n_entries(BNC(node, childnum))>0);
enum reactivity child_re = RE_STABLE;
flush_this_child (t, node, childnum, &child_re, is_first_flush, flush_recursively,
ancestors, bounds);
flush_this_child(t, node, childnum, &child_re, is_first_flush, flush_recursively,
ancestors, bounds);
BOOL did_react;
brt_handle_maybe_reactive_child(t, node, childnum, child_re, &did_react,
ancestors, bounds);
ancestors, bounds);
}
static void assert_leaf_up_to_date(BRTNODE node) {
......@@ -2786,6 +2811,35 @@ static void assert_leaf_up_to_date(BRTNODE node) {
toku_assert_entire_node_in_memory(node);
}
void
toku_bnc_empty(NONLEAF_CHILDINFO bnc)
{
bnc->n_bytes_in_buffer = 0;
toku_fifo_empty(bnc->buffer);
toku_fifo_size_is_stabilized(bnc->buffer);
toku_omt_destroy(&bnc->fresh_message_tree);
int r = toku_omt_create(&bnc->fresh_message_tree); resource_assert_zero(r);
toku_omt_destroy(&bnc->stale_message_tree);
r = toku_omt_create(&bnc->stale_message_tree); resource_assert_zero(r);
toku_omt_destroy(&bnc->broadcast_list);
r = toku_omt_create(&bnc->broadcast_list); resource_assert_zero(r);
}
int
toku_bnc_flush_to_child(BRT t, NONLEAF_CHILDINFO bnc, BRTNODE child)
{
assert(toku_fifo_n_entries(bnc->buffer)>0);
FIFO_ITERATE(
bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
({
DBT hk,hv;
BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
brtnode_put_cmd(t, child, &brtcmd, is_fresh);
}));
return 0;
}
static void
flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, BOOL is_first_flush, BOOL flush_recursively,
ANCESTORS ancestors, struct pivot_bounds const * const bounds)
......@@ -2795,6 +2849,7 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
// we are allowed to flush only one child.
// For this version, flush_this_child cannot release the lock during I/O, but it does need the ancestor information so that it can apply messages when a page comes in.
{
int r;
toku_assert_entire_node_in_memory(node);
struct ancestors next_ancestors = {node, childnum, ancestors};
const struct pivot_bounds next_bounds = next_pivot_keys(node, childnum, bounds);
......@@ -2811,83 +2866,21 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
assert(child->thisnodename.b!=0);
VERIFY_NODE(t, child);
FIFO fifo = BNC_BUFFER(node,childnum);
int r;
if (child->height==0) {
// The child is a leaf node.
assert_leaf_up_to_date(child); // The child has all the messages applied to it.
// We've arranged that the path from the root to this child is empty, except for the childnum fifo in node.
// We must empty the fifo, and arrange for the child to be written to disk, and then mark it as clean and up-to-date.
bytevec key, val;
ITEMLEN keylen, vallen;
u_int32_t type;
MSN msn;
XIDS xids;
while(0==toku_fifo_peek(fifo, &key, &keylen, &val, &vallen, &type, &msn, &xids)) {
int n_bytes_removed = (keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
r = toku_fifo_deq(fifo);
assert(r==0);
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
}
toku_fifo_size_is_stabilized(fifo);
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
toku_omt_destroy(&BNC_MESSAGE_TREE(node, childnum));
r = toku_omt_create(&BNC_MESSAGE_TREE(node, childnum)); resource_assert_zero(r);
toku_omt_destroy(&BNC_BROADCAST_BUFFER(node, childnum));
r = toku_omt_create(&BNC_BROADCAST_BUFFER(node, childnum)); resource_assert_zero(r);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
node->dirty=TRUE;
child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE);
*child_re = get_node_reactivity(child);
toku_unpin_brtnode(t, child);
toku_bnc_empty(BNC(node, childnum));
} else {
bytevec key,val;
ITEMLEN keylen, vallen;
//printf("%s:%d Try random_pick, weight=%d \n", __FILE__, __LINE__, BNC_NBYTESINBUF(node, childnum));
assert(toku_fifo_n_entries(fifo)>0);
u_int32_t type;
MSN msn;
XIDS xids;
while(0==toku_fifo_peek(fifo, &key, &keylen, &val, &vallen, &type, &msn, &xids)) {
DBT hk,hv;
//TODO: Factor out (into a function) conversion of fifo_entry to message
BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
int n_bytes_removed = (hk.size + hv.size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
//printf("%s:%d random_picked\n", __FILE__, __LINE__);
brtnode_put_cmd (t, child, &brtcmd);
//printf("%s:%d %d=push_a_brt_cmd_down=(); child_did_split=%d (weight=%d)\n", __FILE__, __LINE__, r, child_did_split, BNC_NBYTESINBUF(node, childnum));
{
r = toku_fifo_deq(fifo);
//printf("%s:%d deleted status=%d\n", __FILE__, __LINE__, r);
assert(r==0);
}
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
node->dirty = 1;
}
toku_fifo_size_is_stabilized(fifo);
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
toku_omt_destroy(&BNC_MESSAGE_TREE(node, childnum));
r = toku_omt_create(&BNC_MESSAGE_TREE(node, childnum)); resource_assert_zero(r);
toku_omt_destroy(&BNC_BROADCAST_BUFFER(node, childnum));
r = toku_omt_create(&BNC_BROADCAST_BUFFER(node, childnum)); resource_assert_zero(r);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
r = toku_bnc_flush_to_child(t, BNC(node, childnum), child); assert_zero(r);
toku_bnc_empty(BNC(node, childnum));
}
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
node->dirty = TRUE;
child->dirty = TRUE;
if (child->height > 0) {
// Having pushed all that stuff to a child, do we need to flush the child? We may have to flush it many times if there were lots of messages that just got pushed down.
// If we were to only flush one child, we could possibly end up with a very big node after a while.
// This repeated flushing can cause some inserts to take a long time (possibly walking all over the tree).
......@@ -2901,11 +2894,11 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
n_flushed++;
}
}
fixup_child_estimates(node, childnum, child, TRUE);
// Now it's possible that the child needs to be merged or split.
*child_re = get_node_reactivity(child);
toku_unpin_brtnode(t, child);
}
fixup_child_estimates(node, childnum, child, TRUE);
*child_re = get_node_reactivity(child);
toku_unpin_brtnode(t, child);
}
......@@ -2971,7 +2964,7 @@ flush_this_height1_child (BRT t, BRTNODE node, int childnum, BRTNODE child)
static void
brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, bool is_fresh)
// Effect: Push CMD into the subtree rooted at NODE.
// If NODE is a leaf, then
// put CMD into leaf, applying it to the leafentries
......@@ -2988,7 +2981,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
// Do nothing
} else {
brt_nonleaf_put_cmd(t, node, cmd);
brt_nonleaf_put_cmd(t, node, cmd, is_fresh);
}
}
......@@ -2996,15 +2989,15 @@ static const struct pivot_bounds infinite_bounds = {.lower_bound_exclusive=NULL,
.upper_bound_inclusive=NULL};
static void
brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
// Effect: Push CMD into the subtree rooted at nonleaf NODE, and indicate whether as a result NODE should split or should merge.
// Push the cmd in the relevant child's (or children's) FIFOs.
// Push the cmd in the relevant child's (or children's) FIFOs.
// The node may get too full or something. It's the caller job to fix that up.
// Requires: node is not a leaf.
{
assert(node->height>0);
toku_assert_entire_node_in_memory(node);
brt_nonleaf_put_cmd(t, node, cmd);
brt_nonleaf_put_cmd(t, node, cmd, true);
}
......@@ -4993,14 +4986,6 @@ is_le_val_del(LEAFENTRY le, BRT_CURSOR brtcursor) {
return rval;
}
static BOOL
key_is_in_leaf_range (BRT t, const DBT *key, DBT const * const lower_bound_exclusive, DBT const * const upper_bound_inclusive) {
return
((lower_bound_exclusive == NULL) || (t->compare_fun(t->db, lower_bound_exclusive, key) < 0))
&&
((upper_bound_inclusive == NULL) || (t->compare_fun(t->db, key, upper_bound_inclusive) <= 0));
}
static const DBT zero_dbt = {0,0,0,0};
static void search_save_bound (brt_search_t *search, DBT *pivot) {
......@@ -5029,25 +5014,19 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DB *cmp_extra, brt_co
}
}
static BOOL msg_type_has_key (enum brt_msg_type m) {
switch (m) {
case BRT_NONE:
case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
case BRT_OPTIMIZE:
case BRT_OPTIMIZE_FOR_UPGRADE:
case BRT_UPDATE_BROADCAST_ALL:
return FALSE;
case BRT_INSERT:
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
case BRT_INSERT_NO_OVERWRITE:
case BRT_UPDATE:
return TRUE;
}
assert(0);
static int
move_to_stale(OMTVALUE v, u_int32_t UU(idx), BRT brt, NONLEAF_CHILDINFO bnc)
{
// we actually only copy to stale, and then delete messages out of
// fresh later on, because we call this during an iteration over fresh
const long offset = (long) v;
struct fifo_entry *entry = (struct fifo_entry *) toku_fifo_get_entry(bnc->buffer, offset);
entry->is_fresh = false;
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .cmp_extra = brt->db, .cmp = brt->compare_fun, .fifo = bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn };
int r = toku_omt_insert(bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL); assert_zero(r);
return r;
}
struct store_fifo_offset_extra {
......@@ -5065,6 +5044,21 @@ store_fifo_offset(OMTVALUE v, u_int32_t UU(idx), void *extrap)
return 0;
}
struct store_fifo_offset_and_move_to_stale_extra {
BRT brt;
struct store_fifo_offset_extra *sfo_extra;
NONLEAF_CHILDINFO bnc;
};
static int
store_fifo_offset_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct store_fifo_offset_and_move_to_stale_extra *extra = extrap;
int r = store_fifo_offset(v, idx, extra->sfo_extra); assert_zero(r);
r = move_to_stale(v, idx, extra->brt, extra->bnc); assert_zero(r);
return r;
}
static int
fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb)
{
......@@ -5077,7 +5071,7 @@ fifo_offset_msn_cmp(void *extrap, const void *va, const void *vb)
}
static void
do_brt_leaf_put_cmd(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRTNODE ancestor, int childnum, DBT *lbe_ptr, DBT *ubi_ptr, MSN *max_msn_applied, const struct fifo_entry *entry)
do_brt_leaf_put_cmd(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRTNODE ancestor, int childnum, MSN *max_msn_applied, const struct fifo_entry *entry)
{
ITEMLEN keylen = entry->keylen;
ITEMLEN vallen = entry->vallen;
......@@ -5089,7 +5083,6 @@ do_brt_leaf_put_cmd(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRTNODE ancestor, in
DBT hk;
toku_fill_dbt(&hk, key, keylen);
assert(!msg_type_has_key(type) || key_is_in_leaf_range(t, &hk, lbe_ptr, ubi_ptr));
DBT hv;
BRT_MSG_S brtcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } };
bool made_change;
......@@ -5112,8 +5105,6 @@ struct iterate_do_brt_leaf_put_cmd_extra {
SUBTREE_EST se;
BRTNODE ancestor;
int childnum;
DBT *lbe_ptr;
DBT *ubi_ptr;
MSN *max_msn_applied;
};
......@@ -5122,123 +5113,149 @@ iterate_do_brt_leaf_put_cmd(OMTVALUE v, u_int32_t UU(idx), void *extrap)
{
struct iterate_do_brt_leaf_put_cmd_extra *e = extrap;
const long offset = (long) v;
const struct fifo_entry *entry = toku_fifo_get_entry(BNC_BUFFER(e->ancestor, e->childnum), offset);
do_brt_leaf_put_cmd(e->t, e->bn, e->se, e->ancestor, e->childnum, e->lbe_ptr, e->ubi_ptr, e->max_msn_applied, entry);
NONLEAF_CHILDINFO bnc = BNC(e->ancestor, e->childnum);
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset);
do_brt_leaf_put_cmd(e->t, e->bn, e->se, e->ancestor, e->childnum, e->max_msn_applied, entry);
return 0;
}
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra {
BRT brt;
struct iterate_do_brt_leaf_put_cmd_extra *iter_extra;
NONLEAF_CHILDINFO bnc;
};
static int
apply_buffer_messages_to_basement_node (
BRT t,
BASEMENTNODE bn,
SUBTREE_EST se,
BRTNODE ancestor,
int childnum,
struct pivot_bounds const * const bounds
iterate_do_brt_leaf_put_cmd_and_move_to_stale(OMTVALUE v, u_int32_t idx, void *extrap)
{
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra *e = extrap;
int r = iterate_do_brt_leaf_put_cmd(v, idx, e->iter_extra); assert_zero(r);
r = move_to_stale(v, idx, e->brt, e->bnc); assert_zero(r);
return r;
}
static void
bnc_find_iterate_bounds(
DB *cmp_extra,
brt_compare_func cmp,
OMT message_tree,
FIFO buffer,
struct pivot_bounds const * const bounds,
u_int32_t *lbe,
u_int32_t *ubi
)
// Effect: For each messages in ANCESTOR that is between lower_bound_exclusive (exclusive) and upper_bound_inclusive (inclusive), apply the message to the node.
// In ANCESTOR, the relevant messages are all in the buffer for child number CHILDNUM.
// Treat the bounds as minus or plus infinity respectively if they are NULL.
// Do not mark the node as dirty (preserve previous state of 'dirty' bit).
{
assert(0 <= childnum && childnum < ancestor->n_children);
int r = 0;
MSN max_msn_applied = MIN_MSN;
u_int32_t lbe, ubi;
DBT lbedbt, ubidbt; // lbe is lower bound exclusive, ubi is upper bound inclusive
DBT *lbe_ptr, *ubi_ptr;
if (bounds->lower_bound_exclusive) {
struct toku_fifo_entry_key_msn_heaviside_extra lbe_extra = {
.cmp_extra = t->db, .cmp = t->compare_fun,
.fifo = BNC_BUFFER(ancestor, childnum),
.cmp_extra = cmp_extra, .cmp = cmp,
.fifo = buffer,
.key = kv_pair_key((struct kv_pair *) bounds->lower_bound_exclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->lower_bound_exclusive),
.msn = MAX_MSN };
// TODO: get this value and compare it with ubi to see if we even
// need to continue
OMTVALUE found_lb;
r = toku_omt_find(BNC_MESSAGE_TREE(ancestor, childnum),
toku_fifo_entry_key_msn_heaviside, &lbe_extra,
+1, &found_lb, &lbe);
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&lbe_extra, +1, &found_lb, lbe);
if (r == DB_NOTFOUND) {
// no relevant data, we're done
if (toku_omt_size(BNC_BROADCAST_BUFFER(ancestor, childnum)) == 0) {
return 0;
} else {
lbe = 0;
lbe_ptr = NULL;
ubi = 0;
ubi_ptr = NULL;
goto just_apply_broadcast_messages;
}
*lbe = 0;
*ubi = 0;
return;
}
if (bounds->upper_bound_inclusive) {
DBT ubidbt_tmp = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive);
const long offset = (long) found_lb;
DBT found_lbedbt;
fill_dbt_for_fifo_entry(&found_lbedbt, toku_fifo_get_entry(BNC_BUFFER(ancestor, childnum), offset));
int c = t->compare_fun(t->db, &found_lbedbt, &ubidbt_tmp);
fill_dbt_for_fifo_entry(&found_lbedbt, toku_fifo_get_entry(buffer, offset));
int c = cmp(cmp_extra, &found_lbedbt, &ubidbt_tmp);
if (c > 0) {
if (toku_omt_size(BNC_BROADCAST_BUFFER(ancestor, childnum)) == 0) {
return 0;
} else {
lbe = 0;
lbe_ptr = NULL;
ubi = 0;
ubi_ptr = NULL;
goto just_apply_broadcast_messages;
}
// no relevant data, we're done
*lbe = 0;
*ubi = 0;
return;
}
}
lbedbt = kv_pair_key_to_dbt((struct kv_pair *) bounds->lower_bound_exclusive);
lbe_ptr = &lbedbt;
} else {
lbe = 0;
lbe_ptr = NULL;
*lbe = 0;
}
if (bounds->upper_bound_inclusive) {
struct toku_fifo_entry_key_msn_heaviside_extra ubi_extra = {
.cmp_extra = t->db, .cmp = t->compare_fun,
.fifo = BNC_BUFFER(ancestor, childnum),
.cmp_extra = cmp_extra, .cmp = cmp,
.fifo = buffer,
.key = kv_pair_key((struct kv_pair *) bounds->upper_bound_inclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->upper_bound_inclusive),
.msn = MAX_MSN };
r = toku_omt_find(BNC_MESSAGE_TREE(ancestor, childnum),
toku_fifo_entry_key_msn_heaviside, &ubi_extra,
+1, NULL, &ubi);
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&ubi_extra, +1, NULL, ubi);
if (r == DB_NOTFOUND) {
ubi = toku_omt_size(BNC_MESSAGE_TREE(ancestor, childnum));
*ubi = toku_omt_size(message_tree);
}
ubidbt = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive);
ubi_ptr = &ubidbt;
} else {
ubi = toku_omt_size(BNC_MESSAGE_TREE(ancestor, childnum));
ubi_ptr = NULL;
*ubi = toku_omt_size(message_tree);
}
}
static int
bnc_apply_messages_to_basement_node(
BRT t,
BASEMENTNODE bn,
SUBTREE_EST se,
BRTNODE ancestor,
int childnum,
struct pivot_bounds const * const bounds
)
// Effect: For each messages in ANCESTOR that is between lower_bound_exclusive (exclusive) and upper_bound_inclusive (inclusive), apply the message to the node.
// In ANCESTOR, the relevant messages are all in the buffer for child number CHILDNUM.
// Treat the bounds as minus or plus infinity respectively if they are NULL.
// Do not mark the node as dirty (preserve previous state of 'dirty' bit).
{
int r;
NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
u_int32_t stale_lbe, stale_ubi;
if (!bn->stale_ancestor_messages_applied) {
bnc_find_iterate_bounds(t->db, t->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbe, &stale_ubi);
} else {
stale_lbe = 0;
stale_ubi = 0;
}
u_int32_t fresh_lbe, fresh_ubi;
bnc_find_iterate_bounds(t->db, t->compare_fun, bnc->fresh_message_tree, bnc->buffer, bounds, &fresh_lbe, &fresh_ubi);
just_apply_broadcast_messages:
if (toku_omt_size(BNC_BROADCAST_BUFFER(ancestor, childnum)) > 0) {
const int buffer_size = ubi - lbe + toku_omt_size(BNC_BROADCAST_BUFFER(ancestor, childnum));
MSN max_msn_applied = MIN_MSN;
if (toku_omt_size(bnc->broadcast_list) > 0) {
const int buffer_size = (stale_ubi - stale_lbe) + (fresh_ubi - fresh_lbe) + toku_omt_size(bnc->broadcast_list);
long *MALLOC_N(buffer_size, offsets);
struct store_fifo_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
r = toku_omt_iterate_on_range(BNC_MESSAGE_TREE(ancestor, childnum), lbe, ubi, store_fifo_offset, &sfo_extra); assert_zero(r);
r = toku_omt_iterate(BNC_BROADCAST_BUFFER(ancestor, childnum), store_fifo_offset, &sfo_extra); assert_zero(r);
if (!bn->stale_ancestor_messages_applied) {
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbe, stale_ubi, store_fifo_offset, &sfo_extra); assert_zero(r);
}
struct store_fifo_offset_and_move_to_stale_extra sfoamts_extra = { .brt = t, .sfo_extra = &sfo_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbe, fresh_ubi, store_fifo_offset_and_move_to_stale, &sfoamts_extra); assert_zero(r);
r = toku_omt_iterate(bnc->broadcast_list, store_fifo_offset, &sfo_extra); assert_zero(r);
invariant(sfo_extra.i == buffer_size);
r = mergesort_r(offsets, buffer_size, sizeof offsets[0], BNC_BUFFER(ancestor, childnum), fifo_offset_msn_cmp); assert_zero(r);
assert(BP_STATE(ancestor, childnum) == PT_AVAIL);
r = mergesort_r(offsets, buffer_size, sizeof offsets[0], bnc->buffer, fifo_offset_msn_cmp); assert_zero(r);
for (int i = 0; i < buffer_size; ++i) {
const struct fifo_entry *entry = toku_fifo_get_entry(BNC_BUFFER(ancestor, childnum), offsets[i]);
do_brt_leaf_put_cmd(t, bn, se, ancestor, childnum, lbe_ptr, ubi_ptr, &max_msn_applied, entry);
const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
do_brt_leaf_put_cmd(t, bn, se, ancestor, childnum, &max_msn_applied, entry);
}
toku_free(offsets);
} else {
assert(BP_STATE(ancestor, childnum) == PT_AVAIL);
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .bn = bn, .se = se, .ancestor = ancestor, .childnum = childnum, .lbe_ptr = lbe_ptr, .ubi_ptr = ubi_ptr, .max_msn_applied = &max_msn_applied };
r = toku_omt_iterate_on_range(BNC_MESSAGE_TREE(ancestor, childnum), lbe, ubi, iterate_do_brt_leaf_put_cmd, &iter_extra);
assert_zero(r);
struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .bn = bn, .se = se, .ancestor = ancestor, .childnum = childnum, .max_msn_applied = &max_msn_applied };
if (!bn->stale_ancestor_messages_applied) {
r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbe, stale_ubi, iterate_do_brt_leaf_put_cmd, &iter_extra); assert_zero(r);
}
struct iterate_do_brt_leaf_put_cmd_and_move_to_stale_extra iter_amts_extra = { .brt = t, .iter_extra = &iter_extra, .bnc = bnc };
r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbe, fresh_ubi, iterate_do_brt_leaf_put_cmd_and_move_to_stale, &iter_amts_extra); assert_zero(r);
}
// we can't delete things inside move_to_stale because that happens
// inside an iteration, instead we have to delete from fresh after
for (unsigned int i = 0; i < fresh_ubi - fresh_lbe; ++i) {
r = toku_omt_delete_at(bnc->fresh_message_tree, fresh_lbe); assert_zero(r);
}
if (max_msn_applied.msn > bn->max_msn_applied.msn) {
bn->max_msn_applied = max_msn_applied;
......@@ -5391,7 +5408,8 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
height++;
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
apply_buffer_messages_to_basement_node(
assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
bnc_apply_messages_to_basement_node(
t,
curr_bn,
curr_se,
......@@ -5405,6 +5423,7 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
update_stats = TRUE;
}
}
curr_bn->stale_ancestor_messages_applied = true;
}
// Must update the leaf estimates. Might as well use the estimates from the soft copy (even if they make it out to disk), since they are
// the best estimates we have.
......@@ -6490,8 +6509,9 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
}
fprintf(file, "\n");
if (node->height > 0) {
fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, msn, xids,
NONLEAF_CHILDINFO bnc = BNC(node, i);
fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc));
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, type, msn, xids, UU(is_fresh),
{
data=data; datalen=datalen; keylen=keylen;
fprintf(file, "%*s xid=%"PRIu64" %u (type=%d) msn=0x%"PRIu64"\n", depth+2, "", xids_get_innermost_xid(xids), (unsigned)toku_dtoh32(*(int*)key), type, msn.msn);
......@@ -6760,7 +6780,7 @@ toku_brt_get_fragmentation(BRT brt, TOKU_DB_FRAGMENTATION report) {
static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) {
if (node->height > 0) {
for (int childnum=0; childnum<node->n_children; childnum++) {
if (BNC_NBYTESINBUF(node, childnum) != 0) {
if (toku_bnc_nbytesinbuf(BNC(node, childnum)) != 0) {
return 0; // it's not empty if there are bytes in buffers
}
BRTNODE childnode;
......
......@@ -163,13 +163,14 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
for (int i=0; i<n->n_children; i++) {
if (n->height > 0) {
printf(" child %d: %" PRId64 "\n", i, BP_BLOCKNUM(n, i).b);
unsigned int n_bytes = BNC_NBYTESINBUF(n, i);
int n_entries = toku_fifo_n_entries(BNC_BUFFER(n, i));
NONLEAF_CHILDINFO bnc = BNC(n, i);
unsigned int n_bytes = toku_bnc_nbytesinbuf(bnc);
int n_entries = toku_bnc_n_entries(bnc);
if (n_bytes > 0 || n_entries > 0) {
printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries);
}
if (dump_data) {
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, msn, xids,
FIFO_ITERATE(bnc->buffer, key, keylen, data, datalen, typ, msn, xids, UU(is_fresh),
{
printf(" msn=%"PRIu64" (0x%"PRIx64") ", msn.msn, msn.msn);
printf(" TYPE=");
......
......@@ -69,7 +69,7 @@ void toku_fifo_size_hint(FIFO fifo, size_t size) {
}
}
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, MSN msn, XIDS xids, long *dest) {
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, MSN msn, XIDS xids, bool is_fresh, long *dest) {
int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen
+ xids_get_size(xids)
......@@ -103,8 +103,9 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
}
struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used);
entry->type = (unsigned char)type;
entry->msn = msn;
entry->msn = msn;
xids_cpy(&entry->xids_s, xids);
entry->is_fresh = is_fresh;
entry->keylen = keylen;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
memcpy(e_key, key, keylen);
......@@ -119,12 +120,12 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
return 0;
}
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, long *dest) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->msn, cmd->xids, dest);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, bool is_fresh, long *dest) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->msn, cmd->xids, is_fresh, dest);
}
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, MSN *msn, XIDS *xids) {
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, MSN *msn, XIDS *xids, bool *is_fresh) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
......@@ -135,6 +136,7 @@ int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data,
*type = entry->type;
*msn = entry->msn;
*xids = &entry->xids_s;
*is_fresh = entry->is_fresh;
return 0;
}
......@@ -166,6 +168,13 @@ int toku_fifo_deq(FIFO fifo) {
return 0;
}
int toku_fifo_empty(FIFO fifo) {
assert(fifo->memory_start == 0);
fifo->memory_used = 0;
fifo->n_items_in_fifo = 0;
return 0;
}
int toku_fifo_iterate_internal_start(FIFO fifo) { return fifo->memory_start; }
int toku_fifo_iterate_internal_has_more(FIFO fifo, int off) { return off < fifo->memory_start + fifo->memory_used; }
int toku_fifo_iterate_internal_next(FIFO fifo, int off) {
......@@ -176,10 +185,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off);
}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, void*), void *arg) {
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, bool is_fresh, void*), void *arg) {
FIFO_ITERATE(fifo,
key, keylen, data, datalen, type, msn, xids,
f(key,keylen,data,datalen,type,msn,xids, arg));
key, keylen, data, datalen, type, msn, xids, is_fresh,
f(key,keylen,data,datalen,type,msn,xids,is_fresh, arg));
}
void toku_fifo_size_is_stabilized(FIFO fifo) {
......
......@@ -21,6 +21,7 @@ struct __attribute__((__packed__)) fifo_entry {
unsigned int keylen;
unsigned int vallen;
unsigned char type;
bool is_fresh;
MSN msn;
XIDS_S xids_s;
};
......@@ -44,23 +45,25 @@ void toku_fifo_size_is_stabilized(FIFO);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, long *dest);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_MSG cmd, bool is_fresh, long *dest);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, long *dest);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, MSN msn, XIDS xids, bool is_fresh, long *dest);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, MSN *msn, XIDS *xids);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, MSN *msn, XIDS *xids, bool *is_fresh);
// int toku_fifo_peek_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO);
int toku_fifo_deq(FIFO); // we cannot deq items anymore, since their offsets are indexed.
// THIS ONLY REMAINS FOR TESTING, DO NOT USE IT IN CODE
int toku_fifo_empty(FIFO); // don't deallocate the memory for the fifo
unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo uses.
//These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory.
//int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
//int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_MSG, DBT*, DBT*); // fill in the BRT_MSG, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, void*), void*);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, MSN msn, XIDS xids, bool is_fresh, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,body) ({ \
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,is_freshvar,body) ({ \
for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
......@@ -72,6 +75,7 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I
XIDS xidsvar = &e->xids_s; \
bytevec keyvar = xids_get_end_of_array(xidsvar); \
bytevec datavar = (const u_int8_t*)keyvar + e->keylen; \
bool is_freshvar = e->is_fresh; \
body; \
} })
......
......@@ -210,6 +210,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
int nperchild = nelts / 8;
for (int ck = 0; ck < sn.n_children; ++ck) {
long k;
NONLEAF_CHILDINFO bnc = BNC(&sn, ck);
for (long i = 0; i < nperchild; ++i) {
k = ck * nperchild + i;
char buf[valsize];
......@@ -219,9 +220,10 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
c += sizeof(int);
}
memset(&buf[c], 0, valsize - c);
r = toku_fifo_enq(BNC_BUFFER(&sn,ck), &k, sizeof k, buf, sizeof buf, BRT_NONE, next_dummymsn(), xids_123, NULL); assert(r==0);
r = toku_bnc_insert_msg(bnc, &k, sizeof k, buf, valsize, BRT_NONE, next_dummymsn(), xids_123, true, NULL, long_key_cmp); assert_zero(r);
}
BNC_NBYTESINBUF(&sn, ck) = nperchild*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+(sizeof k)+valsize+xids_get_serialize_size(xids_123));
bnc->n_bytes_in_buffer = nperchild*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+(sizeof k)+valsize+xids_get_serialize_size(xids_123));
if (ck < 7) {
sn.childkeys[ck] = kv_pair_malloc(&k, sizeof k, 0, 0);
sn.totalchildkeylens += sizeof k;
......
......@@ -1112,11 +1112,11 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
r = xids_create_child(xids_123, &xids_234, (TXNID)234);
CKERR(r);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, next_dummymsn(), xids_0, NULL); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, next_dummymsn(), xids_123, NULL); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, next_dummymsn(), xids_234, NULL); assert(r==0);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
r = toku_bnc_insert_msg(BNC(&sn, 0), "a", 2, "aval", 5, BRT_NONE, next_dummymsn(), xids_0, true, NULL, string_key_cmp); assert_zero(r);
r = toku_bnc_insert_msg(BNC(&sn, 0), "b", 2, "bval", 5, BRT_NONE, next_dummymsn(), xids_123, true, NULL, string_key_cmp); assert_zero(r);
r = toku_bnc_insert_msg(BNC(&sn, 1), "x", 2, "xval", 5, BRT_NONE, next_dummymsn(), xids_234, true, NULL, string_key_cmp); assert_zero(r);
BNC(&sn, 0)->n_bytes_in_buffer = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC(&sn, 1)->n_bytes_in_buffer = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
//Cleanup:
xids_destroy(&xids_0);
xids_destroy(&xids_123);
......@@ -1165,10 +1165,10 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
assert(BP_BLOCKNUM(dn,0).b==30);
assert(BP_BLOCKNUM(dn,1).b==35);
FIFO src_fifo_1 = BNC_BUFFER(&sn, 0);
FIFO src_fifo_2 = BNC_BUFFER(&sn, 1);
FIFO dest_fifo_1 = BNC_BUFFER(dn, 0);
FIFO dest_fifo_2 = BNC_BUFFER(dn, 1);
FIFO src_fifo_1 = BNC(&sn, 0)->buffer;
FIFO src_fifo_2 = BNC(&sn, 1)->buffer;
FIFO dest_fifo_1 = BNC(dn, 0)->buffer;
FIFO dest_fifo_2 = BNC(dn, 1)->buffer;
bytevec src_key,src_val, dest_key, dest_val;
ITEMLEN src_keylen, src_vallen;
u_int32_t src_type;
......@@ -1178,9 +1178,11 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
u_int32_t dest_type;
MSN dest_msn;
XIDS dest_xids;
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
bool src_is_fresh;
bool dest_is_fresh;
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids, &src_is_fresh);
assert(r==0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids, &dest_is_fresh);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
......@@ -1192,13 +1194,14 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
assert(strcmp(dest_key, "a") == 0);
assert(strcmp(src_val, "aval") == 0);
assert(strcmp(dest_val, "aval") == 0);
assert(dest_is_fresh);
r = toku_fifo_deq(src_fifo_1);
assert(r==0);
r = toku_fifo_deq(dest_fifo_1);
assert(r==0);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids, &src_is_fresh);
assert(r==0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids, &dest_is_fresh);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
......@@ -1210,18 +1213,19 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
assert(strcmp(dest_key, "b") == 0);
assert(strcmp(src_val, "bval") == 0);
assert(strcmp(dest_val, "bval") == 0);
assert(dest_is_fresh);
r = toku_fifo_deq(src_fifo_1);
assert(r==0);
r = toku_fifo_deq(dest_fifo_1);
assert(r==0);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids, &src_is_fresh);
assert(r!=0);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids, &dest_is_fresh);
assert(r!=0);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids, &src_is_fresh);
assert(r==0);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids, &dest_is_fresh);
assert(r==0);
assert(src_keylen == dest_keylen);
assert(src_keylen == 2);
......@@ -1233,13 +1237,14 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
assert(strcmp(dest_key, "x") == 0);
assert(strcmp(src_val, "xval") == 0);
assert(strcmp(dest_val, "xval") == 0);
assert(dest_is_fresh);
r = toku_fifo_deq(src_fifo_2);
assert(r==0);
r = toku_fifo_deq(dest_fifo_2);
assert(r==0);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids);
r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids, &src_is_fresh);
assert(r!=0);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids);
r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids, &dest_is_fresh);
assert(r!=0);
......
......@@ -57,12 +57,12 @@ test_fifo_enq (int n) {
MSN msn = next_dummymsn();
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, NULL); assert(r == 0);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, true, NULL); assert(r == 0);
xids_destroy(&xids);
}
int i = 0;
FIFO_ITERATE(f, key, keylen, val, vallen, type, msn, xids, {
FIFO_ITERATE(f, key, keylen, val, vallen, type, msn, xids, UU(is_fresh), {
if (verbose) printf("checkit %d %d %"PRIu64"\n", i, type, msn.msn);
assert(msn.msn == startmsn.msn + i);
buildkey(i);
......
......@@ -64,7 +64,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
node->max_msn_applied_to_node_on_disk = msn;
}
}
......
......@@ -44,7 +44,7 @@ static void test_3748 (void) {
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, NULL); assert(r == 0);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, msn, xids, true, NULL); assert(r == 0);
xids_destroy(&xids);
}
for (int i=N/10; i<N; i++) {
......
......@@ -68,7 +68,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
unsigned int key = htonl(val);
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
// Create bad tree (don't do following):
// node->max_msn_applied_to_node = msn;
......
......@@ -53,7 +53,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
}
}
......
......@@ -53,7 +53,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
}
}
......
......@@ -56,7 +56,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
}
}
......
......@@ -53,7 +53,7 @@ insert_into_child_buffer(BRT brt, BRTNODE node, int childnum, int minkey, int ma
DBT thekey; toku_fill_dbt(&thekey, &key, sizeof key);
DBT theval; toku_fill_dbt(&theval, &val, sizeof val);
MSN msn = next_dummymsn();
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), &thekey, &theval);
toku_brt_append_to_child_buffer(brt, node, childnum, BRT_INSERT, msn, xids_get_root_xids(), true, &thekey, &theval);
}
}
......
......@@ -38,14 +38,13 @@ NONSTANDARD_SRCS= \
$(LOADER_SRCS) \
FAIL3312C_SRCS = filesize.c test_update_broadcast_stress.c test3529.c
FAIL3389_SRCS = insert-dup-prelock.c loader-cleanup-test.c loader-dup-test.c loader-stress-del.c loader-stress-test.c loader-tpch-load.c preload-db.c preload-db-nested.c
#SRCS = $(sort $(wildcard *.c))
# To patch out upgrade tests, replace line above with line below,
# and patch out loader-stress-test3 (see below)
#SRCS = $(sort $(filter-out $(TRANSPARENT_UPGRADE_SRCS),$(wildcard *.c)))
# To patch out tests failing on 3312c branch (including upgrade tests), use this:
SRCS = $(sort $(filter-out $(TRANSPARENT_UPGRADE_SRCS),$(filter-out $(FAIL3389_SRCS),$(filter-out $(FAIL3312C_SRCS),$(wildcard *.c)))))
SRCS = $(sort $(filter-out $(TRANSPARENT_UPGRADE_SRCS),$(filter-out $(FAIL3312C_SRCS),$(wildcard *.c))))
#end
......@@ -392,7 +391,7 @@ EXTRA_TDB_TESTS = \
endif
RECOVER_TESTS = $(patsubst %.c,%.abortrecover,$(RECOVER_SRCS))
LOADER_TESTS = $(patsubst %.c,%.loader,$(filter-out $(FAIL3389_SRCS),$(LOADER_SRCS)))
LOADER_TESTS = $(patsubst %.c,%.loader,$(LOADER_SRCS))
ifeq ($(OS_CHOICE),windows)
RECOVER_TESTS = $(patsubst %.c,%.abortrecover,$(filter-out $(patsubst %,%.c,$(WINDOWS_DONTRUN_TESTS)),$(RECOVER_SRCS)))
LOADER_TESTS = $(patsubst %.c,%.loader,$(filter-out $(patsubst %,%.c,$(WINDOWS_DONTRUN_TESTS)),$(LOADER_SRCS)))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment