Commit 6561bd1a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1510 Final merge into main. Delete #1510 branch.

svn merge -r 11048:11110 ../tokudb.1510

git-svn-id: file:///svn/toku/tokudb@11112 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1ed0d30d
......@@ -6,11 +6,12 @@ TOKUROOT=./
include $(TOKUROOT)toku_include/Makefile.include
default: build
SRCDIRS = $(OS_CHOICE) newbrt src cxx utils db-benchmark-test db-benchmark-test-cxx
SRCDIRS = $(OS_CHOICE) newbrt src/range_tree src/lock_tree src cxx utils db-benchmark-test db-benchmark-test-cxx
BUILDDIRS = $(SRCDIRS) man/texi
newbrt.dir: $(OS_CHOICE).dir
src.dir: newbrt.dir
src/lock_tree.dir: src/range_tree.dir
src.dir: newbrt.dir src/lock_tree.dir
cxx.dir: src.dir
db-benchmark-test.dir: src.dir
db-benchmark-test-cxx.dir: cxx.dir
......
......@@ -122,7 +122,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[8];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[2];
void *app_private; /* 32-bit offset=36 size=4, 64=bit offset=72 size=8 */
void* __toku_dummy1[27];
char __toku_dummy2[64];
......
......@@ -124,7 +124,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[10];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[4];
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
void* __toku_dummy1[25];
char __toku_dummy2[96];
......
......@@ -125,7 +125,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[10];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[4];
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
void* __toku_dummy1[40];
char __toku_dummy2[128];
......
......@@ -125,7 +125,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[12];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[38];
char __toku_dummy2[128];
......
......@@ -127,7 +127,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[12];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[39];
char __toku_dummy2[144];
......
......@@ -321,7 +321,17 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
// Don't produce db_btree_stat records.
//print_struct("db_btree_stat", 0, db_btree_stat_fields32, db_btree_stat_fields64, sizeof(db_btree_stat_fields32)/sizeof(db_btree_stat_fields32[0]), 0);
assert(sizeof(db_env_fields32)==sizeof(db_env_fields64));
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), 0);
{
const char *extra[]={
"int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */",
"int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */",
"int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */",
"int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */",
"int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */",
"int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */",
NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
}
assert(sizeof(db_key_range_fields32)==sizeof(db_key_range_fields64));
print_struct("db_key_range", 0, db_key_range_fields32, db_key_range_fields64, sizeof(db_key_range_fields32)/sizeof(db_key_range_fields32[0]), 0);
......
......@@ -116,11 +116,11 @@ static void test_db_exceptions (void) {
}
{
Db db2(&env, 0);
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, DB_CREATE, 0777), EINVAL); // sub DB cannot exist
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, DB_CREATE, 0777), ENOTDIR); // sub DB cannot exist
}
{
Db db2(&env, 0);
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, 0, 0777), EINVAL); // sub DB cannot exist withou DB_CREATE
TC(db2.open(0, FNAME, "sub.db", DB_BTREE, 0, 0777), ENOTDIR); // sub DB cannot exist withou DB_CREATE
}
{
Dbc *curs;
......
......@@ -127,7 +127,13 @@ typedef enum {
#endif
struct __toku_db_env {
struct __toku_db_env_internal *i;
void* __toku_dummy0[12];
int (*checkpointing_postpone) (DB_ENV*) /* Use for 'rename table' or any other operation that must be disjoint from a checkpoint */;
int (*checkpointing_resume) (DB_ENV*) /* Alert tokudb 'postpone' is no longer necessary */;
int (*checkpointing_begin_atomic_operation) (DB_ENV*) /* Begin a set of operations (that must be atomic as far as checkpoints are concerned). i.e. inserting into every index in one table */;
int (*checkpointing_end_atomic_operation) (DB_ENV*) /* End a set of operations (that must be atomic as far as checkpoints are concerned). */;
int (*set_default_bt_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (key) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
int (*set_default_dup_compare) (DB_ENV*,int (*bt_compare) (DB *, const DBT *, const DBT *)) /* Set default (val) comparison function for all DBs in this environment. Required for RECOVERY since you cannot open the DBs manually. */;
void* __toku_dummy0[6];
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[39];
char __toku_dummy2[144];
......
......@@ -39,6 +39,7 @@ struct translation { //This is the BTT (block translation table)
enum {RESERVED_BLOCKNUM_NULL=0,
RESERVED_BLOCKNUM_TRANSLATION=1,
RESERVED_BLOCKNUM_FIFO=2,
RESERVED_BLOCKNUM_DESCRIPTOR=3,
RESERVED_BLOCKNUMS};
static const BLOCKNUM freelist_null = {-1}; // in a freelist, this indicates end of list
......@@ -97,6 +98,24 @@ brtheader_set_dirty(struct brt_header *h, BOOL for_checkpoint){
}
}
static void
maybe_truncate_cachefile(BLOCK_TABLE bt, struct brt_header *h, u_int64_t size_needed_before) {
assert(bt->is_locked);
u_int64_t new_size_needed = block_allocator_allocated_limit(bt->block_allocator);
//Save a call to toku_os_get_file_size (kernel call) if unlikely to be useful.
if (new_size_needed < size_needed_before)
toku_maybe_truncate_cachefile(h->cf, new_size_needed);
}
void
toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, struct brt_header *h) {
lock_for_blocktable(bt);
u_int64_t size_needed = block_allocator_allocated_limit(bt->block_allocator);
toku_maybe_truncate_cachefile(h->cf, size_needed);
unlock_for_blocktable(bt);
}
static void
copy_translation(struct translation * dst, struct translation * src, enum translation_type newtype) {
......@@ -182,9 +201,10 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
// move inprogress to checkpoint (resetting type)
// inprogress = NULL
void
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt) {
toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, struct brt_header *h) {
// Free unused blocks
lock_for_blocktable(bt);
u_int64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
assert(bt->inprogress.block_translation);
if (bt->checkpoint_skipped || bt->checkpoint_failed) {
cleanup_failed_checkpoint(bt);
......@@ -210,6 +230,7 @@ PRNTF("free", i, pair->size, pair->u.diskoff, bt);
bt->checkpointed = bt->inprogress;
bt->checkpointed.type = TRANSLATION_CHECKPOINTED;
memset(&bt->inprogress, 0, sizeof(bt->inprogress));
maybe_truncate_cachefile(bt, h, allocated_limit_at_start);
end:
unlock_for_blocktable(bt);
}
......@@ -317,12 +338,11 @@ translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_tra
}
static void
blocknum_realloc_on_disk_unlocked (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) {
blocknum_realloc_on_disk_internal (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) {
assert(bt->is_locked);
brtheader_set_dirty(h, for_checkpoint);
struct translation *t = &bt->current;
verify_valid_freeable_blocknum(t, b);
struct block_translation_pair old_pair = t->block_translation[b.b];
PRNTF("old", b.b, old_pair.size, old_pair.u.diskoff, bt);
//Free the old block if it is not still in use by the checkpoint in progress or the previous checkpoint
......@@ -346,13 +366,14 @@ PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.di
assert(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
}
}
void
toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint) {
lock_for_blocktable(bt);
blocknum_realloc_on_disk_unlocked(bt, b, size, offset, h, for_checkpoint);
struct translation *t = &bt->current;
verify_valid_freeable_blocknum(t, b);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, for_checkpoint);
unlock_for_blocktable(bt);
}
......@@ -446,13 +467,19 @@ toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w,
// Perhaps rename: purpose is get disk address of a block, given its blocknum (blockid?)
void
toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
static void
translate_blocknum_to_offset_size_unlocked(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
struct translation *t = &bt->current;
verify_valid_blocknum(t, b);
if (offset) *offset = t->block_translation[b.b].u.diskoff;
if (size) *size = t->block_translation[b.b].size;
}
// Perhaps rename: purpose is get disk address of a block, given its blocknum (blockid?)
void
toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
translate_blocknum_to_offset_size_unlocked(bt, b, offset, size);
unlock_for_blocktable(bt);
}
......@@ -468,12 +495,20 @@ maybe_expand_translation (struct translation *t) {
u_int64_t i;
for (i = t->length_of_array; i < new_length; i++) {
t->block_translation[i].u.next_free_blocknum = freelist_null;
t->block_translation[i].size = size_is_free;
t->block_translation[i].size = size_is_free;
}
t->length_of_array = new_length;
}
}
void
toku_allocate_blocknum_for_descriptor(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
lock_for_blocktable(bt);
*res = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
brtheader_set_dirty(h, FALSE);
unlock_for_blocktable(bt);
}
void
toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h) {
assert(bt->is_locked);
......@@ -549,16 +584,17 @@ toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, struct brt_header * h) {
void
toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h) {
assert(bt->is_locked);
u_int64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
brtheader_set_dirty(h, FALSE);
//Free all used blocks except descriptor
BLOCKNUM keep_only = h->descriptor.b;
//Free all regular/data blocks (non reserved)
//Meta data is stored in reserved blocks
struct translation *t = &bt->current;
int64_t i;
for (i=RESERVED_BLOCKNUMS; i<t->smallest_never_used_blocknum.b; i++) {
if (i==keep_only.b) continue;
BLOCKNUM b = make_blocknum(i);
if (t->block_translation[i].size > 0) free_blocknum_unlocked(bt, &b, h);
if (t->block_translation[i].size >= 0) free_blocknum_unlocked(bt, &b, h);
}
maybe_truncate_cachefile(bt, h, allocated_limit_at_start);
}
//Verify there are no free blocks.
......@@ -669,8 +705,8 @@ blocktable_create_internal (void) {
static void
translation_default(struct translation *t) { // destination into which to create a default translation
t->type = TRANSLATION_CHECKPOINTED;
t->smallest_never_used_blocknum = make_blocknum(RESERVED_BLOCKNUMS);
t->length_of_array = t->smallest_never_used_blocknum.b + 1; //Enough to store the root
t->smallest_never_used_blocknum = make_blocknum(RESERVED_BLOCKNUMS);
t->length_of_array = t->smallest_never_used_blocknum.b;
t->blocknum_freelist_head = freelist_null;
XMALLOC_N(t->length_of_array, t->block_translation);
int64_t i;
......@@ -856,3 +892,20 @@ toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, in
if (used_sizep) *used_sizep = info.used_space;
}
void
toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, struct brt_header * h) {
lock_for_blocktable(bt);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
blocknum_realloc_on_disk_internal(bt, b, size, offset, h, FALSE);
unlock_for_blocktable(bt);
}
void
toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size) {
lock_for_blocktable(bt);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
translate_blocknum_to_offset_size_unlocked(bt, b, offset, size);
unlock_for_blocktable(bt);
}
......@@ -22,10 +22,11 @@ void toku_block_lock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_unlock_for_multiple_operations(BLOCK_TABLE bt);
void toku_block_translation_note_start_checkpoint_unlocked(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_end_checkpoint(BLOCK_TABLE bt, struct brt_header *h);
void toku_block_translation_note_failed_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_note_skipped_checkpoint(BLOCK_TABLE bt);
void toku_block_translation_truncate_unlocked(BLOCK_TABLE bt, struct brt_header *h);
void toku_maybe_truncate_cachefile_on_open(BLOCK_TABLE bt, struct brt_header *h);
//Blocknums
void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_header * h);
......@@ -35,6 +36,8 @@ void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_fifo_on_disk_unlocked (BLOCK_TABLE, DISKOFF size, DISKOFF *offset);
void toku_get_fifo_offset_on_disk(BLOCK_TABLE bt, DISKOFF *offset);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, struct brt_header * h);
void toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size);
//Blocks and Blocknums
void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF *offset, struct brt_header * h, BOOL for_checkpoint);
......
......@@ -151,14 +151,10 @@ struct remembered_hash {
enum brtheader_type {BRTHEADER_CURRENT=1, BRTHEADER_CHECKPOINT_INPROGRESS};
struct descriptor {
struct simple_dbt sdbt;
BLOCKNUM b;
};
struct brt_header {
enum brtheader_type type;
struct brt_header * checkpoint_header;
CACHEFILE cf;
u_int64_t checkpoint_count; // Free-running counter incremented once per checkpoint (toggling LSB).
// LSB indicates which header location is used on disk so this
// counter is effectively a boolean which alternates with each checkpoint.
......@@ -172,7 +168,7 @@ struct brt_header {
BLOCKNUM root; // roots of the dictionary
struct remembered_hash root_hash; // hash of the root offset.
unsigned int flags;
struct descriptor descriptor;
struct simple_dbt descriptor;
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
......
......@@ -23,35 +23,6 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b;
}
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
{
int r = toku_os_get_file_size(fd, &file_size);
assert(r==0);
}
assert(file_size >= 0);
if ((u_int64_t)file_size < size) {
const int N = umin64(size, 16<<20); // Double the size of the file, or add 16MB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
}
return 0;
}
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
static toku_pthread_mutex_t pwrite_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int pwrite_is_locked=0;
......@@ -79,6 +50,76 @@ unlock_for_pwrite (void) {
assert(r==0);
}
enum {FILE_CHANGE_INCREMENT = (16<<20)};
void
toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used)
// Effect: If file size >= SIZE+32MiB, reduce file size.
// (32 instead of 16.. hysteresis).
// Return 0 on success, otherwise an error number.
{
//Check file size before taking pwrite lock to reduce likelihood of taking
//the lock needlessly.
//Check file size after taking lock to avoid race conditions.
int64_t file_size;
{
int r = toku_os_get_file_size(toku_cachefile_fd(cf), &file_size);
if (r!=0 && toku_cachefile_is_dev_null(cf)) goto done;
assert(r==0);
assert(file_size >= 0);
}
// If file space is overallocated by at least 32M
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
lock_for_pwrite();
{
int r = toku_os_get_file_size(toku_cachefile_fd(cf), &file_size);
if (r!=0 && toku_cachefile_is_dev_null(cf)) goto cleanup;
assert(r==0);
assert(file_size >= 0);
}
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_off_t new_size = alignup(file_size, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
assert(new_size < file_size);
int r = toku_cachefile_truncate(cf, new_size);
assert(r==0);
}
cleanup:
unlock_for_pwrite();
}
done:
return;
}
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MiB whichever is less.
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
{
int r = toku_os_get_file_size(fd, &file_size);
assert(r==0);
}
assert(file_size >= 0);
if ((u_int64_t)file_size < size) {
const int N = umin64(size, FILE_CHANGE_INCREMENT); // Double the size of the file, or add 16MiB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
}
return 0;
}
static int
toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote)
// requires that the pwrite has been locked
......@@ -476,8 +517,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r;
{
lock_for_pwrite();
//TODO: #1463 START (might not be the entire range
// If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0);
//printf("%s:%d h=%p\n", __FILE__, __LINE__, h);
......@@ -491,13 +530,13 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, for_checkpoint);
ssize_t n_wrote;
lock_for_pwrite();
r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote);
if (r) {
// fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
} else {
r=0;
}
//TODO: #1463 END
unlock_for_pwrite();
}
......@@ -929,18 +968,11 @@ int toku_serialize_brt_header_size (struct brt_header *UU(h)) {
);
size+=(+8 // diskoff
+4 // flags
+8 // blocknum of descriptor
);
assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE);
return size;
}
static void
serialize_descriptor_to_wbuf(struct wbuf *wb, struct descriptor *desc) {
wbuf_BLOCKNUM (wb, desc->b);
//descriptor contents are written to disk on opening brt
}
int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h, DISKOFF translation_location_on_disk, DISKOFF translation_size_on_disk) {
unsigned int size = toku_serialize_brt_header_size (h); // !!! seems silly to recompute the size when the caller knew it. Do we really need the size?
wbuf_literal_bytes(wbuf, "tokudata", 8);
......@@ -956,7 +988,6 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h,
wbuf_DISKOFF(wbuf, translation_size_on_disk);
wbuf_BLOCKNUM(wbuf, h->root);
wbuf_int (wbuf, h->flags);
serialize_descriptor_to_wbuf(wbuf, &h->descriptor);
u_int32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
assert(wbuf->ndone<=wbuf->size);
......@@ -967,7 +998,6 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0;
if (h->panic) return h->panic;
assert(h->type==BRTHEADER_CHECKPOINT_INPROGRESS);
lock_for_pwrite();
toku_block_lock_for_multiple_operations(h->blocktable);
struct wbuf w_translation;
int64_t size_translation;
......@@ -991,6 +1021,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
}
toku_block_unlock_for_multiple_operations(h->blocktable);
char *writing_what;
lock_for_pwrite();
{
//Actual Write translation table
ssize_t nwrote;
......@@ -1033,6 +1064,9 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
return rr;
}
//Descriptor is written to disk during toku_brt_open iff we have a new (or changed)
//descriptor.
//Descriptors are NOT written during the header checkpoint process.
int
toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) {
int r;
......@@ -1057,17 +1091,14 @@ toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset) {
}
static void
deserialize_descriptor_from(int fd, struct rbuf *rb, struct brt_header *h, struct descriptor *desc) {
deserialize_descriptor_from(int fd, struct brt_header *h, struct simple_dbt *desc) {
DISKOFF offset;
DISKOFF size;
toku_get_descriptor_offset_size(h->blocktable, &offset, &size);
memset(desc, 0, sizeof(*desc));
desc->b = rbuf_blocknum(rb);
if (desc->b.b > 0) {
DISKOFF offset;
DISKOFF size;
toku_translate_blocknum_to_offset_size(h->blocktable, desc->b, &offset, &size);
if (size > 0) {
assert(size>=4); //4 for checksum
{
unsigned char *XMALLOC_N(size, dbuf);
{
lock_for_pwrite();
......@@ -1082,8 +1113,8 @@ deserialize_descriptor_from(int fd, struct rbuf *rb, struct brt_header *h, struc
u_int32_t stored_x1764 = toku_dtoh32(*(int*)(dbuf + size-4));
assert(x1764 == stored_x1764);
}
desc->sdbt.len = size-4;
desc->sdbt.data = dbuf; //Uses 4 extra bytes, but fast.
desc->len = size-4;
desc->data = dbuf; //Uses 4 extra bytes, but fast.
}
}
}
......@@ -1099,7 +1130,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
struct rbuf rc = *rb;
memset(rb, 0, sizeof(*rb));
struct brt_header *MALLOC(h);
struct brt_header *CALLOC(h);
if (h==0) return errno;
int ret=-1;
if (0) { died1: toku_free(h); return ret; }
......@@ -1135,19 +1166,19 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
ssize_t r = pread(fd, tbuf, translation_size_on_disk, translation_address_on_disk);
assert(r==translation_size_on_disk);
}
unlock_for_pwrite();
// Create table and read in data.
toku_blocktable_create_from_buffer(&h->blocktable,
translation_address_on_disk,
translation_size_on_disk,
tbuf);
unlock_for_pwrite();
toku_free(tbuf);
}
h->root = rbuf_blocknum(&rc);
h->root_hash.valid = FALSE;
h->flags = rbuf_int(&rc);
deserialize_descriptor_from(fd, &rc, h, &h->descriptor);
deserialize_descriptor_from(fd, h, &h->descriptor);
(void)rbuf_int(&rc); //Read in checksum and ignore (already verified).
if (rc.ndone!=rc.size) {ret = EINVAL; goto died1;}
toku_free(rc.buf);
......
......@@ -617,7 +617,7 @@ brtheader_destroy(struct brt_header *h) {
else {
assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.sdbt.data) toku_free(h->descriptor.sdbt.data);
if (h->descriptor.data) toku_free(h->descriptor.data);
}
}
......@@ -2861,6 +2861,8 @@ static int
brt_init_header_partial (BRT t) {
int r;
t->h->flags = t->flags;
if (t->h->cf!=NULL) assert(t->h->cf == t->cf);
t->h->cf = t->cf;
t->h->nodesize=t->nodesize;
compute_and_fill_remembered_hash(t);
......@@ -2950,6 +2952,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
struct brt_header *h;
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cf), &h);
if (r!=0) return r;
h->cf = cf;
h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, (void*)h, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint);
*header = h;
......@@ -3029,25 +3032,27 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_cre
}
}
if (t->did_set_descriptor) {
if (t->h->descriptor.sdbt.len!=t->temp_descriptor.size ||
memcmp(t->h->descriptor.sdbt.data, t->temp_descriptor.data, t->temp_descriptor.size)) {
if (t->h->descriptor.b.b <= 0) toku_allocate_blocknum(t->h->blocktable, &t->h->descriptor.b, t->h);
if (t->h->descriptor.len!=t->temp_descriptor.size ||
memcmp(t->h->descriptor.data, t->temp_descriptor.data, t->temp_descriptor.size)) {
DISKOFF offset;
//4 for checksum
toku_blocknum_realloc_on_disk(t->h->blocktable, t->h->descriptor.b, t->temp_descriptor.size+4, &offset, t->h, FALSE);
toku_realloc_descriptor_on_disk(t->h->blocktable, t->temp_descriptor.size+4, &offset, t->h);
r = toku_serialize_descriptor_contents_to_fd(toku_cachefile_fd(t->cf), &t->temp_descriptor, offset);
if (r!=0) goto died_after_read_and_pin;
if (t->h->descriptor.sdbt.data) toku_free(t->h->descriptor.sdbt.data);
t->h->descriptor.sdbt.data = t->temp_descriptor.data;
t->h->descriptor.sdbt.len = t->temp_descriptor.size;
if (t->h->descriptor.data) toku_free(t->h->descriptor.data);
t->h->descriptor.data = t->temp_descriptor.data;
t->h->descriptor.len = t->temp_descriptor.size;
t->temp_descriptor.data = NULL;
}
t->did_set_descriptor = 0;
}
if (t->db) {
toku_fill_dbt(&t->db->descriptor, t->h->descriptor.sdbt.data, t->h->descriptor.sdbt.len);
toku_fill_dbt(&t->db->descriptor, t->h->descriptor.data, t->h->descriptor.len);
}
assert(t->h);
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
toku_maybe_truncate_cachefile_on_open(t->h->blocktable, t->h);
WHEN_BRTTRACE(fprintf(stderr, "BRTTRACE -> %p\n", t));
return 0;
}
......@@ -3179,10 +3184,10 @@ toku_brtheader_end_checkpoint (CACHEFILE cachefile, void *header_v) {
else
h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
}
toku_block_translation_note_end_checkpoint(h->blocktable);
brtheader_free(h->checkpoint_header);
h->checkpoint_header = NULL;
toku_block_translation_note_end_checkpoint(h->blocktable, h);
}
if (h->checkpoint_header) brtheader_free(h->checkpoint_header);
h->checkpoint_header = NULL;
return r;
}
......@@ -4695,18 +4700,19 @@ int toku_dump_brt (FILE *f, BRT brt) {
int toku_brt_truncate (BRT brt) {
int r;
// flush the cached tree blocks
// flush the cached tree blocks and remove all related pairs from the cachetable
r = toku_brt_flush(brt);
// TODO log the truncate?
toku_block_lock_for_multiple_operations(brt->h->blocktable);
if (r==0) {
// reinit the header
//Free all data blocknums and associated disk space (if not held on to by checkpoint)
toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum_unlocked(brt->h->blocktable, &brt->h->root, brt->h);
// reinit the header
brtheader_partial_destroy(brt->h);
r = brt_init_header_partial(brt);
}
......
......@@ -138,6 +138,9 @@ void toku_brt_destroy(void);
void toku_pwrite_lock_init(void);
void toku_pwrite_lock_destroy(void);
void toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used);
// Effect: truncate file if overallocated by at least 32MiB
int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
......
......@@ -58,6 +58,13 @@ enum ctpair_state {
CTPAIR_WRITING = 3, // being written from memory
};
/* The workqueue pointer cq is set in:
* cachetable_complete_write_pair() cq is cleared, called from many paths, cachetable lock is held during this function
* cachetable_flush_cachefile() called during close and truncate, cachetable lock is held during this function
* toku_cachetable_unpin_and_remove() called during node merge, cachetable lock is held during this function
*
*/
typedef struct ctpair *PAIR;
struct ctpair {
enum typ_tag tag;
......@@ -315,11 +322,20 @@ int toku_cachefile_fd (CACHEFILE cf) {
return cf->fd;
}
int toku_cachefile_truncate0 (CACHEFILE cf) {
BOOL
toku_cachefile_is_dev_null (CACHEFILE cf) {
return cf->fname==NULL;
}
int
toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) {
int r;
r = ftruncate(cf->fd, 0);
if (r != 0)
r = errno;
if (cf->fname==NULL) r = 0; //Don't truncate /dev/null
else {
r = ftruncate(cf->fd, new_size);
if (r != 0)
r = errno;
}
return r;
}
......@@ -868,12 +884,6 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p)
// this is essentially a flush_and_maybe_remove except that
// we already have p->rwlock and we just do the write in our own thread.
assert(p->dirty); // it must be dirty if its pending.
#if 0
// TODO: Determine if this is legal, and/or required. Commented out for now
// I believe if it has a queue, removing it it will break whatever's waiting for it.
// p->cq = 0; // I don't want any delay, just do it.
#endif
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
assert(ct->size_writing>=0);
ct->size_writing += p->size;
......@@ -1186,8 +1196,6 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
// work queue.
unsigned i;
//THIS LOOP IS NOT THREAD SAFE! Has race condition since flush_and_maybe_remove releases cachetable lock
unsigned num_pairs = 0;
unsigned list_size = 16;
PAIR *list = NULL;
......@@ -1335,7 +1343,7 @@ log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v))
int
toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
// Requires: All three checkpoint-relevant locks must be held (see src/checkpoint.c).
// Requires: All three checkpoint-relevant locks must be held (see checkpoint.c).
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Use the begin_checkpoint callback to take necessary snapshots (header, btt)
// Mark every dirty node as "pending." ("Pending" means that the node must be
......@@ -1431,7 +1439,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
int
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
// Requires: The big checkpoint lock must be held (see src/checkpoint.c).
// Requires: The big checkpoint lock must be held (see checkpoint.c).
// Algorithm: Write all pending nodes to disk
// Use checkpoint callback to write snapshot information to disk (header, btt)
// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
......@@ -1670,7 +1678,7 @@ int
toku_cachefile_fsync(CACHEFILE cf) {
int r;
if (cf->fname==0) r = 0; //Don't fsync /dev/null
if (cf->fname==NULL) r = 0; //Don't fsync /dev/null
else r = fsync(cf->fd);
return r;
}
......
......@@ -177,8 +177,10 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
int toku_cachefile_redirect_nullfd (CACHEFILE cf);
// Truncate a cachefile
// Effect: set the cachefile size to 0
int toku_cachefile_truncate0 (CACHEFILE cf);
int toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size);
//has it been redirected to dev null?
BOOL toku_cachefile_is_dev_null (CACHEFILE cf);
// Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE);
......
......@@ -10,7 +10,7 @@
* The purpose of this file is to implement the high-level logic for
* taking a checkpoint.
*
* There are three locks used for taking a checkpoint. They are:
* There are three locks used for taking a checkpoint. They are listed below.
*
* NOTE: The reader-writer locks may be held by either multiple clients
* or the checkpoint function. (The checkpoint function has the role
......@@ -32,11 +32,10 @@
* (the checkpoint function is non-re-entrant), and to prevent certain operations
* that should not happen during a checkpoint.
* The following operations must take the checkpoint_safe lock:
* - open a dictionary
* - close a dictionary
* - delete a dictionary
* - truncate a dictionary
* - rename a dictionary
* The application can use this lock to disable checkpointing during other sensitive
* operations, such as making a backup copy of the database.
*
* - ydb_big_lock
* This is the existing lock used to serialize all access to tokudb.
......@@ -44,15 +43,13 @@
* to set all the "pending" bits and to create the checkpoint-in-progress versions
* of the header and translation table (btt).
*
* Once the "pending" bits are set and the snapshots are take of the header and btt,
* Once the "pending" bits are set and the snapshots are taken of the header and btt,
* most normal database operations are permitted to resume.
*
*
*
*****/
#include <stdio.h>
#include "toku_portability.h"
#include "brttypes.h"
#include "cachetable.h"
......@@ -170,8 +167,6 @@ int
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
int r;
printf("Yipes, checkpoint is being tested\n");
assert(initialized);
multi_operation_checkpoint_lock();
checkpoint_safe_checkpoint_lock();
......
......@@ -6,13 +6,10 @@
#ident "$Id$"
// TODO: $1510 Need callback mechanism so newbrt layer can get/release ydb lock.
/******
*
* NOTE: atomic operation lock is highest level lock
* checkpoint_safe lock is next level lock
* NOTE: multi_operation_lock is highest level lock
* checkpoint_safe_lock is next level lock
* ydb_big_lock is next level lock
*
* Locks must always be taken in this sequence (highest level first).
......@@ -20,13 +17,12 @@
*/
/****** TODO: ydb layer should be taking this lock
/******
* Client code must hold the checkpoint_safe lock during the following operations:
* - delete a dictionary
* - truncate a dictionary
* - rename a dictionary
* - abort a transaction that created a dictionary (abort causes dictionary delete)
* - abort a transaction that had a table lock on an empty table (abort causes dictionary truncate)
* - delete a dictionary via DB->remove
* - delete a dictionary via DB_TXN->abort(txn) (where txn created a dictionary)
* - rename a dictionary //TODO: Handlerton rename needs to take this
* //TODO: Handlerton rename needs to be recoded for transaction recovery
*****/
void toku_checkpoint_safe_client_lock(void);
......@@ -35,9 +31,8 @@ void toku_checkpoint_safe_client_unlock(void);
/****** TODO: rename these functions as something like begin_atomic_operation and end_atomic_operation
* these may need ydb wrappers
* These functions are called from the handlerton level.
/******
* These functions are called from the ydb level.
* Client code must hold the multi_operation lock during the following operations:
* - insertion into multiple indexes
* - replace into (simultaneous delete/insert on a single key)
......
......@@ -408,13 +408,13 @@ void toku_logger_txn_close (TOKUTXN txn) {
return;
}
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) {
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv);
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) {
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv);
if (r!=0) return r;
......@@ -426,7 +426,7 @@ static int note_brt_used_in_parent_txn(OMTVALUE brtv, u_int32_t UU(index), void*
}
// Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void*yieldv) {
int toku_logger_commit (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
......@@ -497,7 +497,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void
r = toku_commit_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(yieldv);
if (count%2 == 0) yield(NULL, yieldv);
}
}
......@@ -866,7 +866,7 @@ toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unused__)
#endif
// Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) {
int toku_logger_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
//printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
......@@ -883,7 +883,7 @@ int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) {
int r = toku_abort_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(yieldv);
if (count%2 == 0) yield(NULL, yieldv);
}
}
list_remove(&txn->live_txns_link);
......
......@@ -40,11 +40,13 @@ int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_set_lg_bsize(TOKULOGGER, u_int32_t);
typedef void(*voidfp)(void);
typedef void(*YIELDF)(voidfp, void*);
// Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v);
int toku_logger_commit (TOKUTXN txn, int no_sync, YIELDF yield, void*yield_v);
// Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN, void(*/*yield*/)(void*), void*/*yield_v*/);
int toku_logger_abort(TOKUTXN, YIELDF, void*/*yield_v*/);
// Closes a txn. Call after commiting or aborting.
void toku_logger_txn_close (TOKUTXN);
......@@ -157,16 +159,16 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn);
struct roll_entry;
int toku_rollback_fileentries (int fd,
TOKUTXN txn,
void (*yield)(void*yieldv),
YIELDF yield,
void * yieldv);
int toku_commit_fileentries (int fd,
TOKUTXN txn,
void (*yield)(void*yieldv),
YIELDF yield,
void * yieldv);
// do the commit items. Call yield(yield_v) once in a while.
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yield_v);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yield_v);
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt);
......
......@@ -225,10 +225,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n");
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n");
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
});
fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n");
......
......@@ -4,6 +4,7 @@
/* rollback and rollforward routines. */
#include "includes.h"
#include "checkpoint.h"
// these flags control whether or not we send commit messages for
// various operations
......@@ -11,8 +12,6 @@
#define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
typedef void (*YIELDF)(void*);
int
toku_commit_fcreate (TXNID UU(xid),
FILENUM UU(filenum),
......@@ -29,9 +28,10 @@ toku_rollback_fcreate (TXNID UU(xid),
FILENUM filenum,
BYTESTRING bs_fname,
TOKUTXN txn,
YIELDF UU(yield),
void* UU(yield_v))
YIELDF yield,
void* yield_v)
{
yield(toku_checkpoint_safe_client_lock, yield_v);
char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory;
int full_len=strlen(fname)+strlen(directory)+2;
......@@ -48,6 +48,7 @@ toku_rollback_fcreate (TXNID UU(xid),
r = unlink(full_fname);
assert(r==0);
toku_free(fname);
toku_checkpoint_safe_client_unlock();
return 0;
}
......@@ -218,7 +219,7 @@ toku_commit_fileentries (int fd,
if (r!=0) goto finish;
memarena_clear(ma);
count++;
if (count%2==0) yield(yieldv);
if (count%2==0) yield(NULL, yieldv);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
......@@ -245,7 +246,7 @@ toku_rollback_fileentries (int fd,
if (r!=0) goto finish;
memarena_clear(ma);
count++;
if (count%2==0) yield(yieldv);
if (count%2==0) yield(NULL, yieldv);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
......
......@@ -83,6 +83,7 @@ BDB_DONTRUN_TESTS = \
test_db_descriptor \
test_db_descriptor_named_db \
test_heaviside_straddle_1622 \
test_dbremove_old \
#\ ends prev line
ifeq ($(OS_CHOICE),windows)
......
......@@ -58,6 +58,8 @@ struct __toku_db_env_internal {
char *tmp_dir;
char *lg_dir;
char **data_dirs;
int (*bt_compare) (DB *, const DBT *, const DBT *);
int (*dup_compare) (DB *, const DBT *, const DBT *);
u_int32_t n_data_dirs;
//void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size;
......
......@@ -29,6 +29,7 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All
#include "memory.h"
#include "dlmalloc.h"
#include "checkpoint.h"
#include "key.h"
#ifdef TOKUTRACE
#define DB_ENV_CREATE_FUN db_env_create_toku10
......@@ -785,6 +786,80 @@ static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t fla
toku_ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); toku_ydb_unlock(); return r;
}
static int
env_checkpointing_postpone(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_checkpoint_safe_client_lock();
return r;
}
static int
env_checkpointing_resume(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_checkpoint_safe_client_unlock();
return r;
}
static int
env_checkpointing_begin_atomic_operation(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_multi_operation_client_lock();
return r;
}
static int
env_checkpointing_end_atomic_operation(DB_ENV * env) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (!env_opened(env)) r = EINVAL;
else toku_multi_operation_client_unlock();
return r;
}
static int
env_set_default_dup_compare(DB_ENV * env, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->dup_compare = dup_compare;
}
return r;
}
static int
locked_env_set_default_dup_compare(DB_ENV * env, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
toku_ydb_lock();
int r = env_set_default_dup_compare(env, dup_compare);
toku_ydb_unlock();
return r;
}
static int
env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->bt_compare = bt_compare;
}
return r;
}
static int
locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
toku_ydb_lock();
int r = env_set_default_bt_compare(env, bt_compare);
toku_ydb_unlock();
return r;
}
static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
static int toku_db_lt_panic(DB* db, int r);
......@@ -802,6 +877,12 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
if (result == 0) { r = ENOMEM; goto cleanup; }
memset(result, 0, sizeof *result);
result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_locked_env_err;
result->set_default_bt_compare = locked_env_set_default_bt_compare;
result->set_default_dup_compare = locked_env_set_default_dup_compare;
result->checkpointing_postpone = env_checkpointing_postpone;
result->checkpointing_resume = env_checkpointing_resume;
result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
result->open = locked_env_open;
result->close = locked_env_close;
result->txn_checkpoint = toku_env_txn_checkpoint;
......@@ -835,6 +916,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; }
memset(result->i, 0, sizeof *result->i);
result->i->bt_compare = toku_default_compare_fun;
result->i->dup_compare = toku_default_compare_fun;
result->i->is_panicked=0;
result->i->panic_string = 0;
result->i->ref_count = 1;
......@@ -904,8 +987,9 @@ static int toku_txn_release_locks(DB_TXN* txn) {
// Yield the lock so someone else can work, and then reacquire the lock.
// Useful while processing commit or rollback logs, to allow others to access the system.
static void ydb_yield (void *UU(v)) {
static void ydb_yield (voidfp f, void *UU(v)) {
toku_ydb_unlock();
if (f) f();
toku_ydb_lock();
}
......@@ -3064,6 +3148,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3
static int
toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
BOOL need_close = TRUE;
char *directory_name = NULL;
int r = find_db_file(db->dbenv, fname, &directory_name);
if (r!=0) goto cleanup;
......@@ -3076,10 +3161,15 @@ toku_db_remove_subdb(DB * db, const char *fname, const char *dbname, u_int32_t f
int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
assert(bytes==(int)sizeof(subdb_full_name)-1);
const char *null_subdbname = NULL;
need_close = FALSE;
r = toku_db_remove(db, subdb_full_name, null_subdbname, flags);
}
cleanup:
if (need_close) {
int r2 = toku_db_close(db, 0);
if (r==0) r = r2;
}
if (directory_name) toku_free(directory_name);
return r;
}
......@@ -3144,6 +3234,7 @@ cleanup:
TODO: Verify the DB file is not in use (either a single db file or
a file with multi-databases).
TODO: Check the other directories in the environment for the file
TODO: Alert the BRT layer (for logging/recovery purposes)
*/
static int toku_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
......@@ -3438,11 +3529,21 @@ static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
}
static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_remove(db, fname, dbname, flags); toku_ydb_unlock(); return r;
toku_checkpoint_safe_client_lock();
toku_ydb_lock();
int r = toku_db_remove(db, fname, dbname, flags);
toku_ydb_unlock();
toku_checkpoint_safe_client_unlock();
return r;
}
static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_rename(db, namea, nameb, namec, flags); toku_ydb_unlock(); return r;
toku_checkpoint_safe_client_lock();
toku_ydb_lock();
int r = toku_db_rename(db, namea, nameb, namec, flags);
toku_ydb_unlock();
toku_checkpoint_safe_client_unlock();
return r;
}
static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
......@@ -3493,8 +3594,7 @@ static const DBT* toku_db_dbt_neg_infty(void) {
}
static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r\
;
toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r;
}
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
......@@ -3577,6 +3677,10 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
env_unref(env);
return ENOMEM;
}
r = toku_brt_set_bt_compare(result->i->brt, env->i->bt_compare);
assert(r==0);
r = toku_brt_set_dup_compare(result->i->brt, env->i->dup_compare);
assert(r==0);
ydb_add_ref();
*db = result;
return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment