Commit c1b0e91b authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

new block format with a header checksum and sub block checksums. merge -c...

new block format with a header checksum and sub block checksums.  merge -c 18916:head from 2351.ptq to main refs[t:2351]

git-svn-id: file:///svn/toku/tokudb@18961 c7de825b-a66e-492c-adef-691d508d4ae1
parent 654d3a08
...@@ -542,6 +542,29 @@ static void *decompress_worker_10(void *arg) { ...@@ -542,6 +542,29 @@ static void *decompress_worker_10(void *arg) {
#endif #endif
static int
verify_decompressed_brtnode_checksum (struct rbuf *rb) {
int r = 0;
if (rb->size >= 4) {
uint32_t verify_size = rb->size - 4; //Not counting the checksum
toku_trace("x1764");
uint32_t crc = x1764_memory(rb->buf, verify_size);
toku_trace("x1764 done");
uint32_t *crcp = (uint32_t*)(((uint8_t*)rb->buf) + verify_size);
uint32_t storedcrc = toku_dtoh32(*crcp);
if (crc!=storedcrc) {
printf("Bad CRC\n");
printf("%s:%d crc=%08x stored=%08x\n", __FILE__, __LINE__, crc, storedcrc);
r = toku_db_badformat();
}
}
else r = toku_db_badformat();
return r;
}
static int static int
decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) { decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r; int r;
......
This diff is collapsed.
...@@ -201,7 +201,7 @@ get_unaligned_uint32(unsigned char *p) { ...@@ -201,7 +201,7 @@ get_unaligned_uint32(unsigned char *p) {
return *(u_int32_t *)p; return *(u_int32_t *)p;
} }
#define SUB_BLOCK_XSUM 0 #define SUB_BLOCK_XSUM 1
struct sub_block { struct sub_block {
u_int32_t compressed_size; u_int32_t compressed_size;
...@@ -277,8 +277,10 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -277,8 +277,10 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
unsigned char *vp = toku_malloc(size); unsigned char *vp = toku_malloc(size);
u_int64_t r = pread(f, vp, size, offset); u_int64_t r = pread(f, vp, size, offset);
if (r == (u_int64_t)size) if (r == (u_int64_t)size) {
printf("%.8s layout_version=%u %u\n", vp, get_unaligned_uint32(vp+8), get_unaligned_uint32(vp+12));
verify_block(vp, size); verify_block(vp, size);
}
toku_free(vp); toku_free(vp);
} }
......
...@@ -1172,9 +1172,6 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -1172,9 +1172,6 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
putbuf_int64_at(&lbuf->dbuf, lbuf->dsize_p, lbuf->dsize); putbuf_int64_at(&lbuf->dbuf, lbuf->dsize_p, lbuf->dsize);
putbuf_int32_at(&lbuf->dbuf, lbuf->n_in_buf_p, lbuf->n_in_buf); putbuf_int32_at(&lbuf->dbuf, lbuf->n_in_buf_p, lbuf->n_in_buf);
u_int32_t checksum = x1764_memory(lbuf->dbuf.buf, lbuf->dbuf.off);
putbuf_int32(&lbuf->dbuf, checksum);
//print_bytestring(lbuf->dbuf.buf, lbuf->dbuf.off, 200); //print_bytestring(lbuf->dbuf.buf, lbuf->dbuf.off, 200);
int n_uncompressed_bytes_at_beginning = (8 // tokuleaf int n_uncompressed_bytes_at_beginning = (8 // tokuleaf
...@@ -1184,30 +1181,42 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -1184,30 +1181,42 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
int n_extra_bytes_for_compression = (+4 // n_sub blocks int n_extra_bytes_for_compression = (+4 // n_sub blocks
+4 // compressed size +4 // compressed size
+4 // compressed size +4 // compressed size
+4 // sub block checksum
+4 // header checksum
); );
int header_len = n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression;
int compression_level = 5; int compression_level = 5;
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning; int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
int bound = compressBound(uncompressed_len); int bound = compressBound(uncompressed_len);
unsigned char *MALLOC_N(bound + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression, compressed_buf); unsigned char *MALLOC_N(header_len + bound, compressed_buf);
uLongf real_compressed_len = bound; uLongf real_compressed_len = bound;
{ {
int r = compress2((Bytef*)(compressed_buf + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression), &real_compressed_len, int r = compress2((Bytef*)(compressed_buf + header_len), &real_compressed_len,
(Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len, (Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len,
compression_level); compression_level);
assert(r==Z_OK); assert(r==Z_OK);
} }
// checksum the sub block
u_int32_t xsum0 = x1764_memory(compressed_buf + header_len, real_compressed_len);
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning); memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
int compressed_len = real_compressed_len; int compressed_len = real_compressed_len;
int n_compressed_blocks = 1; int n_compressed_blocks = 1;
memcpy(compressed_buf+16, &n_compressed_blocks, 4); memcpy(compressed_buf+16, &n_compressed_blocks, 4);
memcpy(compressed_buf+20, &compressed_len, 4); memcpy(compressed_buf+20, &compressed_len, 4);
memcpy(compressed_buf+24, &uncompressed_len, 4); memcpy(compressed_buf+24, &uncompressed_len, 4);
memcpy(compressed_buf+28, &xsum0, 4);
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf+32, &header_xsum, 4);
//#ifndef CILK_STUB //#ifndef CILK_STUB
// ttable_and_write_lock->lock(); // ttable_and_write_lock->lock();
//#endif //#endif
long long off_of_leaf = out->current_off; long long off_of_leaf = out->current_off;
int size = real_compressed_len + n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression; int size = real_compressed_len + header_len;
if (0) { if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len); fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf); fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf);
......
...@@ -13,3 +13,6 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le) { ...@@ -13,3 +13,6 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_literal_bytes(w, le, leafentry_disksize(le)); wbuf_literal_bytes(w, le, leafentry_disksize(le));
} }
void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le) {
wbuf_nocrc_literal_bytes(w, le, leafentry_disksize(le));
}
...@@ -88,6 +88,7 @@ u_int32_t toku_le_crc(LEAFENTRY v); ...@@ -88,6 +88,7 @@ u_int32_t toku_le_crc(LEAFENTRY v);
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory. size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk. size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le); void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form. int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form.
int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete. int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete.
......
...@@ -18,6 +18,12 @@ struct rbuf { ...@@ -18,6 +18,12 @@ struct rbuf {
unsigned int ndone; unsigned int ndone;
}; };
static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int size) {
r->buf = buf;
r->size = size;
r->ndone = 0;
}
static inline unsigned int rbuf_char (struct rbuf *r) { static inline unsigned int rbuf_char (struct rbuf *r) {
assert(r->ndone<r->size); assert(r->ndone<r->size);
return r->buf[r->ndone++]; return r->buf[r->ndone++];
......
...@@ -21,7 +21,7 @@ struct wbuf { ...@@ -21,7 +21,7 @@ struct wbuf {
unsigned char *buf; unsigned char *buf;
unsigned int size; unsigned int size;
unsigned int ndone; unsigned int ndone;
struct x1764 checksum; // The checksumx state struct x1764 checksum; // The checksum state
}; };
static inline void wbuf_nocrc_init (struct wbuf *w, void *buf, DISKOFF size) { static inline void wbuf_nocrc_init (struct wbuf *w, void *buf, DISKOFF size) {
...@@ -115,7 +115,6 @@ static void wbuf_nocrc_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes ...@@ -115,7 +115,6 @@ static void wbuf_nocrc_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes
wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes); wbuf_nocrc_literal_bytes(w, bytes_bv, nbytes);
} }
static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) { static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, u_int32_t nbytes) {
wbuf_uint(w, nbytes); wbuf_uint(w, nbytes);
wbuf_literal_bytes(w, bytes_bv, nbytes); wbuf_literal_bytes(w, bytes_bv, nbytes);
...@@ -126,7 +125,6 @@ static void wbuf_nocrc_ulonglong (struct wbuf *w, u_int64_t ull) { ...@@ -126,7 +125,6 @@ static void wbuf_nocrc_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_nocrc_uint(w, (u_int32_t)(ull&0xFFFFFFFF)); wbuf_nocrc_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
} }
static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) { static void wbuf_ulonglong (struct wbuf *w, u_int64_t ull) {
wbuf_uint(w, (u_int32_t)(ull>>32)); wbuf_uint(w, (u_int32_t)(ull>>32));
wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF)); wbuf_uint(w, (u_int32_t)(ull&0xFFFFFFFF));
...@@ -164,10 +162,14 @@ static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) { ...@@ -164,10 +162,14 @@ static inline void wbuf_u_int32_t (struct wbuf *w, u_int32_t v) {
static inline void wbuf_DISKOFF (struct wbuf *w, DISKOFF off) { static inline void wbuf_DISKOFF (struct wbuf *w, DISKOFF off) {
wbuf_ulonglong(w, (u_int64_t)off); wbuf_ulonglong(w, (u_int64_t)off);
} }
static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) { static inline void wbuf_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
wbuf_ulonglong(w, b.b); wbuf_ulonglong(w, b.b);
} }
static inline void wbuf_nocrc_BLOCKNUM (struct wbuf *w, BLOCKNUM b) {
wbuf_nocrc_ulonglong(w, b.b);
}
static inline void wbuf_nocrc_TXNID (struct wbuf *w, TXNID tid) { static inline void wbuf_nocrc_TXNID (struct wbuf *w, TXNID tid) {
wbuf_nocrc_ulonglong(w, tid); wbuf_nocrc_ulonglong(w, tid);
......
...@@ -209,6 +209,14 @@ void wbuf_xids(struct wbuf *wb, XIDS xids) { ...@@ -209,6 +209,14 @@ void wbuf_xids(struct wbuf *wb, XIDS xids) {
} }
} }
void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids) {
wbuf_nocrc_char(wb, (unsigned char)xids->num_stored_xids);
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
wbuf_nocrc_TXNID(wb, xids->ids[index]);
}
}
void void
xids_fprintf(FILE* fp, XIDS xids) { xids_fprintf(FILE* fp, XIDS xids) {
u_int8_t index; u_int8_t index;
......
...@@ -50,6 +50,7 @@ void toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids); ...@@ -50,6 +50,7 @@ void toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids);
unsigned char *xids_get_end_of_array(XIDS xids); unsigned char *xids_get_end_of_array(XIDS xids);
void wbuf_xids(struct wbuf *wb, XIDS xids); void wbuf_xids(struct wbuf *wb, XIDS xids);
void wbuf_nocrc_xids(struct wbuf *wb, XIDS xids);
void xids_fprintf(FILE* fp, XIDS xids); void xids_fprintf(FILE* fp, XIDS xids);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment