Commit 74d1cba1 authored by John Esmet's avatar John Esmet

FT-585 Move serialize and compression size calculations around so we can malloc

one large buffer to serialize a node instead of many smaller ones, which
should hopefully put less pressure on jemalloc during checkpoints etc.
parent a3036d69
...@@ -165,11 +165,12 @@ void toku_compress (enum toku_compression_method a, ...@@ -165,11 +165,12 @@ void toku_compress (enum toku_compression_method a,
assert(1 <= *destLen); assert(1 <= *destLen);
*destLen = 1; *destLen = 1;
} else { } else {
qlz_state_compress *XCALLOC(qsc); toku::scoped_calloc qsc_buf(sizeof(qlz_state_compress));
qlz_state_compress *qsc = reinterpret_cast<qlz_state_compress *>(qsc_buf.get());
size_t actual_destlen = qlz_compress(source, (char*)(dest+1), sourceLen, qsc); size_t actual_destlen = qlz_compress(source, (char*)(dest+1), sourceLen, qsc);
assert(actual_destlen +1 <= *destLen); assert(actual_destlen + 1 <= *destLen);
*destLen = actual_destlen+1; // add one for the rfc1950-style header byte. // add one for the rfc1950-style header byte.
toku_free(qsc); *destLen = actual_destlen + 1;
} }
// Fill in that first byte // Fill in that first byte
dest[0] = TOKU_QUICKLZ_METHOD + (QLZ_COMPRESSION_LEVEL << 4); dest[0] = TOKU_QUICKLZ_METHOD + (QLZ_COMPRESSION_LEVEL << 4);
......
...@@ -376,13 +376,11 @@ static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) { ...@@ -376,13 +376,11 @@ static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
// //
static void static void
serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) { serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
if (sb->uncompressed_ptr == NULL) { // Caller should have allocated memory.
assert(sb->uncompressed_size == 0); invariant_notnull(sb->uncompressed_ptr);
sb->uncompressed_size = serialize_ftnode_partition_size(node,i); invariant(sb->uncompressed_size > 0);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
} else {
assert(sb->uncompressed_size > 0);
}
// //
// Now put the data into sb->uncompressed_ptr // Now put the data into sb->uncompressed_ptr
// //
...@@ -413,10 +411,10 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) { ...@@ -413,10 +411,10 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
// //
static void static void
compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) { compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
assert(sb->compressed_ptr == NULL); invariant(sb->compressed_ptr != nullptr);
set_compressed_size_bound(sb, method); invariant(sb->compressed_size_bound > 0);
// add 8 extra bytes, 4 for compressed size, 4 for decompressed size paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound + 8);
// //
// This probably seems a bit complicated. Here is what is going on. // This probably seems a bit complicated. Here is what is going on.
// In TokuDB 5.0, sub_blocks were compressed and the compressed data // In TokuDB 5.0, sub_blocks were compressed and the compressed data
...@@ -482,13 +480,12 @@ serialize_ftnode_info_size(FTNODE node) ...@@ -482,13 +480,12 @@ serialize_ftnode_info_size(FTNODE node)
return retval; return retval;
} }
static void serialize_ftnode_info(FTNODE node, static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
SUB_BLOCK sb // output // Memory must have been allocated by our caller.
) { invariant(sb->uncompressed_size > 0);
assert(sb->uncompressed_size == 0); invariant_notnull(sb->uncompressed_ptr);
assert(sb->uncompressed_ptr == NULL); paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
sb->uncompressed_size = serialize_ftnode_info_size(node);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
struct wbuf wb; struct wbuf wb;
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
...@@ -703,24 +700,40 @@ int toku_serialize_ftnode_to_memory(FTNODE node, ...@@ -703,24 +700,40 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
// Each partition represents a compressed sub block // Each partition represents a compressed sub block
// For internal nodes, a sub block is a message buffer // For internal nodes, a sub block is a message buffer
// For leaf nodes, a sub block is a basement node // For leaf nodes, a sub block is a basement node
toku::scoped_malloc sb_buf(sizeof(struct sub_block) * npartitions); toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get()); struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
XREALLOC_N(npartitions, *ndd); XREALLOC_N(npartitions, *ndd);
struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]);;
}
sub_block_init(&sb_node_info);
// //
// First, let's serialize and compress the individual sub blocks // First, let's serialize and compress the individual sub blocks
// //
struct serialize_times st;
memset(&st, 0, sizeof(st)); // determine how large our serialization and compression buffers need to be.
size_t serialize_buf_size = 0, compression_buf_size = 0;
for (int i = 0; i < node->n_children; i++) {
sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
serialize_buf_size += sb[i].uncompressed_size;
compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
}
// give each sub block a base pointer to enough buffer space for serialization and compression
toku::scoped_malloc serialize_buf(serialize_buf_size);
toku::scoped_malloc compression_buf(compression_buf_size);
for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
uncompressed_offset += sb[i].uncompressed_size;
compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
invariant(uncompressed_offset <= serialize_buf_size);
invariant(compressed_offset <= compression_buf_size);
}
// do the actual serialization now that we have buffer space
struct serialize_times st = { 0, 0 };
if (in_parallel) { if (in_parallel) {
serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st); serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
} } else {
else {
serialize_and_compress_serially(node, npartitions, compression_method, sb, &st); serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
} }
...@@ -728,16 +741,31 @@ int toku_serialize_ftnode_to_memory(FTNODE node, ...@@ -728,16 +741,31 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
// Now lets create a sub-block that has the common node information, // Now lets create a sub-block that has the common node information,
// This does NOT include the header // This does NOT include the header
// //
// determine how large our serialization and copmression buffers need to be
struct sub_block sb_node_info;
sub_block_init(&sb_node_info);
size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
// do the actual serialization now that we have buffer space
serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st); serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
//
// At this point, we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer and return it.
//
// update the serialize times, ignore the header for simplicity. we captured all // update the serialize times, ignore the header for simplicity. we captured all
// of the partitions' serialize times so that's probably good enough. // of the partitions' serialize times so that's probably good enough.
toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time); toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
// now we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer
// and return it.
// The total size of the node is: // The total size of the node is:
// size of header + disk size of the n+1 sub_block's created above // size of header + disk size of the n+1 sub_block's created above
uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
...@@ -755,11 +783,10 @@ int toku_serialize_ftnode_to_memory(FTNODE node, ...@@ -755,11 +783,10 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
total_uncompressed_size += sb[i].uncompressed_size + 4; total_uncompressed_size += sb[i].uncompressed_size + 4;
} }
// now create the final serialized node
uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
char *XMALLOC_N_ALIGNED(512, total_buffer_size, data); char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
char *curr_ptr = data; char *curr_ptr = data;
// now create the final serialized node
// write the header // write the header
struct wbuf wb; struct wbuf wb;
...@@ -783,28 +810,15 @@ int toku_serialize_ftnode_to_memory(FTNODE node, ...@@ -783,28 +810,15 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
curr_ptr += sizeof(sb[i].xsum); curr_ptr += sizeof(sb[i].xsum);
} }
// Zero the rest of the buffer // Zero the rest of the buffer
for (uint32_t i=total_node_size; i<total_buffer_size; i++) { memset(data + total_node_size, 0, total_buffer_size - total_node_size);
data[i]=0;
}
assert(curr_ptr - data == total_node_size); assert(curr_ptr - data == total_node_size);
*bytes_to_write = data; *bytes_to_write = data;
*n_bytes_to_write = total_buffer_size; *n_bytes_to_write = total_buffer_size;
*n_uncompressed_bytes = total_uncompressed_size; *n_uncompressed_bytes = total_uncompressed_size;
// invariant(*n_bytes_to_write % 512 == 0);
// now that node has been serialized, go through sub_block's and free invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
// memory
//
toku_free(sb_node_info.compressed_ptr);
toku_free(sb_node_info.uncompressed_ptr);
for (int i = 0; i < npartitions; i++) {
toku_free(sb[i].compressed_ptr);
toku_free(sb[i].uncompressed_ptr);
}
assert(0 == (*n_bytes_to_write)%512);
assert(0 == ((unsigned long long)(*bytes_to_write))%512);
return 0; return 0;
} }
...@@ -1578,8 +1592,9 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, ...@@ -1578,8 +1592,9 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
} }
// Now decompress the subblock // Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
{ {
toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
tokutime_t decompress_t0 = toku_time_now(); tokutime_t decompress_t0 = toku_time_now();
toku_decompress( toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr, (Bytef *) sb_node_info.uncompressed_ptr,
...@@ -1589,16 +1604,13 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, ...@@ -1589,16 +1604,13 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
); );
tokutime_t decompress_t1 = toku_time_now(); tokutime_t decompress_t1 = toku_time_now();
decompress_time = decompress_t1 - decompress_t0; decompress_time = decompress_t1 - decompress_t0;
}
// at this point sb->uncompressed_ptr stores the serialized node info. // at this point sb->uncompressed_ptr stores the serialized node info.
r = deserialize_ftnode_info(&sb_node_info, node); r = deserialize_ftnode_info(&sb_node_info, node);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
}
toku_free(sb_node_info.uncompressed_ptr);
sb_node_info.uncompressed_ptr = NULL;
// Now we have the ftnode_info. We have a bunch more stuff in the // Now we have the ftnode_info. We have a bunch more stuff in the
// rbuf, so we might be able to store the compressed data for some // rbuf, so we might be able to store the compressed data for some
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment