Commit b6b8dd84 authored by Yoni Fogel's avatar Yoni Fogel

[t:4905] closes #4905 Merge 4905b branch to main (removes pwrite lock)

git-svn-id: file:///svn/toku/tokudb@44315 c7de825b-a66e-492c-adef-691d508d4ae1
parent aa88bd3f
......@@ -80,7 +80,7 @@ set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -g3 -ggdb -O0")
set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG FORTIFY_SOURCE=2)
## set extra release flags, we overwrite this because the default passes -DNDEBUG and we don't want that
set(CMAKE_C_FLAGS_RELEASE "-O3")
set(CMAKE_C_FLAGS_RELEASE "-g3 -ggdb -O3")
## check how to do inter-procedural optimization
check_c_compiler_flag(-flto HAVE_CC_FLAG_FLTO)
......
......@@ -40,7 +40,6 @@ set(FT_SOURCES
ft_node-serialize.c
ft-node-deserialize.c
ft-ops.c
ft-pwrite.c
ft-serialize.c
ft-test-helpers.c
ft-verify.c
......
......@@ -14,6 +14,7 @@
#include "block_allocator.h"
#include "rbuf.h"
#include "wbuf.h"
#include "nonblocking_mutex.h"
//When the translation (btt) is stored on disk:
// In Header:
......@@ -71,8 +72,10 @@ struct block_table {
// Note: This is *allocation* not *translation*. The block_allocator is unaware of which blocks are used for which translation, but simply allocates and deallocates blocks.
BLOCK_ALLOCATOR block_allocator;
toku_mutex_t mutex;
BOOL checkpoint_skipped;
BOOL checkpoint_failed;
struct nb_mutex safe_file_size_lock;
bool checkpoint_skipped;
bool checkpoint_failed;
uint64_t safe_file_size;
};
//forward decls
......@@ -97,19 +100,34 @@ ft_set_dirty(FT ft, BOOL for_checkpoint){
}
static void
maybe_truncate_cachefile(BLOCK_TABLE bt, int fd, FT h, u_int64_t size_needed_before) {
maybe_truncate_file(BLOCK_TABLE bt, int fd, u_int64_t size_needed_before) {
assert(toku_mutex_is_locked(&bt->mutex));
u_int64_t new_size_needed = block_allocator_allocated_limit(bt->block_allocator);
//Save a call to toku_os_get_file_size (kernel call) if unlikely to be useful.
if (new_size_needed < size_needed_before)
toku_maybe_truncate_cachefile(h->cf, fd, new_size_needed);
if (new_size_needed < size_needed_before && new_size_needed < bt->safe_file_size) {
nb_mutex_lock(&bt->safe_file_size_lock, &bt->mutex);
// Must hold safe_file_size_lock to change safe_file_size.
if (new_size_needed < bt->safe_file_size) {
int64_t safe_file_size_before = bt->safe_file_size;
// Not safe to use the 'to-be-truncated' portion until truncate is done.
bt->safe_file_size = new_size_needed;
unlock_for_blocktable(bt);
uint64_t size_after;
toku_maybe_truncate_file(fd, new_size_needed, safe_file_size_before, &size_after);
lock_for_blocktable(bt);
bt->safe_file_size = size_after;
}
nb_mutex_unlock(&bt->safe_file_size_lock);
}
}
void
toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, int fd, FT h) {
toku_maybe_truncate_file_on_open(BLOCK_TABLE bt, int fd) {
lock_for_blocktable(bt);
u_int64_t size_needed = block_allocator_allocated_limit(bt->block_allocator);
toku_maybe_truncate_cachefile(h->cf, fd, size_needed);
maybe_truncate_file(bt, fd, bt->safe_file_size);
unlock_for_blocktable(bt);
}
......@@ -255,7 +273,7 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
// move inprogress to checkpoint (resetting type)
// inprogress = NULL
void
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd, FT h) {
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd) {
// Free unused blocks
lock_for_blocktable(bt);
u_int64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
......@@ -284,7 +302,7 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
bt->checkpointed = bt->inprogress;
bt->checkpointed.type = TRANSLATION_CHECKPOINTED;
memset(&bt->inprogress, 0, sizeof(bt->inprogress));
maybe_truncate_cachefile(bt, fd, h, allocated_limit_at_start);
maybe_truncate_file(bt, fd, allocated_limit_at_start);
end:
unlock_for_blocktable(bt);
}
......@@ -333,14 +351,14 @@ unlock_for_blocktable (BLOCK_TABLE bt) {
}
void
toku_ft_lock (FT h) {
BLOCK_TABLE bt = h->blocktable;
toku_ft_lock (FT ft) {
BLOCK_TABLE bt = ft->blocktable;
lock_for_blocktable(bt);
}
void
toku_ft_unlock (FT h) {
BLOCK_TABLE bt = h->blocktable;
toku_ft_unlock (FT ft) {
BLOCK_TABLE bt = ft->blocktable;
assert(toku_mutex_is_locked(&bt->mutex));
unlock_for_blocktable(bt);
}
......@@ -384,9 +402,9 @@ translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_tra
}
static void
blocknum_realloc_on_disk_internal (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT h, BOOL for_checkpoint) {
blocknum_realloc_on_disk_internal (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT ft, BOOL for_checkpoint) {
assert(toku_mutex_is_locked(&bt->mutex));
ft_set_dirty(h, for_checkpoint);
ft_set_dirty(ft, for_checkpoint);
struct translation *t = &bt->current;
struct block_translation_pair old_pair = t->block_translation[b.b];
......@@ -415,12 +433,34 @@ PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.di
}
}
static void
ensure_safe_write_unlocked(BLOCK_TABLE bt, int fd, DISKOFF block_size, DISKOFF block_offset) {
// Requires: holding bt->mutex
uint64_t size_needed = block_size + block_offset;
if (size_needed > bt->safe_file_size) {
// Must hold safe_file_size_lock to change safe_file_size.
nb_mutex_lock(&bt->safe_file_size_lock, &bt->mutex);
if (size_needed > bt->safe_file_size) {
unlock_for_blocktable(bt);
int64_t size_after;
toku_maybe_preallocate_in_file(fd, size_needed, bt->safe_file_size, &size_after);
lock_for_blocktable(bt);
bt->safe_file_size = size_after;
}
nb_mutex_unlock(&bt->safe_file_size_lock);
}
}
void
toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT h, BOOL for_checkpoint) {
toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT ft, int fd, BOOL for_checkpoint) {
lock_for_blocktable(bt);
struct translation *t = &bt->current;
verify_valid_freeable_blocknum(t, b);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, for_checkpoint);
blocknum_realloc_on_disk_internal(bt, b, size, offset, ft, for_checkpoint);
ensure_safe_write_unlocked(bt, fd, size, *offset);
unlock_for_blocktable(bt);
}
......@@ -448,7 +488,7 @@ PRNTF("blokAllokator", 1L, size, offset, bt);
//Fills wbuf with bt
//A clean shutdown runs checkpoint start so that current and inprogress are copies.
void
toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w,
toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, int fd, struct wbuf *w,
int64_t *address, int64_t *size) {
lock_for_blocktable(bt);
struct translation *t = &bt->inprogress;
......@@ -477,6 +517,8 @@ toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w,
wbuf_int(w, checksum);
*address = t->block_translation[b.b].u.diskoff;
*size = t->block_translation[b.b].size;
ensure_safe_write_unlocked(bt, fd, *size, *address);
unlock_for_blocktable(bt);
}
......@@ -517,7 +559,7 @@ maybe_expand_translation (struct translation *t) {
}
void
toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT h) {
toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT ft) {
assert(toku_mutex_is_locked(&bt->mutex));
BLOCKNUM result;
struct translation * t = &bt->current;
......@@ -539,13 +581,13 @@ toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT h) {
t->block_translation[result.b].size = 0;
verify_valid_freeable_blocknum(t, result);
*res = result;
ft_set_dirty(h, FALSE);
ft_set_dirty(ft, FALSE);
}
void
toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT h) {
toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT ft) {
lock_for_blocktable(bt);
toku_allocate_blocknum_unlocked(bt, res, h);
toku_allocate_blocknum_unlocked(bt, res, ft);
unlock_for_blocktable(bt);
}
......@@ -563,7 +605,7 @@ free_blocknum_in_translation(struct translation *t, BLOCKNUM b)
}
static void
free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, FT h, BOOL for_checkpoint) {
free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, BOOL for_checkpoint) {
// Effect: Free a blocknum.
// If the blocknum holds the only reference to a block on disk, free that block
assert(toku_mutex_is_locked(&bt->mutex));
......@@ -574,7 +616,7 @@ free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, FT h, BOOL for_checkpoint)
free_blocknum_in_translation(&bt->current, b);
if (for_checkpoint) {
assert(h->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
free_blocknum_in_translation(&bt->inprogress, b);
}
......@@ -590,13 +632,13 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt);
}
}
else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused);
ft_set_dirty(h, for_checkpoint);
ft_set_dirty(ft, for_checkpoint);
}
void
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, FT h, BOOL for_checkpoint) {
toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, BOOL for_checkpoint) {
lock_for_blocktable(bt);
free_blocknum_unlocked(bt, bp, h, for_checkpoint);
free_blocknum_unlocked(bt, bp, ft, for_checkpoint);
unlock_for_blocktable(bt);
}
......@@ -689,6 +731,7 @@ toku_blocktable_destroy(BLOCK_TABLE *btp) {
destroy_block_allocator(&bt->block_allocator);
blocktable_lock_destroy(bt);
nb_mutex_destroy(&bt->safe_file_size_lock);
toku_free(bt);
}
......@@ -696,9 +739,9 @@ toku_blocktable_destroy(BLOCK_TABLE *btp) {
static BLOCK_TABLE
blocktable_create_internal (void) {
// Effect: Fill it in, including the translation table, which is uninitialized
BLOCK_TABLE XMALLOC(bt);
memset(bt, 0, sizeof(*bt));
BLOCK_TABLE XCALLOC(bt);
blocktable_lock_init(bt);
nb_mutex_init(&bt->safe_file_size_lock);
//There are two headers, so we reserve space for two.
u_int64_t reserve_per_header = BLOCK_ALLOCATOR_HEADER_RESERVE;
......@@ -806,7 +849,8 @@ blocktable_note_translation (BLOCK_ALLOCATOR allocator, struct translation *t) {
// place and then setting current (which is never stored on disk) for current use.
// The translation_buffer has translation only, we create the rest of the block_table.
enum deserialize_error_code
toku_blocktable_create_from_buffer(BLOCK_TABLE *btp,
toku_blocktable_create_from_buffer(int fd,
BLOCK_TABLE *btp,
DISKOFF location_on_disk, //Location of translation_buffer
DISKOFF size_on_disk,
unsigned char *translation_buffer) {
......@@ -818,6 +862,13 @@ toku_blocktable_create_from_buffer(BLOCK_TABLE *btp,
blocktable_note_translation(bt->block_allocator, &bt->checkpointed);
// we just filled in checkpointed, now copy it to current.
copy_translation(&bt->current, &bt->checkpointed, TRANSLATION_CURRENT);
int64_t file_size;
int r = toku_os_get_file_size(fd, &file_size);
lazy_assert_zero(r);
invariant(file_size >= 0);
bt->safe_file_size = file_size;
*btp = bt;
exit:
return e;
......@@ -893,16 +944,18 @@ toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, in
}
void
toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h) {
toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT ft) {
assert(toku_mutex_is_locked(&bt->mutex));
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, FALSE);
blocknum_realloc_on_disk_internal(bt, b, size, offset, ft, FALSE);
}
void
toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h) {
toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT ft, int fd) {
lock_for_blocktable(bt);
toku_realloc_descriptor_on_disk_unlocked(bt, size, offset, h);
toku_realloc_descriptor_on_disk_unlocked(bt, size, offset, ft);
ensure_safe_write_unlocked(bt, fd, size, *offset);
unlock_for_blocktable(bt);
}
......
......@@ -24,17 +24,17 @@ struct block_translation_pair {
};
void toku_blocktable_create_new(BLOCK_TABLE *btp);
enum deserialize_error_code toku_blocktable_create_from_buffer(BLOCK_TABLE *btp, DISKOFF location_on_disk, DISKOFF size_on_disk, unsigned char *translation_buffer);
enum deserialize_error_code toku_blocktable_create_from_buffer(int fd, BLOCK_TABLE *btp, DISKOFF location_on_disk, DISKOFF size_on_disk, unsigned char *translation_buffer);
void toku_blocktable_destroy(BLOCK_TABLE *btp);
void toku_ft_lock(FT h);
void toku_ft_unlock(FT h);
void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, int fd, FT h);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, int fd);
void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt);
void toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, int fd, FT h);
void toku_maybe_truncate_file_on_open(BLOCK_TABLE bt, int fd);
//Blocknums
void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT h);
......@@ -43,16 +43,16 @@ void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, FT h, BOOL for_checkpoint);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h, int fd);
void toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h);
void toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size);
//Blocks and Blocknums
void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT h, BOOL for_checkpoint);
void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, FT ft, int fd, BOOL for_checkpoint);
void toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size);
//Serialization
void toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size);
void toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, int fd, struct wbuf *w, int64_t *address, int64_t *size);
void toku_block_table_swap_for_redirect(BLOCK_TABLE old_bt, BLOCK_TABLE new_bt);
......
......@@ -756,16 +756,6 @@ toku_cachefile_get_fd (CACHEFILE cf) {
return cf->fd;
}
int
toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) {
int r;
r = ftruncate(cf->fd, new_size);
if (r != 0) {
r = errno;
}
return r;
}
static CACHEFILE remove_cf_from_list_locked (CACHEFILE cf, CACHEFILE list) {
if (list==0) return 0;
else if (list==cf) {
......@@ -1188,7 +1178,7 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
cachetable_change_pair_attr(ct, old_attr, new_attr);
}
}
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
// the pair is no longer dirty once written
p->dirty = CACHETABLE_CLEAN;
......@@ -1202,7 +1192,7 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove, BOOL* destroyed) {
p->cq = 0;
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
if (do_remove) {
cachetable_maybe_remove_and_free_pair(ct, p, destroyed);
}
......@@ -1338,7 +1328,7 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
}
......@@ -1437,7 +1427,7 @@ static void maybe_flush_some (CACHETABLE ct, long size) {
// set up a completion queue.
// So, a completion queue cannot exist
assert(!curr_in_clock->cq);
nb_mutex_write_unlock(&curr_in_clock->value_nb_mutex);
nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
}
}
else {
......@@ -1643,7 +1633,7 @@ static void checkpoint_cloned_pair(WORKITEM wi) {
&new_attr,
TRUE //is_clone
);
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
ct->n_checkpoint_clones_running--;
if (ct->n_checkpoint_clones_running == 0) {
toku_cond_broadcast(&ct->clones_background_wait);
......@@ -1737,7 +1727,7 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
// now release value_nb_mutex, before we write the PAIR out
// so that the PAIR is available to client threads
nb_mutex_write_unlock(&p->value_nb_mutex); // didn't call cachetable_write_pair so we have to unlock it ourselves.
nb_mutex_unlock(&p->value_nb_mutex); // didn't call cachetable_write_pair so we have to unlock it ourselves.
if (p->clone_callback) {
// note that pending lock is not needed here because
// we KNOW we are in the middle of a checkpoint
......@@ -1750,7 +1740,7 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
&attr,
TRUE //is_clone
);
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
}
}
else {
......@@ -1767,7 +1757,7 @@ write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
}
}
......@@ -1950,7 +1940,7 @@ do_partial_fetch(
p->attr = new_attr;
cachetable_change_pair_attr(ct, old_attr, new_attr);
p->state = CTPAIR_IDLE;
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
// that means the caller requests continued
......@@ -1964,7 +1954,7 @@ do_partial_fetch(
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
}
}
......@@ -1990,7 +1980,7 @@ void toku_cachetable_pf_pinned_pair(
cachetable_unlock(cf->cachetable);
pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
cachetable_lock(cf->cachetable);
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
cachetable_unlock(cf->cachetable);
}
......@@ -2077,7 +2067,7 @@ static void cachetable_fetch_pair(
p->attr = attr;
cachetable_add_pair_attr(ct, attr);
p->state = CTPAIR_IDLE;
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
if (keep_pair_locked) {
// if the caller wants the pair to remain locked
// that means the caller requests continued
......@@ -2091,7 +2081,7 @@ static void cachetable_fetch_pair(
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
}
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
......@@ -2374,7 +2364,7 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
// So, we should assert that a completion queue does not
// exist
assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
if (dirty) p->dirty = CACHETABLE_DIRTY;
if (attr.is_valid) {
PAIR_ATTR old_attr = p->attr;
......@@ -2528,7 +2518,7 @@ int toku_cachetable_get_and_pin_nonblocking (
workqueue_enq(p->cq, &p->asyncwork, 1);
}
else {
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
cachetable_unlock(ct);
return TOKUDB_TRY_AGAIN;
......@@ -2686,7 +2676,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// sanity check, we already have an assert
// before locking the PAIR
assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
}
}
cachetable_unlock(ct);
......@@ -2987,7 +2977,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
PAIR p = workitem_arg(wi);
p->cq = 0;
//Some other thread owned the lock, but transferred ownership to the thread executing this function
nb_mutex_write_unlock(&p->value_nb_mutex); //Release the lock, no one has a pin, per our assumptions above.
nb_mutex_unlock(&p->value_nb_mutex); //Release the lock, no one has a pin, per our assumptions above.
BOOL destroyed;
cachetable_maybe_remove_and_free_pair(ct, p, &destroyed);
}
......@@ -3113,8 +3103,8 @@ int toku_cachetable_unpin_and_remove (
// we must not have a completion queue
// lying around, as we may create one now
assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_write_unlock(&p->disk_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->disk_nb_mutex);
//
// As of Dr. Noga, only these threads may be
// blocked waiting to lock this PAIR:
......@@ -3193,7 +3183,7 @@ int toku_cachetable_unpin_and_remove (
// make sure that our assumption is valid.
assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0);
nb_mutex_write_unlock(&p->value_nb_mutex);
nb_mutex_unlock(&p->value_nb_mutex);
// Because we assume it is just the checkpoint thread
// that may have been blocked (as argued above),
// it is safe to simply remove the PAIR from the
......@@ -3924,7 +3914,7 @@ toku_cleaner_thread (void *cachetable_v)
// don't need to unlock it if the cleaner callback is called.
if (!cleaner_callback_called) {
assert(!best_pair->cq);
nb_mutex_write_unlock(&best_pair->value_nb_mutex);
nb_mutex_unlock(&best_pair->value_nb_mutex);
}
// We need to make sure the cachefile sticks around so a close
// can't come destroy it. That's the purpose of this
......
......@@ -430,9 +430,6 @@ void toku_cachefile_unlink_on_close(CACHEFILE cf);
// is this cachefile marked as unlink on close?
bool toku_cachefile_is_unlink_on_close(CACHEFILE cf);
// Truncate a cachefile
int toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size);
// Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE);
......
......@@ -3262,7 +3262,7 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
{
int fd = toku_cachefile_get_fd (ft->cf);
toku_maybe_truncate_cachefile_on_open(ft->blocktable, fd, ft);
toku_maybe_truncate_file_on_open(ft->blocktable, fd);
}
r = 0;
......
......@@ -245,11 +245,12 @@ void toku_ft_layer_destroy(void);
void toku_ft_serialize_layer_init(void);
void toku_ft_serialize_layer_destroy(void);
void toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used);
void toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_size);
// Effect: truncate file if overallocated by at least 32MiB
int maybe_preallocate_in_file (int fd, u_int64_t size) __attribute__ ((warn_unused_result));
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size);
// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
// Return 0 on success, otherwise an error number.
void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn);
// Effect: suppresses recovery logs
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: ft-serialize.c 43686 2012-05-18 23:21:00Z leifwalsh $"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "sort.h"
#include "threadpool.h"
#include "ft-pwrite.h"
#include <compress.h>
//TODO(fizzfaldt): determine if this is necessary AT ALL and try to delete
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
static toku_mutex_t pwrite_mutex = { PTHREAD_MUTEX_INITIALIZER };
static int pwrite_is_locked=0;
void
toku_lock_for_pwrite(void) {
// Locks the pwrite_mutex.
toku_mutex_lock(&pwrite_mutex);
pwrite_is_locked = 1;
}
void
toku_unlock_for_pwrite(void) {
pwrite_is_locked = 0;
toku_mutex_unlock(&pwrite_mutex);
}
void
toku_full_pwrite_extend(int fd, const void *buf, size_t count, toku_off_t offset)
// requires that the pwrite has been locked
// On failure, this does not return (an assertion fails or something).
{
invariant(pwrite_is_locked);
{
int r = maybe_preallocate_in_file(fd, offset+count);
lazy_assert_zero(r);
}
toku_os_full_pwrite(fd, buf, count, offset);
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: ft-serialize.c 43686 2012-05-18 23:21:00Z leifwalsh $"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef FT_PWRITE_H
#define FT_PWRITE_H
void toku_lock_for_pwrite(void);
void toku_unlock_for_pwrite(void);
void toku_full_pwrite_extend(int fd, const void *buf, size_t count, toku_off_t offset);
#endif
......@@ -7,7 +7,6 @@
#include "includes.h"
#include "sort.h"
#include "threadpool.h"
#include "ft-pwrite.h"
#include <compress.h>
#if defined(HAVE_CILK)
......@@ -62,10 +61,8 @@ toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF
}
lazy_assert(w.ndone==w.size);
{
toku_lock_for_pwrite();
//Actual Write translation table
toku_full_pwrite_extend(fd, w.buf, size, offset);
toku_unlock_for_pwrite();
toku_os_full_pwrite(fd, w.buf, size, offset);
}
toku_free(w.buf);
return r;
......@@ -106,10 +103,8 @@ deserialize_descriptor_from(int fd, BLOCK_TABLE bt, DESCRIPTOR desc, int layout_
{
XMALLOC_N(size, dbuf);
{
toku_lock_for_pwrite();
ssize_t r = toku_os_pread(fd, dbuf, size, offset);
lazy_assert(r==size);
toku_unlock_for_pwrite();
}
{
// check the checksum
......@@ -199,7 +194,6 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
//Load translation table
{
toku_lock_for_pwrite();
unsigned char *XMALLOC_N(translation_size_on_disk, tbuf);
{
// This cast is messed up in 32-bits if the block translation
......@@ -209,9 +203,9 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
translation_address_on_disk);
lazy_assert(readsz == translation_size_on_disk);
}
toku_unlock_for_pwrite();
// Create table and read in data.
e = toku_blocktable_create_from_buffer(&ft->blocktable,
e = toku_blocktable_create_from_buffer(fd,
&ft->blocktable,
translation_address_on_disk,
translation_size_on_disk,
tbuf);
......@@ -712,7 +706,7 @@ int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE
int64_t address_translation;
{
//Must serialize translation first, to get address,size for header.
toku_serialize_translation_to_wbuf(blocktable, &w_translation,
toku_serialize_translation_to_wbuf(blocktable, fd, &w_translation,
&address_translation,
&size_translation);
lazy_assert(size_translation==w_translation.size);
......@@ -727,10 +721,9 @@ int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE
}
lazy_assert(w_main.ndone==size_main);
}
toku_lock_for_pwrite();
{
//Actual Write translation table
toku_full_pwrite_extend(fd, w_translation.buf,
toku_os_full_pwrite(fd, w_translation.buf,
size_translation, address_translation);
}
{
......@@ -751,11 +744,10 @@ int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE
// Beginning (0) or BLOCK_ALLOCATOR_HEADER_RESERVE
toku_off_t main_offset;
main_offset = (h->checkpoint_count & 0x1) ? 0 : BLOCK_ALLOCATOR_HEADER_RESERVE;
toku_full_pwrite_extend(fd, w_main.buf, w_main.ndone, main_offset);
toku_os_full_pwrite(fd, w_main.buf, w_main.ndone, main_offset);
}
}
toku_free(w_main.buf);
toku_free(w_translation.buf);
toku_unlock_for_pwrite();
return rr;
}
......@@ -251,7 +251,7 @@ ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) {
int r = ft->panic;
if (r==0) {
assert(ft->h->type == FT_CURRENT);
toku_block_translation_note_end_checkpoint(ft->blocktable, fd, ft);
toku_block_translation_note_end_checkpoint(ft->blocktable, fd);
}
if (ft->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint
toku_free(ft->checkpoint_header);
......@@ -939,7 +939,7 @@ toku_update_descriptor(FT h, DESCRIPTOR d, int fd)
int r = 0;
DISKOFF offset;
// 4 for checksum
toku_realloc_descriptor_on_disk(h->blocktable, toku_serialize_descriptor_size(d)+4, &offset, h);
toku_realloc_descriptor_on_disk(h->blocktable, toku_serialize_descriptor_size(d)+4, &offset, h, fd);
r = toku_serialize_descriptor_contents_to_fd(fd, d, offset);
if (r) {
goto cleanup;
......
......@@ -7,7 +7,6 @@
#include "includes.h"
#include "sort.h"
#include "threadpool.h"
#include "ft-pwrite.h"
#include <compress.h>
#if defined(HAVE_CILK)
......@@ -80,60 +79,54 @@ toku_ft_serialize_layer_destroy(void) {
enum {FILE_CHANGE_INCREMENT = (16<<20)};
static inline u_int64_t
alignup64(u_int64_t a, u_int64_t b) {
static inline uint64_t
alignup64(uint64_t a, uint64_t b) {
return ((a+b-1)/b)*b;
}
//Race condition if ydb lock is split.
//Ydb lock is held when this function is called.
//Not going to truncate and delete (redirect to devnull) at same time.
// safe_file_size_lock must be held.
void
toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep)
// Effect: If file size >= SIZE+32MiB, reduce file size.
// (32 instead of 16.. hysteresis).
// Return 0 on success, otherwise an error number.
{
//Check file size before taking pwrite lock to reduce likelihood of taking
//the pwrite lock needlessly.
//Check file size after taking lock to avoid race conditions.
int64_t file_size;
{
int r = toku_os_get_file_size(fd, &file_size);
lazy_assert_zero(r);
invariant(file_size >= 0);
}
invariant(expected_size == (uint64_t)file_size);
// If file space is overallocated by at least 32M
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_lock_for_pwrite();
{
int r = toku_os_get_file_size(fd, &file_size);
lazy_assert_zero(r);
invariant(file_size >= 0);
}
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
invariant(new_size < file_size);
int r = toku_cachefile_truncate(cf, new_size);
lazy_assert_zero(r);
}
toku_unlock_for_pwrite();
if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
invariant(new_size < file_size);
invariant(new_size >= 0);
int r = ftruncate(fd, new_size);
lazy_assert_zero(r);
*new_sizep = new_size;
}
else {
*new_sizep = file_size;
}
return;
}
static u_int64_t
umin64(u_int64_t a, u_int64_t b) {
static int64_t
min64(int64_t a, int64_t b) {
if (a<b) return a;
return b;
}
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MiB whichever is less.
void
toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size)
// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
//TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids
const uint64_t stripe_width = 4096;
{
int r = toku_os_get_file_size(fd, &file_size);
if (r != 0) { // debug #2463
......@@ -143,16 +136,28 @@ maybe_preallocate_in_file (int fd, u_int64_t size)
lazy_assert_zero(r);
}
invariant(file_size >= 0);
if ((u_int64_t)file_size < size) {
const int N = umin64(size, FILE_CHANGE_INCREMENT); // Double the size of the file, or add 16MiB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup64(file_size, 4096);
invariant(start_write >= file_size);
toku_os_full_pwrite(fd, wbuf, N, start_write);
toku_free(wbuf);
invariant(expected_size == file_size);
// We want to double the size of the file, or add 16MiB, whichever is less.
// We emulate calling this function repeatedly until it satisfies the request.
int64_t to_write = 0;
if (file_size == 0) {
// Prevent infinite loop by starting with stripe_width as a base case.
to_write = stripe_width;
}
while (file_size + to_write < size) {
to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width);
}
if (to_write > 0) {
char *XCALLOC_N(to_write, wbuf);
toku_off_t start_write = alignup64(file_size, stripe_width);
invariant(start_write >= file_size);
toku_os_full_pwrite(fd, wbuf, to_write, start_write);
toku_free(wbuf);
*new_size = start_write + to_write;
}
else {
*new_size = file_size;
}
return 0;
}
// Don't include the sub_block header
......@@ -897,10 +902,8 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
DISKOFF offset;
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, for_checkpoint); //dirties h
toku_lock_for_pwrite();
toku_full_pwrite_extend(fd, compressed_buf, n_to_write, offset);
toku_unlock_for_pwrite();
h, fd, for_checkpoint); //dirties h
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
}
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
......@@ -914,8 +917,8 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
DESCRIPTOR desc, ft_compare_func cmp) {
int r;
int n_in_this_buffer = rbuf_int(rbuf);
void **fresh_offsets, **stale_offsets;
void **broadcast_offsets;
void **fresh_offsets = NULL, **stale_offsets = NULL;
void **broadcast_offsets = NULL;
int nfresh = 0, nstale = 0;
int nbroadcast_offsets = 0;
if (cmp) {
......@@ -1781,8 +1784,8 @@ deserialize_and_upgrade_internal_node(FTNODE node,
NONLEAF_CHILDINFO bnc = BNC(node, i);
int n_in_this_buffer = rbuf_int(rb);
void **fresh_offsets;
void **broadcast_offsets;
void **fresh_offsets = NULL;
void **broadcast_offsets = NULL;
int nfresh = 0;
int nbroadcast_offsets = 0;
......@@ -2639,10 +2642,8 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log
lazy_assert(blocknum.b>=0);
DISKOFF offset;
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, for_checkpoint); //dirties h
toku_lock_for_pwrite();
toku_full_pwrite_extend(fd, compressed_buf, n_to_write, offset);
toku_unlock_for_pwrite();
h, fd, for_checkpoint); //dirties h
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
}
toku_free(compressed_buf);
log->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
......
......@@ -52,7 +52,7 @@ static inline void nb_mutex_lock(NB_MUTEX nb_mutex,
// release a write lock
// expects: mutex is locked
static inline void nb_mutex_write_unlock(NB_MUTEX nb_mutex) {
static inline void nb_mutex_unlock(NB_MUTEX nb_mutex) {
rwlock_write_unlock(&nb_mutex->lock);
}
......
......@@ -35,13 +35,13 @@ test_fifo_enq (int n) {
// this was a function but icc cant handle it
#define buildkey(len) { \
thekeylen = len; \
thekeylen = len+1; \
thekey = toku_realloc(thekey, thekeylen); \
memset(thekey, len, thekeylen); \
}
#define buildval(len) { \
thevallen = len+1; \
thevallen = len+2; \
theval = toku_realloc(theval, thevallen); \
memset(theval, ~len, thevallen); \
}
......
......@@ -341,6 +341,7 @@ test_prefetching(void) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -351,7 +352,7 @@ test_prefetching(void) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......
......@@ -284,6 +284,7 @@ test_serialize_nonleaf(void) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -294,7 +295,7 @@ test_serialize_nonleaf(void) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -375,6 +376,7 @@ test_serialize_leaf(void) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -385,7 +387,7 @@ test_serialize_leaf(void) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......
......@@ -57,7 +57,7 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
const int nodesize = (1<<22);
struct ftnode sn, *dn;
int fd = open(__SRCFILE__ ".ft_handle", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int fd = open(__SRCFILE__ ".ft", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int r;
......@@ -116,6 +116,7 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -126,7 +127,7 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -186,7 +187,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
const int nodesize = (1<<22);
struct ftnode sn, *dn;
int fd = open(__SRCFILE__ ".ft_handle", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int fd = open(__SRCFILE__ ".ft", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int r;
......@@ -254,6 +255,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
brt_h->compare_fun = long_key_cmp;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -264,7 +266,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......
......@@ -261,6 +261,8 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, BOOL do_clone) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -271,7 +273,7 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, BOOL do_clone) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -408,6 +410,7 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, BOOL do_clone
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -418,7 +421,7 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, BOOL do_clone
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -552,6 +555,7 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, BOOL do_clone) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -562,7 +566,7 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, BOOL do_clone) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -701,6 +705,7 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, BOOL do_clone)
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -711,7 +716,7 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, BOOL do_clone)
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -866,6 +871,7 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, BOOL
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -876,7 +882,7 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, BOOL
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -995,6 +1001,7 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -1005,7 +1012,7 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -1129,6 +1136,7 @@ test_serialize_leaf(enum ftnode_verify_type bft, BOOL do_clone) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -1139,7 +1147,7 @@ test_serialize_leaf(enum ftnode_verify_type bft, BOOL do_clone) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......@@ -1276,6 +1284,7 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, BOOL do_clone) {
brt_h->panic = 0; brt_h->panic_string = 0;
toku_ft_init_treelock(brt_h);
toku_blocktable_create_new(&brt_h->blocktable);
{ int r_truncate = ftruncate(fd, 0); CKERR(r_truncate); }
//Want to use block #20
BLOCKNUM b = make_blocknum(0);
while (b.b < 20) {
......@@ -1286,7 +1295,7 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, BOOL do_clone) {
{
DISKOFF offset;
DISKOFF size;
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, FALSE);
toku_blocknum_realloc_on_disk(brt_h->blocktable, b, 100, &offset, brt_h, fd, FALSE);
assert(offset==BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE);
toku_translate_blocknum_to_offset_size(brt_h->blocktable, b, &offset, &size);
......
......@@ -387,7 +387,8 @@ int test_main (int argc, const char *argv[]) {
char deletecmd[templen];
int n = snprintf(deletecmd, templen, "rm -rf %s", directory);
assert(n>0 && n<templen);
system(deletecmd);
r = system(deletecmd);
CKERR(r);
}
return 0;
......
......@@ -16,7 +16,8 @@
static void test_it (int N) {
FT_HANDLE brt;
int r;
system("rm -rf " TESTDIR);
r = system("rm -rf " TESTDIR);
CKERR(r);
r = toku_os_mkdir(TESTDIR, S_IRWXU); CKERR(r);
TOKULOGGER logger;
......
......@@ -39,8 +39,9 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
assert(r==0);
}
{
int r = maybe_preallocate_in_file(fd, 1000);
assert(r==0);
int64_t size_after;
toku_maybe_preallocate_in_file(fd, 1000, file_size, &size_after);
assert(size_after == file_size);
}
int64_t file_size2;
{
......
......@@ -331,6 +331,7 @@ int toku_txn_manager_start_txn(
}
if (xid == TXNID_NONE) {
LSN first_lsn;
invariant(logger);
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
assert_zero(r);
xid = first_lsn.lsn;
......
......@@ -9,6 +9,12 @@ if(BUILD_TESTING)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.${src}.test)
endforeach(src)
include(CheckCCompilerFlag)
check_c_compiler_flag(-Wno-unused-result HAVE_WNO_UNUSED_RESULT)
if (HAVE_WNO_UNUSED_RESULT)
set_property(SOURCE try-leak-lost.c APPEND PROPERTY COMPILE_FLAGS -Wno-unused-result)
endif ()
foreach(src ${srcs})
get_filename_component(test ${src} NAME_WE)
......
../../windows/tests/test-fsync.c
\ No newline at end of file
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#include "test.h"
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include "toku_time.h"
int verbose = 0;
static void
create_files(int N, int fds[N]) {
int r;
int i;
char name[30];
for (i = 0; i < N; i++) {
snprintf(name, sizeof(name), "%d", i);
fds[i] = open(name, O_CREAT|O_WRONLY, 0644);
if (fds[i] < 0) {
r = errno;
CKERR(r);
}
}
}
static void
write_to_files(int N, int bytes, int fds[N]) {
char junk[bytes];
int i;
for (i = 0; i < bytes; i++) {
junk[i] = random() & 0xFF;
}
int r;
for (i = 0; i < N; i++) {
r = toku_os_write(fds[i], junk, bytes);
CKERR(r);
}
}
static void
time_many_fsyncs_one_file(int N, int bytes, int fds[N]) {
if (verbose>1) {
printf("Starting %s\n", __FUNCTION__);
fflush(stdout);
}
struct timeval begin;
struct timeval after_first;
struct timeval end;
write_to_files(1, bytes, fds);
if (verbose>1) {
printf("Done writing to os buffers\n");
fflush(stdout);
}
int i;
int r;
r = gettimeofday(&begin, NULL);
CKERR(r);
r = fsync(fds[0]);
CKERR(r);
r = gettimeofday(&after_first, NULL);
CKERR(r);
for (i = 0; i < N; i++) {
r = fsync(fds[0]);
CKERR(r);
}
r = gettimeofday(&end, NULL);
CKERR(r);
if (verbose) {
printf("Fsyncing one file %d times:\n"
"\tFirst fsync took: [%f] seconds\n"
"\tRemaining %d fsyncs took additional: [%f] seconds\n"
"\tTotal time [%f] seconds\n",
N + 1,
toku_tdiff(&after_first, &begin),
N,
toku_tdiff(&end, &after_first),
toku_tdiff(&end, &begin));
fflush(stdout);
}
}
static void
time_fsyncs_many_files(int N, int bytes, int fds[N]) {
if (verbose>1) {
printf("Starting %s\n", __FUNCTION__);
fflush(stdout);
}
write_to_files(N, bytes, fds);
if (verbose>1) {
printf("Done writing to os buffers\n");
fflush(stdout);
}
struct timeval begin;
struct timeval after_first;
struct timeval end;
int i;
int r;
r = gettimeofday(&begin, NULL);
CKERR(r);
for (i = 0; i < N; i++) {
r = fsync(fds[i]);
CKERR(r);
if (i==0) {
r = gettimeofday(&after_first, NULL);
CKERR(r);
}
if (verbose>2) {
printf("Done fsyncing %d\n", i);
fflush(stdout);
}
}
r = gettimeofday(&end, NULL);
CKERR(r);
if (verbose) {
printf("Fsyncing %d files:\n"
"\tFirst fsync took: [%f] seconds\n"
"\tRemaining %d fsyncs took additional: [%f] seconds\n"
"\tTotal time [%f] seconds\n",
N,
toku_tdiff(&after_first, &begin),
N-1,
toku_tdiff(&end, &after_first),
toku_tdiff(&end, &begin));
fflush(stdout);
}
}
#if !TOKU_WINDOWS
//sync() does not appear to have an analogue on windows.
static void
time_sync_fsyncs_many_files(int N, int bytes, int fds[N]) {
if (verbose>1) {
printf("Starting %s\n", __FUNCTION__);
fflush(stdout);
}
//TODO: timing
write_to_files(N, bytes, fds);
if (verbose>1) {
printf("Done writing to os buffers\n");
fflush(stdout);
}
int i;
int r;
struct timeval begin;
struct timeval after_sync;
struct timeval end;
r = gettimeofday(&begin, NULL);
CKERR(r);
sync();
r = gettimeofday(&after_sync, NULL);
CKERR(r);
if (verbose>1) {
printf("Done with sync()\n");
fflush(stdout);
}
for (i = 0; i < N; i++) {
r = fsync(fds[i]);
CKERR(r);
if (verbose>2) {
printf("Done fsyncing %d\n", i);
fflush(stdout);
}
}
r = gettimeofday(&end, NULL);
CKERR(r);
if (verbose) {
printf("sync() then fsyncing %d files:\n"
"\tsync() took: [%f] seconds\n"
"\tRemaining %d fsyncs took additional: [%f] seconds\n"
"\tTotal time [%f] seconds\n",
N,
toku_tdiff(&after_sync, &begin),
N,
toku_tdiff(&end, &after_sync),
toku_tdiff(&end, &begin));
fflush(stdout);
}
}
#endif
int test_main(int argc, char *const argv[]) {
int i;
int r;
int N = 1000;
int bytes = 4096;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
if (verbose < 0) verbose = 0;
verbose++;
continue;
}
if (strcmp(argv[i], "-q") == 0) {
verbose = 0;
continue;
}
if (strcmp(argv[i], "-b") == 0) {
i++;
if (i>=argc) exit(1);
bytes = atoi(argv[i]);
if (bytes <= 0) exit(1);
continue;
}
if (strcmp(argv[i], "-n") == 0) {
i++;
if (i>=argc) exit(1);
N = atoi(argv[i]);
if (N <= 0) exit(1);
continue;
}
}
r = system("rm -rf " ENVDIR);
CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
CKERR(r);
r = chdir(ENVDIR);
CKERR(r);
int fds[N];
create_files(N, fds);
time_many_fsyncs_one_file(N, bytes, fds);
time_fsyncs_many_files(N, bytes, fds);
#if !TOKU_WINDOWS
time_sync_fsyncs_many_files(N, bytes, fds);
#endif
return 0;
}
......@@ -67,7 +67,8 @@ int test_main(int argc, char * const argv[]) {
const int size = 10+strlen(env_dir);
char cmd[size];
snprintf(cmd, size, "rm -rf %s", env_dir);
system(cmd);
int r = system(cmd);
CKERR(r);
}
CHK(toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO));
......
......@@ -188,7 +188,8 @@ unlink_dir (const char *dir) {
int len = strlen(dir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", dir);
system(cmd);
int r = system(cmd);
CKERR(r);
}
int
......
......@@ -13,7 +13,7 @@ static void setup_env (void) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
{int r = system(cmd); CKERR(r); }
{int r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); }
{int r = db_env_create(&env, 0); CKERR(r); }
//env->set_errfile(env, stderr);
......
......@@ -9,7 +9,8 @@ static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
int r = system(cmd);
CKERR(r);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
......
......@@ -9,7 +9,8 @@ static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
int r = system(cmd);
CKERR(r);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
......
......@@ -9,7 +9,8 @@ static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
int r = system(cmd);
CKERR(r);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
......
......@@ -9,7 +9,8 @@ static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
int r = system(cmd);
CKERR(r);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
......
......@@ -105,7 +105,8 @@ int test_main(int argc, char * const argv[]) {
const int size = 10+strlen(env_dir);
char cmd[size];
snprintf(cmd, size, "rm -rf %s", env_dir);
system(cmd);
int r = system(cmd);
CKERR(r);
}
CHK(toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO));
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
......
......@@ -39,7 +39,7 @@ initialize_values (void) {
}
u_int32_t len = random() % MAX_SIZE;
fillrandom(keybuf, len);
dbt_init(&vals[nest_level], &keybuf[0], len);
dbt_init(&key, &keybuf[0], len);
}
......
......@@ -61,7 +61,7 @@ initialize_values (void) {
}
u_int32_t len = random() % MAX_SIZE;
fillrandom(keybuf, len);
dbt_init(&vals[nest_level], &keybuf[0], len);
dbt_init(&key, &keybuf[0], len);
}
......
......@@ -68,7 +68,7 @@ initialize_values (void) {
}
u_int32_t len = random() % MAX_SIZE;
fillrandom(keybuf, len);
dbt_init(&vals[nest_level], &keybuf[0], len);
dbt_init(&key, &keybuf[0], len);
}
......
......@@ -72,7 +72,7 @@ initialize_values (void) {
}
u_int32_t len = random() % MAX_SIZE;
fillrandom(keybuf, len);
dbt_init(&vals[nest_level], &keybuf[0], len);
dbt_init(&key, &keybuf[0], len);
fillrandom(junkvalbuf, MAX_SIZE-1);
dbt_init(&junkval, &junkvalbuf[0], MAX_SIZE-1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment