Commit 357a9c74 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

newbrt tests pass with compression. Addresses #1131.

git-svn-id: file:///svn/tokudb.1131@5952 c7de825b-a66e-492c-adef-691d508d4ae1
parent e5b695ae
...@@ -14,9 +14,12 @@ ...@@ -14,9 +14,12 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <zlib.h> #include <zlib.h>
// Don't include the compressed data size or the uncompressed data size.
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf" static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
4+ // nodesize 4+ // nodesize
8+ // checkpoint number 8+ // checkpoint number
4+ // target node size
4+ // compressed data size 4+ // compressed data size
4+ // uncompressed data size 4+ // uncompressed data size
4+ // flags 4+ // flags
...@@ -69,6 +72,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -69,6 +72,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
} }
} }
// This is the size of the uncompressed data, including the uncompressed header, and including the 4 bytes for the information about how big is the compressed version, and how big is the uncompressed version.
unsigned int toku_serialize_brtnode_size (BRTNODE node) { unsigned int toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead; unsigned int result =brtnode_header_overhead;
assert(sizeof(off_t)==8); assert(sizeof(off_t)==8);
...@@ -99,22 +103,19 @@ int wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) { ...@@ -99,22 +103,19 @@ int wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
return 0; return 0;
} }
const int uncompressed_header_len = (8 // tokuleaf or tokunode const int uncompressed_magic_len = (8 // tokuleaf or tokunode
+4 // version +4 // version
+8 // lsn +8 // lsn
+4 // compressed_len );
const int compression_header_len = (4 // compressed_len
+4); // uncompressed_len +4); // uncompressed_len
void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
//printf("%s:%d serializing\n", __FILE__, __LINE__); //printf("%s:%d serializing\n", __FILE__, __LINE__);
struct wbuf w; struct wbuf w;
int i; int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node); unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
#if 0
if (calculated_size!=toku_serialize_brtnode_size(node)) {
//printf("Sizes don't match: %d %d\n", calculated_size, toku_serialize_brtnode_size(node));
}
#endif
//assert(calculated_size<=size); //assert(calculated_size<=size);
//char buf[size]; //char buf[size];
char *MALLOC_N(calculated_size, buf); char *MALLOC_N(calculated_size, buf);
...@@ -128,6 +129,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { ...@@ -128,6 +129,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
wbuf_int(&w, BRT_LAYOUT_VERSION); wbuf_int(&w, BRT_LAYOUT_VERSION);
wbuf_ulonglong(&w, node->log_lsn.lsn); wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, node->nodesize);
wbuf_uint(&w, node->flags); wbuf_uint(&w, node->flags);
wbuf_int(&w, node->height); wbuf_int(&w, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height); //printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
...@@ -214,24 +216,29 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { ...@@ -214,24 +216,29 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
// uncompressed_len(4)[which includes only the compressed data, not the header] // uncompressed_len(4)[which includes only the compressed data, not the header]
// The first part of the data is uncompressed // The first part of the data is uncompressed
uLongf uncompressed_len = calculated_size-uncompressed_header_len; uLongf uncompressed_len = calculated_size-uncompressed_magic_len;
uLongf compressed_len= compressBound(uncompressed_len); uLongf compressed_len= compressBound(uncompressed_len);
char *MALLOC_N(compressed_len+uncompressed_header_len, compressed_buf); char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf);
memcpy(compressed_buf, buf, uncompressed_header_len); memcpy(compressed_buf, buf, uncompressed_magic_len);
if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
buf[uncompressed_magic_len], buf[uncompressed_magic_len+1],
buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]);
{ {
int r = compress(((Bytef*)compressed_buf)+uncompressed_header_len, &compressed_len, int r = compress(((Bytef*)compressed_buf)+uncompressed_magic_len + compression_header_len, &compressed_len,
((Bytef*)buf)+uncompressed_header_len, calculated_size-uncompressed_header_len); ((Bytef*)buf)+uncompressed_magic_len, calculated_size-uncompressed_magic_len);
assert(r==Z_OK); assert(r==Z_OK);
} }
((int32_t*)(compressed_buf+uncompressed_header_len))[-2] = htonl(compressed_len); if (0) printf("Size before compressing %d, after compression %ld\n", calculated_size-uncompressed_magic_len, compressed_len);
((int32_t*)(compressed_buf+uncompressed_header_len))[-1] = htonl(uncompressed_len);
((int32_t*)(compressed_buf+uncompressed_magic_len))[0] = htonl(compressed_len);
((int32_t*)(compressed_buf+uncompressed_magic_len))[1] = htonl(uncompressed_len);
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{ {
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
size_t n_to_write = uncompressed_header_len + compressed_len; size_t n_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
ssize_t r=pwrite(fd, compressed_buf, n_to_write, off); ssize_t r=pwrite(fd, compressed_buf, n_to_write, off);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno); if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write); assert(r==(ssize_t)n_to_write);
...@@ -254,46 +261,53 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -254,46 +261,53 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
return r; return r;
} }
result->ever_been_written = 1; result->ever_been_written = 1;
char uncompressed_header[uncompressed_header_len]; char uncompressed_header[uncompressed_magic_len + compression_header_len];
u_int32_t compressed_size; u_int32_t compressed_size;
u_int32_t uncompressed_size; u_int32_t uncompressed_size;
{ {
// get the compressed size // get the compressed size
r = pread(fd, uncompressed_header, uncompressed_header_len, off); r = pread(fd, uncompressed_header, sizeof(uncompressed_header), off);
//printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n)); //printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n));
if (r!=uncompressed_header_len) { if (r!=(int)sizeof(uncompressed_header)) {
if (r==-1) r=errno; if (r==-1) r=errno;
else r = DB_BADFORMAT; else r = DB_BADFORMAT;
goto died0; goto died0;
} }
compressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[8+4+8])); compressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; } if (compressed_size<=0 || compressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; }
uncompressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[8+4+8+4])); uncompressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len+4]));
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; } if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; }
if (0) printf("Compressed size = %d, uncompressed size=%d\n", compressed_size, uncompressed_size);
} }
rc.buf=toku_malloc(uncompressed_size + uncompressed_header_len-8);
assert(rc.buf);
unsigned char *MALLOC_N(compressed_size, compressed_data); unsigned char *MALLOC_N(compressed_size, compressed_data);
assert(compressed_data); assert(compressed_data);
{ {
ssize_t rlen=pread(fd, compressed_data, compressed_size, off+uncompressed_header_len); ssize_t rlen=pread(fd, compressed_data, compressed_size, off+uncompressed_magic_len + compression_header_len);
//printf("%s:%d pread->%d datasize=%d\n", __FILE__, __LINE__, r, datasize); //printf("%s:%d pread->%d datasize=%d\n", __FILE__, __LINE__, r, datasize);
assert((size_t)rlen==compressed_size); assert((size_t)rlen==compressed_size);
//printf("Got %d %d %d %d\n", rc.buf[0], rc.buf[1], rc.buf[2], rc.buf[3]); //printf("Got %d %d %d %d\n", rc.buf[0], rc.buf[1], rc.buf[2], rc.buf[3]);
} }
rc.size=uncompressed_size + uncompressed_header_len; rc.size= uncompressed_size + uncompressed_magic_len;
assert(rc.size>0); assert(rc.size>0);
memcpy(rc.buf, uncompressed_header, uncompressed_header_len-8);
rc.buf=toku_malloc(rc.size);
assert(rc.buf);
memcpy(rc.buf, uncompressed_header, uncompressed_magic_len);
{ {
uLongf destlen = uncompressed_size; uLongf destlen = uncompressed_size;
r = uncompress(rc.buf+uncompressed_header_len-8, &destlen, r = uncompress(rc.buf+uncompressed_magic_len, &destlen,
compressed_data, compressed_size); compressed_data, compressed_size);
assert(destlen==uncompressed_size); assert(destlen==uncompressed_size);
assert(r==Z_OK); assert(r==Z_OK);
} }
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
rc.buf[uncompressed_magic_len], rc.buf[uncompressed_magic_len+1],
rc.buf[uncompressed_magic_len+2], rc.buf[uncompressed_magic_len+3]);
toku_free(compressed_data); toku_free(compressed_data);
rc.ndone=0; rc.ndone=0;
//printf("Deserializing %lld datasize=%d\n", off, datasize); //printf("Deserializing %lld datasize=%d\n", off, datasize);
...@@ -317,6 +331,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -317,6 +331,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
ok_layout_version: ; ok_layout_version: ;
} }
result->disk_lsn.lsn = rbuf_ulonglong(&rc); result->disk_lsn.lsn = rbuf_ulonglong(&rc);
result->nodesize = rbuf_int(&rc);
result->log_lsn = result->disk_lsn; result->log_lsn = result->disk_lsn;
result->thisnodename = off; result->thisnodename = off;
...@@ -417,7 +432,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -417,7 +432,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
result->u.l.seqinsert = 0; result->u.l.seqinsert = 0;
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer); //printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_mempool_init(&result->u.l.buffer_mempool, rc.buf, uncompressed_size + uncompressed_header_len-8); toku_mempool_init(&result->u.l.buffer_mempool, rc.buf, uncompressed_size + uncompressed_magic_len);
u_int32_t actual_sum = 0; u_int32_t actual_sum = 0;
u_int32_t start_of_data = rc.ndone; u_int32_t start_of_data = rc.ndone;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment