Commit baf1831f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5560, merge to main

git-svn-id: file:///svn/toku/tokudb@49128 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1f2665b7
...@@ -61,6 +61,7 @@ set(FT_SOURCES ...@@ -61,6 +61,7 @@ set(FT_SOURCES
rollback rollback
rollback-apply rollback-apply
rollback-ct-callbacks rollback-ct-callbacks
rollback_log_node_cache
roll roll
sub_block sub_block
txn txn
......
This diff is collapsed.
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h> #include <toku_portability.h>
#include "ft-internal.h" // ugly but pragmatic, need access to dirty bits while holding translation lock #include "ft-internal.h" // ugly but pragmatic, need access to dirty bits while holding translation lock
#include "fttypes.h" #include "fttypes.h"
#include "block_table.h" #include "block_table.h"
#include "memory.h" #include "memory.h"
...@@ -145,8 +145,8 @@ copy_translation(struct translation * dst, struct translation * src, enum transl ...@@ -145,8 +145,8 @@ copy_translation(struct translation * dst, struct translation * src, enum transl
dst->length_of_array = dst->smallest_never_used_blocknum.b; dst->length_of_array = dst->smallest_never_used_blocknum.b;
XMALLOC_N(dst->length_of_array, dst->block_translation); XMALLOC_N(dst->length_of_array, dst->block_translation);
memcpy(dst->block_translation, memcpy(dst->block_translation,
src->block_translation, src->block_translation,
dst->length_of_array * sizeof(*dst->block_translation)); dst->length_of_array * sizeof(*dst->block_translation));
//New version of btt is not yet stored on disk. //New version of btt is not yet stored on disk.
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0; dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff = diskoff_unused; dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff = diskoff_unused;
...@@ -241,7 +241,7 @@ cleanup_failed_checkpoint (BLOCK_TABLE bt) { ...@@ -241,7 +241,7 @@ cleanup_failed_checkpoint (BLOCK_TABLE bt) {
for (i = 0; i < t->length_of_array; i++) { for (i = 0; i < t->length_of_array; i++) {
struct block_translation_pair *pair = &t->block_translation[i]; struct block_translation_pair *pair = &t->block_translation[i];
if (pair->size > 0 && if (pair->size > 0 &&
!translation_prevents_freeing(&bt->current, make_blocknum(i), pair) && !translation_prevents_freeing(&bt->current, make_blocknum(i), pair) &&
!translation_prevents_freeing(&bt->checkpointed, make_blocknum(i), pair)) { !translation_prevents_freeing(&bt->checkpointed, make_blocknum(i), pair)) {
PRNTF("free", i, pair->size, pair->u.diskoff, bt); PRNTF("free", i, pair->size, pair->u.diskoff, bt);
...@@ -368,7 +368,7 @@ static int64_t ...@@ -368,7 +368,7 @@ static int64_t
calculate_size_on_disk (struct translation *t) { calculate_size_on_disk (struct translation *t) {
int64_t r = (8 + // smallest_never_used_blocknum int64_t r = (8 + // smallest_never_used_blocknum
8 + // blocknum_freelist_head 8 + // blocknum_freelist_head
t->smallest_never_used_blocknum.b * 16 + // Array t->smallest_never_used_blocknum.b * 16 + // Array
4); // 4 for checksum 4); // 4 for checksum
return r; return r;
} }
...@@ -400,18 +400,21 @@ PRNTF("Freed", b.b, old_pair.size, old_pair.u.diskoff, bt); ...@@ -400,18 +400,21 @@ PRNTF("Freed", b.b, old_pair.size, old_pair.u.diskoff, bt);
block_allocator_free_block(bt->block_allocator, old_pair.u.diskoff); block_allocator_free_block(bt->block_allocator, old_pair.u.diskoff);
} }
uint64_t allocator_offset; uint64_t allocator_offset = diskoff_unused;
//Allocate a new block t->block_translation[b.b].size = size;
block_allocator_alloc_block(bt->block_allocator, size, &allocator_offset); if (size > 0) {
// Allocate a new block if the size is greater than 0,
// if the size is just 0, offset will be set to diskoff_unused
block_allocator_alloc_block(bt->block_allocator, size, &allocator_offset);
}
t->block_translation[b.b].u.diskoff = allocator_offset; t->block_translation[b.b].u.diskoff = allocator_offset;
t->block_translation[b.b].size = size;
*offset = allocator_offset; *offset = allocator_offset;
PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt); PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
//Update inprogress btt if appropriate (if called because Pending bit is set). //Update inprogress btt if appropriate (if called because Pending bit is set).
if (for_checkpoint) { if (for_checkpoint) {
assert(b.b < bt->inprogress.length_of_array); assert(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b]; bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
} }
} }
...@@ -630,15 +633,36 @@ toku_block_verify_no_free_blocknums(BLOCK_TABLE bt) { ...@@ -630,15 +633,36 @@ toku_block_verify_no_free_blocknums(BLOCK_TABLE bt) {
assert(bt->current.blocknum_freelist_head.b == freelist_null.b); assert(bt->current.blocknum_freelist_head.b == freelist_null.b);
} }
// Frees blocknums that have a size of 0 and unused diskoff
// Currently used for eliminating unused cached rollback log nodes
void
toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root) {
lock_for_blocktable(bt);
int64_t smallest = bt->current.smallest_never_used_blocknum.b;
for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) {
if (i == root.b) {
continue;
}
BLOCKNUM b = make_blocknum(i);
if (bt->current.block_translation[b.b].size == 0) {
invariant(bt->current.block_translation[b.b].u.diskoff == diskoff_unused);
free_blocknum_in_translation(&bt->current, b);
}
}
unlock_for_blocktable(bt);
}
//Verify there are no data blocks except root. //Verify there are no data blocks except root.
void void
toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root) { toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) {
lock_for_blocktable(bt); lock_for_blocktable(bt);
//Relies on checkpoint having used optimize_translation
assert(root.b >= RESERVED_BLOCKNUMS); assert(root.b >= RESERVED_BLOCKNUMS);
assert(bt->current.smallest_never_used_blocknum.b == root.b + 1); int64_t smallest = bt->current.smallest_never_used_blocknum.b;
int64_t i; for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) {
for (i=RESERVED_BLOCKNUMS; i < root.b; i++) { if (i == root.b) {
continue;
}
BLOCKNUM b = make_blocknum(i); BLOCKNUM b = make_blocknum(i);
assert(bt->current.block_translation[b.b].size == size_is_free); assert(bt->current.block_translation[b.b].size == size_is_free);
} }
......
...@@ -37,7 +37,8 @@ void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT h); ...@@ -37,7 +37,8 @@ void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT h);
void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT h); void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT h);
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, FT h, bool for_checkpoint); void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, FT h, bool for_checkpoint);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b); void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root); void toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root);
void toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt); void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h, int fd); void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h, int fd);
void toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h); void toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h);
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
// so they are still easily available to the debugger and to save lots of typing. // so they are still easily available to the debugger and to save lots of typing.
static uint64_t cachetable_miss; static uint64_t cachetable_miss;
static uint64_t cachetable_misstime; // time spent waiting for disk read static uint64_t cachetable_misstime; // time spent waiting for disk read
static uint64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static uint64_t cachetable_evictions; static uint64_t cachetable_evictions;
static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
...@@ -53,7 +52,6 @@ status_init(void) { ...@@ -53,7 +52,6 @@ status_init(void) {
STATUS_INIT(CT_MISS, UINT64, "miss"); STATUS_INIT(CT_MISS, UINT64, "miss");
STATUS_INIT(CT_MISSTIME, UINT64, "miss time"); STATUS_INIT(CT_MISSTIME, UINT64, "miss time");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches"); STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_SIZE_CURRENT, UINT64, "size current"); STATUS_INIT(CT_SIZE_CURRENT, UINT64, "size current");
STATUS_INIT(CT_SIZE_LIMIT, UINT64, "size limit"); STATUS_INIT(CT_SIZE_LIMIT, UINT64, "size limit");
...@@ -107,7 +105,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -107,7 +105,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
} }
STATUS_VALUE(CT_MISS) = cachetable_miss; STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime; STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches; STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions; STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions;
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions; STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions;
...@@ -780,9 +777,9 @@ void pair_init(PAIR p, ...@@ -780,9 +777,9 @@ void pair_init(PAIR p,
// Requires pair list's write lock to be held on entry. // Requires pair list's write lock to be held on entry.
// On exit, get pair with mutex held // On exit, get pair with mutex held
// //
static PAIR cachetable_insert_at(CACHETABLE ct, static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value, CACHEFILE cachefile, CACHEKEY key, void *value,
uint32_t fullhash, uint32_t fullhash,
PAIR_ATTR attr, PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback, CACHETABLE_WRITE_CALLBACK write_callback,
enum cachetable_dirty dirty) { enum cachetable_dirty dirty) {
...@@ -798,13 +795,20 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -798,13 +795,20 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
fullhash, fullhash,
write_callback, write_callback,
&ct->ev, &ct->ev,
&ct->list); &ct->list
);
ct->list.put(p); ct->list.put(p);
ct->ev.add_pair_attr(attr); ct->ev.add_pair_attr(attr);
return p; return p;
} }
static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
ct->list.put(p);
ct->ev.add_pair_attr(attr);
}
// has ct locked on entry // has ct locked on entry
// This function MUST NOT release and reacquire the cachetable lock // This function MUST NOT release and reacquire the cachetable lock
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
...@@ -813,41 +817,27 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -813,41 +817,27 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
// //
static void cachetable_put_internal( static void cachetable_put_internal(
CACHEFILE cachefile, CACHEFILE cachefile,
CACHEKEY key, PAIR p,
uint32_t fullhash,
void *value, void *value,
PAIR_ATTR attr, PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback,
CACHETABLE_PUT_CALLBACK put_callback CACHETABLE_PUT_CALLBACK put_callback
) )
{ {
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p = ct->list.find_pair(cachefile, key, fullhash); //
invariant_null(p); //
// TODO: (Zardosht), make code run in debug only
// flushing could change the table size, but wont' change the fullhash //
cachetable_puts++; //
p = cachetable_insert_at( //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash);
ct, //invariant_null(dummy_p);
cachefile, cachetable_insert_pair_at(ct, p, attr);
key,
value,
fullhash,
attr,
write_callback,
CACHETABLE_DIRTY
);
invariant_notnull(p);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
//note_hash_count(count);
invariant_notnull(put_callback); invariant_notnull(put_callback);
put_callback(value, p); put_callback(value, p);
} }
// Pair mutex (p->mutex) is may or may not be held on entry, // Pair mutex (p->mutex) is may or may not be held on entry,
// Holding the pair mutex on entry is not important // Holding the pair mutex on entry is not important
// for performance or corrrectness // for performance or corrrectness
// Pair is pinned on entry // Pair is pinned on entry
static void static void
...@@ -1061,10 +1051,41 @@ static void get_pairs( ...@@ -1061,10 +1051,41 @@ static void get_pairs(
} }
} }
// does NOT include the actual key and fullhash we eventually want
// a helper function for the two cachetable_put functions below
static inline PAIR malloc_and_init_pair(
CACHEFILE cachefile,
void *value,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback
)
{
CACHETABLE ct = cachefile->cachetable;
CACHEKEY dummy_key = {0};
uint32_t dummy_fullhash = 0;
PAIR XMALLOC(p);
memset(p, 0, sizeof *p);
pair_init(p,
cachefile,
dummy_key,
value,
attr,
CACHETABLE_DIRTY,
dummy_fullhash,
write_callback,
&ct->ev,
&ct->list
);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
return p;
}
void toku_cachetable_put_with_dep_pairs( void toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile, CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
void *value, void *value,
PAIR_ATTR attr, PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback, CACHETABLE_WRITE_CALLBACK write_callback,
void *get_key_and_fullhash_extra, void *get_key_and_fullhash_extra,
...@@ -1088,15 +1109,18 @@ void toku_cachetable_put_with_dep_pairs( ...@@ -1088,15 +1109,18 @@ void toku_cachetable_put_with_dep_pairs(
if (ct->ev.should_client_wake_eviction_thread()) { if (ct->ev.should_client_wake_eviction_thread()) {
ct->ev.signal_eviction_thread(); ct->ev.signal_eviction_thread();
} }
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
ct->list.write_list_lock(); ct->list.write_list_lock();
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
p->key.b = key->b;
p->fullhash = *fullhash;
cachetable_put_internal( cachetable_put_internal(
cachefile, cachefile,
*key, p,
*fullhash,
value, value,
attr, attr,
write_callback,
put_callback put_callback
); );
PAIR dependent_pairs[num_dependent_pairs]; PAIR dependent_pairs[num_dependent_pairs];
...@@ -1141,14 +1165,16 @@ void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, v ...@@ -1141,14 +1165,16 @@ void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, v
if (ct->ev.should_client_wake_eviction_thread()) { if (ct->ev.should_client_wake_eviction_thread()) {
ct->ev.signal_eviction_thread(); ct->ev.signal_eviction_thread();
} }
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
ct->list.write_list_lock(); ct->list.write_list_lock();
p->key.b = key.b;
p->fullhash = fullhash;
cachetable_put_internal( cachetable_put_internal(
cachefile, cachefile,
key, p,
fullhash,
value, value,
attr, attr,
write_callback,
put_callback put_callback
); );
ct->list.write_list_unlock(); ct->list.write_list_unlock();
...@@ -4475,7 +4501,6 @@ void ...@@ -4475,7 +4501,6 @@ void
toku_cachetable_helgrind_ignore(void) { toku_cachetable_helgrind_ignore(void) {
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss); TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime); TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches); TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions); TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions); TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
......
...@@ -528,7 +528,6 @@ uint64_t toku_cachefile_size(CACHEFILE cf); ...@@ -528,7 +528,6 @@ uint64_t toku_cachefile_size(CACHEFILE cf);
typedef enum { typedef enum {
CT_MISS = 0, CT_MISS = 0,
CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss
CT_PUTS, // how many times has a newly created node been put into the cachetable?
CT_PREFETCHES, // how many times has a block been prefetched into the cachetable? CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
CT_SIZE_CURRENT, // the sum of the sizes of the nodes represented in the cachetable CT_SIZE_CURRENT, // the sum of the sizes of the nodes represented in the cachetable
CT_SIZE_LIMIT, // the limit to the sum of the node sizes CT_SIZE_LIMIT, // the limit to the sum of the node sizes
......
...@@ -2064,6 +2064,7 @@ deserialize_and_upgrade_leaf_node(FTNODE node, ...@@ -2064,6 +2064,7 @@ deserialize_and_upgrade_leaf_node(FTNODE node,
static int static int
read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
DISKOFF offset, DISKOFF size,
FT h, FT h,
struct rbuf *rb, struct rbuf *rb,
/* out */ int *layout_version_p); /* out */ int *layout_version_p);
...@@ -2085,9 +2086,17 @@ deserialize_and_upgrade_ftnode(FTNODE node, ...@@ -2085,9 +2086,17 @@ deserialize_and_upgrade_ftnode(FTNODE node,
// I. First we need to de-compress the entire node, only then can // I. First we need to de-compress the entire node, only then can
// we read the different sub-sections. // we read the different sub-sections.
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(bfe->h->blocktable,
blocknum,
&offset,
&size);
struct rbuf rb; struct rbuf rb;
r = read_and_decompress_block_from_fd_into_rbuf(fd, r = read_and_decompress_block_from_fd_into_rbuf(fd,
blocknum, blocknum,
offset,
size,
bfe->h, bfe->h,
&rb, &rb,
&version); &version);
...@@ -2838,15 +2847,13 @@ decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_blo ...@@ -2838,15 +2847,13 @@ decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_blo
static int static int
read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
DISKOFF offset, DISKOFF size,
FT h, FT h,
struct rbuf *rb, struct rbuf *rb,
/* out */ int *layout_version_p) { /* out */ int *layout_version_p) {
int r = 0; int r = 0;
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
uint8_t *XMALLOC_N(size, raw_block); uint8_t *XMALLOC_N(size, raw_block);
{ {
// read the (partially compressed) block // read the (partially compressed) block
...@@ -2903,12 +2910,26 @@ int ...@@ -2903,12 +2910,26 @@ int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash,
ROLLBACK_LOG_NODE *logp, FT h) { ROLLBACK_LOG_NODE *logp, FT h) {
toku_trace("deserial start"); toku_trace("deserial start");
int layout_version = 0;
int r; int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0}; struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
int layout_version = 0; // get the file offset and block size for the block
r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, h, &rb, &layout_version); DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
// if the size is 0, then the blocknum is unused
if (size == 0) {
// blocknum is unused, just create an empty one and get out
ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log);
log->blocknum.b = blocknum.b;
log->hash = fullhash;
r = 0;
*logp = log;
goto cleanup;
}
r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, h, &rb, &layout_version);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
{ {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "txn_manager.h" #include "txn_manager.h"
#include <portability/toku_pthread.h> #include <portability/toku_pthread.h>
#include <util/omt.h> #include <util/omt.h>
#include "rollback_log_node_cache.h"
using namespace toku; using namespace toku;
// Locking for the logger // Locking for the logger
...@@ -93,6 +94,7 @@ struct tokulogger { ...@@ -93,6 +94,7 @@ struct tokulogger {
void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ... void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ...
void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted. void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted.
CACHEFILE rollback_cachefile; CACHEFILE rollback_cachefile;
rollback_log_node_cache rollback_cache;
TXN_MANAGER txn_manager; TXN_MANAGER txn_manager;
}; };
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "includes.h" #include "includes.h"
#include "txn_manager.h" #include "txn_manager.h"
#include "rollback_log_node_cache.h"
static const int log_format_version=TOKU_LOG_VERSION; static const int log_format_version=TOKU_LOG_VERSION;
...@@ -176,19 +177,29 @@ bool toku_logger_rollback_is_open (TOKULOGGER logger) { ...@@ -176,19 +177,29 @@ bool toku_logger_rollback_is_open (TOKULOGGER logger) {
return logger->rollback_cachefile != NULL; return logger->rollback_cachefile != NULL;
} }
#define MAX_CACHED_ROLLBACK_NODES 4096
void
toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
toku_free_unused_blocknums(ft->blocktable, ft->h->root_blocknum);
logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
}
int int
toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) { toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
assert(logger->is_open); assert(logger->is_open);
assert(!logger->rollback_cachefile); assert(!logger->rollback_cachefile);
FT_HANDLE t = NULL; // Note, there is no DB associated with this BRT. FT_HANDLE t = NULL; // Note, there is no DB associated with this BRT.
toku_ft_handle_create(&t); toku_ft_handle_create(&t);
int r = toku_ft_handle_open(t, ROLLBACK_CACHEFILE_NAME, create, create, cachetable, NULL_TXN); int r = toku_ft_handle_open(t, ROLLBACK_CACHEFILE_NAME, create, create, cachetable, NULL_TXN);
assert_zero(r); assert_zero(r);
logger->rollback_cachefile = t->ft->cf; logger->rollback_cachefile = t->ft->cf;
toku_logger_initialize_rollback_cache(logger, t->ft);
//Verify it is empty //Verify it is empty
//Must have no data blocks (rollback logs or otherwise). //Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->h->root_blocknum); toku_block_verify_no_data_blocks_except_root(t->ft->blocktable, t->ft->h->root_blocknum);
bool is_empty; bool is_empty;
is_empty = toku_ft_is_empty_fast(t); is_empty = toku_ft_is_empty_fast(t);
assert(is_empty); assert(is_empty);
...@@ -205,11 +216,13 @@ void toku_logger_close_rollback(TOKULOGGER logger) { ...@@ -205,11 +216,13 @@ void toku_logger_close_rollback(TOKULOGGER logger) {
if (cf) { if (cf) {
FT_HANDLE ft_to_close; FT_HANDLE ft_to_close;
{ //Find "brt" { //Find "brt"
logger->rollback_cache.destroy();
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
//Verify it is safe to close it. //Verify it is safe to close it.
assert(!ft->h->dirty); //Must not be dirty. assert(!ft->h->dirty); //Must not be dirty.
toku_free_unused_blocknums(ft->blocktable, ft->h->root_blocknum);
//Must have no data blocks (rollback logs or otherwise). //Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(ft->blocktable, ft->h->root_blocknum); toku_block_verify_no_data_blocks_except_root(ft->blocktable, ft->h->root_blocknum);
assert(!ft->h->dirty); assert(!ft->h->dirty);
ft_to_close = toku_ft_get_only_existing_ft_handle(ft); ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
{ {
......
...@@ -25,6 +25,7 @@ int toku_logger_open (const char *directory, TOKULOGGER logger); ...@@ -25,6 +25,7 @@ int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid); int toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid);
void toku_logger_shutdown(TOKULOGGER logger); void toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp); int toku_logger_close(TOKULOGGER *loggerp);
void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft);
int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create); int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create);
void toku_logger_close_rollback(TOKULOGGER logger); void toku_logger_close_rollback(TOKULOGGER logger);
bool toku_logger_rollback_is_open (TOKULOGGER); // return true iff the rollback is open. bool toku_logger_rollback_is_open (TOKULOGGER); // return true iff the rollback is open.
......
...@@ -33,8 +33,8 @@ void memarena_clear (MEMARENA ma) { ...@@ -33,8 +33,8 @@ void memarena_clear (MEMARENA ma) {
// Free the other bufs. // Free the other bufs.
int i; int i;
for (i=0; i<ma->n_other_bufs; i++) { for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]); toku_free(ma->other_bufs[i]);
ma->other_bufs[i]=0; ma->other_bufs[i]=0;
} }
ma->n_other_bufs=0; ma->n_other_bufs=0;
// But reuse the main buffer // But reuse the main buffer
...@@ -54,25 +54,25 @@ round_to_page (size_t size) { ...@@ -54,25 +54,25 @@ round_to_page (size_t size) {
void* malloc_in_memarena (MEMARENA ma, size_t size) { void* malloc_in_memarena (MEMARENA ma, size_t size) {
if (ma->buf_size < ma->buf_used + size) { if (ma->buf_size < ma->buf_used + size) {
// The existing block isn't big enough. // The existing block isn't big enough.
// Add the block to the vector of blocks. // Add the block to the vector of blocks.
if (ma->buf) { if (ma->buf) {
int old_n = ma->n_other_bufs; int old_n = ma->n_other_bufs;
REALLOC_N(old_n+1, ma->other_bufs); REALLOC_N(old_n+1, ma->other_bufs);
assert(ma->other_bufs); assert(ma->other_bufs);
ma->other_bufs[old_n]=ma->buf; ma->other_bufs[old_n]=ma->buf;
ma->n_other_bufs = old_n+1; ma->n_other_bufs = old_n+1;
ma->size_of_other_bufs += ma->buf_size; ma->size_of_other_bufs += ma->buf_size;
} }
// Make a new one // Make a new one
{ {
size_t new_size = 2*ma->buf_size; size_t new_size = 2*ma->buf_size;
if (new_size<size) new_size=size; if (new_size<size) new_size=size;
new_size=round_to_page(new_size); // at least size, but round to the next page size new_size=round_to_page(new_size); // at least size, but round to the next page size
XMALLOC_N(new_size, ma->buf); XMALLOC_N(new_size, ma->buf);
ma->buf_used = 0; ma->buf_used = 0;
ma->buf_size = new_size; ma->buf_size = new_size;
} }
} }
// allocate in the existing block. // allocate in the existing block.
char *result=ma->buf+ma->buf_used; char *result=ma->buf+ma->buf_used;
...@@ -89,12 +89,12 @@ void *memarena_memdup (MEMARENA ma, const void *v, size_t len) { ...@@ -89,12 +89,12 @@ void *memarena_memdup (MEMARENA ma, const void *v, size_t len) {
void memarena_close(MEMARENA *map) { void memarena_close(MEMARENA *map) {
MEMARENA ma=*map; MEMARENA ma=*map;
if (ma->buf) { if (ma->buf) {
toku_free(ma->buf); toku_free(ma->buf);
ma->buf=0; ma->buf=0;
} }
int i; int i;
for (i=0; i<ma->n_other_bufs; i++) { for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]); toku_free(ma->other_bufs[i]);
} }
if (ma->other_bufs) toku_free(ma->other_bufs); if (ma->other_bufs) toku_free(ma->other_bufs);
ma->other_bufs=0; ma->other_bufs=0;
...@@ -116,15 +116,15 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) { ...@@ -116,15 +116,15 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) {
REALLOC_N(dest->n_other_bufs + source->n_other_bufs + 1, other_bufs); REALLOC_N(dest->n_other_bufs + source->n_other_bufs + 1, other_bufs);
#if TOKU_WINDOWS_32 #if TOKU_WINDOWS_32
if (other_bufs == 0) { if (other_bufs == 0) {
char **new_other_bufs; char **new_other_bufs;
printf("_CrtCheckMemory:%d\n", _CrtCheckMemory()); printf("_CrtCheckMemory:%d\n", _CrtCheckMemory());
printf("Z: move_counter:%d dest:%p %p %d source:%p %p %d errno:%d\n", printf("Z: move_counter:%d dest:%p %p %d source:%p %p %d errno:%d\n",
move_counter, move_counter,
dest, dest->other_bufs, dest->n_other_bufs, dest, dest->other_bufs, dest->n_other_bufs,
source, source->other_bufs, source->n_other_bufs, source, source->other_bufs, source->n_other_bufs,
errno); errno);
new_other_bufs = toku_malloc((dest->n_other_bufs + source->n_other_bufs + 1)*sizeof (char **)); new_other_bufs = toku_malloc((dest->n_other_bufs + source->n_other_bufs + 1)*sizeof (char **));
printf("new_other_bufs=%p errno=%d\n", new_other_bufs, errno); printf("new_other_bufs=%p errno=%d\n", new_other_bufs, errno);
} }
#endif #endif
...@@ -134,7 +134,7 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) { ...@@ -134,7 +134,7 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) {
assert(other_bufs); assert(other_bufs);
dest->other_bufs = other_bufs; dest->other_bufs = other_bufs;
for (i=0; i<source->n_other_bufs; i++) { for (i=0; i<source->n_other_bufs; i++) {
dest->other_bufs[dest->n_other_bufs++] = source->other_bufs[i]; dest->other_bufs[dest->n_other_bufs++] = source->other_bufs[i];
} }
dest->other_bufs[dest->n_other_bufs++] = source->buf; dest->other_bufs[dest->n_other_bufs++] = source->buf;
source->n_other_bufs = 0; source->n_other_bufs = 0;
......
...@@ -420,6 +420,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re ...@@ -420,6 +420,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re
toku_ft_handle_create(&t); toku_ft_handle_create(&t);
r = toku_ft_handle_open_recovery(t, ROLLBACK_CACHEFILE_NAME, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn); r = toku_ft_handle_open_recovery(t, ROLLBACK_CACHEFILE_NAME, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn);
renv->logger->rollback_cachefile = t->ft->cf; renv->logger->rollback_cachefile = t->ft->cf;
toku_logger_initialize_rollback_cache(renv->logger, t->ft);
} else { } else {
r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn); r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn);
assert(r==0); assert(r==0);
......
...@@ -122,7 +122,18 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { ...@@ -122,7 +122,18 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
txn->roll_info.spilled_rollback_head_hash = next_log_hash; txn->roll_info.spilled_rollback_head_hash = next_log_hash;
} }
} }
toku_rollback_log_unpin_and_remove(txn, log); bool give_back = false;
// each txn tries to give back at most one rollback log node
// to the cache.
if (next_log.b == ROLLBACK_NONE.b) {
give_back = txn->logger->rollback_cache.give_rollback_log_node(
txn,
log
);
}
if (!give_back) {
toku_rollback_log_unpin_and_remove(txn, log);
}
} }
return r; return r;
} }
...@@ -183,7 +194,17 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { ...@@ -183,7 +194,17 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed. // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena); memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
} }
toku_rollback_log_unpin_and_remove(txn, child_log); // each txn tries to give back at most one rollback log node
// to the cache. All other rollback log nodes for this child
// transaction are included in the parent's rollback log,
// so this is the only node we can give back to the cache
bool give_back = txn->logger->rollback_cache.give_rollback_log_node(
txn,
child_log
);
if (!give_back) {
toku_rollback_log_unpin_and_remove(txn, child_log);
}
txn->roll_info.current_rollback = ROLLBACK_NONE; txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0; txn->roll_info.current_rollback_hash = 0;
......
...@@ -14,37 +14,58 @@ ...@@ -14,37 +14,58 @@
#include "rollback.h" #include "rollback.h"
// Address used as a sentinel. Otherwise unused.
static struct serialized_rollback_log_node cloned_rollback;
// Cleanup the rollback memory // Cleanup the rollback memory
static void static void
rollback_log_destroy(ROLLBACK_LOG_NODE log) { rollback_log_destroy(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena); make_rollback_log_empty(log);
toku_free(log); toku_free(log);
} }
// Write something out. Keep trying even if partial writes occur. // flush an ununused log to disk, by allocating a size 0 blocknum in
// On error: Return negative with errno set. // the blocktable
// On success return nbytes. static void
void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, toku_rollback_flush_unused_log(
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, ROLLBACK_LOG_NODE log,
bool write_me, bool keep_me, bool for_checkpoint, bool is_clone) { BLOCKNUM logname,
ROLLBACK_LOG_NODE log = nullptr; int fd,
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr; FT ft,
if (is_clone) { bool write_me,
CAST_FROM_VOIDP(serialized, rollback_v); bool keep_me,
invariant(serialized->blocknum.b == logname.b); bool for_checkpoint,
bool is_clone
)
{
if (write_me) {
DISKOFF offset;
toku_blocknum_realloc_on_disk(ft->blocktable, logname, 0, &offset,
ft, fd, for_checkpoint);
} }
else { if (!keep_me && !is_clone) {
CAST_FROM_VOIDP(log, rollback_v); toku_free(log);
invariant(log->blocknum.b == logname.b);
} }
FT CAST_FROM_VOIDP(h, extraargs); }
// flush a used log to disk by serializing and writing the node out
static void
toku_rollback_flush_used_log (
ROLLBACK_LOG_NODE log,
SERIALIZED_ROLLBACK_LOG_NODE serialized,
int fd,
FT ft,
bool write_me,
bool keep_me,
bool for_checkpoint,
bool is_clone
)
{
if (write_me) { if (write_me) {
assert(h->cf == cachefile); int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, ft, for_checkpoint);
int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, h, for_checkpoint);
assert(r == 0); assert(r == 0);
} }
*new_size = size;
if (!keep_me) { if (!keep_me) {
if (is_clone) { if (is_clone) {
toku_serialized_rollback_log_destroy(serialized); toku_serialized_rollback_log_destroy(serialized);
...@@ -55,12 +76,69 @@ void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname ...@@ -55,12 +76,69 @@ void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname
} }
} }
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
void toku_rollback_flush_callback (
CACHEFILE UU(cachefile),
int fd,
BLOCKNUM logname,
void *rollback_v,
void** UU(disk_data),
void *extraargs,
PAIR_ATTR size,
PAIR_ATTR* new_size,
bool write_me,
bool keep_me,
bool for_checkpoint,
bool is_clone
)
{
ROLLBACK_LOG_NODE log = nullptr;
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
bool is_unused = false;
if (is_clone) {
is_unused = (rollback_v == &cloned_rollback);
CAST_FROM_VOIDP(serialized, rollback_v);
}
else {
CAST_FROM_VOIDP(log, rollback_v);
is_unused = rollback_log_is_unused(log);
}
*new_size = size;
FT ft;
CAST_FROM_VOIDP(ft, extraargs);
if (is_unused) {
toku_rollback_flush_unused_log(
log,
logname,
fd,
ft,
write_me,
keep_me,
for_checkpoint,
is_clone
);
}
else {
toku_rollback_flush_used_log(
log,
serialized,
fd,
ft,
write_me,
keep_me,
for_checkpoint,
is_clone
);
}
}
int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash, int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) { void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r; int r;
FT CAST_FROM_VOIDP(h, extraargs); FT CAST_FROM_VOIDP(h, extraargs);
assert(h->cf == cachefile); assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv; ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h); r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
if (r==0) { if (r==0) {
...@@ -130,11 +208,15 @@ void toku_rollback_clone_callback( ...@@ -130,11 +208,15 @@ void toku_rollback_clone_callback(
) )
{ {
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data); ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
SERIALIZED_ROLLBACK_LOG_NODE XMALLOC(serialized); SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
if (!rollback_log_is_unused(log)) {
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized); XMALLOC(serialized);
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized);
*cloned_value_data = serialized;
}
else {
*cloned_value_data = &cloned_rollback;
}
new_attr->is_valid = false; new_attr->is_valid = false;
*cloned_value_data = serialized;
} }
...@@ -6,18 +6,18 @@ ...@@ -6,18 +6,18 @@
#include "includes.h" #include "includes.h"
#include "rollback-ct-callbacks.h" #include "rollback-ct-callbacks.h"
#include <inttypes.h>
static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) { static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
FT CAST_FROM_VOIDP(h, extra); FT CAST_FROM_VOIDP(h, extra);
toku_free_blocknum( toku_free_blocknum(
h->blocktable, h->blocktable,
cachekey, cachekey,
h, h,
for_checkpoint for_checkpoint
); );
} }
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) { void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r; int r;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
...@@ -46,19 +46,21 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) ...@@ -46,19 +46,21 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len)
static inline PAIR_ATTR make_rollback_pair_attr(long size) { static inline PAIR_ATTR make_rollback_pair_attr(long size) {
PAIR_ATTR result={ PAIR_ATTR result={
.size = size, .size = size,
.nonleaf_size = 0, .nonleaf_size = 0,
.leaf_size = 0, .leaf_size = 0,
.rollback_size = size, .rollback_size = size,
.cache_pressure_size = 0, .cache_pressure_size = 0,
.is_valid = true .is_valid = true
}; };
return result; return result;
} }
PAIR_ATTR PAIR_ATTR
rollback_memory_size(ROLLBACK_LOG_NODE log) { rollback_memory_size(ROLLBACK_LOG_NODE log) {
size_t size = sizeof(*log); size_t size = sizeof(*log);
size += memarena_total_memory_size(log->rollentry_arena); if (log->rollentry_arena) {
size += memarena_total_memory_size(log->rollentry_arena);
}
return make_rollback_pair_attr(size); return make_rollback_pair_attr(size);
} }
...@@ -67,37 +69,78 @@ static void toku_rollback_node_save_ct_pair(void *value_data, PAIR p) { ...@@ -67,37 +69,78 @@ static void toku_rollback_node_save_ct_pair(void *value_data, PAIR p) {
log->ct_pair = p; log->ct_pair = p;
} }
// create and pin a new rollback log node. chain it to the other rollback nodes //
// by providing a previous blocknum/ hash and assigning the new rollback log // initializes an empty rollback log node
// node the next sequence number // Does not touch the blocknum or hash, that is the
static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previous_hash, ROLLBACK_LOG_NODE *result) { // responsibility of the caller
ROLLBACK_LOG_NODE MALLOC(log); //
assert(log); void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
// Having a txnid set to TXNID_NONE is how we determine if the
CACHEFILE cf = txn->logger->rollback_cachefile; // rollback log node is empty or in use.
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf)); log->txnid = TXNID_NONE;
log->layout_version = FT_LAYOUT_VERSION; log->layout_version = FT_LAYOUT_VERSION;
log->layout_version_original = FT_LAYOUT_VERSION; log->layout_version_original = FT_LAYOUT_VERSION;
log->layout_version_read_from_disk = FT_LAYOUT_VERSION; log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
log->dirty = true; log->dirty = true;
log->sequence = 0;
log->previous = {0};
log->previous_hash = 0;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = NULL;
log->rollentry_resident_bytecount = 0;
}
static void rollback_initialize_for_txn(
ROLLBACK_LOG_NODE log,
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash
)
{
log->txnid = txn->txnid64; log->txnid = txn->txnid64;
log->sequence = txn->roll_info.num_rollback_nodes++; log->sequence = txn->roll_info.num_rollback_nodes++;
toku_allocate_blocknum(h->blocktable, &log->blocknum, h); log->previous = previous;
log->hash = toku_cachetable_hash(cf, log->blocknum);
log->previous = previous;
log->previous_hash = previous_hash; log->previous_hash = previous_hash;
log->oldest_logentry = NULL; log->oldest_logentry = NULL;
log->newest_logentry = NULL; log->newest_logentry = NULL;
log->rollentry_arena = memarena_create(); log->rollentry_arena = memarena_create();
log->rollentry_resident_bytecount = 0; log->rollentry_resident_bytecount = 0;
}
void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
rollback_empty_log_init(log);
}
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash,
ROLLBACK_LOG_NODE *result
)
{
ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log);
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
rollback_initialize_for_txn(log, txn, previous, previous_hash);
toku_allocate_blocknum(ft->blocktable, &log->blocknum, ft);
log->hash = toku_cachetable_hash(ft->cf, log->blocknum);
*result = log; *result = log;
toku_cachetable_put(cf, log->blocknum, log->hash, toku_cachetable_put(cf, log->blocknum, log->hash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(h), get_write_callbacks_for_rollback_log(ft),
toku_rollback_node_save_ct_pair); toku_rollback_node_save_ct_pair);
txn->roll_info.current_rollback = log->blocknum; txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback_hash = log->hash; txn->roll_info.current_rollback_hash = log->hash;
} }
...@@ -211,26 +254,50 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash ...@@ -211,26 +254,50 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
toku_rollback_fetch_callback, toku_rollback_fetch_callback,
toku_rollback_pf_req_callback, toku_rollback_pf_req_callback,
toku_rollback_pf_callback, toku_rollback_pf_callback,
PL_WRITE_EXPENSIVE, // lock_type PL_WRITE_CHEAP, // lock_type
h, h,
0, NULL, NULL, NULL, NULL 0, NULL, NULL, NULL, NULL
); );
assert(r == 0); assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value); ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
assert(pinned_log->blocknum.b == blocknum.b); assert(pinned_log->blocknum.b == blocknum.b);
assert(pinned_log->hash == hash);
*log = pinned_log; *log = pinned_log;
} }
void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) { void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
ROLLBACK_LOG_NODE pinned_log; ROLLBACK_LOG_NODE pinned_log = NULL;
invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
if (txn_has_current_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log); toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1); toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
} else { } else {
// create a new log for this transaction to use. // For each transaction, we try to acquire the first rollback log
// this call asserts success internally // from the rollback log node cache, so that we avoid
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log); // putting something new into the cachetable. However,
// if transaction has spilled rollbacks, that means we
// have already done a lot of work for this transaction,
// and subsequent rollback log nodes are created
// and put into the cachetable. The idea is for
// transactions that don't do a lot of work to (hopefully)
// get a rollback log node from a cache, as opposed to
// taking the more expensive route of creating a new one.
if (!txn_has_spilled_rollback_logs(txn)) {
txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
if (pinned_log != NULL) {
rollback_initialize_for_txn(
pinned_log,
txn,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.spilled_rollback_tail_hash
);
txn->roll_info.current_rollback = pinned_log->blocknum;
txn->roll_info.current_rollback_hash = pinned_log->hash;
}
}
if (pinned_log == NULL) {
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
}
} }
assert(pinned_log->txnid == txn->txnid64); assert(pinned_log->txnid == txn->txnid64);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b); assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
......
...@@ -101,5 +101,12 @@ toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) { ...@@ -101,5 +101,12 @@ toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
toku_free(log); toku_free(log);
} }
void rollback_empty_log_init(ROLLBACK_LOG_NODE log);
void make_rollback_log_empty(ROLLBACK_LOG_NODE log);
static inline bool rollback_log_is_unused(ROLLBACK_LOG_NODE log) {
return (log->txnid == TXNID_NONE);
}
#endif // TOKU_ROLLBACK_H #endif // TOKU_ROLLBACK_H
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: rollback.cc 49033 2012-10-17 18:48:30Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "rollback_log_node_cache.h"
void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
XMALLOC_N(max_num_avail_nodes, m_avail_blocknums);
XMALLOC_N(max_num_avail_nodes, m_hashes);
m_max_num_avail = max_num_avail_nodes;
m_first = 0;
m_num_avail = 0;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
toku_mutex_init(&m_mutex, &attr);
}
void rollback_log_node_cache::destroy() {
toku_mutex_destroy(&m_mutex);
toku_free(m_avail_blocknums);
toku_free(m_hashes);
}
// returns true if rollback log node was successfully added,
// false otherwise
bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log){
bool retval = false;
toku_mutex_lock(&m_mutex);
if (m_num_avail < m_max_num_avail) {
retval = true;
uint32_t index = m_first + m_num_avail;
if (index > m_max_num_avail) {
index -= m_max_num_avail;
}
m_avail_blocknums[index].b = log->blocknum.b;
m_hashes[index] = log->hash;
m_num_avail++;
}
toku_mutex_unlock(&m_mutex);
//
// now unpin the rollback log node
//
if (retval) {
make_rollback_log_empty(log);
toku_rollback_log_unpin(txn, log);
}
return retval;
}
// if a rollback log node is available, will set log to it,
// otherwise, will set log to NULL and caller is on his own
// for getting a rollback log node
void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){
BLOCKNUM b = ROLLBACK_NONE;
uint32_t hash;
toku_mutex_lock(&m_mutex);
if (m_num_avail > 0) {
b.b = m_avail_blocknums[m_first].b;
hash = m_hashes[m_first];
m_num_avail--;
if (++m_first >= m_max_num_avail) {
m_first = 0;
}
}
toku_mutex_unlock(&m_mutex);
if (b.b != ROLLBACK_NONE.b) {
toku_get_and_pin_rollback_log(txn, b, hash, log);
invariant(rollback_log_is_unused(*log));
} else {
*log = NULL;
}
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef TOKU_ROLLBACK_LOG_NODE_CACHE_H
#define TOKU_ROLLBACK_LOG_NODE_CACHE_H
#ident "$Id: rollback.h 49033 2012-10-17 18:48:30Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
class rollback_log_node_cache {
public:
void init (uint32_t max_num_avail_nodes);
void destroy();
// returns true if rollback log node was successfully added,
// false otherwise
bool give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// if a rollback log node is available, will set log to it,
// otherwise, will set log to NULL and caller is on his own
// for getting a rollback log node
void get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log);
private:
BLOCKNUM* m_avail_blocknums;
uint32_t* m_hashes;
uint32_t m_first;
uint32_t m_num_avail;
uint32_t m_max_num_avail;
toku_mutex_t m_mutex;
};
ENSURE_POD(rollback_log_node_cache);
#endif // TOKU_ROLLBACK_LOG_NODE_CACHE_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment