Commit 43b9a313 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 1489 to main. addresses #1489

git-svn-id: file:///svn/toku/tokudb@10639 c7de825b-a66e-492c-adef-691d508d4ae1
parent 03bad1a9
...@@ -54,6 +54,7 @@ int n_insertions_since_txn_began=0; ...@@ -54,6 +54,7 @@ int n_insertions_since_txn_began=0;
int env_open_flags = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL; int env_open_flags = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL;
u_int32_t put_flags = DB_YESOVERWRITE; u_int32_t put_flags = DB_YESOVERWRITE;
double compressibility = -1; // -1 means make it very compressible. 1 means use random bits everywhere. 2 means half the bits are random. double compressibility = -1; // -1 means make it very compressible. 1 means use random bits everywhere. 2 means half the bits are random.
int do_append = 0;
static void do_prelock(DB* db, DB_TXN* txn) { static void do_prelock(DB* db, DB_TXN* txn) {
if (prelock) { if (prelock) {
...@@ -80,15 +81,16 @@ DB_TXN *tid=0; ...@@ -80,15 +81,16 @@ DB_TXN *tid=0;
static void benchmark_setup (void) { static void benchmark_setup (void) {
int r; int r;
{ if (!do_append) {
char unlink_cmd[strlen(dbdir) + strlen("rm -rf ") + 1]; char unlink_cmd[strlen(dbdir) + strlen("rm -rf ") + 1];
snprintf(unlink_cmd, sizeof(unlink_cmd), "rm -rf %s", dbdir); snprintf(unlink_cmd, sizeof(unlink_cmd), "rm -rf %s", dbdir);
//printf("unlink_cmd=%s\n", unlink_cmd); //printf("unlink_cmd=%s\n", unlink_cmd);
system(unlink_cmd); system(unlink_cmd);
}
if (strcmp(dbdir, ".") != 0) { if (strcmp(dbdir, ".") != 0) {
r = toku_os_mkdir(dbdir,S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH); r = toku_os_mkdir(dbdir,S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
assert(r == 0); assert(r == 0);
}
} }
r = db_env_create(&dbenv, 0); r = db_env_create(&dbenv, 0);
...@@ -323,6 +325,7 @@ static int print_usage (const char *argv0) { ...@@ -323,6 +325,7 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n"); fprintf(stderr, " --DB_INIT_LOCK (1|0) turn on or off the DB_INIT_LOCK env_open_flag\n");
fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n"); fprintf(stderr, " --1514 do a point query for something not there at end. See #1514. (Requires --norandom)\n");
fprintf(stderr, " --env DIR\n"); fprintf(stderr, " --env DIR\n");
fprintf(stderr, " --append append to an existing file\n");
fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
return 1; return 1;
...@@ -439,6 +442,11 @@ int main (int argc, const char *argv[]) { ...@@ -439,6 +442,11 @@ int main (int argc, const char *argv[]) {
} else if (strcmp(arg, "--prelockflag") == 0) { } else if (strcmp(arg, "--prelockflag") == 0) {
prelock=1; prelock=1;
prelockflag=1; prelockflag=1;
} else if (strcmp(arg, "--srandom") == 0) {
if (i+1 >= argc) return print_usage(argv[0]);
srandom(atoi(argv[++i]));
} else if (strcmp(arg, "--append") == 0) {
do_append = 1;
} else { } else {
return print_usage(argv[0]); return print_usage(argv[0]);
} }
......
...@@ -274,11 +274,12 @@ int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lo ...@@ -274,11 +274,12 @@ int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lo
enum brt_layout_version_e { enum brt_layout_version_e {
BRT_LAYOUT_VERSION_5 = 5, BRT_LAYOUT_VERSION_5 = 5,
BRT_LAYOUT_VERSION_6 = 6, // Diff from 5 to 6: Add leafentry_estimate BRT_LAYOUT_VERSION_6 = 6, // Diff from 5 to 6: Add leafentry_estimate
BRT_LAYOUT_VERSION_7 = 7, // Diff from 6 to 7: Add exact-bit to leafentry_estimate #818, add magic to header #22, add per-subdatase flags #333 BRT_LAYOUT_VERSION_7 = 7, // Diff from 6 to 7: Add exact-bit to leafentry_estimate #818, add magic to header #22, add per-subdatase flags #333
BRT_LAYOUT_VERSION_8 = 8, // Diff from 7 to 8: Use murmur instead of crc32. We are going to make a simplification and stop supporting version 7 and before. Current As of Beta 1.0.6 BRT_LAYOUT_VERSION_8 = 8, // Diff from 7 to 8: Use murmur instead of crc32. We are going to make a simplification and stop supporting version 7 and before. Current As of Beta 1.0.6
BRT_LAYOUT_VERSION_9 = 9, // Diff from 8 to 9: Variable-sized blocks and compression. BRT_LAYOUT_VERSION_9 = 9, // Diff from 8 to 9: Variable-sized blocks and compression.
BRT_ANTEULTIMATE_VERSION, // the version after the most recent version BRT_LAYOUT_VERSION_10 = 10, // Diff from 9 to 10: Variable number of compressed sub-blocks per block
BRT_ANTEULTIMATE_VERSION, // the version after the most recent version
BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line. BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line.
}; };
......
...@@ -105,14 +105,11 @@ toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ss ...@@ -105,14 +105,11 @@ toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ss
} }
} }
// Don't include the compressed data size or the uncompressed data size. // Don't include the compression header
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf" static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
4+ // nodesize 4+ // nodesize
8+ // checkpoint number 8+ // checkpoint number
4+ // target node size 4+ // target node size
4+ // compressed data size
4+ // uncompressed data size
4+ // flags 4+ // flags
4+ // height 4+ // height
4+ // random for fingerprint 4+ // random for fingerprint
...@@ -166,7 +163,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -166,7 +163,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
} }
} }
// This is the size of the uncompressed data, including the uncompressed header, and including the 4 bytes for the information about how big is the compressed version, and how big is the uncompressed version. // This is the size of the uncompressed data, not including the compression headers
unsigned int toku_serialize_brtnode_size (BRTNODE node) { unsigned int toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead; unsigned int result =brtnode_header_overhead;
assert(sizeof(toku_off_t)==8); assert(sizeof(toku_off_t)==8);
...@@ -202,18 +199,99 @@ wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) { ...@@ -202,18 +199,99 @@ wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
enum { uncompressed_magic_len = (8 // tokuleaf or tokunode enum { uncompressed_magic_len = (8 // tokuleaf or tokunode
+4 // version +4 // version
+8 // lsn +8 // lsn
) }; )
};
// uncompressed header offsets
enum {
uncompressed_magic_offset = 0,
uncompressed_version_offset = 8,
uncompressed_lsn_offset = 12,
};
// compression header sub block sizes
struct sub_block_sizes {
u_int32_t compressed_size;
u_int32_t uncompressed_size;
};
// round up n
static inline int roundup2(int n, int alignment) {
return (n+alignment-1)&~(alignment-1);
}
// choose the number of sub blocks such that the sub block size
// is around 1 meg. put an upper bound on the number of sub blocks.
static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes sizes[]) {
const int meg = 1024*1024;
const int alignment = 256;
int n, subsize;
n = totalsize/meg;
if (n == 0) {
n = 1;
subsize = totalsize;
} else {
if (n > maxn)
n = maxn;
subsize = roundup2(totalsize/n, alignment);
while (n < maxn && subsize >= meg + meg/8) {
n++;
subsize = roundup2(totalsize/n, alignment);
}
}
// generate the sub block sizes
int i;
for (i=0; i<n-1; i++) {
sizes[i].uncompressed_size = subsize;
sizes[i].compressed_size = compressBound(subsize);
totalsize -= subsize;
}
if (i == 0 || totalsize > 0) {
sizes[i].uncompressed_size = totalsize;
sizes[i].compressed_size = compressBound(totalsize);
i++;
}
return i;
}
// get the size of the compression header
static size_t get_compression_header_size(int layout_version, int n) {
if (layout_version < BRT_LAYOUT_VERSION_10)
return n * sizeof (struct sub_block_sizes);
else
return sizeof (u_int32_t) + n * sizeof (struct sub_block_sizes);
}
// get the sum of the sub block compressed sizes
static size_t get_sum_compressed_size(int n, struct sub_block_sizes sizes[]) {
int i;
size_t compressed_size = 0;
for (i=0; i<n; i++)
compressed_size += sizes[i].compressed_size;
return compressed_size;
}
enum { compression_header_len = (4 // compressed_len
+4 // uncompressed_len // get the sum of the sub block uncompressed sizes
) }; static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) {
int i;
size_t uncompressed_size = 0;
for (i=0; i<n; i++)
uncompressed_size += sizes[i].uncompressed_size;
return uncompressed_size;
}
static inline void ignore_int (int UU(ignore_me)) {} static inline void ignore_int (int UU(ignore_me)) {}
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads) { int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads) {
struct wbuf w; struct wbuf w;
int i; int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
// serialize the node into buf
unsigned int calculated_size = toku_serialize_brtnode_size(node);
//printf("%s:%d serializing %" PRIu64 " size=%d\n", __FILE__, __LINE__, blocknum.b, calculated_size); //printf("%s:%d serializing %" PRIu64 " size=%d\n", __FILE__, __LINE__, blocknum.b, calculated_size);
//assert(calculated_size<=size); //assert(calculated_size<=size);
//char buf[size]; //char buf[size];
...@@ -225,7 +303,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -225,7 +303,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
wbuf_literal_bytes(&w, "toku", 4); wbuf_literal_bytes(&w, "toku", 4);
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4); if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4); else wbuf_literal_bytes(&w, "node", 4);
wbuf_int(&w, BRT_LAYOUT_VERSION); assert(node->layout_version == BRT_LAYOUT_VERSION_9 || node->layout_version == BRT_LAYOUT_VERSION);
wbuf_int(&w, node->layout_version);
wbuf_ulonglong(&w, node->log_lsn.lsn); wbuf_ulonglong(&w, node->log_lsn.lsn);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size); //printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, node->nodesize); wbuf_uint(&w, node->nodesize);
...@@ -312,42 +391,78 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -312,42 +391,78 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
// tokuleaf(8), // tokuleaf(8),
// version(4), // version(4),
// lsn(8), // lsn(8),
// compressed_len(4),[which includes only the compressed data] // n_sub_blocks(4), followed by n length pairs
// uncompressed_len(4)[which includes only the compressed data, not the header] // compressed_len(4)
// uncompressed_len(4)
// The first part of the data is uncompressed
uLongf uncompressed_len = calculated_size-uncompressed_magic_len; // select the number of sub blocks and their sizes.
uLongf compressed_len = compressBound(uncompressed_len); // impose an upper bound on the number of sub blocks.
int max_sub_blocks = 4;
if (node->layout_version < BRT_LAYOUT_VERSION_10)
max_sub_blocks = 1;
struct sub_block_sizes sub_block_sizes[max_sub_blocks];
int n_sub_blocks = get_sub_block_sizes(calculated_size-uncompressed_magic_len, max_sub_blocks, sub_block_sizes);
assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
if (0 && n_sub_blocks != 1) {
printf("%s:%d %d:", __FUNCTION__, __LINE__, n_sub_blocks);
for (i=0; i<n_sub_blocks; i++)
printf("%u ", sub_block_sizes[i].uncompressed_size);
printf("\n");
}
size_t compressed_len = get_sum_compressed_size(n_sub_blocks, sub_block_sizes);
size_t compression_header_len = get_compression_header_size(node->layout_version, n_sub_blocks);
char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf); char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf);
memcpy(compressed_buf, buf, uncompressed_magic_len); memcpy(compressed_buf, buf, uncompressed_magic_len);
if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n", if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
buf[uncompressed_magic_len], buf[uncompressed_magic_len+1], buf[uncompressed_magic_len], buf[uncompressed_magic_len+1],
buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]); buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]);
{
// TBD compress all of the sub blocks
char *uncompressed_ptr = buf + uncompressed_magic_len;
char *compressed_base_ptr = compressed_buf + uncompressed_magic_len + compression_header_len;
char *compressed_ptr = compressed_base_ptr;
for (i=0; i<n_sub_blocks; i++) {
uLongf uncompressed_len = sub_block_sizes[i].uncompressed_size;
uLongf real_compressed_len = sub_block_sizes[i].compressed_size;
{
#ifdef ADAPTIVE_COMPRESSION #ifdef ADAPTIVE_COMPRESSION
// Marketing has expressed concern that this algorithm will make customers go crazy. // Marketing has expressed concern that this algorithm will make customers go crazy.
int compression_level; int compression_level;
if (n_workitems <= n_threads) compression_level = 5; if (n_workitems <= n_threads) compression_level = 5;
else if (n_workitems <= 2*n_threads) compression_level = 4; else if (n_workitems <= 2*n_threads) compression_level = 4;
else if (n_workitems <= 3*n_threads) compression_level = 3; else if (n_workitems <= 3*n_threads) compression_level = 3;
else if (n_workitems <= 4*n_threads) compression_level = 2; else if (n_workitems <= 4*n_threads) compression_level = 2;
else compression_level = 1; else compression_level = 1;
#else #else
int compression_level = 5; int compression_level = 5;
ignore_int(n_workitems); ignore_int(n_threads); ignore_int(n_workitems); ignore_int(n_threads);
#endif #endif
//printf("compress(%d) n_workitems=%d n_threads=%d\n", compression_level, n_workitems, n_threads); //printf("compress(%d) n_workitems=%d n_threads=%d\n", compression_level, n_workitems, n_threads);
int r = compress2(((Bytef*)compressed_buf)+uncompressed_magic_len + compression_header_len, &compressed_len, int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
((Bytef*)buf)+uncompressed_magic_len, calculated_size-uncompressed_magic_len, (Bytef*)uncompressed_ptr, uncompressed_len,
compression_level); compression_level);
assert(r==Z_OK); assert(r==Z_OK);
sub_block_sizes[i].compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
uncompressed_ptr += uncompressed_len; // update the uncompressed and compressed buffer pointers
compressed_ptr += real_compressed_len;
}
} }
compressed_len = compressed_ptr - compressed_base_ptr;
if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %lu\n", blocknum.b, calculated_size-uncompressed_magic_len, compressed_len); if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %lu\n", blocknum.b, calculated_size-uncompressed_magic_len, compressed_len);
((int32_t*)(compressed_buf+uncompressed_magic_len))[0] = toku_htonl(compressed_len); // write out the compression header
((int32_t*)(compressed_buf+uncompressed_magic_len))[1] = toku_htonl(uncompressed_len); uint32_t *compressed_header_ptr = (uint32_t *)(compressed_buf + uncompressed_magic_len);
if (node->layout_version >= BRT_LAYOUT_VERSION_10)
*compressed_header_ptr++ = toku_htonl(n_sub_blocks);
for (i=0; i<n_sub_blocks; i++) {
compressed_header_ptr[0] = toku_htonl(sub_block_sizes[i].compressed_size);
compressed_header_ptr[1] = toku_htonl(sub_block_sizes[i].uncompressed_size);
compressed_header_ptr += 2;
}
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r; int r;
...@@ -383,9 +498,78 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -383,9 +498,78 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
return r; return r;
} }
#define DO_DECOMPRESS_WORKER 1
struct decompress_work {
toku_pthread_t id;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
};
// initialize the decompression work
static void init_decompress_work(struct decompress_work *w,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size) {
w->id = 0;
w->compress_ptr = compress_ptr; w->compress_size = compress_size;
w->uncompress_ptr = uncompress_ptr; w->uncompress_size = uncompress_size;
}
// do the decompression work
static void do_decompress_work(struct decompress_work *w) {
uLongf destlen = w->uncompress_size;
int r = uncompress(w->uncompress_ptr, &destlen,
w->compress_ptr, w->compress_size);
assert(destlen==w->uncompress_size);
assert(r==Z_OK);
}
#if DO_DECOMPRESS_WORKER
static void *decompress_worker(void *);
static void start_decompress_work(struct decompress_work *w) {
int r = toku_pthread_create(&w->id, NULL, decompress_worker, w); assert(r == 0);
}
static void wait_decompress_work(struct decompress_work *w) {
void *ret;
int r = toku_pthread_join(w->id, &ret); assert(r == 0);
}
static void *decompress_worker(void *arg) {
struct decompress_work *w = (struct decompress_work *) arg;
do_decompress_work(w);
return arg;
}
#endif
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
static int toku_trace_fd = -1;
static inline void do_toku_trace(const char *cp, int len) {
write(toku_trace_fd, cp, len);
}
#define toku_trace(a) do_toku_trace(a, strlen(a))
#else
#define toku_trace(a)
#endif
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) { int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic; if (h->panic) return h->panic;
#if DO_TOKU_TRACE
if (toku_trace_fd == -1)
toku_trace_fd = open("/dev/null", O_WRONLY);
toku_trace("deserial start");
#endif
// get the file offset and block size for the block
DISKOFF offset, size; DISKOFF offset, size;
toku_block_get_offset_size(h->blocktable, blocknum, &offset, &size); toku_block_get_offset_size(h->blocktable, blocknum, &offset, &size);
TAGMALLOC(BRTNODE, result); TAGMALLOC(BRTNODE, result);
...@@ -401,38 +585,83 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -401,38 +585,83 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
unsigned char *MALLOC_N(size, compressed_block); unsigned char *MALLOC_N(size, compressed_block);
// read the compressed block
ssize_t rlen = pread(fd, compressed_block, size, offset); ssize_t rlen = pread(fd, compressed_block, size, offset);
assert((DISKOFF)rlen == size); assert((DISKOFF)rlen == size);
// get the layout_version
unsigned char *uncompressed_header = compressed_block; unsigned char *uncompressed_header = compressed_block;
u_int32_t compressed_size = toku_ntohl(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len])); int layout_version = toku_ntohl(*(uint32_t*)(uncompressed_header+uncompressed_version_offset));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; }
u_int32_t uncompressed_size = toku_ntohl(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len+4])); // get the number of compressed sub blocks
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size); int n_sub_blocks;
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; } int compression_header_offset;
if (layout_version < BRT_LAYOUT_VERSION_10) {
n_sub_blocks = 1;
compression_header_offset = uncompressed_magic_len;
} else {
n_sub_blocks = toku_ntohl(*(u_int32_t*)(&uncompressed_header[uncompressed_magic_len]));
compression_header_offset = uncompressed_magic_len + 4;
}
assert(0 < n_sub_blocks);
// verify the sizes of the compressed sub blocks
if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
unsigned char *compressed_data = compressed_block + uncompressed_magic_len + compression_header_len; struct sub_block_sizes sub_block_sizes[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_ntohl(*(u_int32_t*)(&uncompressed_header[compression_header_offset+8*i]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; }
u_int32_t uncompressed_size = toku_ntohl(*(u_int32_t*)(&uncompressed_header[compression_header_offset+8*i+4]));
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); goto died0; }
sub_block_sizes[i].compressed_size = compressed_size;
sub_block_sizes[i].uncompressed_size = uncompressed_size;
}
rc.size= uncompressed_size + uncompressed_magic_len; unsigned char *compressed_data = compressed_block + uncompressed_magic_len + get_compression_header_size(layout_version, n_sub_blocks);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block_sizes);
rc.size= uncompressed_magic_len + uncompressed_size;
assert(rc.size>0); assert(rc.size>0);
rc.buf=toku_malloc(rc.size); rc.buf=toku_malloc(rc.size);
assert(rc.buf); assert(rc.buf);
// construct the uncompressed block from the header and compressed sub blocks
memcpy(rc.buf, uncompressed_header, uncompressed_magic_len); memcpy(rc.buf, uncompressed_header, uncompressed_magic_len);
{
uLongf destlen = uncompressed_size; // decompress the sub blocks
r = uncompress(rc.buf+uncompressed_magic_len, &destlen, void *uncompressed_data = rc.buf+uncompressed_magic_len;
compressed_data, compressed_size); struct decompress_work decompress_work[n_sub_blocks];
assert(destlen==uncompressed_size);
assert(r==Z_OK); for (i=0; i<n_sub_blocks; i++) {
init_decompress_work(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
if (i>0) {
#if DO_DECOMPRESS_WORKER
start_decompress_work(&decompress_work[i]);
#else
do_decompress_work(&decompress_work[i]);
#endif
}
uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size;
} }
do_decompress_work(&decompress_work[0]);
#if DO_DECOMPRESS_WORKER
for (i=1; i<n_sub_blocks; i++)
wait_decompress_work(&decompress_work[i]);
#endif
toku_trace("decompress done");
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n", if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
rc.buf[uncompressed_magic_len], rc.buf[uncompressed_magic_len+1], rc.buf[uncompressed_magic_len], rc.buf[uncompressed_magic_len+1],
rc.buf[uncompressed_magic_len+2], rc.buf[uncompressed_magic_len+3]); rc.buf[uncompressed_magic_len+2], rc.buf[uncompressed_magic_len+3]);
toku_free(compressed_block); toku_free(compressed_block);
// deserialize the uncompressed block
rc.ndone=0; rc.ndone=0;
//printf("Deserializing %lld datasize=%d\n", off, datasize); //printf("Deserializing %lld datasize=%d\n", off, datasize);
{ {
...@@ -447,8 +676,10 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -447,8 +676,10 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
result->layout_version = rbuf_int(&rc); result->layout_version = rbuf_int(&rc);
{ {
switch (result->layout_version) { switch (result->layout_version) {
case BRT_LAYOUT_VERSION_9: goto ok_layout_version; case BRT_LAYOUT_VERSION_10:
// Don't support older versions. case BRT_LAYOUT_VERSION_9:
goto ok_layout_version;
// Don't support older versions.
} }
r=toku_db_badformat(); r=toku_db_badformat();
return r; return r;
...@@ -570,10 +801,12 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -570,10 +801,12 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
array[i]=(OMTVALUE)le; array[i]=(OMTVALUE)le;
actual_sum += x1764_memory(le, disksize); actual_sum += x1764_memory(le, disksize);
} }
toku_trace("fill array");
u_int32_t end_of_data = rc.ndone; u_int32_t end_of_data = rc.ndone;
result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD; result->u.l.n_bytes_in_buffer += end_of_data-start_of_data + n_in_buf*OMT_ITEM_OVERHEAD;
actual_sum *= result->rand4fingerprint; actual_sum *= result->rand4fingerprint;
r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf); r = toku_omt_create_steal_sorted_array(&result->u.l.buffer, &array, n_in_buf, n_in_buf);
toku_trace("create omt");
if (r!=0) { if (r!=0) {
toku_free(array); toku_free(array);
if (0) { died_21: toku_omt_destroy(&result->u.l.buffer); } if (0) { died_21: toku_omt_destroy(&result->u.l.buffer); }
...@@ -602,7 +835,9 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -602,7 +835,9 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
if (n_read_so_far+4!=rc.size) { if (n_read_so_far+4!=rc.size) {
r = toku_db_badformat(); goto died_21; r = toku_db_badformat(); goto died_21;
} }
toku_trace("x1764 start");
uint32_t crc = x1764_memory(rc.buf, n_read_so_far); uint32_t crc = x1764_memory(rc.buf, n_read_so_far);
toku_trace("x1764");
uint32_t storedcrc = rbuf_int(&rc); uint32_t storedcrc = rbuf_int(&rc);
if (crc!=storedcrc) { if (crc!=storedcrc) {
printf("Bad CRC\n"); printf("Bad CRC\n");
...@@ -617,6 +852,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -617,6 +852,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
// For height==0 we used the buf inside the OMT // For height==0 we used the buf inside the OMT
toku_free(rc.buf); toku_free(rc.buf);
} }
toku_trace("deserial done");
*brtnode = result; *brtnode = result;
//toku_verify_counts(result); //toku_verify_counts(result);
return 0; return 0;
...@@ -695,7 +931,7 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h) ...@@ -695,7 +931,7 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size? unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size?
wbuf_literal_bytes(wbuf, "tokudata", 8); wbuf_literal_bytes(wbuf, "tokudata", 8);
wbuf_int (wbuf, size); wbuf_int (wbuf, size);
wbuf_int (wbuf, BRT_LAYOUT_VERSION); wbuf_int (wbuf, h->layout_version);
wbuf_int (wbuf, h->nodesize); wbuf_int (wbuf, h->nodesize);
//TODO: Use 'prelocked/unlocked' versions to make this atomic //TODO: Use 'prelocked/unlocked' versions to make this atomic
//TODO: #1463 START //TODO: #1463 START
...@@ -810,7 +1046,7 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header ** ...@@ -810,7 +1046,7 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
h->panic_string = 0; h->panic_string = 0;
h->layout_version = rbuf_int(&rc); h->layout_version = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc); h->nodesize = rbuf_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_9); assert(h->layout_version==BRT_LAYOUT_VERSION_9 || h->layout_version==BRT_LAYOUT_VERSION_10);
BLOCKNUM free_blocks = rbuf_blocknum(&rc); BLOCKNUM free_blocks = rbuf_blocknum(&rc);
BLOCKNUM unused_blocks = rbuf_blocknum(&rc); BLOCKNUM unused_blocks = rbuf_blocknum(&rc);
h->n_named_roots = rbuf_int(&rc); h->n_named_roots = rbuf_int(&rc);
......
...@@ -607,7 +607,8 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height) ...@@ -607,7 +607,8 @@ initialize_empty_brtnode (BRT t, BRTNODE n, BLOCKNUM nodename, int height)
n->thisnodename = nodename; n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0. n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn; n->log_lsn = n->disk_lsn;
n->layout_version = BRT_LAYOUT_VERSION; assert(t->h->layout_version != 0);
n->layout_version = t->h->layout_version;
n->height = height; n->height = height;
n->rand4fingerprint = random(); n->rand4fingerprint = random();
n->local_fingerprint = 0; n->local_fingerprint = 0;
...@@ -2728,6 +2729,8 @@ int toku_brt_alloc_init_header(BRT t, const char *dbname) { ...@@ -2728,6 +2729,8 @@ int toku_brt_alloc_init_header(BRT t, const char *dbname) {
return r; return r;
} }
t->h->layout_version = BRT_LAYOUT_VERSION;
if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; } if ((MALLOC_N(1, t->h->flags_array))==0) { r = errno; if (0) { died3: toku_free(t->h->flags_array); } goto died2; }
if (dbname) { if (dbname) {
......
...@@ -222,6 +222,30 @@ dump_fragmentation(int f, struct brt_header *h) { ...@@ -222,6 +222,30 @@ dump_fragmentation(int f, struct brt_header *h) {
printf("fragmentation: %.1f%%\n", 100. * ((double)fragsizes / (double)(fragsizes + blocksizes))); printf("fragmentation: %.1f%%\n", 100. * ((double)fragsizes / (double)(fragsizes + blocksizes)));
} }
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t i;
for (i=0; i<size; i++) {
if ((i % 32) == 0)
printf("%"PRIu64": ", offset+i);
printf("%2.2X", vp[i]);
if (((i+1) % 4) == 0)
printf(" ");
if (((i+1) % 32) == 0)
printf("\n");
}
printf("\n");
}
static void
dump_file(int f, u_int64_t offset, u_int64_t size) {
unsigned char *vp = toku_malloc(size);
u_int64_t r = pread(f, vp, size, offset);
if (r == size)
hex_dump(vp, offset, size);
toku_free(vp);
}
static void static void
readline (char *line, int maxline) { readline (char *line, int maxline) {
int i = 0; int i = 0;
...@@ -278,7 +302,7 @@ main (int argc, const char *argv[]) { ...@@ -278,7 +302,7 @@ main (int argc, const char *argv[]) {
readline(line, maxline); readline(line, maxline);
if (strcmp(line, "") == 0) if (strcmp(line, "") == 0)
break; break;
enum { maxfields = 2 }; const int maxfields = 4;
char *fields[maxfields]; char *fields[maxfields];
int nfields = split_fields(line, fields, maxfields); int nfields = split_fields(line, fields, maxfields);
if (nfields == 0) if (nfields == 0)
...@@ -298,6 +322,17 @@ main (int argc, const char *argv[]) { ...@@ -298,6 +322,17 @@ main (int argc, const char *argv[]) {
dump_block_translation(h, offset); dump_block_translation(h, offset);
} else if (strcmp(fields[0], "fragmentation") == 0) { } else if (strcmp(fields[0], "fragmentation") == 0) {
dump_fragmentation(f, h); dump_fragmentation(f, h);
} else if (strcmp(fields[0], "file") == 0 && nfields == 3) {
u_int64_t offset, size;
if (strncmp(fields[1], "0x", 2) == 0)
offset = strtoll(fields[1], NULL, 16);
else
offset = strtoll(fields[1], NULL, 10);
if (strncmp(fields[2], "0x", 2) == 0)
size = strtoll(fields[2], NULL, 16);
else
size = strtoll(fields[2], NULL, 10);
dump_file(f, offset, size);
} else if (strcmp(fields[0], "quit") == 0 || strcmp(fields[0], "q") == 0) { } else if (strcmp(fields[0], "quit") == 0 || strcmp(fields[0], "q") == 0) {
break; break;
} }
......
...@@ -36,6 +36,7 @@ REGRESSION_TESTS_RAW = \ ...@@ -36,6 +36,7 @@ REGRESSION_TESTS_RAW = \
block_allocator_test \ block_allocator_test \
bread-test \ bread-test \
brt-serialize-test \ brt-serialize-test \
brt-serialize-sub-block-test \
brt-test \ brt-test \
brt-test-cursor \ brt-test-cursor \
brt-test-cursor-2 \ brt-test-cursor-2 \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
// create a brt and put n rows into it
// write the brt to the file
// verify the rows in the brt
static void test_sub_block(int n) {
if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n);
const char fname[]= __FILE__ ".brt";
const int nodesize = 4*1024*1024;
TOKUTXN const null_txn = 0;
DB * const null_db = 0;
char * const null_dbname = 0;
int error;
CACHETABLE ct;
BRT brt;
int i;
unlink_file_and_bit(fname);
error = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER);
assert(error == 0);
error = toku_open_brt(fname, null_dbname, TRUE, &brt, nodesize, ct, null_txn, toku_default_compare_fun, null_db);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
for (i=0; i<n; i++) {
int k = toku_htonl(i);
int v = i;
DBT key, val;
toku_fill_dbt(&key, &k, sizeof k);
toku_fill_dbt(&val, &v, sizeof v);
error = toku_brt_insert(brt, &key, &val, 0);
assert(error == 0);
}
// write to the file
error = toku_close_brt(brt, 0, 0);
assert(error == 0);
// verify the brt by walking a cursor through the rows
error = toku_open_brt(fname, null_dbname, FALSE, &brt, nodesize, ct, null_txn, toku_default_compare_fun, null_db);
assert(error == 0);
BRT_CURSOR cursor;
error = toku_brt_cursor(brt, &cursor);
assert(error == 0);
for (i=0; ; i++) {
int k = htonl(i);
int v = i;
struct check_pair pair = {sizeof k, &k, sizeof v, &v, 0};
error = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_NEXT, null_txn);
if (error != 0) {
assert(pair.call_count==0);
break;
}
assert(pair.call_count==1);
}
assert(i == n);
error = toku_brt_cursor_close(cursor);
assert(error == 0);
error = toku_close_brt(brt, 0, 0);
assert(error == 0);
error = toku_cachetable_close(&ct);
assert(error == 0);
}
int test_main (int argc , const char *argv[]) {
default_parse_args(argc, argv);
const int meg = 1024*1024;
const int row = 32;
const int rowspermeg = meg/row;
test_sub_block(1);
test_sub_block(rowspermeg-1);
int i;
for (i=1; i<8; i++)
test_sub_block(rowspermeg*i);
if (verbose) printf("test ok\n");
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment