Commit 616ba2fb authored by Yoni Fogel's avatar Yoni Fogel

refs #5222 Implement block cloning for rollback log nodes

git-svn-id: file:///svn/toku/tokudb@47585 c7de825b-a66e-492c-adef-691d508d4ae1
parent 36d9dc7b
......@@ -525,9 +525,9 @@ int toku_serialize_ftnode_to_memory (FTNODE node,
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write);
int toku_serialize_ftnode_to(int fd, BLOCKNUM, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT h, bool for_checkpoint);
int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log,
FT h,
bool for_checkpoint);
int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
FT h, bool for_checkpoint);
void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, DESCRIPTOR desc, ft_compare_func cmp);
......
......@@ -2583,48 +2583,53 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf,
}
static int
toku_serialize_rollback_log_to_memory (ROLLBACK_LOG_NODE log,
enum toku_compression_method method,
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) {
void
toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
// get the size of the serialized node
size_t calculated_size = serialize_rollback_log_size(log);
serialized->len = calculated_size;
serialized->n_sub_blocks = 0;
// choose sub block parameters
int n_sub_blocks = 0, sub_block_size = 0;
int sub_block_size = 0;
size_t data_size = calculated_size - node_header_overhead;
choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &n_sub_blocks);
lazy_assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks);
lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks);
lazy_assert(sub_block_size > 0);
// set the initial sub block size for all of the sub blocks
struct sub_block sub_block[n_sub_blocks];
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
for (int i = 0; i < serialized->n_sub_blocks; i++)
sub_block_init(&serialized->sub_block[i]);
set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block);
// allocate space for the serialized node
char *XMALLOC_N(calculated_size, buf);
XMALLOC_N(calculated_size, serialized->data);
// serialize the node into buf
serialize_rollback_log_node_to_buf(log, buf, calculated_size, n_sub_blocks, sub_block);
//Compress and malloc buffer to write
int result = serialize_uncompressed_block_to_memory(buf, n_sub_blocks, sub_block, method,
n_bytes_to_write, bytes_to_write);
toku_free(buf);
return result;
serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block);
serialized->blocknum = log->blocknum;
}
int
toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log,
FT h,
bool for_checkpoint) {
toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
FT h, bool for_checkpoint) {
size_t n_to_write;
char *compressed_buf;
struct serialized_rollback_log_node serialized_local;
if (is_serialized) {
invariant_null(log);
} else {
invariant_null(serialized_log);
serialized_log = &serialized_local;
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log);
}
BLOCKNUM blocknum = serialized_log->blocknum;
{
int r = toku_serialize_rollback_log_to_memory(log, h->h->compression_method, &n_to_write, &compressed_buf);
//Compress and malloc buffer to write
int r = serialize_uncompressed_block_to_memory(serialized_log->data,
serialized_log->n_sub_blocks, serialized_log->sub_block,
h->h->compression_method, &n_to_write, &compressed_buf);
if (r!=0) return r;
}
......@@ -2636,7 +2641,10 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
}
toku_free(compressed_buf);
log->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
if (!is_serialized) {
toku_static_serialized_rollback_log_destroy(&serialized_local);
log->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
}
return 0;
}
......
......@@ -252,6 +252,7 @@ typedef void (*remove_ft_ref_callback)(FT, void*);
typedef struct memarena *MEMARENA;
typedef struct rollback_log_node *ROLLBACK_LOG_NODE;
typedef struct serialized_rollback_log_node *SERIALIZED_ROLLBACK_LOG_NODE;
//
// Types of snapshots that can be taken by a tokutxn
......
......@@ -26,16 +26,24 @@ rollback_log_destroy(ROLLBACK_LOG_NODE log) {
// On success return nbytes.
void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
bool write_me, bool keep_me, bool for_checkpoint, bool UU(is_clone)) {
bool write_me, bool keep_me, bool for_checkpoint, bool is_clone) {
int r;
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, rollback_v);
ROLLBACK_LOG_NODE log = nullptr;
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
if (is_clone) {
CAST_FROM_VOIDP(serialized, rollback_v);
invariant(serialized->blocknum.b == logname.b);
}
else {
CAST_FROM_VOIDP(log, rollback_v);
invariant(log->blocknum.b == logname.b);
}
FT CAST_FROM_VOIDP(h, extraargs);
assert(log->blocknum.b==logname.b);
if (write_me && !h->panic) {
assert(h->cf == cachefile);
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, for_checkpoint);
r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, h, for_checkpoint);
if (r) {
if (h->panic==0) {
char *e = strerror(r);
......@@ -49,7 +57,12 @@ void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname
}
*new_size = size;
if (!keep_me) {
rollback_log_destroy(log);
if (is_clone) {
toku_serialized_rollback_log_destroy(serialized);
}
else {
rollback_log_destroy(log);
}
}
}
......@@ -119,3 +132,20 @@ int toku_rollback_cleaner_callback (
return 0;
}
void toku_rollback_clone_callback(
void* value_data,
void** cloned_value_data,
PAIR_ATTR* new_attr,
bool UU(for_checkpoint),
void* UU(write_extraargs)
)
{
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
SERIALIZED_ROLLBACK_LOG_NODE XMALLOC(serialized);
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized);
new_attr->is_valid = false;
*cloned_value_data = serialized;
}
......@@ -28,6 +28,8 @@ int toku_rollback_pe_callback (
) ;
bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ;
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep));
void toku_rollback_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
int toku_rollback_cleaner_callback (
void* UU(ftnode_pv),
BLOCKNUM UU(blocknum),
......@@ -41,7 +43,7 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT
wc.pe_est_callback = toku_rollback_pe_est_callback;
wc.pe_callback = toku_rollback_pe_callback;
wc.cleaner_callback = toku_rollback_cleaner_callback;
wc.clone_callback = NULL;
wc.clone_callback = toku_rollback_clone_callback;
wc.write_extraargs = h;
return wc;
}
......
......@@ -8,6 +8,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "omt.h"
#include "sub_block.h"
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
......@@ -81,4 +82,24 @@ struct rollback_log_node {
PAIR ct_pair;
};
struct serialized_rollback_log_node {
char *data;
uint32_t len;
int n_sub_blocks;
BLOCKNUM blocknum;
struct sub_block sub_block[max_sub_blocks];
};
static inline void
toku_static_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
toku_free(log->data);
}
static inline void
toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
toku_static_serialized_rollback_log_destroy(log);
toku_free(log);
}
#endif // TOKU_ROLLBACK_H
......@@ -7,7 +7,7 @@
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <compress.h>
#include "compress.h"
#include "fttypes.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment