Commit 3e7ee942 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3627], merge last of milestone 3 to main

git-svn-id: file:///svn/toku/tokudb@32565 c7de825b-a66e-492c-adef-691d508d4ae1
parent 00623b4e
......@@ -80,6 +80,54 @@ add_estimates (struct subtree_estimates *a, struct subtree_estimates *b) {
a->dsize += b->dsize;
}
enum brtnode_fetch_type {
brtnode_fetch_none=1,
brtnode_fetch_subset,
brtnode_fetch_all
};
struct brtnode_fetch_extra {
enum brtnode_fetch_type type;
// needed for reading a node off disk
struct brt_header *h;
// used in the case where type == brtnode_fetch_extra
// parameters needed to find out which child needs to be decompressed (so it can be read)
brt_search_t* search;
BRT brt;
// this value will be set during the fetch_callback call by toku_brtnode_fetch_callback or toku_brtnode_pf_req_callback
// thi callbacks need to evaluate this anyway, so we cache it here so the search code does not reevaluate it
int child_to_read;
};
static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struct brt_header *h) {
bfe->type = brtnode_fetch_all;
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->child_to_read = -1;
};
static inline void fill_bfe_for_subset_read(
struct brtnode_fetch_extra *bfe,
struct brt_header *h,
BRT brt,
brt_search_t* search
)
{
bfe->type = brtnode_fetch_subset;
bfe->h = h;
bfe->search = search;
bfe->brt = brt;
bfe->child_to_read = -1;
};
static inline void fill_bfe_for_min_read(struct brtnode_fetch_extra *bfe, struct brt_header *h) {
bfe->type = brtnode_fetch_none;
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->child_to_read = -1;
};
struct brtnode_nonleaf_childinfo {
FIFO buffer;
......@@ -168,6 +216,7 @@ struct brtnode {
unsigned int totalchildkeylens;
struct kv_pair **childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Child 1's keys are > childkeys[0]. */
u_int32_t bp_offset; // offset on disk to where the partitions start
// array of brtnode partitions
// each one is associated with a child
// for internal nodes, the ith partition corresponds to the ith message buffer
......@@ -274,7 +323,8 @@ int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE
struct brt_header *h, int n_workitems, int n_threads,
BOOL for_checkpoint);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, struct brt_header *h);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
void toku_deserialize_bp_from_compressed(BRTNODE node, int childnum);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brtnode_fetch_extra* bfe);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......@@ -361,15 +411,25 @@ struct pivot_bounds {
struct kv_pair const * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
};
int
toku_brt_search_which_child(
BRT brt,
BRTNODE node,
brt_search_t *search
);
u_int8_t
toku_brtnode_partition_state (struct brtnode_fetch_extra* bfe, int childnum);
// logs the memory allocation, but not the creation of the new node
void toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children);
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const pbounds,
struct brtnode_fetch_extra *bfe,
BRTNODE *node_p)
__attribute__((__warn_unused_result__));
void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const pbounds,
struct brtnode_fetch_extra *bfe,
BRTNODE *node_p);
void toku_unpin_brtnode (BRT brt, BRTNODE node);
unsigned int toku_brtnode_which_child (BRTNODE node , const DBT *k, BRT t)
......
......@@ -606,6 +606,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
char* data = NULL;
char* curr_ptr = NULL;
toku_assert_entire_node_in_memory(node);
if (node->height == 0) {
rebalance_brtnode_leaf(node);
......@@ -651,6 +652,11 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
total_node_size += sb[i].compressed_size + 4;
}
//
// set the node bp_offset
//
node->bp_offset = serialize_node_header_size(node) + sb_node_info.compressed_size + 4;
data = toku_xmalloc(total_node_size);
curr_ptr = data;
// now create the final serialized node
......@@ -700,7 +706,6 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_h
size_t n_to_write;
char *compressed_buf = NULL;
{
toku_assert_entire_node_in_memory(node);
int r = toku_serialize_brtnode_to_memory (node, &n_to_write, &compressed_buf);
if (r!=0) return r;
}
......@@ -829,6 +834,10 @@ read_block_from_fd_into_rbuf(
return 0;
}
//
// read the compressed partition into the sub_block,
// validate the checksum of the compressed data
//
static void
read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
{
......@@ -840,6 +849,13 @@ read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
// let's check the checksum
u_int32_t actual_xsum = x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
invariant(sb->xsum == actual_xsum);
}
static void
read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
{
read_compressed_sub_block(rb, sb);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
assert(sb->uncompressed_ptr);
......@@ -867,7 +883,10 @@ verify_brtnode_sub_block (struct sub_block *sb)
// This function deserializes the data stored by serialize_brtnode_info
static void
deserialize_brtnode_info(struct sub_block *sb, BRTNODE node)
deserialize_brtnode_info(
struct sub_block *sb,
BRTNODE node
)
{
// sb_node_info->uncompressed_ptr stores the serialized node information
// this function puts that information into node
......@@ -891,23 +910,6 @@ deserialize_brtnode_info(struct sub_block *sb, BRTNODE node)
// leaf node or internal node
// now the subtree_estimates
XMALLOC_N(node->n_children, node->bp);
//
// setup memory needed for the node
//
for (int i = 0; i < node->n_children; i++) {
if (node->height == 0) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
toku_setup_empty_bn(bn);
}
else {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo));
int r = toku_fifo_create(&BNC_BUFFER(node,i));
assert(r == 0);
}
BP_STATE(node,i) = PT_AVAIL;
}
for (int i=0; i < node->n_children; i++) {
SUBTREE_EST curr_se = &BP_SUBTREE_EST(node,i);
curr_se->nkeys = rbuf_ulonglong(&rb);
......@@ -956,6 +958,54 @@ deserialize_brtnode_info(struct sub_block *sb, BRTNODE node)
}
}
static void
setup_available_brtnode_partition(BRTNODE node, int i) {
if (node->height == 0) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
toku_setup_empty_bn(bn);
}
else {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo));
int r = toku_fifo_create(&BNC_BUFFER(node,i));
assert(r == 0);
}
}
static void
setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
if (bfe->type == brtnode_fetch_subset) {
// we do not take into account prefetching yet
// as of now, if we need a subset, the only thing
// we can possibly require is a single basement node
// we find out what basement node the query cares about
// and check if it is available
assert(bfe->brt);
assert(bfe->search);
bfe->child_to_read = toku_brt_search_which_child(
bfe->brt,
node,
bfe->search
);
}
//
// setup memory needed for the node
//
for (int i = 0; i < node->n_children; i++) {
BP_STATE(node,i) = toku_brtnode_partition_state(bfe, i);
if (BP_STATE(node,i) == PT_AVAIL) {
setup_available_brtnode_partition(node, i);
}
else if (BP_STATE(node,i) == PT_COMPRESSED) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct sub_block));
sub_block_init((struct sub_block*)node->bp[i].ptr);
}
else {
assert(FALSE);
}
}
}
static void
deserialize_brtnode_partition(
struct sub_block *sb,
......@@ -1013,6 +1063,7 @@ deserialize_brtnode_from_rbuf(
BRTNODE *brtnode,
BLOCKNUM blocknum,
u_int32_t fullhash,
struct brtnode_fetch_extra* bfe,
struct rbuf *rb
)
{
......@@ -1050,18 +1101,31 @@ deserialize_brtnode_from_rbuf(
//now we read and decompress the pivot and child information
sub_block_init(&sb_node_info);
read_compressed_sub_block(rb, &sb_node_info);
read_and_decompress_sub_block(rb, &sb_node_info);
// at this point, sb->uncompressed_ptr stores the serialized node info
deserialize_brtnode_info(&sb_node_info, node);
toku_free(sb_node_info.uncompressed_ptr);
//
// now that we have read and decompressed up until
// the start of the bp's, we can set the node->bp_offset
// so future partial fetches know where to get bp's
//
node->bp_offset = rb->ndone;
// now that the node info has been deserialized, we can proceed to deserialize
// the individual sub blocks
assert(bfe->type == brtnode_fetch_none || bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_all);
// setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node
// for partitions staying compressed, create sub_block
setup_brtnode_partitions(node,bfe);
// TODO: (Zardosht) Cilkify this
for (int i = 0; i < node->n_children; i++) {
u_int32_t curr_offset = (i==0) ? 0 : BP_OFFSET(node,i-1);
u_int32_t curr_size = (i==0) ? BP_OFFSET(node,i) : (BP_OFFSET(node,i) - BP_OFFSET(node,i-1));
// the compressed, serialized partitions start at where rb is currently pointing,
// which would be rb->buf + rb->ndone
// we need to intialize curr_rbuf to point to this place
......@@ -1070,11 +1134,44 @@ deserialize_brtnode_from_rbuf(
struct sub_block curr_sb;
sub_block_init(&curr_sb);
read_compressed_sub_block(&curr_rbuf, &curr_sb);
//
// now we are at the point where we have:
// - read the entire compressed node off of disk,
// - decompressed the pivot and offset information,
// - have arrived at the individual partitions.
//
// Based on the information in bfe, we want to decompress a subset of
// of the compressed partitions (also possibly none or possibly all)
// The partitions that we want to decompress and make available
// to the node, we do, the rest we simply copy in compressed
// form into the node, and set the state of the partition to PT_COMPRESSED
//
// case where we read and decompress the partition
// deserialize_brtnode_info figures out what the state
// should be and sets up the memory so that we are ready to use it
if (BP_STATE(node,i) == PT_AVAIL) {
read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
// at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, i);
toku_free(curr_sb.uncompressed_ptr);
}
// case where we leave the partition in the compressed state
else if (BP_STATE(node,i) == PT_COMPRESSED) {
read_compressed_sub_block(&curr_rbuf, &curr_sb);
struct sub_block* bp_sb = (struct sub_block*)node->bp[i].ptr;
bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
memcpy(
bp_sb->compressed_ptr,
curr_sb.compressed_ptr,
bp_sb->compressed_size
);
}
}
*brtnode = node;
r = 0;
cleanup:
......@@ -1084,16 +1181,47 @@ cleanup:
return r;
}
// Take a brtnode partition that is in the compressed state, and make it avail
void
toku_deserialize_bp_from_compressed(BRTNODE node, int childnum) {
assert(BP_STATE(node, childnum) == PT_COMPRESSED);
struct sub_block* curr_sb = (struct sub_block*)node->bp[childnum].ptr;
assert(curr_sb->uncompressed_ptr == NULL);
curr_sb->uncompressed_ptr = toku_xmalloc(curr_sb->uncompressed_size);
setup_available_brtnode_partition(node, childnum);
BP_STATE(node,childnum) = PT_AVAIL;
// decompress the sub_block
toku_decompress(
curr_sb->uncompressed_ptr,
curr_sb->uncompressed_size,
curr_sb->compressed_ptr,
curr_sb->compressed_size
);
deserialize_brtnode_partition(curr_sb, node, childnum);
toku_free(curr_sb->uncompressed_ptr);
toku_free(curr_sb->compressed_ptr);
toku_free(curr_sb);
}
// Read brt node from file into struct. Perform version upgrade if necessary.
int
toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash,
BRTNODE *brtnode, struct brt_header *h) {
toku_deserialize_brtnode_from (
int fd,
BLOCKNUM blocknum,
u_int32_t fullhash,
BRTNODE *brtnode,
struct brtnode_fetch_extra* bfe
)
{
toku_trace("deserial start");
int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
r = read_block_from_fd_into_rbuf(fd, blocknum, h, &rb);
r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
if (r != 0) { goto cleanup; }
bytevec magic;
......@@ -1104,7 +1232,7 @@ toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash,
goto cleanup;
}
r = deserialize_brtnode_from_rbuf(brtnode, blocknum, fullhash, &rb);
r = deserialize_brtnode_from_rbuf(brtnode, blocknum, fullhash, bfe, &rb);
if (r!=0) {
dump_bad_block(rb.buf,rb.size);
}
......
......@@ -77,6 +77,8 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
{
assert(testsetup_initialized);
void *node_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf, diskoff,
toku_cachetable_hash(brt->cf, diskoff),
......@@ -87,7 +89,7 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert(r==0);
......@@ -102,6 +104,8 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
assert(testsetup_initialized);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -113,7 +117,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
if (r!=0) return r;
......@@ -169,6 +173,8 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
assert(testsetup_initialized);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -180,7 +186,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
if (r!=0) return r;
......
......@@ -113,6 +113,8 @@ toku_verify_brtnode (BRT brt,
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
{
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -124,7 +126,7 @@ toku_verify_brtnode (BRT brt,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert_zero(r); // this is a bad failure if it happens.
......
......@@ -258,6 +258,7 @@ static long brtnode_memory_size (BRTNODE node);
int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
struct brtnode_fetch_extra *bfe,
BRTNODE *node_p) {
void *node_v;
int r = toku_cachetable_get_and_pin_nonblocking(
......@@ -271,8 +272,8 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h,
bfe, //read_extraargs
brt->h, //write_extraargs
unlockers);
if (r==0) {
BRTNODE node = node_v;
......@@ -288,6 +289,7 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
ANCESTORS ancestors, struct pivot_bounds const * const bounds,
struct brtnode_fetch_extra *bfe,
BRTNODE *node_p) {
void *node_v;
int r = toku_cachetable_get_and_pin(
......@@ -301,7 +303,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
bfe,
brt->h
);
assert(r==0);
......@@ -394,8 +396,11 @@ fixup_child_estimates (BRTNODE node, int childnum_of_node, BRTNODE child, BOOL d
estimates.dsize += child_se->dsize;
if (!child_se->exact) estimates.exact = FALSE;
if (child->height>0) {
// only execute this if the child's partition is available, as checked above
if (toku_fifo_n_entries(BNC_BUFFER(child,i))!=0) estimates.exact=FALSE;
if (BP_STATE(child,i) != PT_AVAIL ||
toku_fifo_n_entries(BNC_BUFFER(child,i))!=0)
{
estimates.exact=FALSE;
}
}
}
// We only call this function if we have reason to believe that the child changed.
......@@ -418,6 +423,8 @@ toku_verify_estimates (BRT t, BRTNODE node) {
BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
u_int32_t fullhash = compute_child_fullhash(t->cf, node, childnum);
void *childnode_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
childblocknum,
......@@ -429,7 +436,7 @@ toku_verify_estimates (BRT t, BRTNODE node) {
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
&bfe,
t->h
);
assert_zero(r);
......@@ -506,6 +513,20 @@ next_dict_id(void) {
return d;
}
u_int8_t
toku_brtnode_partition_state (struct brtnode_fetch_extra* bfe, int childnum)
{
if (bfe->type == brtnode_fetch_all ||
(bfe->type == brtnode_fetch_subset && bfe->child_to_read == childnum))
{
return PT_AVAIL;
}
else {
return PT_COMPRESSED;
}
}
//fd is protected (must be holding fdlock)
void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, BOOL for_checkpoint) {
......@@ -538,14 +559,16 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
}
//fd is protected (must be holding fdlock)
int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM nodename, u_int32_t fullhash,
void **brtnode_pv, long *sizep, int *dirtyp, void *extraargs) {
assert(extraargs);
assert(*brtnode_pv == NULL);
struct brt_header *h = extraargs;
struct brtnode_fetch_extra *bfe = (struct brtnode_fetch_extra *)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(fd, nodename, fullhash, result, h);
// TODO: (Zardosht) pass in bfe to toku_deserialize_brtnode_from so it can do the right thing
int r = toku_deserialize_brtnode_from(fd, nodename, fullhash, result, bfe);
if (r == 0) {
*sizep = brtnode_memory_size(*result);
*dirtyp = (*result)->dirty;
......@@ -562,17 +585,71 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
return 0;
}
// callback that sates if partially reading a node is necessary
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
BOOL toku_brtnode_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {
// placeholder for now
return FALSE;
BOOL retval = FALSE;
BRTNODE node = brtnode_pv;
struct brtnode_fetch_extra *bfe = (struct brtnode_fetch_extra *)read_extraargs;
if (bfe->type == brtnode_fetch_none) {
retval = FALSE;
}
else if (bfe->type == brtnode_fetch_all) {
retval = FALSE;
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) != PT_AVAIL) {
retval = TRUE;
break;
}
}
}
else if (bfe->type == brtnode_fetch_subset) {
// we do not take into account prefetching yet
// as of now, if we need a subset, the only thing
// we can possibly require is a single basement node
// we find out what basement node the query cares about
// and check if it is available
assert(bfe->brt);
assert(bfe->search);
bfe->child_to_read = toku_brt_search_which_child(
bfe->brt,
node,
bfe->search
);
retval = (BP_STATE(node,bfe->child_to_read) != PT_AVAIL);
}
else {
// we have a bug. The type should be known
assert(FALSE);
}
return retval;
}
// callback for partially reading a node
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_brtnode_pf_callback(void* UU(brtnode_pv), void* UU(read_extraargs), long* UU(sizep)) {
assert(FALSE);
int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, long* sizep) {
BRTNODE node = brtnode_pv;
struct brtnode_fetch_extra *bfe = (struct brtnode_fetch_extra *)read_extraargs;
// there must be a reason this is being called. If we get a garbage type or the type is brtnode_fetch_none,
// then something went wrong
assert((bfe->type == brtnode_fetch_subset) || (bfe->type == brtnode_fetch_all));
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) {
continue;
}
if (toku_brtnode_partition_state(bfe, i) == PT_AVAIL) {
assert(BP_STATE(node,i) == PT_COMPRESSED);
//
// decompress the subblock
//
toku_deserialize_bp_from_compressed(node, i);
}
}
*sizep = brtnode_memory_size(node);
return 0;
}
......@@ -761,6 +838,7 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height, int n
n->childkeys=0;
n->bp = 0;
n->n_children = num_children;
n->bp_offset = 0;
if (num_children > 0) {
MALLOC_N(num_children-1, n->childkeys);
......@@ -1351,6 +1429,8 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
{
void *childnode_v;
// For now, don't use toku_pin_brtnode since we aren't yet prepared to deal with the TRY_AGAIN, and we don't have to apply all the messages above to do this split operation.
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(t->cf,
BP_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum),
......@@ -1361,7 +1441,7 @@ brt_split_child (BRT t, BRTNODE node, int childnum, BOOL *did_react)
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
&bfe,
t->h);
assert(r==0);
child = childnode_v;
......@@ -2271,6 +2351,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
{
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnuma);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
BP_BLOCKNUM(node, childnuma),
......@@ -2282,7 +2364,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
&bfe,
t->h
);
assert(r==0);
......@@ -2291,6 +2373,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
{
void *childnode_v;
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnumb);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
int r = toku_cachetable_get_and_pin(
t->cf,
BP_BLOCKNUM(node, childnumb),
......@@ -2301,7 +2385,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
t->h,
&bfe,
t->h
);
assert(r==0);
......@@ -2474,7 +2558,9 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
toku_verify_blocknum_allocated(t->h->blocktable, targetchild);
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
BRTNODE child;
toku_pin_brtnode_holding_lock(t, targetchild, childfullhash, &next_ancestors, &next_bounds, &child); // get that child node in, and apply the ancestor messages if it's a leaf.
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, t->h);
toku_pin_brtnode_holding_lock(t, targetchild, childfullhash, &next_ancestors, &next_bounds, &bfe, &child); // get that child node in, and apply the ancestor messages if it's a leaf.
toku_assert_entire_node_in_memory(node);
assert(child->thisnodename.b!=0);
......@@ -2783,7 +2869,9 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
// get the root node
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &node);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &bfe, &node);
toku_assert_entire_node_in_memory(node);
cmd->msn.msn = node->max_msn_applied_to_node_in_memory.msn + 1;
// Note, the lower level function that filters messages based on msn,
......@@ -4927,7 +5015,19 @@ got_a_good_value:
}
static int
brt_search_node (BRT brt, BRTNODE node, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers, ANCESTORS, struct pivot_bounds const * const bounds);
brt_search_node (
BRT brt,
BRTNODE node,
brt_search_t *search,
int child_to_search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS,
struct pivot_bounds const * const bounds
);
// the number of nodes to prefetch
#define TOKU_DO_PREFETCH 0
......@@ -4991,10 +5091,19 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
BRTNODE childnode;
struct brtnode_fetch_extra bfe;
fill_bfe_for_subset_read(
&bfe,
brt->h,
brt,
search
);
{
int rr = toku_pin_brtnode(brt, childblocknum, fullhash,
unlockers,
&next_ancestors, bounds,
&bfe,
&childnode);
if (rr==TOKUDB_TRY_AGAIN) return rr;
assert(rr==0);
......@@ -5003,7 +5112,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
int r = brt_search_node(brt, childnode, search, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds);
int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds);
if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child.
......@@ -5017,14 +5126,23 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
} else {
// try again.
assert(!next_unlockers.locked);
// there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_brtnode returned
// that value and unpinned all the nodes anyway. case 2
// is when brt_search_node had to stop its search because
// some piece of a node that it needed was not in memory. In this case,
// the node was not unpinned, so we unpin it here
if (next_unlockers.locked) {
toku_unpin_brtnode(brt, childnode);
}
}
return r;
}
static int
brt_search_which_child(
int
toku_brt_search_which_child(
BRT brt,
BRTNODE node,
brt_search_t *search
......@@ -5076,6 +5194,7 @@ brt_search_node(
BRT brt,
BRTNODE node,
brt_search_t *search,
int child_to_search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
......@@ -5085,13 +5204,25 @@ brt_search_node(
struct pivot_bounds const * const bounds
)
{ int r = 0;
int child_to_search = brt_search_which_child(brt, node, search);
// assert that we got a valid child_to_search
assert(child_to_search >= 0 || child_to_search < node->n_children);
//
// At this point, we must have the necessary partition available to continue the search
//
assert(BP_STATE(node,child_to_search) == PT_AVAIL);
while (child_to_search >= 0 && child_to_search < node->n_children) {
//
// Normally, the child we want to use is available, as we checked
// before entering this while loop. However, if we pass through
// the loop once, getting DB_NOTFOUND for this first value
// of child_to_search, we enter the while loop again with a
// child_to_search that may not be in memory. If it is not,
// we need to return TOKUDB_TRY_AGAIN so the query can
// read teh appropriate partition into memory
//
if (BP_STATE(node,child_to_search) != PT_AVAIL) {
return TOKUDB_TRY_AGAIN;
}
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
if (node->height > 0) {
r = brt_search_child(
......@@ -5150,7 +5281,6 @@ brt_search_node(
child_to_search--;
}
}
return r;
}
......@@ -5172,7 +5302,14 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
BRTNODE node;
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &node);
struct brtnode_fetch_extra bfe;
fill_bfe_for_subset_read(
&bfe,
brt->h,
brt,
search
);
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &bfe, &node);
struct unlock_brtnode_extra unlock_extra = {brt,node};
struct unlockers unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
......@@ -5180,9 +5317,17 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
{
BOOL doprefetch = FALSE;
//static int counter = 0; counter++;
r = brt_search_node(brt, node, search, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds);
if (r==TOKUDB_TRY_AGAIN) {
assert(!unlockers.locked);
// there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_brtnode returned
// that value and unpinned all the nodes anyway. case 2
// is when brt_search_node had to stop its search because
// some piece of a node that it needed was not in memory. In this case,
// the node was not unpinned, so we unpin it here
if (unlockers.locked) {
toku_unpin_brtnode(brt, node);
}
goto try_again;
} else {
assert(unlockers.locked);
......@@ -5658,6 +5803,9 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
{
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
struct brtnode_fetch_extra bfe;
// TODO: (Zardosht) change this
fill_bfe_for_full_read(&bfe, brt->h);
int rr = toku_cachetable_get_and_pin(
brt->cf,
nodename,
......@@ -5669,7 +5817,7 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert_zero(rr);
......@@ -5744,6 +5892,8 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
CACHEKEY root = *rootp;
void *node_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf,
root,
......@@ -5755,7 +5905,7 @@ int toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
if (r!=0) return r;
......@@ -5782,6 +5932,8 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
BRTNODE node;
void *node_v;
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int r = toku_cachetable_get_and_pin(
brt->cf,
blocknum,
......@@ -5793,7 +5945,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert_zero(r);
......@@ -6097,6 +6249,8 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) {
void *node_v;
BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int rr = toku_cachetable_get_and_pin(
brt->cf,
childblocknum,
......@@ -6108,7 +6262,7 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) {
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert(rr ==0);
......@@ -6140,6 +6294,8 @@ BOOL toku_brt_is_empty_fast (BRT brt)
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
{
void *node_v;
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->h);
int rr = toku_cachetable_get_and_pin(
brt->cf,
*rootp,
......@@ -6151,7 +6307,7 @@ BOOL toku_brt_is_empty_fast (BRT brt)
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
&bfe,
brt->h
);
assert_zero(rr);
......
......@@ -118,7 +118,9 @@ print_le (OMTVALUE lev, u_int32_t UU(idx), void *UU(v)) {
static void
dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
BRTNODE n;
int r = toku_deserialize_brtnode_from (f, blocknum, 0 /*pass zero for hash, it doesn't matter*/, &n, h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
int r = toku_deserialize_brtnode_from (f, blocknum, 0 /*pass zero for hash, it doesn't matter*/, &n, &bfe);
assert(r==0);
assert(n!=0);
printf("brtnode\n");
......@@ -227,7 +229,9 @@ static int
fragmentation_helper(BLOCKNUM b, int64_t size, int64_t UU(address), void *extra) {
frag_help_extra *info = extra;
BRTNODE n;
int r = toku_deserialize_brtnode_from(info->f, b, 0 /*pass zero for hash, it doesn't matter*/, &n, info->h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->h);
int r = toku_deserialize_brtnode_from(info->f, b, 0 /*pass zero for hash, it doesn't matter*/, &n, &bfe);
if (r==0) {
info->blocksizes += size;
if (n->height == 0) {
......@@ -266,14 +270,14 @@ get_unaligned_uint32(unsigned char *p) {
return *(u_int32_t *)p;
}
struct sub_block {
struct dump_sub_block {
u_int32_t compressed_size;
u_int32_t uncompressed_size;
u_int32_t xsum;
};
static void
sub_block_deserialize(struct sub_block *sb, unsigned char *sub_block_header) {
sub_block_deserialize(struct dump_sub_block *sb, unsigned char *sub_block_header) {
sb->compressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+0));
sb->uncompressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+4));
sb->xsum = toku_dtoh32(get_unaligned_uint32(sub_block_header+8));
......@@ -288,7 +292,7 @@ verify_block(unsigned char *cp, u_int64_t file_offset, u_int64_t size) {
unsigned char *sub_block_header = &cp[node_header];
u_int32_t n_sub_blocks = toku_dtoh32(get_unaligned_uint32(&sub_block_header[0]));
u_int32_t header_length = node_header + n_sub_blocks * sizeof (struct sub_block);
u_int32_t header_length = node_header + n_sub_blocks * sizeof (struct dump_sub_block);
header_length += sizeof (u_int32_t); // CRC
if (header_length > size) {
printf("header length too big: %u\n", header_length);
......@@ -302,11 +306,11 @@ verify_block(unsigned char *cp, u_int64_t file_offset, u_int64_t size) {
}
// deserialize the sub block header
struct sub_block sub_block[n_sub_blocks];
struct dump_sub_block sub_block[n_sub_blocks];
sub_block_header += sizeof (u_int32_t);
for (u_int32_t i = 0 ; i < n_sub_blocks; i++) {
sub_block_deserialize(&sub_block[i], sub_block_header);
sub_block_header += sizeof (struct sub_block);
sub_block_header += sizeof (struct dump_sub_block);
}
// verify the sub block header
......
......@@ -1544,8 +1544,12 @@ int toku_cachetable_get_and_pin (
cachetable_waittime += get_tnow() - t0;
}
t0 = get_tnow();
long old_size = p->size;
long size = 0;
int r = pf_callback(p->value, read_extraargs, &size);
p->size = size;
ct->size_current += size;
ct->size_current -= old_size;
lazy_assert_zero(r);
cachetable_waittime += get_tnow() - t0;
rwlock_write_unlock(&p->rwlock);
......@@ -1816,11 +1820,18 @@ int toku_cachetable_get_and_pin_nonblocking (
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
// Now wait for the I/O to occur.
rwlock_write_lock(&p->rwlock, ct->mutex);
cachetable_unlock(ct);
long old_size = p->size;
long size = 0;
int r = pf_callback(p->value, read_extraargs, &size);
lazy_assert_zero(r);
cachetable_lock(ct);
p->size = size;
ct->size_current += size;
ct->size_current -= old_size;
rwlock_write_unlock(&p->rwlock);
cachetable_unlock(ct);
if (ct->ydb_lock_callback) ct->ydb_lock_callback();
return TOKUDB_TRY_AGAIN;
}
rwlock_read_lock(&p->rwlock, ct->mutex);
......
......@@ -160,7 +160,9 @@ test_serialize_leaf_with_large_pivots(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......@@ -281,7 +283,9 @@ test_serialize_leaf_with_many_rows(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......@@ -408,7 +412,9 @@ test_serialize_leaf_with_large_rows(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......@@ -538,7 +544,9 @@ test_serialize_leaf_with_empty_basement_nodes(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......@@ -667,7 +675,9 @@ test_serialize_leaf(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......@@ -814,7 +824,9 @@ test_serialize_nonleaf(void) {
assert(sn.max_msn_applied_to_node_in_memory.msn == TESTMSNMEMVAL);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
struct brtnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt_h);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->thisnodename.b==20);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment