Commit f99b058a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1924 refs[t:1924] Clean up deserialize brtnode in preparation for upgrade logic

git-svn-id: file:///svn/toku/tokudb@13911 c7de825b-a66e-492c-adef-691d508d4ae1
parent 14e1f586
...@@ -109,3 +109,20 @@ deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **brth) { ...@@ -109,3 +109,20 @@ deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **brth) {
*brth = h; *brth = h;
return 0; return 0;
} }
static int
decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r = decompress_brtnode_from_raw_block_into_rbuf(raw_block, rb, blocknum);
return r;
}
static int
deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb) {
blocknum = blocknum;
fullhash = fullhash;
brtnode = brtnode;
h = h;
rb = rb;
assert(FALSE);
}
...@@ -10,4 +10,8 @@ static int deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **br ...@@ -10,4 +10,8 @@ static int deserialize_brtheader_10 (int fd, struct rbuf *rb, brt_header_10 **br
static int upgrade_brtheader_10_11 (brt_header_10 **brth_10, brt_header_11 **brth_11); static int upgrade_brtheader_10_11 (brt_header_10 **brth_10, brt_header_11 **brth_11);
static int decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum);
static int deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb);
#endif #endif
...@@ -617,189 +617,52 @@ static inline void do_toku_trace(const char *cp, int len) { ...@@ -617,189 +617,52 @@ static inline void do_toku_trace(const char *cp, int len) {
static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary); static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) { static int
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
if (h->panic) return h->panic;
#if DO_TOKU_TRACE
if (toku_trace_fd == -1)
toku_trace_fd = open("/dev/null", O_WRONLY);
toku_trace("deserial start");
#endif
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
TAGMALLOC(BRTNODE, result);
struct rbuf rc;
int i;
int r; int r;
if (result==0) { int i;
r=errno;
if (0) { died0: toku_free(result); }
return r;
}
result->desc = &h->descriptor;
result->ever_been_written = 1;
unsigned char *XMALLOC_N(size, compressed_block);
// read the compressed block
ssize_t rlen = pread(fd, compressed_block, size, offset);
assert((DISKOFF)rlen == size);
// get the layout_version
unsigned char *uncompressed_header = compressed_block;
int layout_version = toku_dtoh32(*(uint32_t*)(uncompressed_header+uncompressed_version_offset));
//TODO: #1924 Deal with old versions here.
//TODO: #1924 either assert we're not too old, or return some form of
// checksum error?
// get the number of compressed sub blocks
int n_sub_blocks;
int compression_header_offset;
{
n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len]));
compression_header_offset = uncompressed_magic_len + 4;
}
assert(0 < n_sub_blocks);
// verify the sizes of the compressed sub blocks
if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
struct sub_block_sizes sub_block_sizes[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t*)(&uncompressed_header[compression_header_offset+8*i]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; }
u_int32_t uncompressed_size = toku_dtoh32(*(u_int32_t*)(&uncompressed_header[compression_header_offset+8*i+4]));
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; }
sub_block_sizes[i].compressed_size = compressed_size;
sub_block_sizes[i].uncompressed_size = uncompressed_size;
}
unsigned char *compressed_data = compressed_block + uncompressed_magic_len + get_compression_header_size(layout_version, n_sub_blocks);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block_sizes);
rc.size= uncompressed_magic_len + uncompressed_size;
assert(rc.size>0);
rc.buf=toku_malloc(rc.size);
assert(rc.buf);
// construct the uncompressed block from the header and compressed sub blocks
memcpy(rc.buf, uncompressed_header, uncompressed_magic_len);
// decompress the sub blocks
unsigned char *uncompressed_data = rc.buf+uncompressed_magic_len;
struct decompress_work decompress_work[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
init_decompress_work(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
if (i>0) {
#if DO_DECOMPRESS_WORKER
start_decompress_work(&decompress_work[i]);
#else
do_decompress_work(&decompress_work[i]);
#endif
}
uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size;
}
do_decompress_work(&decompress_work[0]);
#if DO_DECOMPRESS_WORKER
for (i=1; i<n_sub_blocks; i++)
wait_decompress_work(&decompress_work[i]);
#endif
toku_trace("decompress done");
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
rc.buf[uncompressed_magic_len], rc.buf[uncompressed_magic_len+1],
rc.buf[uncompressed_magic_len+2], rc.buf[uncompressed_magic_len+3]);
toku_free(compressed_block);
// deserialize the uncompressed block if (memcmp(magic, "tokunode", 8)!=0) {
rc.ndone=0;
//printf("Deserializing %lld datasize=%d\n", off, datasize);
{
bytevec tmp;
rbuf_literal_bytes(&rc, &tmp, 8);
if (memcmp(tmp, "tokuleaf", 8)!=0
&& memcmp(tmp, "tokunode", 8)!=0) {
r = toku_db_badformat(); r = toku_db_badformat();
return r; return r;
} }
}
result->layout_version = rbuf_int(&rc);
{
switch (result->layout_version) {
//TODO: #1924
case BRT_LAYOUT_VERSION: goto ok_layout_version;
// Don't support older versions.
}
r=toku_db_badformat();
return r;
ok_layout_version: ;
}
result->disk_lsn.lsn = rbuf_ulonglong(&rc);
{
//Restrict scope for now since we do not support upgrades.
struct descriptor desc;
//desc.dbt.data is TEMPORARY. Will be unusable when the rc buffer is freed.
deserialize_descriptor_from_rbuf(&rc, &desc, TRUE);
assert(desc.version == result->desc->version); //We do not yet support upgrading the dbts.
}
result->nodesize = rbuf_int(&rc);
result->log_lsn = result->disk_lsn;
result->thisnodename = blocknum;
result->flags = rbuf_int(&rc);
result->height = rbuf_int(&rc);
result->rand4fingerprint = rbuf_int(&rc);
result->local_fingerprint = rbuf_int(&rc);
// printf("%s:%d read %08x\n", __FILE__, __LINE__, result->local_fingerprint);
result->dirty = 0;
result->fullhash = fullhash;
//printf("height==%d\n", result->height);
if (result->height>0) {
result->u.n.totalchildkeylens=0; result->u.n.totalchildkeylens=0;
u_int32_t subtree_fingerprint = rbuf_int(&rc); u_int32_t subtree_fingerprint = rbuf_int(rb);
u_int32_t check_subtree_fingerprint = 0; u_int32_t check_subtree_fingerprint = 0;
result->u.n.n_children = rbuf_int(&rc); result->u.n.n_children = rbuf_int(rb);
MALLOC_N(result->u.n.n_children+1, result->u.n.childinfos); MALLOC_N(result->u.n.n_children+1, result->u.n.childinfos);
MALLOC_N(result->u.n.n_children, result->u.n.childkeys); MALLOC_N(result->u.n.n_children, result->u.n.childkeys);
//printf("n_children=%d\n", result->n_children); //printf("n_children=%d\n", result->n_children);
assert(result->u.n.n_children>=0); assert(result->u.n.n_children>=0);
for (i=0; i<result->u.n.n_children; i++) { for (i=0; i<result->u.n.n_children; i++) {
u_int32_t childfp = rbuf_int(&rc); u_int32_t childfp = rbuf_int(rb);
BNC_SUBTREE_FINGERPRINT(result, i)= childfp; BNC_SUBTREE_FINGERPRINT(result, i)= childfp;
check_subtree_fingerprint += childfp; check_subtree_fingerprint += childfp;
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(result, i)); struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(result, i));
se->nkeys = rbuf_ulonglong(&rc); se->nkeys = rbuf_ulonglong(rb);
se->ndata = rbuf_ulonglong(&rc); se->ndata = rbuf_ulonglong(rb);
se->dsize = rbuf_ulonglong(&rc); se->dsize = rbuf_ulonglong(rb);
se->exact = (BOOL) (rbuf_char(&rc) != 0); se->exact = (BOOL) (rbuf_char(rb) != 0);
} }
for (i=0; i<result->u.n.n_children-1; i++) { for (i=0; i<result->u.n.n_children-1; i++) {
if (result->flags & TOKU_DB_DUPSORT) { if (result->flags & TOKU_DB_DUPSORT) {
bytevec keyptr, dataptr; bytevec keyptr, dataptr;
unsigned int keylen, datalen; unsigned int keylen, datalen;
rbuf_bytes(&rc, &keyptr, &keylen); rbuf_bytes(rb, &keyptr, &keylen);
rbuf_bytes(&rc, &dataptr, &datalen); rbuf_bytes(rb, &dataptr, &datalen);
result->u.n.childkeys[i] = kv_pair_malloc(keyptr, keylen, dataptr, datalen); result->u.n.childkeys[i] = kv_pair_malloc(keyptr, keylen, dataptr, datalen);
} else { } else {
bytevec childkeyptr; bytevec childkeyptr;
unsigned int cklen; unsigned int cklen;
rbuf_bytes(&rc, &childkeyptr, &cklen); /* Returns a pointer into the rbuf. */ rbuf_bytes(rb, &childkeyptr, &cklen); /* Returns a pointer into the rbuf. */
result->u.n.childkeys[i] = kv_pair_malloc((void*)childkeyptr, cklen, 0, 0); result->u.n.childkeys[i] = kv_pair_malloc((void*)childkeyptr, cklen, 0, 0);
} }
//printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]); //printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]);
result->u.n.totalchildkeylens+=toku_brtnode_pivot_key_len(result, result->u.n.childkeys[i]); result->u.n.totalchildkeylens+=toku_brtnode_pivot_key_len(result, result->u.n.childkeys[i]);
} }
for (i=0; i<result->u.n.n_children; i++) { for (i=0; i<result->u.n.n_children; i++) {
BNC_BLOCKNUM(result,i) = rbuf_blocknum(&rc); BNC_BLOCKNUM(result,i) = rbuf_blocknum(rb);
BNC_HAVE_FULLHASH(result, i) = FALSE; BNC_HAVE_FULLHASH(result, i) = FALSE;
BNC_NBYTESINBUF(result,i) = 0; BNC_NBYTESINBUF(result,i) = 0;
//printf("Child %d at %lld\n", i, result->children[i]); //printf("Child %d at %lld\n", i, result->children[i]);
...@@ -809,7 +672,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -809,7 +672,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
r=toku_fifo_create(&BNC_BUFFER(result,i)); r=toku_fifo_create(&BNC_BUFFER(result,i));
if (r!=0) { if (r!=0) {
int j; int j;
if (0) { died_12: j=result->u.n.n_bytes_in_buffers; } if (0) { died_1: j=result->u.n.n_bytes_in_buffers; }
for (j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j)); for (j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j));
return toku_db_badformat(); return toku_db_badformat();
} }
...@@ -818,23 +681,23 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -818,23 +681,23 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
int cnum; int cnum;
u_int32_t check_local_fingerprint = 0; u_int32_t check_local_fingerprint = 0;
for (cnum=0; cnum<result->u.n.n_children; cnum++) { for (cnum=0; cnum<result->u.n.n_children; cnum++) {
int n_in_this_hash = rbuf_int(&rc); int n_in_this_hash = rbuf_int(rb);
//printf("%d in hash\n", n_in_hash); //printf("%d in hash\n", n_in_hash);
for (i=0; i<n_in_this_hash; i++) { for (i=0; i<n_in_this_hash; i++) {
int diff; int diff;
bytevec key; ITEMLEN keylen; bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen; bytevec val; ITEMLEN vallen;
//toku_verify_counts(result); //toku_verify_counts(result);
int type = rbuf_char(&rc); int type = rbuf_char(rb);
XIDS xids; XIDS xids;
xids_create_from_buffer(&rc, &xids); xids_create_from_buffer(rb, &xids);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */ rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen); rbuf_bytes(rb, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen); check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val); //printf("Found %s,%s\n", (char*)key, (char*)val);
{ {
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the hash table. */ r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; } if (r!=0) { goto died_1; }
} }
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids); diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
result->u.n.n_bytes_in_buffers += diff; result->u.n.n_bytes_in_buffers += diff;
...@@ -852,87 +715,328 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -852,87 +715,328 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
return toku_db_badformat(); return toku_db_badformat();
} }
} }
} else { (void)rbuf_int(rb); //Ignore the crc (already verified).
result->u.l.leaf_stats.nkeys = rbuf_ulonglong(&rc); if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
result->u.l.leaf_stats.ndata = rbuf_ulonglong(&rc); r = toku_db_badformat(); goto died_1;
result->u.l.leaf_stats.dsize = rbuf_ulonglong(&rc); }
return 0;
}
static int
deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
int r;
int i;
if (memcmp(magic, "tokuleaf", 8)!=0) {
r = toku_db_badformat();
return r;
}
result->u.l.leaf_stats.nkeys = rbuf_ulonglong(rb);
result->u.l.leaf_stats.ndata = rbuf_ulonglong(rb);
result->u.l.leaf_stats.dsize = rbuf_ulonglong(rb);
result->u.l.leaf_stats.exact = TRUE; result->u.l.leaf_stats.exact = TRUE;
int n_in_buf = rbuf_int(&rc); int n_in_buf = rbuf_int(rb);
result->u.l.n_bytes_in_buffer = 0; result->u.l.n_bytes_in_buffer = 0;
result->u.l.seqinsert = 0; result->u.l.seqinsert = 0;
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer); //printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_mempool_init(&result->u.l.buffer_mempool, rc.buf, uncompressed_size + uncompressed_magic_len); toku_mempool_init(&result->u.l.buffer_mempool, rb->buf, rb->size);
u_int32_t actual_sum = 0; u_int32_t actual_sum = 0;
u_int32_t start_of_data = rc.ndone; u_int32_t start_of_data = rb->ndone;
OMTVALUE *MALLOC_N(n_in_buf, array); OMTVALUE *MALLOC_N(n_in_buf, array);
for (i=0; i<n_in_buf; i++) { for (i=0; i<n_in_buf; i++) {
LEAFENTRY le = (LEAFENTRY)(&rc.buf[rc.ndone]); LEAFENTRY le = (LEAFENTRY)(&rb->buf[rb->ndone]);
u_int32_t disksize = leafentry_disksize(le); u_int32_t disksize = leafentry_disksize(le);
rc.ndone += disksize; rb->ndone += disksize;
assert(rc.ndone<=rc.size); assert(rb->ndone<=rb->size);
array[i]=(OMTVALUE)le; array[i]=(OMTVALUE)le;
actual_sum += x1764_memory(le, disksize); actual_sum += x1764_memory(le, disksize);
} }
toku_trace("fill array"); toku_trace("fill array");
u_int32_t end_of_data = rc.ndone; u_int32_t end_of_data = rb->ndone;
result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD; result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD;
actual_sum *= result->rand4fingerprint; actual_sum *= result->rand4fingerprint;
r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf); r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf);
toku_trace("create omt"); toku_trace("create omt");
if (r!=0) { if (r!=0) {
toku_free(array); toku_free(array);
if (0) { died_21: toku_omt_destroy(&result->u.l.buffer); } r = toku_db_badformat();
return toku_db_badformat(); if (0) { died_1: toku_omt_destroy(&result->u.l.buffer); }
return r;
} }
assert(array==NULL); assert(array==NULL);
r = toku_leaflock_borrow(&result->u.l.leaflock);
if (r!=0) goto died_21;
result->u.l.buffer_mempool.frag_size = start_of_data; result->u.l.buffer_mempool.frag_size = start_of_data;
result->u.l.buffer_mempool.free_offset = end_of_data; result->u.l.buffer_mempool.free_offset = end_of_data;
if (r!=0) goto died_21; if (r!=0) goto died_1;
if (actual_sum!=result->local_fingerprint) { if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf); //fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
return toku_db_badformat(); r = toku_db_badformat();
// goto died_21; goto died_1;
} else { } else {
//fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height); //fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height);
} }
//toku_verify_counts(result); //toku_verify_counts(result);
(void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
r = toku_leaflock_borrow(&result->u.l.leaflock);
if (r!=0) goto died_1;
rb->buf = NULL; //Buffer was used for node's mempool.
return 0;
}
static int
deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb) {
TAGMALLOC(BRTNODE, result);
int r;
if (result==0) {
r=errno;
if (0) { died0: toku_free(result); }
return r;
} }
result->desc = &h->descriptor;
result->ever_been_written = 1;
//printf("Deserializing %lld datasize=%d\n", off, datasize);
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
result->layout_version = rbuf_int(rb);
assert(result->layout_version == BRT_LAYOUT_VERSION);
result->disk_lsn.lsn = rbuf_ulonglong(rb);
{ {
unsigned int n_read_so_far = rc.ndone; //Restrict scope for now since we do not support upgrades.
if (n_read_so_far+4!=rc.size) { struct descriptor desc;
r = toku_db_badformat(); goto died_21; //desc.dbt.data is TEMPORARY. Will be unusable when the rc buffer is freed.
deserialize_descriptor_from_rbuf(rb, &desc, TRUE);
assert(desc.version == result->desc->version); //We do not yet support upgrading the dbts.
}
result->nodesize = rbuf_int(rb);
result->log_lsn = result->disk_lsn;
result->thisnodename = blocknum;
result->flags = rbuf_int(rb);
result->height = rbuf_int(rb);
result->rand4fingerprint = rbuf_int(rb);
result->local_fingerprint = rbuf_int(rb);
// printf("%s:%d read %08x\n", __FILE__, __LINE__, result->local_fingerprint);
result->dirty = 0;
result->fullhash = fullhash;
//printf("height==%d\n", result->height);
if (result->height>0)
r = deserialize_brtnode_nonleaf_from_rbuf(result, magic, rb);
else
r = deserialize_brtnode_leaf_from_rbuf(result, magic, rb);
if (r!=0) goto died0;
//printf("%s:%d Ok got %lld n_children=%d\n", __FILE__, __LINE__, result->thisnodename, result->n_children);
if (result->height>0) {
// For height==0 we used the buf inside the OMT
toku_free(rb->buf);
rb->buf = NULL;
} }
toku_trace("deserial done");
*brtnode = result;
//toku_verify_counts(result);
return 0;
}
static int
verify_decompressed_brtnode_checksum (struct rbuf *rb) {
int r = 0;
if (rb->size >= 4) {
uint32_t verify_size = rb->size - 4; //Not counting the checksum
toku_trace("x1764 start"); toku_trace("x1764 start");
uint32_t crc = x1764_memory(rc.buf, n_read_so_far); uint32_t crc = x1764_memory(rb->buf, verify_size);
toku_trace("x1764"); toku_trace("x1764");
uint32_t storedcrc = rbuf_int(&rc);
uint32_t *crcp = (uint32_t*)(((uint8_t*)rb->buf) + verify_size);
uint32_t storedcrc = toku_dtoh32(*crcp);
if (crc!=storedcrc) { if (crc!=storedcrc) {
printf("Bad CRC\n"); printf("Bad CRC\n");
printf("%s:%d crc=%08x stored=%08x\n", __FILE__, __LINE__, crc, storedcrc); printf("%s:%d crc=%08x stored=%08x\n", __FILE__, __LINE__, crc, storedcrc);
assert(0);//this is wrong!!!
r = toku_db_badformat(); r = toku_db_badformat();
goto died_21;
} }
} }
//printf("%s:%d Ok got %lld n_children=%d\n", __FILE__, __LINE__, result->thisnodename, result->n_children); else r = toku_db_badformat();
if (result->height>0) { return r;
// For height==0 we used the buf inside the OMT }
toku_free(rc.buf);
static int
decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r;
int i;
// get the number of compressed sub blocks
int n_sub_blocks;
int compression_header_offset;
{
n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&raw_block[uncompressed_magic_len]));
compression_header_offset = uncompressed_magic_len + 4;
} }
toku_trace("deserial done"); assert(0 < n_sub_blocks);
*brtnode = result;
//toku_verify_counts(result); // verify the sizes of the compressed sub blocks
return 0; if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
struct sub_block_sizes sub_block_sizes[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
u_int32_t uncompressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i+4]));
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
sub_block_sizes[i].compressed_size = compressed_size;
sub_block_sizes[i].uncompressed_size = uncompressed_size;
}
unsigned char *compressed_data = raw_block + uncompressed_magic_len + get_compression_header_size(BRT_LAYOUT_VERSION, n_sub_blocks);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block_sizes);
rb->size= uncompressed_magic_len + uncompressed_size;
assert(rb->size>0);
rb->buf=toku_xmalloc(rb->size);
// construct the uncompressed block from the header and compressed sub blocks
memcpy(rb->buf, raw_block, uncompressed_magic_len);
// decompress the sub blocks
unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len;
struct decompress_work decompress_work[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
init_decompress_work(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
if (i>0) {
#if DO_DECOMPRESS_WORKER
start_decompress_work(&decompress_work[i]);
#else
do_decompress_work(&decompress_work[i]);
#endif
}
uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size;
}
do_decompress_work(&decompress_work[0]);
#if DO_DECOMPRESS_WORKER
for (i=1; i<n_sub_blocks; i++)
wait_decompress_work(&decompress_work[i]);
#endif
toku_trace("decompress done");
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
rb->buf[uncompressed_magic_len], rb->buf[uncompressed_magic_len+1],
rb->buf[uncompressed_magic_len+2], rb->buf[uncompressed_magic_len+3]);
rb->ndone=0;
r = verify_decompressed_brtnode_checksum(rb);
return r;
} }
static int
decompress_brtnode_from_raw_block_into_rbuf_versioned(u_int32_t version, u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r;
switch (version) {
case BRT_LAYOUT_VERSION_10:
r = decompress_brtnode_from_raw_block_into_rbuf_10(raw_block, rb, blocknum);
break;
case BRT_LAYOUT_VERSION:
r = decompress_brtnode_from_raw_block_into_rbuf(raw_block, rb, blocknum);
break;
default:
assert(FALSE);
}
return r;
}
static int
deserialize_brtnode_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb) {
int r;
switch (version) {
case BRT_LAYOUT_VERSION_10:
r = deserialize_brtnode_from_rbuf_10(blocknum, fullhash, brtnode, h, rb);
break;
case BRT_LAYOUT_VERSION:
r = deserialize_brtnode_from_rbuf(blocknum, fullhash, brtnode, h, rb);
break;
default:
assert(FALSE);
}
return r;
}
// Read brt node from file into struct. Perform version upgrade if necessary.
int
toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic;
#if DO_TOKU_TRACE
if (toku_trace_fd == -1)
toku_trace_fd = open("/dev/null", O_WRONLY);
toku_trace("deserial start");
#endif
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
unsigned char *XMALLOC_N(size, raw_block);
{
// read the (partially compressed) block
ssize_t rlen = pread(fd, raw_block, size, offset);
assert((DISKOFF)rlen == size);
}
// get the layout_version
int layout_version;
{
u_int8_t *magic = raw_block + uncompressed_magic_offset;
if (memcmp(magic, "tokuleaf", 8)!=0 && memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
goto cleanup;
}
u_int8_t *version = raw_block + uncompressed_version_offset;
layout_version = toku_dtoh32(*(uint32_t*)version);
if (layout_version < BRT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > BRT_LAYOUT_VERSION) {
r = toku_db_badformat();
goto cleanup;
}
}
//TODO: #1924 verify some form of checksum during 'decompression'???
r = decompress_brtnode_from_raw_block_into_rbuf_versioned(layout_version, raw_block, &rb, blocknum);
if (r!=0) goto cleanup;
//TODO: #1924 verify some form of checksum during 'deserialization'???
r = deserialize_brtnode_from_rbuf_versioned(layout_version, blocknum, fullhash, brtnode, h, &rb);
cleanup:
if (rb.buf) toku_free(rb.buf);
if (raw_block) toku_free(raw_block);
return r;
}
// ################
struct sum_info { struct sum_info {
unsigned int dsum; unsigned int dsum;
unsigned int msum; unsigned int msum;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment