Commit e5b695ae authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Did some decompression stuff. But it's broken. Addresses #1131.

git-svn-id: file:///svn/tokudb.1131@5951 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6a9710a9
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf" static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
4+ // nodesize 4+ // nodesize
8+ // checkpoint number 8+ // checkpoint number
4+ // block size 4+ // compressed data size
4+ // data size 4+ // uncompressed data size
4+ // flags 4+ // flags
4+ // height 4+ // height
4+ // random for fingerprint 4+ // random for fingerprint
...@@ -99,6 +99,12 @@ int wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) { ...@@ -99,6 +99,12 @@ int wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
return 0; return 0;
} }
const int uncompressed_header_len = (8 // tokuleaf or tokunode
+4 // version
+8 // lsn
+4 // compressed_len
+4); // uncompressed_len
void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
//printf("%s:%d serializing\n", __FILE__, __LINE__); //printf("%s:%d serializing\n", __FILE__, __LINE__);
struct wbuf w; struct wbuf w;
...@@ -111,8 +117,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { ...@@ -111,8 +117,7 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
#endif #endif
//assert(calculated_size<=size); //assert(calculated_size<=size);
//char buf[size]; //char buf[size];
size_t n_to_write = node->ever_been_written ? calculated_size : node->nodesize; char *MALLOC_N(calculated_size, buf);
char *MALLOC_N(n_to_write, buf);
//toku_verify_counts(node); //toku_verify_counts(node);
//assert(size>0); //assert(size>0);
//printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]); //printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]);
...@@ -123,8 +128,6 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { ...@@ -123,8 +128,6 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
wbuf_int(&w, BRT_LAYOUT_VERSION); wbuf_int(&w, BRT_LAYOUT_VERSION);
wbuf_ulonglong(&w, node->log_lsn.lsn); wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, calculated_size);
wbuf_int(&w, node->nodesize);
wbuf_uint(&w, node->flags); wbuf_uint(&w, node->flags);
wbuf_int(&w, node->height); wbuf_int(&w, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height); //printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
...@@ -199,53 +202,51 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) { ...@@ -199,53 +202,51 @@ void toku_serialize_brtnode_to (int fd, DISKOFF off, BRTNODE node) {
} }
#endif #endif
char *MALLOC_N(node->nodesize, compressed_buf); if (calculated_size!=w.ndone)
printf("%s:%d w.done=%d calculated_size=%d\n", __FILE__, __LINE__, w.ndone, calculated_size);
assert(calculated_size==w.ndone);
// The uncompressed part of the header is
// tokuleaf(8),
// version(4),
// lsn(8),
// compressed_len(4),[which includes only the compressed data]
// uncompressed_len(4)[which includes only the compressed data, not the header]
// The first part of the data is uncompressed // The first part of the data is uncompressed
int first_part = (4+ // toku uLongf uncompressed_len = calculated_size-uncompressed_header_len;
4+ // leaf or node uLongf compressed_len= compressBound(uncompressed_len);
4+ // version char *MALLOC_N(compressed_len+uncompressed_header_len, compressed_buf);
8);// lsn
memcpy(compressed_buf, buf, first_part); memcpy(compressed_buf, buf, uncompressed_header_len);
uLongf compressed_len=node->nodesize-first_part-4;
{ {
int r = compress(((Bytef*)compressed_buf)+first_part+4, &compressed_len, int r = compress(((Bytef*)compressed_buf)+uncompressed_header_len, &compressed_len,
((Bytef*)buf)+first_part, calculated_size-first_part); ((Bytef*)buf)+uncompressed_header_len, calculated_size-uncompressed_header_len);
assert(r==Z_OK); assert(r==Z_OK);
} }
toku_free(compressed_buf);
*((int32_t*)(compressed_buf+first_part)) = htonl(compressed_len); ((int32_t*)(compressed_buf+uncompressed_header_len))[-2] = htonl(compressed_len);
int compressed_n_to_write; ((int32_t*)(compressed_buf+uncompressed_header_len))[-1] = htonl(uncompressed_len);
if (!node->ever_been_written) {
memset(compressed_buf+first_part+4+compressed_len, 0, (size_t)(node->nodesize-first_part-4-compressed_len)); // fill with zeros
compressed_n_to_write = node->nodesize;
} else {
compressed_n_to_write = first_part+4+compressed_len;
}
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{ {
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
//size_t n_to_write = node->nodesize; size_t n_to_write = uncompressed_header_len + compressed_len;
ssize_t r=pwrite(fd, compressed_buf, compressed_n_to_write, off); ssize_t r=pwrite(fd, compressed_buf, n_to_write, off);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno); if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write); assert(r==(ssize_t)n_to_write);
} }
if (calculated_size!=w.ndone)
printf("%s:%d w.done=%d calculated_size=%d\n", __FILE__, __LINE__, w.ndone, calculated_size);
assert(calculated_size==w.ndone);
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size); //printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
assert(w.ndone<=node->nodesize); assert(w.ndone<=node->nodesize);
toku_free(buf); toku_free(buf);
toku_free(compressed_buf);
} }
int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTNODE *brtnode) { int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTNODE *brtnode) {
TAGMALLOC(BRTNODE, result); TAGMALLOC(BRTNODE, result);
struct rbuf rc; struct rbuf rc;
int i; int i;
u_int32_t datasize;
int r; int r;
if (result==0) { if (result==0) {
r=errno; r=errno;
...@@ -253,46 +254,56 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -253,46 +254,56 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
return r; return r;
} }
result->ever_been_written = 1; result->ever_been_written = 1;
char uncompressed_header[uncompressed_header_len];
u_int32_t compressed_size;
u_int32_t uncompressed_size;
{ {
u_int32_t datasize_n; // get the compressed size
r = pread(fd, &datasize_n, sizeof(datasize_n), off +8+4+8); r = pread(fd, uncompressed_header, uncompressed_header_len, off);
//printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n)); //printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n));
if (r!=sizeof(datasize_n)) { if (r!=uncompressed_header_len) {
if (r==-1) r=errno; if (r==-1) r=errno;
else r = DB_BADFORMAT; else r = DB_BADFORMAT;
goto died0; goto died0;
} }
datasize = ntohl(datasize_n); compressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[8+4+8]));
if (datasize<=0 || datasize>(1<<30)) { r = DB_BADFORMAT; goto died0; } if (compressed_size<=0 || compressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; }
uncompressed_size = ntohl(*(u_int32_t*)(&uncompressed_header[8+4+8+4]));
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = DB_BADFORMAT; goto died0; }
} }
rc.buf=toku_malloc(datasize);
//printf("%s:%d errno=%d\n", __FILE__, __LINE__, errno); rc.buf=toku_malloc(uncompressed_size + uncompressed_header_len-8);
if (rc.buf==0) { assert(rc.buf);
r=errno; unsigned char *MALLOC_N(compressed_size, compressed_data);
if (0) { died1: toku_free(rc.buf); } assert(compressed_data);
goto died0;
}
rc.size=datasize;
assert(rc.size>0);
rc.ndone=0;
//printf("Deserializing %lld datasize=%d\n", off, datasize);
{ {
ssize_t rlen=pread(fd, rc.buf, datasize, off); ssize_t rlen=pread(fd, compressed_data, compressed_size, off+uncompressed_header_len);
//printf("%s:%d pread->%d datasize=%d\n", __FILE__, __LINE__, r, datasize); //printf("%s:%d pread->%d datasize=%d\n", __FILE__, __LINE__, r, datasize);
if ((size_t)rlen!=datasize) { assert((size_t)rlen==compressed_size);
//printf("%s:%d size messed up\n", __FILE__, __LINE__);
r=errno;
goto died1;
}
//printf("Got %d %d %d %d\n", rc.buf[0], rc.buf[1], rc.buf[2], rc.buf[3]); //printf("Got %d %d %d %d\n", rc.buf[0], rc.buf[1], rc.buf[2], rc.buf[3]);
} }
rc.size=uncompressed_size + uncompressed_header_len;
assert(rc.size>0);
memcpy(rc.buf, uncompressed_header, uncompressed_header_len-8);
{
uLongf destlen = uncompressed_size;
r = uncompress(rc.buf+uncompressed_header_len-8, &destlen,
compressed_data, compressed_size);
assert(destlen==uncompressed_size);
assert(r==Z_OK);
}
toku_free(compressed_data);
rc.ndone=0;
//printf("Deserializing %lld datasize=%d\n", off, datasize);
{ {
bytevec tmp; bytevec tmp;
rbuf_literal_bytes(&rc, &tmp, 8); rbuf_literal_bytes(&rc, &tmp, 8);
if (memcmp(tmp, "tokuleaf", 8)!=0 if (memcmp(tmp, "tokuleaf", 8)!=0
&& memcmp(tmp, "tokunode", 8)!=0) { && memcmp(tmp, "tokunode", 8)!=0) {
r = DB_BADFORMAT; r = DB_BADFORMAT;
goto died1; return r;
} }
} }
result->layout_version = rbuf_int(&rc); result->layout_version = rbuf_int(&rc);
...@@ -302,16 +313,12 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -302,16 +313,12 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
// Don't support older versions. // Don't support older versions.
} }
r=DB_BADFORMAT; r=DB_BADFORMAT;
goto died1; return r;
ok_layout_version: ; ok_layout_version: ;
} }
result->disk_lsn.lsn = rbuf_ulonglong(&rc); result->disk_lsn.lsn = rbuf_ulonglong(&rc);
result->log_lsn = result->disk_lsn; result->log_lsn = result->disk_lsn;
{
unsigned int stored_size = rbuf_int(&rc);
if (stored_size!=datasize) { r=DB_BADFORMAT; goto died1; }
}
result->nodesize = rbuf_int(&rc);
result->thisnodename = off; result->thisnodename = off;
result->flags = rbuf_int(&rc); result->flags = rbuf_int(&rc);
result->height = rbuf_int(&rc); result->height = rbuf_int(&rc);
...@@ -365,7 +372,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -365,7 +372,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
int j; int j;
if (0) { died_12: j=result->u.n.n_bytes_in_buffers; } if (0) { died_12: j=result->u.n.n_bytes_in_buffers; }
for (j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j)); for (j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j));
goto died1; return DB_BADFORMAT;
} }
} }
{ {
...@@ -410,7 +417,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -410,7 +417,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
result->u.l.seqinsert = 0; result->u.l.seqinsert = 0;
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer); //printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_mempool_init(&result->u.l.buffer_mempool, rc.buf, datasize); toku_mempool_init(&result->u.l.buffer_mempool, rc.buf, uncompressed_size + uncompressed_header_len-8);
u_int32_t actual_sum = 0; u_int32_t actual_sum = 0;
u_int32_t start_of_data = rc.ndone; u_int32_t start_of_data = rc.ndone;
...@@ -431,7 +438,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN ...@@ -431,7 +438,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, u_int32_t fullhash, BRTN
toku_free(array); toku_free(array);
if (r!=0) { if (r!=0) {
if (0) { died_21: toku_omt_destroy(&result->u.l.buffer); } if (0) { died_21: toku_omt_destroy(&result->u.l.buffer); }
goto died1; return DB_BADFORMAT;
} }
result->u.l.buffer_mempool.frag_size = start_of_data; result->u.l.buffer_mempool.frag_size = start_of_data;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment