Commit 2adbbe6d authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix 1172. we can not use the brt in the block serialize functions as the brt...

fix 1172. we can not use the brt in the block serialize functions as the brt may not be valid.  addresses #1172

git-svn-id: file:///svn/tokudb.1131b+1080a@6272 c7de825b-a66e-492c-adef-691d508d4ae1
parent bf7ed9d9
......@@ -178,7 +178,7 @@ struct brt {
};
/* serialization code */
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, BRT brt);
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -183,7 +183,7 @@ const int uncompressed_magic_len = (8 // tokuleaf or tokunode
const int compression_header_len = (4 // compressed_len
+4); // uncompressed_len
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt) {
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) {
//printf("%s:%d serializing\n", __FILE__, __LINE__);
struct wbuf w;
int i;
......@@ -313,33 +313,33 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt
lock_for_pwrite();
// If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0);
//printf("%s:%d brt=%p\n", __FILE__, __LINE__, brt);
//printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, brt->h->translated_blocknum_limit, blocknum.b);
//printf("%s:%d allocator=%p\n", __FILE__, __LINE__, brt->h->block_allocator);
//printf("%s:%d bt=%p\n", __FILE__, __LINE__, brt->h->block_translation);
if (brt->h->translated_blocknum_limit <= (u_int64_t)blocknum.b) {
if (brt->h->block_translation == 0) assert(brt->h->translated_blocknum_limit==0);
//printf("%s:%d h=%p\n", __FILE__, __LINE__, h);
//printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, h->translated_blocknum_limit, blocknum.b);
//printf("%s:%d allocator=%p\n", __FILE__, __LINE__, h->block_allocator);
//printf("%s:%d bt=%p\n", __FILE__, __LINE__, h->block_translation);
if (h->translated_blocknum_limit <= (u_int64_t)blocknum.b) {
if (h->block_translation == 0) assert(h->translated_blocknum_limit==0);
u_int64_t new_limit = blocknum.b + 1;
u_int64_t old_limit = brt->h->translated_blocknum_limit;
u_int64_t old_limit = h->translated_blocknum_limit;
u_int64_t j;
XREALLOC_N(new_limit, brt->h->block_translation);
XREALLOC_N(new_limit, h->block_translation);
for (j=old_limit; j<new_limit; j++) {
brt->h->block_translation[j].diskoff = 0;
brt->h->block_translation[j].size = 0;
h->block_translation[j].diskoff = 0;
h->block_translation[j].size = 0;
}
brt->h->translated_blocknum_limit = new_limit;
h->translated_blocknum_limit = new_limit;
}
if (brt->h->block_translation[blocknum.b].size > 0) {
block_allocator_free_block(brt->h->block_allocator, brt->h->block_translation[blocknum.b].diskoff);
brt->h->block_translation[blocknum.b].diskoff = 0;
brt->h->block_translation[blocknum.b].size = 0;
if (h->block_translation[blocknum.b].size > 0) {
block_allocator_free_block(h->block_allocator, h->block_translation[blocknum.b].diskoff);
h->block_translation[blocknum.b].diskoff = 0;
h->block_translation[blocknum.b].size = 0;
}
brt->h->dirty = 1; // Allocating a block dirties the header.
h->dirty = 1; // Allocating a block dirties the header.
size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
u_int64_t offset;
block_allocator_alloc_block(brt->h->block_allocator, n_to_write, &offset);
brt->h->block_translation[blocknum.b].diskoff = offset;
brt->h->block_translation[blocknum.b].size = n_to_write;
block_allocator_alloc_block(h->block_allocator, n_to_write, &offset);
h->block_translation[blocknum.b].diskoff = offset;
h->block_translation[blocknum.b].size = n_to_write;
ssize_t r=toku_pwrite(fd, compressed_buf, n_to_write, offset);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write);
......
......@@ -162,7 +162,7 @@ static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
}
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRT brt = extraargs;
struct brt_header *h = extraargs;
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
......@@ -176,7 +176,7 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, brt);
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) {
......@@ -187,9 +187,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
int toku_brtnode_fetch_callback (CACHEFILE cachefile, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
assert(extraargs);
BRT brt = extraargs;
struct brt_header *h = extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, fullhash, result, brt->h);
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, fullhash, result, h);
if (r == 0) {
*sizep = brtnode_memory_size(*result);
*written_lsn = (*result)->disk_lsn;
......@@ -370,7 +370,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
n->fullhash = fullhash;
r=toku_cachetable_put(t->cf, n->thisnodename, fullhash,
n, brtnode_memory_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
assert(r==0);
return 0;
}
......@@ -1071,7 +1071,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
assert(targetchild.b>=0 && targetchild.b<t->h->unused_blocks.b); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
r = toku_cachetable_get_and_pin(t->cf, targetchild, childfullhash, &childnode_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
child=childnode_v;
......@@ -1738,7 +1738,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, fullhash, &child_v);
else
r = toku_cachetable_get_and_pin(t->cf, childblocknum, fullhash, &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r != 0)
return r;
......@@ -2011,7 +2011,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER log
node->fullhash = fullhash;
r=toku_cachetable_put(t->cf, blocknum, fullhash,
node, brtnode_memory_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
if (r!=0) {
toku_free(node);
return r;
......@@ -2535,7 +2535,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
u_int32_t fullhash = toku_cachetable_hash(brt->cf, newroot_diskoff);
newroot->fullhash = fullhash;
toku_cachetable_put(brt->cf, newroot_diskoff, fullhash, newroot, brtnode_memory_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
*newrootp = newroot;
return 0;
}
......@@ -2594,7 +2594,7 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
//assert(fullhash==toku_cachetable_hash(brt->cf, *rootp));
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
return r;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
......@@ -2690,7 +2690,7 @@ int toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, I
u_int32_t fullhash = toku_cachetable_hash(brt->cf, blocknum);
int r = toku_cachetable_get_and_pin(brt->cf, blocknum, fullhash,
&node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
......@@ -2754,7 +2754,7 @@ static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) {
int i,r;
assert(off%brt->h->nodesize==0);
if ((r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 0, 0); }
return r;
}
......@@ -2816,7 +2816,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
void *node_v;
BLOCKNUM childblocknum = BNC_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
rr = toku_cachetable_get_and_pin(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
rr = toku_cachetable_get_and_pin(brt->cf, childblocknum, fullhash, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
for (;;) {
......@@ -3014,7 +3014,7 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
BRTNODE node = node_v;
......@@ -3724,7 +3724,7 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, u_int32_t fu
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, nodename));
int rr = toku_cachetable_get_and_pin(brt->cf, nodename, fullhash,
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
&node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h);
assert(rr == 0);
node = node_v;
assert(node->fullhash==fullhash);
......@@ -3822,7 +3822,7 @@ int toku_brt_height_of_root(BRT brt, int *height) {
void *node_v;
//assert(fullhash == toku_cachetable_hash(brt->cf, *rootp));
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, fullhash, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt->h))) {
return r;
}
BRTNODE node = node_v;
......
......@@ -69,7 +69,7 @@ static void test_serialize(void) {
assert(b==4096);
}
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt); assert(r==0);
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h); assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment