Commit d6634e04 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

[t:4749] [t:4878] [t:4929] [t:4947] merging these changes to main.


git-svn-id: file:///svn/toku/tokudb@44058 c7de825b-a66e-492c-adef-691d508d4ae1
parent df056272
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
extern "C" { extern "C" {
#endif #endif
typedef void(*voidfp)(void *thunk); // typedef void(*voidfp)(void *thunk);
typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk); // typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk);
struct roll_entry; struct roll_entry;
......
...@@ -298,10 +298,10 @@ generate_log_struct (void) { ...@@ -298,10 +298,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name); fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name)); DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n"); fprintf(hf, "TOKUTXN txn, LSN oplsn);\n");
fprintf(hf, "int toku_commit_%s (", lt->name); fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name)); DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n"); fprintf(hf, "TOKUTXN txn, LSN oplsn);\n");
}); });
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
......
...@@ -260,12 +260,6 @@ static const char *recover_state(RECOVER_ENV renv) { ...@@ -260,12 +260,6 @@ static const char *recover_state(RECOVER_ENV renv) {
return scan_state_string(&renv->ss); return scan_state_string(&renv->ss);
} }
// function supplied to transaction commit and abort
// No yielding is necessary, but it must call the f function if provided.
static void recover_yield(voidfp f, void *fpthunk, void *UU(yieldthunk)) {
if (f) f(fpthunk);
}
// Open the file if it is not already open. If it is already open, then do nothing. // Open the file if it is not already open. If it is already open, then do nothing.
static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags, static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags,
TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) { TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) {
...@@ -663,7 +657,7 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) { ...@@ -663,7 +657,7 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
assert(txn!=NULL); assert(txn!=NULL);
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, r = toku_txn_commit_with_lsn(txn, TRUE, l->lsn,
NULL, NULL); NULL, NULL);
assert(r == 0); assert(r == 0);
...@@ -711,7 +705,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { ...@@ -711,7 +705,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
assert(txn!=NULL); assert(txn!=NULL);
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn, NULL, NULL); r = toku_txn_abort_with_lsn(txn, l->lsn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -1269,7 +1263,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) { ...@@ -1269,7 +1263,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
int r = find_an_unprepared_txn(renv, &txn); int r = find_an_unprepared_txn(renv, &txn);
if (r==0) { if (r==0) {
// abort the transaction // abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL); r = toku_txn_abort_txn(txn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
This diff is collapsed.
...@@ -18,9 +18,9 @@ poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_che ...@@ -18,9 +18,9 @@ poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_che
} }
} }
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
txn->num_rollentries_processed++; txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) { if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, TRUE, FALSE); poll_txn_progress_function(txn, TRUE, FALSE);
...@@ -28,9 +28,9 @@ int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yiel ...@@ -28,9 +28,9 @@ int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yiel
return r; return r;
} }
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
txn->num_rollentries_processed++; txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) { if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, FALSE, FALSE); poll_txn_progress_function(txn, FALSE, FALSE);
...@@ -68,14 +68,13 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -68,14 +68,13 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
} }
static int static int
apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, apply_txn (TOKUTXN txn, LSN lsn,
apply_rollback_item func) { apply_rollback_item func) {
int r = 0; int r = 0;
// do the commit/abort calls and free everything // do the commit/abort calls and free everything
// we do the commit/abort calls in reverse order too. // we do the commit/abort calls in reverse order too.
struct roll_entry *item; struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__); //printf("%s:%d abort\n", __FILE__, __LINE__);
int count=0;
BLOCKNUM next_log = ROLLBACK_NONE; BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0; uint32_t next_log_hash = 0;
...@@ -105,18 +104,8 @@ apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -105,18 +104,8 @@ apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
if (func) { if (func) {
while ((item=log->newest_logentry)) { while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev; log->newest_logentry = item->prev;
r = func(txn, item, yield, yieldv, lsn); r = func(txn, item, lsn);
if (r!=0) return r; if (r!=0) return r;
count++;
// We occassionally yield here to prevent transactions
// from hogging the log. This yield will allow other
// threads to grab the ydb lock. However, we don't
// want any transaction doing more than one log
// operation to always yield the ydb lock, as it must
// wait for the ydb lock to be released to proceed.
if (count % 8 == 0) {
yield(NULL, NULL, yieldv);
}
} }
} }
if (next_log.b == txn->spilled_rollback_head.b) { if (next_log.b == txn->spilled_rollback_head.b) {
...@@ -197,7 +186,7 @@ static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*tx ...@@ -197,7 +186,7 @@ static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*tx
//Commit each entry in the rollback log. //Commit each entry in the rollback log.
//If the transaction has a parent, it just promotes its information to its parent. //If the transaction has a parent, it just promotes its information to its parent.
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
int r=0; int r=0;
if (txn->parent!=0) { if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we spilled // First we must put a rollinclude entry into the parent if we spilled
...@@ -275,16 +264,16 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -275,16 +264,16 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->num_rollentries += txn->num_rollentries; txn->parent->num_rollentries += txn->num_rollentries;
} else { } else {
r = apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item); r = apply_txn(txn, lsn, toku_commit_rollback_item);
assert(r==0); assert(r==0);
} }
return r; return r;
} }
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
int r; int r;
r = apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item); r = apply_txn(txn, lsn, toku_abort_rollback_item);
assert(r==0); assert(r==0);
return r; return r;
} }
......
...@@ -14,8 +14,8 @@ extern "C" { ...@@ -14,8 +14,8 @@ extern "C" {
#endif #endif
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint); void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_commit(TOKUTXN txn, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_abort(TOKUTXN txn, LSN lsn);
// these functions assert internally that they succeed // these functions assert internally that they succeed
...@@ -38,9 +38,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo ...@@ -38,9 +38,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
// unpin and rmove a rollback log from the cachetable // unpin and rmove a rollback log from the cachetable
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log); void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size); void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
......
...@@ -13,11 +13,6 @@ ...@@ -13,11 +13,6 @@
#define TESTDIR __SRCFILE__ ".dir" #define TESTDIR __SRCFILE__ ".dir"
#define FILENAME "test0.ft_handle" #define FILENAME "test0.ft_handle"
static void do_yield (voidfp f, void *fv, void *UU(v)) {
if (f) f(fv);
}
static void test_it (int N) { static void test_it (int N) {
FT_HANDLE brt; FT_HANDLE brt;
int r; int r;
...@@ -42,7 +37,7 @@ static void test_it (int N) { ...@@ -42,7 +37,7 @@ static void test_it (int N) {
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r); r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r);
...@@ -52,7 +47,7 @@ static void test_it (int N) { ...@@ -52,7 +47,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -63,7 +58,7 @@ static void test_it (int N) { ...@@ -63,7 +58,7 @@ static void test_it (int N) {
memset(val, 'v', sizeof(val)); memset(val, 'v', sizeof(val));
val[sizeof(val)-1]=0; val[sizeof(val)-1]=0;
r = toku_ft_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn); r = toku_ft_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -75,7 +70,7 @@ static void test_it (int N) { ...@@ -75,7 +70,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -90,7 +85,7 @@ static void test_it (int N) { ...@@ -90,7 +85,7 @@ static void test_it (int N) {
assert(!is_empty); assert(!is_empty);
} }
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -101,7 +96,7 @@ static void test_it (int N) { ...@@ -101,7 +96,7 @@ static void test_it (int N) {
} }
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
if (0) { if (0) {
......
...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT ...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -115,7 +109,7 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -115,7 +109,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -166,11 +160,11 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -166,11 +160,11 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = le_cursor_close(cursor); error = le_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(cursortxn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(cursortxn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(cursortxn); toku_txn_close_txn(cursortxn);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -21,12 +21,6 @@ test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) { ...@@ -21,12 +21,6 @@ test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -53,7 +47,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -53,7 +47,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -73,7 +67,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -73,7 +67,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT * ...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT *
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -182,12 +182,12 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) { ...@@ -182,12 +182,12 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
return 0; return 0;
} }
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn(TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Doesn't close the txn, just performs the commit operations. // Effect: Doesn't close the txn, just performs the commit operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN,
poll, poll_extra); poll, poll_extra);
} }
...@@ -206,7 +206,7 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) { ...@@ -206,7 +206,7 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) {
return (!txn->parent && txn->checkpoint_needed_before_commit); return (!txn->parent && txn->checkpoint_needed_before_commit);
} }
//Called during a yield (ydb lock NOT held). //TODO(yoni): inline this function manually
static void static void
log_xcommit(void *thunk) { log_xcommit(void *thunk) {
struct xcommit_info *info = thunk; struct xcommit_info *info = thunk;
...@@ -214,7 +214,7 @@ log_xcommit(void *thunk) { ...@@ -214,7 +214,7 @@ log_xcommit(void *thunk) {
info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks. info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks.
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
...@@ -245,21 +245,21 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -245,21 +245,21 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
r = info.r; r = info.r;
} }
if (r==0) { if (r==0) {
r = toku_rollback_commit(txn, yield, yieldv, oplsn); r = toku_rollback_commit(txn, oplsn);
STATUS_VALUE(TXN_COMMIT)++; STATUS_VALUE(TXN_COMMIT)++;
} }
return r; return r;
} }
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Doesn't close the txn, just performs the abort operations. // Effect: Doesn't close the txn, just performs the abort operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra); return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra);
} }
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
...@@ -271,7 +271,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, ...@@ -271,7 +271,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
txn->do_fsync = FALSE; txn->do_fsync = FALSE;
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r==0) { if (r==0) {
r = toku_rollback_abort(txn, yield, yieldv, oplsn); r = toku_rollback_abort(txn, oplsn);
STATUS_VALUE(TXN_ABORT)++; STATUS_VALUE(TXN_ABORT)++;
} }
return r; return r;
...@@ -320,11 +320,12 @@ static void do_txn_fsync_log(void *thunk) { ...@@ -320,11 +320,12 @@ static void do_txn_fsync_log(void *thunk) {
info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn); info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn);
} }
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv) { int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync) {
int r = 0; int r = 0;
if (logger && do_fsync) { if (logger && do_fsync) {
struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn }; struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn };
yield(do_txn_fsync_log, &info, yieldv); //TODO(yoni): inline do_txn_fsync_log here
do_txn_fsync_log(&info);
r = info.r; r = info.r;
} }
return r; return r;
......
...@@ -37,15 +37,15 @@ int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN ...@@ -37,15 +37,15 @@ int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
BOOL toku_txn_requires_checkpoint(TOKUTXN txn); BOOL toku_txn_requires_checkpoint(TOKUTXN txn);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unused_result)); int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unused_result));
...@@ -54,7 +54,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unu ...@@ -54,7 +54,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unu
void toku_txn_get_prepared_xa_xid (TOKUTXN, TOKU_XA_XID *); void toku_txn_get_prepared_xa_xid (TOKUTXN, TOKU_XA_XID *);
// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values. // Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values.
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv); int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync);
void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn); void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);
......
...@@ -254,7 +254,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -254,7 +254,7 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->ekeys = NULL; loader->i->ekeys = NULL;
loader->i->evals = NULL; loader->i->evals = NULL;
LSN load_lsn; LSN load_lsn;
r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader); r = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader);
if ( r!=0 ) { if ( r!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
......
...@@ -73,6 +73,7 @@ struct __toku_db_env_internal { ...@@ -73,6 +73,7 @@ struct __toku_db_env_internal {
DB *directory; // Maps dnames to inames DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade DB *persistent_environment; // Stores environment settings, can be used for upgrade
OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location) OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku_mutex_t open_dbs_lock; // lock that protects the OMT of open dbs.
char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /)
char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /)
......
This diff is collapsed.
This diff is collapsed.
...@@ -8,8 +8,6 @@ ...@@ -8,8 +8,6 @@
/* ydb functions used by loader /* ydb functions used by loader
*/ */
// When the loader is created, it makes this call. // When the loader is created, it makes this call.
// For each dictionary to be loaded, replace old iname in directory // For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname. This will also take a write lock // with a newly generated iname. This will also take a write lock
...@@ -22,13 +20,12 @@ ...@@ -22,13 +20,12 @@
// If "mark_as_loader" is true, then include a mark in the iname // If "mark_as_loader" is true, then include a mark in the iname
// to indicate that the file is created by the brt loader. // to indicate that the file is created by the brt loader.
// Return 0 on success (could fail if write lock not available). // Return 0 on success (could fail if write lock not available).
int ydb_load_inames(DB_ENV * env, int locked_load_inames(DB_ENV * env,
DB_TXN * txn, DB_TXN * txn,
int N, int N,
DB * dbs[/*N*/], DB * dbs[N],
/*out*/ char * new_inames_in_env[N], char * new_inames_in_env[N], /* out */
LSN *load_lsn, LSN *load_lsn,
BOOL mark_as_loader); BOOL mark_as_loader);
#endif #endif
This diff is collapsed.
...@@ -9,13 +9,17 @@ ...@@ -9,13 +9,17 @@
extern "C" { extern "C" {
#endif #endif
// begin, commit, and abort use the multi operation lock
// internally to synchronize with begin checkpoint. callers
// should not hold the multi operation lock.
int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock);
int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int locked_txn_commit(DB_TXN *txn, u_int32_t flags); int locked_txn_commit(DB_TXN *txn, u_int32_t flags);
int locked_txn_abort(DB_TXN *txn); int locked_txn_abort(DB_TXN *txn);
void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn);
void toku_keep_prepared_txn_callback(DB_ENV *env, TOKUTXN tokutxn);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment