Commit 2379d061 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3564] Merge in barry's stuff to the main line. {{{svn merge -r32637:32693...

[t:3564] Merge in barry's stuff to the main line. {{{svn merge -r32637:32693 ../tokudb.3564b+3312}}} Refs #3564.

git-svn-id: file:///svn/toku/tokudb@32694 c7de825b-a66e-492c-adef-691d508d4ae1
parent 55579798
......@@ -50,7 +50,7 @@ enum { BUFFER_HEADER_SIZE = (4 // height//
+ TREE_FANOUT * 8 // children
) };
struct subtree_estimates {
struct __attribute__((__packed__)) subtree_estimates {
// estimate number of rows in the tree by counting the number of rows
// in the leaves. The stuff in the internal nodes is likely to be off O(1).
u_int64_t nkeys; // number of distinct keys (obsolete with removal of dupsort, but not worth removing)
......@@ -140,6 +140,7 @@ struct brtnode_leaf_basement_node {
OMT buffer;
unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */
unsigned int seqinsert; /* number of sequential inserts to this leaf */
MSN max_msn_applied;
};
#define PT_INVALID 0
......@@ -147,8 +148,25 @@ struct brtnode_leaf_basement_node {
#define PT_COMPRESSED 2
#define PT_AVAIL 3
enum brtnode_child_tag {
BCT_INVALID = 0,
BCT_NULL,
BCT_SUBBLOCK,
BCT_LEAF,
BCT_NONLEAF
};
typedef struct __attribute__((__packed__)) brtnode_child_pointer {
u_int8_t tag;
union {
struct sub_block *subblock;
struct brtnode_nonleaf_childinfo *nonleaf;
struct brtnode_leaf_basement_node *leaf;
} u;
} BRTNODE_CHILD_POINTER;
// a brtnode partition represents
struct brtnode_partition {
struct __attribute__((__packed__)) brtnode_partition {
BLOCKNUM blocknum;
BOOL have_fullhash; // do we have the full hash?
u_int32_t fullhash; // the fullhash of the child
......@@ -176,44 +194,15 @@ struct brtnode_partition {
// a struct brtnode_nonleaf_childinfo for internal nodes,
// a struct brtnode_leaf_basement_node for leaf nodes
//
void* ptr;
struct brtnode_child_pointer ptr;
// clock count used to for pe_callback to determine if a node should be evicted or not
// for now, saturating the count at 1
u_int8_t clock_count;
};
// brtnode partition macros
#define BP_BLOCKNUM(node,i) ((node)->bp[i].blocknum)
#define BP_HAVE_FULLHASH(node,i) ((node)->bp[i].have_fullhash)
#define BP_FULLHASH(node,i) ((node)->bp[i].fullhash)
#define BP_STATE(node,i) ((node)->bp[i].state)
#define BP_OFFSET(node,i) ((node)->bp[i].offset)
#define BP_SUBTREE_EST(node,i) ((node)->bp[i].subtree_estimates)
//
// macros for managing a node's clock
// Should be managed by brt.c, NOT by serialize/deserialize
//
#define BP_TOUCH_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
// nodes, such as in splits and creating new roots, and the other is for when
// we are deserializing a node and not all bp's are touched
#define BP_INIT_TOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_INIT_UNTOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
// internal node macros
#define BNC_BUFFER(node,i) (((struct brtnode_nonleaf_childinfo*)((node)->bp[i].ptr))->buffer)
#define BNC_NBYTESINBUF(node,i) (((struct brtnode_nonleaf_childinfo*)((node)->bp[i].ptr))->n_bytes_in_buffer)
// leaf node macros
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->optimized_for_upgrade)
#define BLB_SOFTCOPYISUPTODATE(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->soft_copy_is_up_to_date)
#define BLB_BUFFER(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->buffer)
#define BLB_NBYTESINBUF(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->n_bytes_in_buffer)
#define BLB_SEQINSERT(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->seqinsert)
// How many bytes worth of work was performed by messages in each buffer.
uint64_t workdone;
};
struct brtnode {
MSN max_msn_applied_to_node_in_memory; // max msn that has been applied to this node (for root node, this is max msn for the tree)
......@@ -241,6 +230,86 @@ struct brtnode {
struct brtnode_partition *bp;
};
// brtnode partition macros
#define BP_BLOCKNUM(node,i) ((node)->bp[i].blocknum)
#define BP_HAVE_FULLHASH(node,i) ((node)->bp[i].have_fullhash)
#define BP_FULLHASH(node,i) ((node)->bp[i].fullhash)
#define BP_STATE(node,i) ((node)->bp[i].state)
#define BP_OFFSET(node,i) ((node)->bp[i].offset)
#define BP_SUBTREE_EST(node,i) ((node)->bp[i].subtree_estimates)
#define BP_WORKDONE(node, i)((node)->bp[i].workdone)
//
// macros for managing a node's clock
// Should be managed by brt.c, NOT by serialize/deserialize
//
#define BP_TOUCH_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
// nodes, such as in splits and creating new roots, and the other is for when
// we are deserializing a node and not all bp's are touched
#define BP_INIT_TOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_INIT_UNTOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
// internal node macros
static inline void set_BNULL(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
node->bp[i].ptr.tag = BCT_NULL;
}
static inline bool is_BNULL (BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
return node->bp[i].ptr.tag == BCT_NULL;
}
static inline NONLEAF_CHILDINFO BNC(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_NONLEAF);
return p.u.nonleaf;
}
static inline void set_BNC(BRTNODE node, int i, NONLEAF_CHILDINFO nl) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_NONLEAF;
p->u.nonleaf = nl;
}
static inline BASEMENTNODE BLB(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_LEAF);
return p.u.leaf;
}
static inline void set_BLB(BRTNODE node, int i, BASEMENTNODE bn) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_LEAF;
p->u.leaf = bn;
}
static inline SUB_BLOCK BSB(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_SUBBLOCK);
return p.u.subblock;
}
static inline void set_BSB(BRTNODE node, int i, SUB_BLOCK sb) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_SUBBLOCK;
p->u.subblock = sb;
}
#define BNC_BUFFER(node,i) (BNC(node,i)->buffer)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
// leaf node macros
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (BLB(node,i)->optimized_for_upgrade)
#define BLB_SOFTCOPYISUPTODATE(node,i) (BLB(node,i)->soft_copy_is_up_to_date)
#define BLB_BUFFER(node,i) (BLB(node,i)->buffer)
#define BLB_NBYTESINBUF(node,i) (BLB(node,i)->n_bytes_in_buffer)
#define BLB_SEQINSERT(node,i) (BLB(node,i)->seqinsert)
/* pivot flags (must fit in 8 bits) */
enum {
BRT_PIVOT_TRUNC = 4,
......@@ -354,7 +423,11 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int6
int toku_deserialize_brtheader_from (int fd, LSN max_acceptable_lsn, struct brt_header **brth);
int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset);
void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc);
void toku_setup_empty_bn(BASEMENTNODE bn);
BASEMENTNODE toku_create_empty_bn(void);
BASEMENTNODE toku_create_empty_bn_no_buffer(void); // create a basement node with a null buffer.
NONLEAF_CHILDINFO toku_create_empty_nl(void);
void destroy_basement_node (BASEMENTNODE bn);
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl);
void toku_destroy_brtnode_internals(BRTNODE node);
void toku_brtnode_free (BRTNODE *node);
void toku_assert_entire_node_in_memory(BRTNODE node);
......@@ -420,9 +493,9 @@ struct brt_cursor {
typedef struct ancestors *ANCESTORS;
struct ancestors {
BRTNODE node;
int childnum; // which buffer holds our ancestors.
ANCESTORS next;
BRTNODE node; // This is the root node if next is NULL.
int childnum; // which buffer holds messages destined to the node whose ancestors this list represents.
ANCESTORS next; // Parent of this node (so next->node.(next->childnum) refers to this node).
};
struct pivot_bounds {
struct kv_pair const * const lower_bound_exclusive;
......@@ -536,11 +609,13 @@ brt_leaf_apply_cmd_once (
const BRT_MSG cmd,
u_int32_t idx,
LEAFENTRY le,
TOKULOGGER logger
TOKULOGGER logger,
uint64_t *workdonep
);
void brt_leaf_put_cmd (BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void
toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, int *made_change);
toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value.
......
......@@ -372,8 +372,8 @@ serialize_brtnode_info_size(BRTNODE node)
static void
serialize_brtnode_info(
BRTNODE node,
struct sub_block *sb_parts,
struct sub_block *sb // output
SUB_BLOCK sb_parts,
SUB_BLOCK sb // output
)
{
assert(sb->uncompressed_size == 0);
......@@ -537,10 +537,7 @@ rebalance_brtnode_leaf(BRTNODE node)
node->n_children = num_children;
XMALLOC_N(num_children, node->bp);
for (int i = 0; i < num_children; i++) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
memset(bn, 0, sizeof(struct brtnode_leaf_basement_node));
toku_setup_empty_bn(bn);
set_BLB(node, i, toku_create_empty_bn());
}
// now we start to fill in the data
......@@ -615,7 +612,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
struct sub_block sb[npartitions];
struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]);
sub_block_init(&sb[i]);;
}
sub_block_init(&sb_node_info);
......@@ -753,6 +750,7 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf) {
invariant(rbuf->ndone == rbuf->size);
BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer;
BP_WORKDONE(node, cnum) = 0;
}
// dump a buffer to stderr
......@@ -791,14 +789,46 @@ dump_bad_block(unsigned char *vp, u_int64_t size) {
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
void toku_setup_empty_bn(BASEMENTNODE bn) {
bn->soft_copy_is_up_to_date = TRUE;
BASEMENTNODE toku_create_empty_bn(void) {
BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
int r;
r = toku_omt_create(&bn->buffer);
assert_zero(r);
return bn;
}
BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
BASEMENTNODE XMALLOC(bn);
bn->soft_copy_is_up_to_date = TRUE;
bn->buffer = NULL;
bn->n_bytes_in_buffer = 0;
bn->seqinsert = 0;
bn->optimized_for_upgrade = 0;
bn->max_msn_applied = ZERO_MSN;
return bn;
}
NONLEAF_CHILDINFO toku_create_empty_nl(void) {
NONLEAF_CHILDINFO XMALLOC(cn);
cn->n_bytes_in_buffer = 0;
int r = toku_fifo_create(&cn->buffer);
assert(r==0);
return cn;
}
void destroy_basement_node (BASEMENTNODE bn)
{
// The buffer may have been freed already, in some cases.
if (bn->buffer) {
toku_omt_destroy(&bn->buffer);
}
toku_free(bn);
}
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
{
toku_fifo_free(&nl->buffer);
toku_free(nl);
}
//
......@@ -939,6 +969,7 @@ deserialize_brtnode_info(
BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
BP_HAVE_FULLHASH(node, i) = FALSE;
BP_FULLHASH(node,i) = 0;
BP_WORKDONE(node, i) = 0;
}
}
......@@ -957,14 +988,10 @@ deserialize_brtnode_info(
static void
setup_available_brtnode_partition(BRTNODE node, int i) {
if (node->height == 0) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
toku_setup_empty_bn(bn);
set_BLB(node, i, toku_create_empty_bn());
}
else {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo));
int r = toku_fifo_create(&BNC_BUFFER(node,i));
assert(r == 0);
set_BNC(node, i, toku_create_empty_nl());
}
}
......@@ -995,8 +1022,7 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
BP_TOUCH_CLOCK(node,i);
}
else if (BP_STATE(node,i) == PT_COMPRESSED) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct sub_block));
sub_block_init((struct sub_block*)node->bp[i].ptr);
set_BSB(node, i, sub_block_creat());
}
else {
assert(FALSE);
......@@ -1131,7 +1157,7 @@ deserialize_brtnode_from_rbuf(
rbuf_init(&curr_rbuf, rb->buf + rb->ndone + curr_offset, curr_size);
struct sub_block curr_sb;
sub_block_init(&curr_sb);
sub_block_init(&curr_sb);
//
// now we are at the point where we have:
......@@ -1158,7 +1184,7 @@ deserialize_brtnode_from_rbuf(
// case where we leave the partition in the compressed state
else if (BP_STATE(node,i) == PT_COMPRESSED) {
read_compressed_sub_block(&curr_rbuf, &curr_sb);
struct sub_block* bp_sb = (struct sub_block*)node->bp[i].ptr;
SUB_BLOCK bp_sb = BSB(node, i);
bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
......@@ -1168,7 +1194,6 @@ deserialize_brtnode_from_rbuf(
bp_sb->compressed_size
);
}
}
*brtnode = node;
r = 0;
......@@ -1182,7 +1207,7 @@ cleanup:
void
toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode_fetch_extra* bfe) {
assert(BP_STATE(node,childnum) == PT_ON_DISK);
assert(node->bp[childnum].ptr == NULL);
assert(node->bp[childnum].ptr.tag == BCT_NULL);
//
// setup the partition
......@@ -1229,7 +1254,7 @@ toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode
void
toku_deserialize_bp_from_compressed(BRTNODE node, int childnum) {
assert(BP_STATE(node, childnum) == PT_COMPRESSED);
struct sub_block* curr_sb = (struct sub_block*)node->bp[childnum].ptr;
SUB_BLOCK curr_sb = BSB(node, childnum);
assert(curr_sb->uncompressed_ptr == NULL);
curr_sb->uncompressed_ptr = toku_xmalloc(curr_sb->uncompressed_size);
......@@ -2012,7 +2037,7 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc
static int
serialize_uncompressed_block_to_memory(char * uncompressed_buf,
int n_sub_blocks,
struct sub_block sub_block[n_sub_blocks],
struct sub_block sub_block[/*n_sub_blocks*/],
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) {
// allocate space for the compressed uncompressed_buf
......
......@@ -182,7 +182,7 @@ toku_verify_brtnode (BRT brt,
});
}
else {
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
BASEMENTNODE bn = BLB(node, i);
for (u_int32_t j = 0; j < toku_omt_size(bn->buffer); j++) {
VERIFY_ASSERTION((rootmsn.msn >= thismsn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn");
LEAFENTRY le = get_ith_leafentry(bn, j);
......
This diff is collapsed.
......@@ -2759,7 +2759,8 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
DBT thekey = { .data = key, .size = keylen };
DBT theval = { .data = val, .size = vallen };
BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
uint64_t workdone=0;
brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, &workdone);
}
static int write_literal(struct dbout *out, void*data, size_t len) {
......@@ -2994,11 +2995,14 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
totalchildkeylens += kv_pair_keylen(childkey);
}
node->totalchildkeylens = totalchildkeylens;
for (int i = 0; i < n_children; i++) {
BP_SUBTREE_EST(node, i) = subtree_info[i].subtree_estimates;
BP_BLOCKNUM(node, i) = make_blocknum(subtree_info[i].block);
BP_STATE(node, i) = PT_AVAIL;
XMALLOC_N(n_children, node->bp);
for (int i=0; i<n_children; i++) {
set_BNC(node, i, toku_create_empty_nl());
BP_BLOCKNUM(node,i)= make_blocknum(subtree_info[i].block);
BP_SUBTREE_EST(node,i) = subtree_info[i].subtree_estimates;
BP_HAVE_FULLHASH(node,i) = FALSE;
BP_FULLHASH(node,i) = 0;
BP_STATE(node,i) = PT_AVAIL;
}
if (result == 0) {
......@@ -3029,11 +3033,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
toku_free(node->childkeys[i]);
}
for (int i=0; i<n_children; i++) {
if (BNC_BUFFER(node, i)) {
toku_fifo_free(&BNC_BUFFER(node, i));
BNC_BUFFER(node, i) = NULL;
}
toku_free(node->bp[i].ptr);
destroy_nonleaf_childinfo(BNC(node,i));
}
toku_free(pivots);
toku_free(node->bp);
......
......@@ -22,6 +22,7 @@ typedef struct brt *BRT;
typedef struct brtnode *BRTNODE;
typedef struct brtnode_leaf_basement_node *BASEMENTNODE;
typedef struct brtnode_nonleaf_childinfo *NONLEAF_CHILDINFO;
typedef struct sub_block *SUB_BLOCK;
typedef struct subtree_estimates *SUBTREE_EST;
struct brt_header;
struct wbuf;
......
......@@ -13,9 +13,14 @@
#include "threadpool.h"
#include "sub_block.h"
#include "compress.h"
#include "memory.h"
void
sub_block_init(struct sub_block *sub_block) {
SUB_BLOCK sub_block_creat(void) {
SUB_BLOCK XMALLOC(sb);
sub_block_init(sb);
return sb;
}
void sub_block_init(SUB_BLOCK sub_block) {
sub_block->uncompressed_ptr = 0;
sub_block->uncompressed_size = 0;
......@@ -25,7 +30,7 @@ sub_block_init(struct sub_block *sub_block) {
sub_block->xsum = 0;
}
// get the size of the compression header
size_t
sub_block_header_size(int n_sub_blocks) {
......@@ -204,6 +209,8 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un
char *compressed_base_ptr = compressed_ptr;
size_t compressed_len;
// This is a complex way to write a parallel loop. Cilk would be better.
if (n_sub_blocks == 1) {
// single sub-block
sub_block[0].uncompressed_ptr = uncompressed_ptr;
......
......@@ -6,6 +6,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <compress.h>
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
......@@ -37,8 +38,8 @@ struct stored_sub_block {
u_int32_t xsum;
};
void
sub_block_init(struct sub_block *sub_block);
void sub_block_init(SUB_BLOCK);
SUB_BLOCK sub_block_creat(void);
// get the size of the compression header
size_t
......
......@@ -112,11 +112,11 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE
for (int i = 0; i < (*dn)->n_children; i++) {
if ((*dn)->height == 0) {
assert(BP_STATE(*dn,i) == PT_ON_DISK);
assert((*dn)->bp[i].ptr == NULL);
assert(is_BNULL(*dn, i));
}
else {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
assert((*dn)->bp[i].ptr != NULL);
assert(is_BNULL(*dn, i));
}
}
}
......@@ -175,11 +175,8 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
set_BLB(&sn, i, toku_create_empty_bn());
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
for (int i = 0; i < nrows; ++i) {
r = toku_omt_insert(BLB_BUFFER(&sn, i), les[i], omt_cmp, les[i], NULL); assert(r==0);
......@@ -245,15 +242,12 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < nrows; ++i) {
toku_free(les[i]);
}
toku_free(sn.childkeys);
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
......@@ -299,11 +293,9 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
BLB_NBYTESINBUF(&sn, 0) = 0;
for (int i = 0; i < nrows; ++i) {
......@@ -366,14 +358,11 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < nrows; ++i) {
toku_free(les[i]);
}
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
toku_free(sn.childkeys);
......@@ -425,11 +414,8 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
set_BLB(&sn, i, toku_create_empty_bn());
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
BLB_NBYTESINBUF(&sn, 0) = 0;
for (int i = 0; i < 7; ++i) {
......@@ -492,14 +478,11 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 7; ++i) {
toku_free(les[i]);
}
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
toku_free(sn.childkeys);
......@@ -549,10 +532,8 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
set_BLB(&sn, i, toku_create_empty_bn());
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[0], omt_cmp, elts[0], NULL); assert(r==0);
......@@ -622,14 +603,11 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 3; ++i) {
toku_free(elts[i]);
}
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
toku_free(sn.childkeys);
......@@ -672,11 +650,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
set_BLB(&sn, i, toku_create_empty_bn());
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
BLB_NBYTESINBUF(&sn, 0) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 0));
BLB_NBYTESINBUF(&sn, 1) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1));
......@@ -739,11 +714,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
toku_free(sn.childkeys);
......@@ -793,10 +765,8 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0);
BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL;
sn.bp[0].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
sn.bp[1].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
r = toku_omt_create(&BLB_BUFFER(&sn, 0)); assert(r==0);
r = toku_omt_create(&BLB_BUFFER(&sn, 1)); assert(r==0);
set_BLB(&sn, 0, toku_create_empty_bn());
set_BLB(&sn, 1, toku_create_empty_bn());
r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[0], omt_cmp, elts[0], NULL); assert(r==0);
r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[1], omt_cmp, elts[1], NULL); assert(r==0);
r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[2], omt_cmp, elts[2], NULL); assert(r==0);
......@@ -804,8 +774,6 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
BLB_NBYTESINBUF(&sn, 1) = 1*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1));
for (int i = 0; i < 2; ++i) {
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
}
struct brt *XMALLOC(brt);
......@@ -867,14 +835,11 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
}
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 3; ++i) {
toku_free(elts[i]);
}
for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr);
destroy_basement_node(BLB(&sn, i));
}
toku_free(sn.bp);
toku_free(sn.childkeys);
......@@ -927,10 +892,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0);
BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL;
sn.bp[0].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo));
sn.bp[1].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo));
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
set_BNC(&sn, 0, toku_create_empty_nl());
set_BNC(&sn, 1, toku_create_empty_nl());
//Create XIDS
XIDS xids_0 = xids_get_root_xids();
XIDS xids_123;
......@@ -1000,10 +963,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
kv_pair_free(sn.childkeys[0]);
toku_free(hello_string);
toku_fifo_free(&BNC_BUFFER(&sn,0));
toku_fifo_free(&BNC_BUFFER(&sn,1));
toku_free(sn.bp[0].ptr);
toku_free(sn.bp[1].ptr);
destroy_nonleaf_childinfo(BNC(&sn, 0));
destroy_nonleaf_childinfo(BNC(&sn, 1));
toku_free(sn.bp);
toku_free(sn.childkeys);
......
......@@ -30,7 +30,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -37,7 +37,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn;
leafnode->max_msn_applied_to_node_in_memory = msn;
......
......@@ -11,6 +11,10 @@
// - inject message with old msn, verify that row still has value2 (verify cmd.msn < node.max_msn is rejected)
// TODO:
// - verify that no work is done by messages that should be ignored (via workdone arg to brt_leaf_put_cmd())
// - maybe get counter of messages ignored for old msn (once the counter is implemented in brt.c)
#include "brt-internal.h"
#include "includes.h"
#include "test.h"
......@@ -40,8 +44,9 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
int made_change;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change);
bool made_change;
u_int64_t workdone=0;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, &workdone);
{
int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -49,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
}
BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change);
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, &workdone);
// message should be rejected for duplicate msn, row should still have original val
......@@ -62,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with proper msn gets through
msn = next_dummymsn();
BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change);
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, &workdone);
// message should be accepted, val should have new value
{
......@@ -75,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change);
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, &workdone);
// message should be rejected, val should still have value in pair2
{
......
......@@ -35,7 +35,8 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks];
struct sub_block *sub_blocks[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) sub_blocks[i] = sub_block_create();
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
......@@ -50,13 +51,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
for (int xidx = 0; xidx < n_sub_blocks; xidx++) {
// corrupt a checksum
sub_blocks[xidx].xsum += 1;
sub_blocks[xidx]->xsum += 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0);
// reset the checksums
sub_blocks[xidx].xsum -= 1;
sub_blocks[xidx]->xsum -= 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0);
......@@ -77,7 +78,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
}
for (int i=0; i<n_sub_blocks; i++) sub_block_destroy(sub_blocks[i]);
toku_free(ubuf);
toku_free(cbuf);
}
......
......@@ -23,7 +23,8 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks];
struct sub_block *sub_blocks[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) sub_blocks[i] = sub_block_create();
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
......@@ -41,6 +42,7 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
assert(memcmp(buf, ubuf, total_size) == 0);
for (int i=0; i<n_sub_blocks; i++) sub_block_destroy(sub_blocks[i]);
toku_free(ubuf);
toku_free(cbuf);
}
......
......@@ -40,7 +40,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn;
......
......@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node
MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL);
brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment