Commit 48245c84 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1670

Serialize descriptor with each brtnode (leaf and nonleaf) 
Deserialize descriptor with each brtnode (to be thrown away). 
    Don't malloc+memdup the descriptor for brtnodes, since that would waste time.




git-svn-id: file:///svn/toku/tokudb@11200 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1b96373a
...@@ -82,6 +82,7 @@ typedef struct brtnode *BRTNODE; ...@@ -82,6 +82,7 @@ typedef struct brtnode *BRTNODE;
/* Internal nodes. */ /* Internal nodes. */
struct brtnode { struct brtnode {
enum typ_tag tag; enum typ_tag tag;
struct descriptor *desc;
unsigned int nodesize; unsigned int nodesize;
int ever_been_written; int ever_been_written;
unsigned int flags; unsigned int flags;
......
...@@ -167,6 +167,7 @@ addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) { ...@@ -167,6 +167,7 @@ addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
unsigned int size=brtnode_header_overhead; unsigned int size=brtnode_header_overhead;
size += toku_serialize_descriptor_size(node->desc);
if (node->height>0) { if (node->height>0) {
unsigned int hsize=0; unsigned int hsize=0;
unsigned int csize=0; unsigned int csize=0;
...@@ -207,6 +208,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -207,6 +208,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
unsigned int toku_serialize_brtnode_size (BRTNODE node) { unsigned int toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead; unsigned int result =brtnode_header_overhead;
assert(sizeof(toku_off_t)==8); assert(sizeof(toku_off_t)==8);
result += toku_serialize_descriptor_size(node->desc);
if (node->height>0) { if (node->height>0) {
result+=4; /* subtree fingerpirnt */ result+=4; /* subtree fingerpirnt */
result+=4; /* n_children */ result+=4; /* n_children */
...@@ -327,6 +329,8 @@ static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) { ...@@ -327,6 +329,8 @@ static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) {
static inline void ignore_int (int UU(ignore_me)) {} static inline void ignore_int (int UU(ignore_me)) {}
static void serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc);
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint) { int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint) {
struct wbuf w; struct wbuf w;
int i; int i;
...@@ -347,6 +351,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -347,6 +351,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
assert(node->layout_version == BRT_LAYOUT_VERSION_9 || node->layout_version == BRT_LAYOUT_VERSION); assert(node->layout_version == BRT_LAYOUT_VERSION_9 || node->layout_version == BRT_LAYOUT_VERSION);
wbuf_int(&w, node->layout_version); wbuf_int(&w, node->layout_version);
wbuf_ulonglong(&w, node->log_lsn.lsn); wbuf_ulonglong(&w, node->log_lsn.lsn);
assert(node->desc == &h->descriptor);
serialize_descriptor_contents_to_wbuf(&w, node->desc);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, node->nodesize); wbuf_uint(&w, node->nodesize);
wbuf_uint(&w, node->flags); wbuf_uint(&w, node->flags);
...@@ -606,6 +612,8 @@ static inline void do_toku_trace(const char *cp, int len) { ...@@ -606,6 +612,8 @@ static inline void do_toku_trace(const char *cp, int len) {
#define toku_trace(a) #define toku_trace(a)
#endif #endif
static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) { int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic; if (h->panic) return h->panic;
...@@ -628,6 +636,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -628,6 +636,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
if (0) { died0: toku_free(result); } if (0) { died0: toku_free(result); }
return r; return r;
} }
result->desc = &h->descriptor;
result->ever_been_written = 1; result->ever_been_written = 1;
unsigned char *MALLOC_N(size, compressed_block); unsigned char *MALLOC_N(size, compressed_block);
...@@ -731,6 +740,13 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -731,6 +740,13 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
ok_layout_version: ; ok_layout_version: ;
} }
result->disk_lsn.lsn = rbuf_ulonglong(&rc); result->disk_lsn.lsn = rbuf_ulonglong(&rc);
{
//Restrict scope for now since we do not support upgrades.
struct descriptor desc;
//desc.dbt.data is TEMPORARY. Will be unusable when the rc buffer is freed.
deserialize_descriptor_from_rbuf(&rc, &desc, TRUE);
assert(desc.version == result->desc->version); //We do not yet support upgrading the dbts.
}
result->nodesize = rbuf_int(&rc); result->nodesize = rbuf_int(&rc);
result->log_lsn = result->disk_lsn; result->log_lsn = result->disk_lsn;
...@@ -1108,19 +1124,22 @@ toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOF ...@@ -1108,19 +1124,22 @@ toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOF
} }
static void static void
deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc) { deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary) {
desc->version = rbuf_int(rb); desc->version = rbuf_int(rb);
u_int32_t size; u_int32_t size;
bytevec data; bytevec data;
rbuf_bytes(rb, &data, &size); rbuf_bytes(rb, &data, &size);
bytevec data_copy; bytevec data_copy = data;;
if (size>0) if (size>0) {
if (!temporary) {
data_copy = toku_memdup(data, size); //Cannot keep the reference from rbuf. Must copy. data_copy = toku_memdup(data, size); //Cannot keep the reference from rbuf. Must copy.
assert(data_copy);
}
}
else { else {
assert(size==0); assert(size==0);
data_copy = NULL; data_copy = NULL;
} }
assert(data_copy);
toku_fill_dbt(&desc->dbt, data_copy, size); toku_fill_dbt(&desc->dbt, data_copy, size);
if (desc->version==0) assert(desc->dbt.size==0); if (desc->version==0) assert(desc->dbt.size==0);
} }
...@@ -1150,7 +1169,8 @@ deserialize_descriptor_from(int fd, struct brt_header *h, struct descriptor *des ...@@ -1150,7 +1169,8 @@ deserialize_descriptor_from(int fd, struct brt_header *h, struct descriptor *des
} }
{ {
struct rbuf rb = {.buf = dbuf, .size = size, .ndone = 0}; struct rbuf rb = {.buf = dbuf, .size = size, .ndone = 0};
deserialize_descriptor_from_rbuf(&rb, desc); //Not temporary; must have a toku_memdup'd copy.
deserialize_descriptor_from_rbuf(&rb, desc, FALSE);
} }
assert(toku_serialize_descriptor_size(desc)+4 == size); assert(toku_serialize_descriptor_size(desc)+4 == size);
toku_free(dbuf); toku_free(dbuf);
......
...@@ -651,6 +651,7 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height) ...@@ -651,6 +651,7 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height)
// Effect: Fill in N as an empty brtnode. // Effect: Fill in N as an empty brtnode.
{ {
n->tag = TYP_BRTNODE; n->tag = TYP_BRTNODE;
n->desc = &t->h->descriptor;
n->nodesize = t->h->nodesize; n->nodesize = t->h->nodesize;
n->flags = t->flags; n->flags = t->flags;
n->thisnodename = nodename; n->thisnodename = nodename;
......
...@@ -54,7 +54,7 @@ static void test_serialize(void) { ...@@ -54,7 +54,7 @@ static void test_serialize(void) {
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
struct brt *XMALLOC(brt); struct brt *XMALLOC(brt);
struct brt_header *XMALLOC(brt_h); struct brt_header *XCALLOC(brt_h);
brt->h = brt_h; brt->h = brt_h;
brt_h->type = BRTHEADER_CURRENT; brt_h->type = BRTHEADER_CURRENT;
brt_h->panic = 0; brt_h->panic_string = 0; brt_h->panic = 0; brt_h->panic_string = 0;
...@@ -77,6 +77,7 @@ static void test_serialize(void) { ...@@ -77,6 +77,7 @@ static void test_serialize(void) {
assert(size == 100); assert(size == 100);
} }
sn.desc = &brt->h->descriptor;
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE); r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0); assert(r==0);
......
...@@ -202,6 +202,9 @@ runtest(void) { ...@@ -202,6 +202,9 @@ runtest(void) {
open_db(order[i]); open_db(order[i]);
test_insert(i); test_insert(i);
close_db(); close_db();
open_db(-1);
test_insert(i);
close_db();
open_db(order[i]); open_db(order[i]);
test_insert(i); test_insert(i);
close_db(); close_db();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment