Commit 429ef45c authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

new block format with a header checksum and sub block checksums. merge -c...

new block format with a header checksum and sub block checksums.  merge -c 18916:head from 2351.ptq to main refs[t:2351]

git-svn-id: file:///svn/toku/tokudb@18961 c7de825b-a66e-492c-adef-691d508d4ae1
parent 40938e66
......@@ -542,6 +542,29 @@ static void *decompress_worker_10(void *arg) {
#endif
static int
verify_decompressed_brtnode_checksum (struct rbuf *rb) {
int r = 0;
if (rb->size >= 4) {
uint32_t verify_size = rb->size - 4; //Not counting the checksum
toku_trace("x1764");
uint32_t crc = x1764_memory(rb->buf, verify_size);
toku_trace("x1764 done");
uint32_t *crcp = (uint32_t*)(((uint8_t*)rb->buf) + verify_size);
uint32_t storedcrc = toku_dtoh32(*crcp);
if (crc!=storedcrc) {
printf("Bad CRC\n");
printf("%s:%d crc=%08x stored=%08x\n", __FILE__, __LINE__, crc, storedcrc);
r = toku_db_badformat();
}
}
else r = toku_db_badformat();
return r;
}
static int
decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r;
......
......@@ -164,18 +164,21 @@ toku_full_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offse
toku_os_full_pwrite(fd, buf, count, offset);
}
// Don't include the compression header
// Don't include the sub_block header
// Overhead calculated in same order fields are written to wbuf
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
enum {
node_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
4+ // layout_version
4+ // layout_version_original
0+ // descriptor (variable, not counted here)
4), // layout_version_original
extended_node_header_overhead = (0+ // descriptor (variable, not counted here)
4+ // nodesize
4+ // flags
4+ // height
4+ // random for fingerprint
4+ // localfingerprint
4); // crc32 at the end
4), // localfingerprint
};
static int
addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
......@@ -187,7 +190,7 @@ addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
static unsigned int
toku_serialize_brtnode_size_slow (BRTNODE node) {
unsigned int size=brtnode_header_overhead;
unsigned int size = node_header_overhead + extended_node_header_overhead;
size += toku_serialize_descriptor_size(node->desc);
if (node->height>0) {
unsigned int hsize=0;
......@@ -228,7 +231,7 @@ toku_serialize_brtnode_size_slow (BRTNODE node) {
// This is the size of the uncompressed data, not including the compression headers
unsigned int
toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead;
unsigned int result = node_header_overhead + extended_node_header_overhead;
assert(sizeof(toku_off_t)==8);
result += toku_serialize_descriptor_size(node->desc);
if (node->height>0) {
......@@ -253,20 +256,12 @@ toku_serialize_brtnode_size (BRTNODE node) {
return result;
}
enum { uncompressed_magic_len = (8 // tokuleaf or tokunode
+4 // layout version
+4 // layout version original
)
};
// uncompressed header offsets
enum {
uncompressed_magic_offset = 0,
uncompressed_version_offset = 8,
};
struct sub_block {
void *uncompressed_ptr;
u_int32_t uncompressed_size;
......@@ -281,7 +276,7 @@ struct sub_block {
struct stored_sub_block {
u_int32_t uncompressed_size;
u_int32_t compressed_size;
// u_int32_t xsum;
u_int32_t xsum;
};
static void
......@@ -395,7 +390,7 @@ compress_sub_block(struct sub_block *sub_block) {
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
// checksum it
// sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
}
static void *
......@@ -467,25 +462,24 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un
static void
serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
wbuf_literal_bytes(wbuf, "toku", 4);
if (node->height==0)
wbuf_literal_bytes(wbuf, "leaf", 4);
if (node->height == 0)
wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
else
wbuf_literal_bytes(wbuf, "node", 4);
wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
assert(node->layout_version == BRT_LAYOUT_VERSION);
wbuf_int(wbuf, node->layout_version);
wbuf_int(wbuf, node->layout_version_original);
wbuf_nocrc_int(wbuf, node->layout_version);
wbuf_nocrc_int(wbuf, node->layout_version_original);
// serialize the descriptor
toku_serialize_descriptor_contents_to_wbuf(wbuf, node->desc);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(wbuf, node->nodesize);
wbuf_uint(wbuf, node->flags);
wbuf_int(wbuf, node->height);
wbuf_nocrc_uint(wbuf, node->nodesize);
wbuf_nocrc_uint(wbuf, node->flags);
wbuf_nocrc_int(wbuf, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
wbuf_uint(wbuf, node->rand4fingerprint);
wbuf_uint(wbuf, node->local_fingerprint);
wbuf_nocrc_uint(wbuf, node->rand4fingerprint);
wbuf_nocrc_uint(wbuf, node->local_fingerprint);
//printf("%s:%d wrote %08x for node %lld\n", __FILE__, __LINE__, node->local_fingerprint, (long long)node->thisnodename);
//printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, node->local_fingerprint);
//printf("%s:%d w.ndone=%d n_children=%d\n", __FILE__, __LINE__, w.ndone, node->n_children);
......@@ -502,29 +496,29 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
for (int i = 0; i < node->u.n.n_children; i++) {
subtree_fingerprint += BNC_SUBTREE_FINGERPRINT(node, i);
}
wbuf_uint(wbuf, subtree_fingerprint);
wbuf_nocrc_uint(wbuf, subtree_fingerprint);
}
wbuf_int(wbuf, node->u.n.n_children);
wbuf_nocrc_int(wbuf, node->u.n.n_children);
for (int i = 0; i < node->u.n.n_children; i++) {
wbuf_uint(wbuf, BNC_SUBTREE_FINGERPRINT(node, i));
wbuf_nocrc_uint(wbuf, BNC_SUBTREE_FINGERPRINT(node, i));
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(node, i));
wbuf_ulonglong(wbuf, se->nkeys);
wbuf_ulonglong(wbuf, se->ndata);
wbuf_ulonglong(wbuf, se->dsize);
wbuf_char (wbuf, (char)se->exact);
wbuf_nocrc_ulonglong(wbuf, se->nkeys);
wbuf_nocrc_ulonglong(wbuf, se->ndata);
wbuf_nocrc_ulonglong(wbuf, se->dsize);
wbuf_nocrc_char (wbuf, (char)se->exact);
}
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
for (int i = 0; i < node->u.n.n_children-1; i++) {
if (node->flags & TOKU_DB_DUPSORT) {
wbuf_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
wbuf_bytes(wbuf, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
wbuf_nocrc_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
wbuf_nocrc_bytes(wbuf, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
} else {
wbuf_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), toku_brtnode_pivot_key_len(node, node->u.n.childkeys[i]));
wbuf_nocrc_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), toku_brtnode_pivot_key_len(node, node->u.n.childkeys[i]));
}
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
}
for (int i = 0; i < node->u.n.n_children; i++) {
wbuf_BLOCKNUM(wbuf, BNC_BLOCKNUM(node,i));
wbuf_nocrc_BLOCKNUM(wbuf, BNC_BLOCKNUM(node,i));
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
}
......@@ -556,14 +550,14 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
for (int i = 0; i < n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
// assert(child_buffer_map[i].offset == wbuf_get_woffset(wbuf));
wbuf_int(wbuf, toku_fifo_n_entries(BNC_BUFFER(node,i)));
wbuf_nocrc_int(wbuf, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
{
assert(type>=0 && type<256);
wbuf_char(wbuf, (unsigned char)type);
wbuf_xids(wbuf, xids);
wbuf_bytes(wbuf, key, keylen);
wbuf_bytes(wbuf, data, datalen);
wbuf_nocrc_char(wbuf, (unsigned char)type);
wbuf_nocrc_xids(wbuf, xids);
wbuf_nocrc_bytes(wbuf, key, keylen);
wbuf_nocrc_bytes(wbuf, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
});
}
......@@ -577,16 +571,16 @@ static int
wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
LEAFENTRY le=lev;
struct wbuf *thisw=v;
wbuf_LEAFENTRY(thisw, le);
wbuf_nocrc_LEAFENTRY(thisw, le);
return 0;
}
static void
serialize_leaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], struct wbuf *wbuf) {
// serialize the leaf stats
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.nkeys);
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.ndata);
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.dsize);
wbuf_nocrc_ulonglong(wbuf, node->u.l.leaf_stats.nkeys);
wbuf_nocrc_ulonglong(wbuf, node->u.l.leaf_stats.ndata);
wbuf_nocrc_ulonglong(wbuf, node->u.l.leaf_stats.dsize);
#if 0
// RFP partition the leaf elements. for now, 1 partition
......@@ -615,7 +609,7 @@ serialize_leaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], str
#endif
// serialize the leaf entries
wbuf_uint(wbuf, toku_omt_size(node->u.l.buffer));
wbuf_nocrc_uint(wbuf, toku_omt_size(node->u.l.buffer));
toku_omt_iterate(node->u.l.buffer, wbufwriteleafentry, wbuf);
}
......@@ -631,29 +625,19 @@ serialize_node(BRTNODE node, char *buf, size_t calculated_size, int n_sub_blocks
else
serialize_leaf(node, n_sub_blocks, sub_block, &wb);
assert(wb.ndone <= wb.size);
#ifdef CRC_ATEND
wbuf_int(&w, crc32(toku_null_crc, wb.buf, wb.ndone));
#endif
#ifdef CRC_INCR
{
u_int32_t checksum = x1764_finish(&wb.checksum);
wbuf_uint(&wb, checksum);
}
#endif
assert(wb.ndone == wb.size);
assert(calculated_size==wb.ndone);
}
int
toku_serialize_brtnode_to_memory (BRTNODE node, int n_workitems __attribute__((__unused__)), int n_threads __attribute__((__unused__)),
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) {
toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_threads), /*out*/ size_t *n_bytes_to_write, /*out*/ char **bytes_to_write) {
// get the size of the serialized node
unsigned int calculated_size = toku_serialize_brtnode_size(node);
// choose sub block parameters
int n_sub_blocks, sub_block_size;
size_t data_size = calculated_size - uncompressed_magic_len;
size_t data_size = calculated_size - node_header_overhead;
choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &n_sub_blocks);
assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
assert(sub_block_size > 0);
......@@ -675,36 +659,39 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int n_workitems __attribute__((_
// allocate space for the compressed buf
size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
size_t compression_header_len = sub_block_header_size(n_sub_blocks);
char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf);
size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum
char *MALLOC_N(header_len + compressed_len, compressed_buf);
// copy the header
memcpy(compressed_buf, buf, uncompressed_magic_len);
memcpy(compressed_buf, buf, node_header_overhead);
if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
buf[uncompressed_magic_len], buf[uncompressed_magic_len+1],
buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]);
buf[node_header_overhead], buf[node_header_overhead+1],
buf[node_header_overhead+2], buf[node_header_overhead+3]);
// compress all of the sub blocks
char *uncompressed_ptr = buf + uncompressed_magic_len;
char *compressed_base_ptr = compressed_buf + uncompressed_magic_len + compression_header_len;
char *compressed_ptr = compressed_base_ptr;
char *uncompressed_ptr = buf + node_header_overhead;
char *compressed_ptr = compressed_buf + header_len;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-uncompressed_magic_len, (uint64_t) compressed_len);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
// serialize the sub block header
uint32_t *compressed_header_ptr = (uint32_t *)(compressed_buf + uncompressed_magic_len);
*compressed_header_ptr++ = toku_htod32(n_sub_blocks);
uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead);
*ptr++ = toku_htod32(n_sub_blocks);
for (int i=0; i<n_sub_blocks; i++) {
compressed_header_ptr[0] = toku_htod32(sub_block[i].compressed_size);
compressed_header_ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
// RFP xsum
compressed_header_ptr += 2;
ptr[0] = toku_htod32(sub_block[i].compressed_size);
ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
ptr[2] = toku_htod32(sub_block[i].xsum);
ptr += 3;
}
// RFP compute the header checksum and serialize it
// compute the header checksum and serialize it
uint32_t header_length = (void *)ptr - (void *) compressed_buf;
uint32_t xsum = x1764_memory(compressed_buf, header_length);
*ptr = toku_htod32(xsum);
*n_bytes_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
*n_bytes_to_write = header_len + compressed_len;
*bytes_to_write = compressed_buf;
toku_free(buf);
......@@ -847,7 +834,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
return toku_db_badformat();
}
}
(void)rbuf_int(rb); //Ignore the crc (already verified).
// RFP REMOVE (void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
......@@ -914,7 +901,6 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
//toku_verify_counts(result);
(void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
......@@ -983,29 +969,6 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b
return 0;
}
static int
verify_decompressed_brtnode_checksum (struct rbuf *rb) {
int r = 0;
if (rb->size >= 4) {
uint32_t verify_size = rb->size - 4; //Not counting the checksum
toku_trace("x1764");
uint32_t crc = x1764_memory(rb->buf, verify_size);
toku_trace("x1764 done");
uint32_t *crcp = (uint32_t*)(((uint8_t*)rb->buf) + verify_size);
uint32_t storedcrc = toku_dtoh32(*crcp);
if (crc!=storedcrc) {
printf("Bad CRC\n");
printf("%s:%d crc=%08x stored=%08x\n", __FILE__, __LINE__, crc, storedcrc);
r = toku_db_badformat();
}
}
else r = toku_db_badformat();
return r;
}
#include "workset.h"
struct decompress_work {
......@@ -1036,9 +999,8 @@ decompress_work_init(struct decompress_work *dw,
static int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
// verify checksum
// u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
// assert(xsum == expected_sum);
expected_xsum = expected_xsum;
u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
assert(xsum == expected_xsum);
// decompress
uLongf destlen = uncompress_size;
......@@ -1109,52 +1071,58 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
// get the number of compressed sub blocks
int n_sub_blocks;
int compression_header_offset;
{
n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&raw_block[uncompressed_magic_len]));
compression_header_offset = uncompressed_magic_len + 4;
}
assert(0 < n_sub_blocks);
n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&raw_block[node_header_overhead]));
// verify the sizes of the compressed sub blocks
if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
// verify the number of sub blocks
assert(0 <= n_sub_blocks && n_sub_blocks <= max_sub_blocks);
// deserialize the sub block header
struct sub_block sub_block[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i]));
u_int32_t *sub_block_header = (u_int32_t *) &raw_block[node_header_overhead+4];
for (int i = 0; i < n_sub_blocks; i++) {
sub_block_init(&sub_block[i]);
sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
sub_block_header += 3;
}
// verify sub block sizes
for (int i = 0; i < n_sub_blocks; i++) {
u_int32_t compressed_size = sub_block[i].compressed_size;
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
u_int32_t uncompressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i+4]));
u_int32_t uncompressed_size = sub_block[i].uncompressed_size;
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
sub_block[i].compressed_size = compressed_size;
sub_block[i].uncompressed_size = uncompressed_size;
}
unsigned char *compressed_data = raw_block + uncompressed_magic_len + sub_block_header_size(n_sub_blocks);
// sum up the uncompressed size of the sub blocks
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
rb->size= uncompressed_magic_len + uncompressed_size;
assert(rb->size>0);
rb->buf=toku_xmalloc(rb->size);
// allocate the uncompressed buffer
size_t size = node_header_overhead + uncompressed_size;
unsigned char *buf = toku_xmalloc(size);
assert(buf);
rbuf_init(rb, buf, size);
// copy the uncompressed node header to the uncompressed buffer
memcpy(rb->buf, raw_block, node_header_overhead);
// construct the uncompressed block from the header and compressed sub blocks
memcpy(rb->buf, raw_block, uncompressed_magic_len);
// point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
unsigned char *compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (u_int32_t);
unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len;
// point at the start of the uncompressed data
unsigned char *uncompressed_data = rb->buf + node_header_overhead;
// decompress all the compressed sub blocks into the uncompressed buffer
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data);
toku_trace("decompress done");
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
rb->buf[uncompressed_magic_len], rb->buf[uncompressed_magic_len+1],
rb->buf[uncompressed_magic_len+2], rb->buf[uncompressed_magic_len+3]);
rb->ndone=0;
r = verify_decompressed_brtnode_checksum(rb);
return r;
return 0;
}
static int
......@@ -1242,11 +1210,17 @@ toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BR
}
}
//TODO: #1924 verify some form of checksum during 'decompression'???
// verify the header checksum
u_int32_t n_sub_blocks = toku_dtoh32(*(u_int32_t *)(raw_block + node_header_overhead));
u_int32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
assert(header_length <= size);
u_int32_t xsum = x1764_memory(raw_block, header_length);
u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length));
assert(xsum == stored_xsum);
r = decompress_brtnode_from_raw_block_into_rbuf_versioned(layout_version, raw_block, &rb, blocknum);
if (r!=0) goto cleanup;
//TODO: #1924 verify some form of checksum during 'deserialization'???
r = deserialize_brtnode_from_rbuf_versioned(layout_version, blocknum, fullhash, brtnode, h, &rb);
toku_trace("deserial done");
......
......@@ -201,7 +201,7 @@ get_unaligned_uint32(unsigned char *p) {
return *(u_int32_t *)p;
}
#define SUB_BLOCK_XSUM 0
#define SUB_BLOCK_XSUM 1
struct sub_block {
u_int32_t compressed_size;
......@@ -277,8 +277,10 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
unsigned char *vp = toku_malloc(size);
u_int64_t r = pread(f, vp, size, offset);
if (r == (u_int64_t)size)
if (r == (u_int64_t)size) {
printf("%.8s layout_version=%u %u\n", vp, get_unaligned_uint32(vp+8), get_unaligned_uint32(vp+12));
verify_block(vp, size);
}
toku_free(vp);
}
......
......@@ -1172,9 +1172,6 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
putbuf_int64_at(&lbuf->dbuf, lbuf->dsize_p, lbuf->dsize);
putbuf_int32_at(&lbuf->dbuf, lbuf->n_in_buf_p, lbuf->n_in_buf);
u_int32_t checksum = x1764_memory(lbuf->dbuf.buf, lbuf->dbuf.off);
putbuf_int32(&lbuf->dbuf, checksum);
//print_bytestring(lbuf->dbuf.buf, lbuf->dbuf.off, 200);
int n_uncompressed_bytes_at_beginning = (8 // tokuleaf
......@@ -1184,30 +1181,42 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
int n_extra_bytes_for_compression = (+4 // n_sub blocks
+4 // compressed size
+4 // compressed size
+4 // sub block checksum
+4 // header checksum
);
int header_len = n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression;
int compression_level = 5;
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
int bound = compressBound(uncompressed_len);
unsigned char *MALLOC_N(bound + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression, compressed_buf);
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
uLongf real_compressed_len = bound;
{
int r = compress2((Bytef*)(compressed_buf + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression), &real_compressed_len,
int r = compress2((Bytef*)(compressed_buf + header_len), &real_compressed_len,
(Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len,
compression_level);
assert(r==Z_OK);
}
// checksum the sub block
u_int32_t xsum0 = x1764_memory(compressed_buf + header_len, real_compressed_len);
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
int compressed_len = real_compressed_len;
int n_compressed_blocks = 1;
memcpy(compressed_buf+16, &n_compressed_blocks, 4);
memcpy(compressed_buf+20, &compressed_len, 4);
memcpy(compressed_buf+24, &uncompressed_len, 4);
memcpy(compressed_buf+28, &xsum0, 4);
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf+32, &header_xsum, 4);
//#ifndef CILK_STUB
// ttable_and_write_lock->lock();
//#endif
long long off_of_leaf = out->current_off;
int size = real_compressed_len + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression;
int size = real_compressed_len + header_len;
if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf);
......
......@@ -13,3 +13,6 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_literal_bytes(w, le, leafentry_disksize(le));
}
void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_nocrc_literal_bytes(w, le, leafentry_disksize(le));
}
......@@ -88,6 +88,7 @@ u_int32_t toku_le_crc(LEAFENTRY v);
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form.
int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete.
......
......@@ -18,6 +18,12 @@ struct rbuf {
unsigned int ndone;
};
static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int size) {
r->buf = buf;
r->size = size;
r->ndone = 0;
}
static inline unsigned int rbuf_char (struct rbuf *r) {
assert(r->ndone<r->size);
return r->buf[r->ndone++];
......
......@@ -21,7 +21,7 @@ struct wbuf {
unsigned char *buf;
unsigned int size;
unsigned int ndone;
struct x1764 checksum; // The checksumx state
struct x1764 checksum; // The checksum state
};
static inline void wbuf_nocrc_init (struct wbuf *w, void *buf, DISKOFF size) {
......@@ -115,7 +115,6 @@ static void wbuf_nocrc_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
}
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_uint(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes);
......@@ -126,7 +125,6 @@ static void wbuf_nocrc_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
}
static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_uint(w, (u_int32_t)(ull>>32));
wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
......@@ -164,10 +162,14 @@ static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) {
static inline void wbuf_DISKOFF (struct wbuf *w, DISKOFF off) {
wbuf_ulonglong(w, (u_int64_t)off);
}
static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
wbuf_ulonglong(w, b.b);
}
static inline void wbuf_nocrc_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
wbuf_nocrc_ulonglong(w, b.b);
}
static inline void wbuf_nocrc_TXNID (struct wbuf *w, TXNID tid) {
wbuf_nocrc_ulonglong(w, tid);
......
......@@ -209,6 +209,14 @@ void wbuf_xids(struct wbuf *wb, XIDS xids) {
}
}
void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids) {
wbuf_nocrc_char(wb, (unsigned char)xids->num_stored_xids);
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
wbuf_nocrc_TXNID(wb, xids->ids[index]);
}
}
void
xids_fprintf(FILE* fp, XIDS xids) {
u_int8_t index;
......
......@@ -50,6 +50,7 @@ void toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids);
unsigned char *xids_get_end_of_array(XIDS xids);
void wbuf_xids(struct wbuf *wb, XIDS xids);
void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids);
void xids_fprintf(FILE* fp, XIDS xids);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment