Commit 8192256d authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

[t:4749] [t:4878] [t:4929] [t:4947] merging these changes to main.


git-svn-id: file:///svn/toku/tokudb@44058 c7de825b-a66e-492c-adef-691d508d4ae1
parent 971862bb
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
extern "C" { extern "C" {
#endif #endif
typedef void(*voidfp)(void *thunk); // typedef void(*voidfp)(void *thunk);
typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk); // typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk);
struct roll_entry; struct roll_entry;
......
...@@ -298,10 +298,10 @@ generate_log_struct (void) { ...@@ -298,10 +298,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n"); fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name); fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name)); DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n"); fprintf(hf, "TOKUTXN txn, LSN oplsn);\n");
fprintf(hf, "int toku_commit_%s (", lt->name); fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name)); DO_FIELDS(field_type, lt, fprintf(hf, "%s %s,", field_type->type, field_type->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n"); fprintf(hf, "TOKUTXN txn, LSN oplsn);\n");
}); });
fprintf(hf, "struct log_entry {\n"); fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n"); fprintf(hf, " enum lt_cmd cmd;\n");
......
...@@ -260,12 +260,6 @@ static const char *recover_state(RECOVER_ENV renv) { ...@@ -260,12 +260,6 @@ static const char *recover_state(RECOVER_ENV renv) {
return scan_state_string(&renv->ss); return scan_state_string(&renv->ss);
} }
// function supplied to transaction commit and abort
// No yielding is necessary, but it must call the f function if provided.
static void recover_yield(voidfp f, void *fpthunk, void *UU(yieldthunk)) {
if (f) f(fpthunk);
}
// Open the file if it is not already open. If it is already open, then do nothing. // Open the file if it is not already open. If it is already open, then do nothing.
static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags, static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags,
TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) { TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) {
...@@ -663,7 +657,7 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) { ...@@ -663,7 +657,7 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
assert(txn!=NULL); assert(txn!=NULL);
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, r = toku_txn_commit_with_lsn(txn, TRUE, l->lsn,
NULL, NULL); NULL, NULL);
assert(r == 0); assert(r == 0);
...@@ -711,7 +705,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { ...@@ -711,7 +705,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
assert(txn!=NULL); assert(txn!=NULL);
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn, NULL, NULL); r = toku_txn_abort_with_lsn(txn, l->lsn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -1269,7 +1263,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) { ...@@ -1269,7 +1263,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
int r = find_an_unprepared_txn(renv, &txn); int r = find_an_unprepared_txn(renv, &txn);
if (r==0) { if (r==0) {
// abort the transaction // abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL); r = toku_txn_abort_txn(txn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
...@@ -28,8 +28,6 @@ ...@@ -28,8 +28,6 @@
int int
toku_commit_fdelete (FILENUM filenum, toku_commit_fdelete (FILENUM filenum,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn)) //oplsn is the lsn of the commit LSN UU(oplsn)) //oplsn is the lsn of the commit
{ {
int r; int r;
...@@ -64,9 +62,6 @@ toku_commit_fdelete (FILENUM filenum, ...@@ -64,9 +62,6 @@ toku_commit_fdelete (FILENUM filenum,
// after processing rollback entries. As a result, we may be unlinking a file // after processing rollback entries. As a result, we may be unlinking a file
// here as part of a transactoin that may abort if we do not fsync the log. // here as part of a transactoin that may abort if we do not fsync the log.
// So, we fsync the log here. // So, we fsync the log here.
//
// Because committing fdeletes should be a rare operation, we do not bother
// yielding the ydb lock before performing the fsync.
if (txn->logger) { if (txn->logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn); r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
assert_zero(r); assert_zero(r);
...@@ -90,8 +85,6 @@ done: ...@@ -90,8 +85,6 @@ done:
int int
toku_rollback_fdelete (FILENUM UU(filenum), toku_rollback_fdelete (FILENUM UU(filenum),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield),
void* UU(yield_v),
LSN UU(oplsn)) //oplsn is the lsn of the abort LSN UU(oplsn)) //oplsn is the lsn of the abort
{ {
//Rolling back an fdelete is an no-op. //Rolling back an fdelete is an no-op.
...@@ -102,8 +95,6 @@ int ...@@ -102,8 +95,6 @@ int
toku_commit_fcreate (FILENUM UU(filenum), toku_commit_fcreate (FILENUM UU(filenum),
BYTESTRING UU(bs_fname), BYTESTRING UU(bs_fname),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
return 0; return 0;
...@@ -113,8 +104,6 @@ int ...@@ -113,8 +104,6 @@ int
toku_rollback_fcreate (FILENUM filenum, toku_rollback_fcreate (FILENUM filenum,
BYTESTRING UU(bs_fname), BYTESTRING UU(bs_fname),
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void* UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
int r; int r;
...@@ -219,7 +208,7 @@ static int do_nothing_with_filenum(TOKUTXN UU(txn), FILENUM UU(filenum)) { ...@@ -219,7 +208,7 @@ static int do_nothing_with_filenum(TOKUTXN UU(txn), FILENUM UU(filenum)) {
} }
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING UU(key), TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv), LSN UU(oplsn)) { int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING UU(key), TOKUTXN txn, LSN UU(oplsn)) {
#if TOKU_DO_COMMIT_CMD_INSERT #if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, FALSE); return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, FALSE);
#else #else
...@@ -231,8 +220,6 @@ int ...@@ -231,8 +220,6 @@ int
toku_rollback_cmdinsert (FILENUM filenum, toku_rollback_cmdinsert (FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE); return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE);
...@@ -242,8 +229,6 @@ int ...@@ -242,8 +229,6 @@ int
toku_commit_cmdupdate(FILENUM filenum, toku_commit_cmdupdate(FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
return do_insertion(FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, FALSE); return do_insertion(FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, FALSE);
...@@ -253,8 +238,6 @@ int ...@@ -253,8 +238,6 @@ int
toku_rollback_cmdupdate(FILENUM filenum, toku_rollback_cmdupdate(FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
return do_insertion(FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE); return do_insertion(FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE);
...@@ -264,8 +247,6 @@ int ...@@ -264,8 +247,6 @@ int
toku_commit_cmdupdatebroadcast(FILENUM filenum, toku_commit_cmdupdatebroadcast(FILENUM filenum,
BOOL is_resetting_op, BOOL is_resetting_op,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
// if is_resetting_op, reset root_xid_that_created in // if is_resetting_op, reset root_xid_that_created in
...@@ -282,8 +263,6 @@ int ...@@ -282,8 +263,6 @@ int
toku_rollback_cmdupdatebroadcast(FILENUM filenum, toku_rollback_cmdupdatebroadcast(FILENUM filenum,
BOOL UU(is_resetting_op), BOOL UU(is_resetting_op),
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
BYTESTRING nullkey = { 0, NULL }; BYTESTRING nullkey = { 0, NULL };
...@@ -294,8 +273,6 @@ int ...@@ -294,8 +273,6 @@ int
toku_commit_cmddelete (FILENUM filenum, toku_commit_cmddelete (FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
#if TOKU_DO_COMMIT_CMD_DELETE #if TOKU_DO_COMMIT_CMD_DELETE
...@@ -310,8 +287,6 @@ int ...@@ -310,8 +287,6 @@ int
toku_rollback_cmddelete (FILENUM filenum, toku_rollback_cmddelete (FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN oplsn) LSN oplsn)
{ {
return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE); return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, FALSE);
...@@ -325,13 +300,10 @@ toku_apply_rollinclude (TXNID xid, ...@@ -325,13 +300,10 @@ toku_apply_rollinclude (TXNID xid,
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash, uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
YIELDF yield,
void * yieldv,
LSN oplsn, LSN oplsn,
apply_rollback_item func) { apply_rollback_item func) {
int r = 0; int r = 0;
struct roll_entry *item; struct roll_entry *item;
int count=0;
BLOCKNUM next_log = spilled_tail; BLOCKNUM next_log = spilled_tail;
uint32_t next_log_hash = spilled_tail_hash; uint32_t next_log_hash = spilled_tail_hash;
...@@ -350,10 +322,8 @@ toku_apply_rollinclude (TXNID xid, ...@@ -350,10 +322,8 @@ toku_apply_rollinclude (TXNID xid,
while ((item=log->newest_logentry)) { while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev; log->newest_logentry = item->prev;
r = func(txn, item, yield, yieldv, oplsn); r = func(txn, item, oplsn);
if (r!=0) return r; if (r!=0) return r;
count++;
if (count%2 == 0) yield(NULL, NULL, yieldv);
} }
if (next_log.b == spilled_head.b) { if (next_log.b == spilled_head.b) {
assert(!found_head); assert(!found_head);
...@@ -386,14 +356,12 @@ toku_commit_rollinclude (TXNID xid, ...@@ -386,14 +356,12 @@ toku_commit_rollinclude (TXNID xid,
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash, uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
YIELDF yield,
void * yieldv,
LSN oplsn) { LSN oplsn) {
int r; int r;
r = toku_apply_rollinclude(xid, num_nodes, r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash, spilled_head, spilled_head_hash,
spilled_tail, spilled_tail_hash, spilled_tail, spilled_tail_hash,
txn, yield, yieldv, oplsn, txn, oplsn,
toku_commit_rollback_item); toku_commit_rollback_item);
return r; return r;
} }
...@@ -406,14 +374,12 @@ toku_rollback_rollinclude (TXNID xid, ...@@ -406,14 +374,12 @@ toku_rollback_rollinclude (TXNID xid,
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash, uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
YIELDF yield,
void * yieldv,
LSN oplsn) { LSN oplsn) {
int r; int r;
r = toku_apply_rollinclude(xid, num_nodes, r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash, spilled_head, spilled_head_hash,
spilled_tail, spilled_tail_hash, spilled_tail, spilled_tail_hash,
txn, yield, yieldv, oplsn, txn, oplsn,
toku_abort_rollback_item); toku_abort_rollback_item);
return r; return r;
} }
...@@ -422,8 +388,6 @@ int ...@@ -422,8 +388,6 @@ int
toku_commit_load (FILENUM old_filenum, toku_commit_load (FILENUM old_filenum,
BYTESTRING UU(new_iname), BYTESTRING UU(new_iname),
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
int r; int r;
...@@ -452,9 +416,6 @@ toku_commit_load (FILENUM old_filenum, ...@@ -452,9 +416,6 @@ toku_commit_load (FILENUM old_filenum,
// after processing rollback entries. As a result, we may be unlinking a file // after processing rollback entries. As a result, we may be unlinking a file
// here as part of a transactoin that may abort if we do not fsync the log. // here as part of a transactoin that may abort if we do not fsync the log.
// So, we fsync the log here. // So, we fsync the log here.
//
// Because committing fdeletes should be a rare operation, we do not bother
// yielding the ydb lock before performing the fsync.
if (txn->logger) { if (txn->logger) {
r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn); r = toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
lazy_assert(r == 0); lazy_assert(r == 0);
...@@ -469,8 +430,6 @@ int ...@@ -469,8 +430,6 @@ int
toku_rollback_load (FILENUM UU(old_filenum), toku_rollback_load (FILENUM UU(old_filenum),
BYTESTRING new_iname, BYTESTRING new_iname,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
int r; int r;
...@@ -501,8 +460,6 @@ toku_rollback_load (FILENUM UU(old_filenum), ...@@ -501,8 +460,6 @@ toku_rollback_load (FILENUM UU(old_filenum),
int int
toku_commit_hot_index (FILENUMS UU(hot_index_filenums), toku_commit_hot_index (FILENUMS UU(hot_index_filenums),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield),
void * UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
// nothing // nothing
...@@ -512,8 +469,6 @@ toku_commit_hot_index (FILENUMS UU(hot_index_filenums), ...@@ -512,8 +469,6 @@ toku_commit_hot_index (FILENUMS UU(hot_index_filenums),
int int
toku_rollback_hot_index (FILENUMS UU(hot_index_filenums), toku_rollback_hot_index (FILENUMS UU(hot_index_filenums),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield),
void * UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
return 0; return 0;
...@@ -523,8 +478,6 @@ int ...@@ -523,8 +478,6 @@ int
toku_commit_dictionary_redirect (FILENUM UU(old_filenum), toku_commit_dictionary_redirect (FILENUM UU(old_filenum),
FILENUM UU(new_filenum), FILENUM UU(new_filenum),
TOKUTXN UU(txn), TOKUTXN UU(txn),
YIELDF UU(yield),
void * UU(yield_v),
LSN UU(oplsn)) //oplsn is the lsn of the commit LSN UU(oplsn)) //oplsn is the lsn of the commit
{ {
//Redirect only has meaning during normal operation (NOT during recovery). //Redirect only has meaning during normal operation (NOT during recovery).
...@@ -538,8 +491,6 @@ int ...@@ -538,8 +491,6 @@ int
toku_rollback_dictionary_redirect (FILENUM old_filenum, toku_rollback_dictionary_redirect (FILENUM old_filenum,
FILENUM new_filenum, FILENUM new_filenum,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yield_v),
LSN UU(oplsn)) //oplsn is the lsn of the abort LSN UU(oplsn)) //oplsn is the lsn of the abort
{ {
int r = 0; int r = 0;
...@@ -566,8 +517,6 @@ int ...@@ -566,8 +517,6 @@ int
toku_commit_change_fdescriptor(FILENUM filenum, toku_commit_change_fdescriptor(FILENUM filenum,
BYTESTRING UU(old_descriptor), BYTESTRING UU(old_descriptor),
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
return do_nothing_with_filenum(txn, filenum); return do_nothing_with_filenum(txn, filenum);
...@@ -577,8 +526,6 @@ int ...@@ -577,8 +526,6 @@ int
toku_rollback_change_fdescriptor(FILENUM filenum, toku_rollback_change_fdescriptor(FILENUM filenum,
BYTESTRING old_descriptor, BYTESTRING old_descriptor,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
CACHEFILE cf; CACHEFILE cf;
......
...@@ -18,9 +18,9 @@ poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_che ...@@ -18,9 +18,9 @@ poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_che
} }
} }
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
txn->num_rollentries_processed++; txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) { if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, TRUE, FALSE); poll_txn_progress_function(txn, TRUE, FALSE);
...@@ -28,9 +28,9 @@ int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yiel ...@@ -28,9 +28,9 @@ int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yiel
return r; return r;
} }
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
txn->num_rollentries_processed++; txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0) { if (txn->num_rollentries_processed % 1024 == 0) {
poll_txn_progress_function(txn, FALSE, FALSE); poll_txn_progress_function(txn, FALSE, FALSE);
...@@ -68,14 +68,13 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -68,14 +68,13 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
} }
static int static int
apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, apply_txn (TOKUTXN txn, LSN lsn,
apply_rollback_item func) { apply_rollback_item func) {
int r = 0; int r = 0;
// do the commit/abort calls and free everything // do the commit/abort calls and free everything
// we do the commit/abort calls in reverse order too. // we do the commit/abort calls in reverse order too.
struct roll_entry *item; struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__); //printf("%s:%d abort\n", __FILE__, __LINE__);
int count=0;
BLOCKNUM next_log = ROLLBACK_NONE; BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0; uint32_t next_log_hash = 0;
...@@ -105,18 +104,8 @@ apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -105,18 +104,8 @@ apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
if (func) { if (func) {
while ((item=log->newest_logentry)) { while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev; log->newest_logentry = item->prev;
r = func(txn, item, yield, yieldv, lsn); r = func(txn, item, lsn);
if (r!=0) return r; if (r!=0) return r;
count++;
// We occassionally yield here to prevent transactions
// from hogging the log. This yield will allow other
// threads to grab the ydb lock. However, we don't
// want any transaction doing more than one log
// operation to always yield the ydb lock, as it must
// wait for the ydb lock to be released to proceed.
if (count % 8 == 0) {
yield(NULL, NULL, yieldv);
}
} }
} }
if (next_log.b == txn->spilled_rollback_head.b) { if (next_log.b == txn->spilled_rollback_head.b) {
...@@ -197,7 +186,7 @@ static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*tx ...@@ -197,7 +186,7 @@ static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*tx
//Commit each entry in the rollback log. //Commit each entry in the rollback log.
//If the transaction has a parent, it just promotes its information to its parent. //If the transaction has a parent, it just promotes its information to its parent.
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
int r=0; int r=0;
if (txn->parent!=0) { if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we spilled // First we must put a rollinclude entry into the parent if we spilled
...@@ -275,16 +264,16 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -275,16 +264,16 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->num_rollentries += txn->num_rollentries; txn->parent->num_rollentries += txn->num_rollentries;
} else { } else {
r = apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item); r = apply_txn(txn, lsn, toku_commit_rollback_item);
assert(r==0); assert(r==0);
} }
return r; return r;
} }
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
int r; int r;
r = apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item); r = apply_txn(txn, lsn, toku_abort_rollback_item);
assert(r==0); assert(r==0);
return r; return r;
} }
......
...@@ -14,8 +14,8 @@ extern "C" { ...@@ -14,8 +14,8 @@ extern "C" {
#endif #endif
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint); void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_commit(TOKUTXN txn, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_abort(TOKUTXN txn, LSN lsn);
// these functions assert internally that they succeed // these functions assert internally that they succeed
...@@ -38,9 +38,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo ...@@ -38,9 +38,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
// unpin and rmove a rollback log from the cachetable // unpin and rmove a rollback log from the cachetable
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log); void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size); void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
......
...@@ -13,11 +13,6 @@ ...@@ -13,11 +13,6 @@
#define TESTDIR __SRCFILE__ ".dir" #define TESTDIR __SRCFILE__ ".dir"
#define FILENAME "test0.ft_handle" #define FILENAME "test0.ft_handle"
static void do_yield (voidfp f, void *fv, void *UU(v)) {
if (f) f(fv);
}
static void test_it (int N) { static void test_it (int N) {
FT_HANDLE brt; FT_HANDLE brt;
int r; int r;
...@@ -42,7 +37,7 @@ static void test_it (int N) { ...@@ -42,7 +37,7 @@ static void test_it (int N) {
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r); r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, CLIENT_CHECKPOINT); CKERR(r);
...@@ -52,7 +47,7 @@ static void test_it (int N) { ...@@ -52,7 +47,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -63,7 +58,7 @@ static void test_it (int N) { ...@@ -63,7 +58,7 @@ static void test_it (int N) {
memset(val, 'v', sizeof(val)); memset(val, 'v', sizeof(val));
val[sizeof(val)-1]=0; val[sizeof(val)-1]=0;
r = toku_ft_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn); r = toku_ft_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -75,7 +70,7 @@ static void test_it (int N) { ...@@ -75,7 +70,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -90,7 +85,7 @@ static void test_it (int N) { ...@@ -90,7 +85,7 @@ static void test_it (int N) {
assert(!is_empty); assert(!is_empty);
} }
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -101,7 +96,7 @@ static void test_it (int N) { ...@@ -101,7 +96,7 @@ static void test_it (int N) {
} }
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r); r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
if (0) { if (0) {
......
...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT ...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *desc __attribute__((unused)), const DBT
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -115,7 +109,7 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -115,7 +109,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -166,11 +160,11 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -166,11 +160,11 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = le_cursor_close(cursor); error = le_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(cursortxn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(cursortxn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(cursortxn); toku_txn_close_txn(cursortxn);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -21,12 +21,6 @@ test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) { ...@@ -21,12 +21,6 @@ test_keycompare(DB* UU(desc), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -53,7 +47,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -53,7 +47,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -73,7 +67,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -73,7 +67,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT * ...@@ -17,12 +17,6 @@ static int test_ft_cursor_keycompare(DB *db __attribute__((unused)), const DBT *
return toku_keycompare(a->data, a->size, b->data, b->size); return toku_keycompare(a->data, a->size, b->data, b->size);
} }
static void
txn_yield(voidfp UU(f), void *UU(fv), void *UU(v)) {
if (f)
f(fv);
}
// create a tree and populate it with n rows // create a tree and populate it with n rows
static void static void
create_populate_tree(const char *logdir, const char *fname, int n) { create_populate_tree(const char *logdir, const char *fname, int n) {
...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -49,7 +43,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare); error = toku_open_ft_handle(fname, 1, &brt, 1<<12, 1<<9, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, test_ft_cursor_keycompare);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -69,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, NULL, NULL);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -182,12 +182,12 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) { ...@@ -182,12 +182,12 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
return 0; return 0;
} }
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn(TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Doesn't close the txn, just performs the commit operations. // Effect: Doesn't close the txn, just performs the commit operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN,
poll, poll_extra); poll, poll_extra);
} }
...@@ -206,7 +206,7 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) { ...@@ -206,7 +206,7 @@ BOOL toku_txn_requires_checkpoint(TOKUTXN txn) {
return (!txn->parent && txn->checkpoint_needed_before_commit); return (!txn->parent && txn->checkpoint_needed_before_commit);
} }
//Called during a yield (ydb lock NOT held). //TODO(yoni): inline this function manually
static void static void
log_xcommit(void *thunk) { log_xcommit(void *thunk) {
struct xcommit_info *info = thunk; struct xcommit_info *info = thunk;
...@@ -214,7 +214,7 @@ log_xcommit(void *thunk) { ...@@ -214,7 +214,7 @@ log_xcommit(void *thunk) {
info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks. info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks.
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
...@@ -245,21 +245,21 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -245,21 +245,21 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
r = info.r; r = info.r;
} }
if (r==0) { if (r==0) {
r = toku_rollback_commit(txn, yield, yieldv, oplsn); r = toku_rollback_commit(txn, oplsn);
STATUS_VALUE(TXN_COMMIT)++; STATUS_VALUE(TXN_COMMIT)++;
} }
return r; return r;
} }
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Doesn't close the txn, just performs the abort operations. // Effect: Doesn't close the txn, just performs the abort operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra); return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra);
} }
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
...@@ -271,7 +271,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, ...@@ -271,7 +271,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
txn->do_fsync = FALSE; txn->do_fsync = FALSE;
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r==0) { if (r==0) {
r = toku_rollback_abort(txn, yield, yieldv, oplsn); r = toku_rollback_abort(txn, oplsn);
STATUS_VALUE(TXN_ABORT)++; STATUS_VALUE(TXN_ABORT)++;
} }
return r; return r;
...@@ -320,11 +320,12 @@ static void do_txn_fsync_log(void *thunk) { ...@@ -320,11 +320,12 @@ static void do_txn_fsync_log(void *thunk) {
info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn); info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn);
} }
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv) { int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync) {
int r = 0; int r = 0;
if (logger && do_fsync) { if (logger && do_fsync) {
struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn }; struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn };
yield(do_txn_fsync_log, &info, yieldv); //TODO(yoni): inline do_txn_fsync_log here
do_txn_fsync_log(&info);
r = info.r; r = info.r;
} }
return r; return r;
......
...@@ -37,15 +37,15 @@ int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN ...@@ -37,15 +37,15 @@ int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
BOOL toku_txn_requires_checkpoint(TOKUTXN txn); BOOL toku_txn_requires_checkpoint(TOKUTXN txn);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unused_result)); int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unused_result));
...@@ -54,7 +54,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unu ...@@ -54,7 +54,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xid) __attribute__((warn_unu
void toku_txn_get_prepared_xa_xid (TOKUTXN, TOKU_XA_XID *); void toku_txn_get_prepared_xa_xid (TOKUTXN, TOKU_XA_XID *);
// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values. // Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values.
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv); int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync);
void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn); void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);
......
...@@ -254,7 +254,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -254,7 +254,7 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->ekeys = NULL; loader->i->ekeys = NULL;
loader->i->evals = NULL; loader->i->evals = NULL;
LSN load_lsn; LSN load_lsn;
r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader); r = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader);
if ( r!=0 ) { if ( r!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
......
...@@ -73,6 +73,7 @@ struct __toku_db_env_internal { ...@@ -73,6 +73,7 @@ struct __toku_db_env_internal {
DB *directory; // Maps dnames to inames DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade DB *persistent_environment; // Stores environment settings, can be used for upgrade
OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location) OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location)
toku_mutex_t open_dbs_lock; // lock that protects the OMT of open dbs.
char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_data_dir; // data dir used when the env is opened (relative to cwd, or absolute with leading /)
char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /) char *real_log_dir; // log dir used when the env is opened (relative to cwd, or absolute with leading /)
......
...@@ -31,7 +31,6 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r ...@@ -31,7 +31,6 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r
#include <ft/key.h> #include <ft/key.h>
#include "loader.h" #include "loader.h"
#include "indexer.h" #include "indexer.h"
#include "ydb_load.h"
#include <ft/ftloader.h> #include <ft/ftloader.h>
#include <ft/log_header.h> #include <ft/log_header.h>
#include <ft/ft.h> #include <ft/ft.h>
...@@ -396,7 +395,6 @@ static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable) ...@@ -396,7 +395,6 @@ static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
static int static int
ydb_do_recovery (DB_ENV *env) { ydb_do_recovery (DB_ENV *env) {
assert(env->i->real_log_dir); assert(env->i->real_log_dir);
toku_ydb_unlock();
int r = tokudb_recover(env, int r = tokudb_recover(env,
toku_keep_prepared_txn_callback, toku_keep_prepared_txn_callback,
keep_cachetable_callback, keep_cachetable_callback,
...@@ -405,7 +403,6 @@ ydb_do_recovery (DB_ENV *env) { ...@@ -405,7 +403,6 @@ ydb_do_recovery (DB_ENV *env) {
env->i->update_function, env->i->update_function,
env->i->generate_row_for_put, env->i->generate_row_for_del, env->i->generate_row_for_put, env->i->generate_row_for_del,
env->i->cachetable_size); env->i->cachetable_size);
toku_ydb_lock();
return r; return r;
} }
...@@ -726,9 +723,7 @@ static int ...@@ -726,9 +723,7 @@ static int
ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, BOOL * upgrade_in_progress) { ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, BOOL * upgrade_in_progress) {
int r = 0; int r = 0;
if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) { if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
toku_ydb_unlock();
r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress); r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
toku_ydb_lock();
} }
return r; return r;
} }
...@@ -949,7 +944,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -949,7 +944,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
DB_TXN *txn=NULL; DB_TXN *txn=NULL;
if (using_txns) { if (using_txns) {
r = toku_txn_begin(env, 0, &txn, 0, 1, true); r = locked_txn_begin(env, 0, &txn, 0);
assert_zero(r); assert_zero(r);
} }
...@@ -997,13 +992,11 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -997,13 +992,11 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
assert_zero(r); assert_zero(r);
} }
if (using_txns) { if (using_txns) {
r = toku_txn_commit(txn, 0, NULL, NULL, false); r = locked_txn_commit(txn, 0);
assert_zero(r); assert_zero(r);
} }
toku_ydb_unlock();
r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT); r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
assert_zero(r); assert_zero(r);
toku_ydb_lock();
env_fs_poller(env); // get the file system state at startup env_fs_poller(env); // get the file system state at startup
env_fs_init_minicron(env); env_fs_init_minicron(env);
cleanup: cleanup:
...@@ -1064,7 +1057,6 @@ toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -1064,7 +1057,6 @@ toku_env_close(DB_ENV * env, u_int32_t flags) {
} }
} }
if (env->i->cachetable) { if (env->i->cachetable) {
toku_ydb_unlock(); // ydb lock must not be held when shutting down minicron
toku_cachetable_minicron_shutdown(env->i->cachetable); toku_cachetable_minicron_shutdown(env->i->cachetable);
if (env->i->logger) { if (env->i->logger) {
r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT); r = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
...@@ -1093,7 +1085,6 @@ toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -1093,7 +1085,6 @@ toku_env_close(DB_ENV * env, u_int32_t flags) {
goto panic_and_quit_early; goto panic_and_quit_early;
} }
} }
toku_ydb_lock();
r=toku_cachetable_close(&env->i->cachetable); r=toku_cachetable_close(&env->i->cachetable);
if (r) { if (r) {
err_msg = "Cannot close environment (cachetable close error)\n"; err_msg = "Cannot close environment (cachetable close error)\n";
...@@ -1136,6 +1127,7 @@ toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -1136,6 +1127,7 @@ toku_env_close(DB_ENV * env, u_int32_t flags) {
toku_free(env->i->real_tmp_dir); toku_free(env->i->real_tmp_dir);
if (env->i->open_dbs) if (env->i->open_dbs)
toku_omt_destroy(&env->i->open_dbs); toku_omt_destroy(&env->i->open_dbs);
toku_mutex_destroy(&env->i->open_dbs_lock);
if (env->i->dir) if (env->i->dir)
toku_free(env->i->dir); toku_free(env->i->dir);
//Immediately before freeing internal environment unlock the directories //Immediately before freeing internal environment unlock the directories
...@@ -1189,21 +1181,59 @@ toku_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncac ...@@ -1189,21 +1181,59 @@ toku_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncac
static int static int
locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) { locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) {
toku_multi_operation_client_lock(); //Cannot begin checkpoint int ret, r;
toku_ydb_lock(); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
int r = toku_env_dbremove(env, txn, fname, dbname, flags);
toku_ydb_unlock(); DB_TXN *child_txn = NULL;
toku_multi_operation_client_unlock(); //Can now begin checkpoint int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
// cannot begin a checkpoint
toku_multi_operation_client_lock();
r = toku_env_dbremove(env, child_txn, fname, dbname, flags);
toku_multi_operation_client_unlock();
if (using_txns) {
if (r == 0) { // commit
ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
} else {
ret = locked_txn_abort(child_txn);
invariant_zero(ret);
}
}
return r; return r;
} }
static int static int
locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) { locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) {
toku_multi_operation_client_lock(); //Cannot begin checkpoint int ret, r;
toku_ydb_lock(); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
int r = toku_env_dbrename(env, txn, fname, dbname, newname, flags);
toku_ydb_unlock(); DB_TXN *child_txn = NULL;
toku_multi_operation_client_unlock(); //Can now begin checkpoint int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
// cannot begin a checkpoint
toku_multi_operation_client_lock();
r = toku_env_dbrename(env, child_txn, fname, dbname, newname, flags);
toku_multi_operation_client_unlock();
if (using_txns) {
if (r == 0) {
ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
} else {
ret = locked_txn_abort(child_txn);
invariant_zero(ret);
}
}
return r; return r;
} }
...@@ -1454,16 +1484,6 @@ toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), u_int32_t UU(flags)) { ...@@ -1454,16 +1484,6 @@ toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), u_int32_t UU(flags)) {
return 1; return 1;
} }
static int
locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
toku_ydb_lock(); int r = toku_env_open(env, home, flags, mode); toku_ydb_unlock(); return r;
}
static int
locked_env_close(DB_ENV * env, u_int32_t flags) {
toku_ydb_lock(); int r = toku_env_close(env, flags); toku_ydb_unlock(); return r;
}
static int static int
toku_env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { toku_env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
struct tokulogger_preplist *MALLOC_N(count,preps); struct tokulogger_preplist *MALLOC_N(count,preps);
...@@ -2391,8 +2411,6 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -2391,8 +2411,6 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(cleaner_get_period); SENV(cleaner_get_period);
SENV(cleaner_set_iterations); SENV(cleaner_set_iterations);
SENV(cleaner_get_iterations); SENV(cleaner_get_iterations);
SENV(open);
SENV(close);
SENV(txn_recover); SENV(txn_recover);
SENV(txn_xa_recover); SENV(txn_xa_recover);
SENV(get_txn_from_xid); SENV(get_txn_from_xid);
...@@ -2432,6 +2450,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -2432,6 +2450,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->update_multiple = env_update_multiple; result->update_multiple = env_update_multiple;
// unlocked methods // unlocked methods
result->open = toku_env_open;
result->close = toku_env_close;
result->txn_checkpoint = toku_env_txn_checkpoint; result->txn_checkpoint = toku_env_txn_checkpoint;
result->checkpointing_postpone = env_checkpointing_postpone; result->checkpointing_postpone = env_checkpointing_postpone;
result->checkpointing_resume = env_checkpointing_resume; result->checkpointing_resume = env_checkpointing_resume;
...@@ -2471,6 +2491,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -2471,6 +2491,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
assert(result->i->ltm); assert(result->i->ltm);
r = toku_omt_create(&result->i->open_dbs); r = toku_omt_create(&result->i->open_dbs);
toku_mutex_init(&result->i->open_dbs_lock, NULL);
assert_zero(r); assert_zero(r);
assert(result->i->open_dbs); assert(result->i->open_dbs);
...@@ -2488,9 +2509,7 @@ cleanup: ...@@ -2488,9 +2509,7 @@ cleanup:
int int
DB_ENV_CREATE_FUN (DB_ENV ** envp, u_int32_t flags) { DB_ENV_CREATE_FUN (DB_ENV ** envp, u_int32_t flags) {
toku_ydb_lock();
int r = toku_env_create(envp, flags); int r = toku_env_create(envp, flags);
toku_ydb_unlock();
return r; return r;
} }
...@@ -2523,7 +2542,8 @@ find_db_by_db (OMTVALUE v, void *dbv) { ...@@ -2523,7 +2542,8 @@ find_db_by_db (OMTVALUE v, void *dbv) {
// Tell env that there is a new db handle (with non-unique dname in db->i-dname) // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
void void
env_note_db_opened(DB_ENV *env, DB *db) { env_note_db_opened(DB_ENV *env, DB *db) {
assert(db->i->dname); // internal (non-user) dictionary has no dname toku_mutex_lock(&env->i->open_dbs_lock);
assert(db->i->dname); // internal (non-user) dictionary has no dname
int r; int r;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
...@@ -2535,14 +2555,15 @@ env_note_db_opened(DB_ENV *env, DB *db) { ...@@ -2535,14 +2555,15 @@ env_note_db_opened(DB_ENV *env, DB *db) {
assert(r==DB_NOTFOUND); //Must not already be there. assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx); r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert_zero(r); assert_zero(r);
toku_mutex_unlock(&env->i->open_dbs_lock);
} }
void
env_note_db_closed(DB_ENV *env, DB *db)
// Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals. // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
{ void
assert(db->i->dname); env_note_db_closed(DB_ENV *env, DB *db) {
assert(toku_omt_size(env->i->open_dbs)); toku_mutex_lock(&env->i->open_dbs_lock);
assert(db->i->dname); // internal (non-user) dictionary has no dname
assert(toku_omt_size(env->i->open_dbs) > 0);
int r; int r;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
...@@ -2553,6 +2574,7 @@ env_note_db_closed(DB_ENV *env, DB *db) ...@@ -2553,6 +2574,7 @@ env_note_db_closed(DB_ENV *env, DB *db)
r = toku_omt_delete_at(env->i->open_dbs, idx); r = toku_omt_delete_at(env->i->open_dbs, idx);
STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs); STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs);
assert_zero(r); assert_zero(r);
toku_mutex_unlock(&env->i->open_dbs_lock);
} }
static int static int
...@@ -2572,6 +2594,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -2572,6 +2594,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
BOOL rval; BOOL rval;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
toku_mutex_lock(&env->i->open_dbs_lock);
r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx); r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) { if (r==0) {
DB *db = dbv; DB *db = dbv;
...@@ -2582,6 +2605,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { ...@@ -2582,6 +2605,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
rval = FALSE; rval = FALSE;
} }
toku_mutex_unlock(&env->i->open_dbs_lock);
return rval; return rval;
} }
...@@ -2618,8 +2642,7 @@ finalize_file_removal(DICTIONARY_ID dict_id, void * extra) { ...@@ -2618,8 +2642,7 @@ finalize_file_removal(DICTIONARY_ID dict_id, void * extra) {
// returns: true if we could open, lock, and close a dictionary // returns: true if we could open, lock, and close a dictionary
// with the given dname, false otherwise. // with the given dname, false otherwise.
static bool static bool
can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
{
int r; int r;
bool got_lock = false; bool got_lock = false;
DB *db; DB *db;
...@@ -2627,9 +2650,6 @@ can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) ...@@ -2627,9 +2650,6 @@ can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env)
r = toku_db_create(&db, env, 0); r = toku_db_create(&db, env, 0);
assert_zero(r); assert_zero(r);
r = db_open_iname(db, txn, iname_in_env, 0, 0); r = db_open_iname(db, txn, iname_in_env, 0, 0);
if (r != 0) {
printf("%s had db_open_iname return %d\n", __FUNCTION__, r);
}
assert_zero(r); assert_zero(r);
r = toku_db_pre_acquire_table_lock(db, txn); r = toku_db_pre_acquire_table_lock(db, txn);
if (r == 0) { if (r == 0) {
...@@ -2647,43 +2667,37 @@ int ...@@ -2647,43 +2667,37 @@ int
toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) { toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u_int32_t flags) {
int r; int r;
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); if (!env_opened(env) || flags != 0) {
if (!env_opened(env)) return EINVAL; return EINVAL;
if (dbname!=NULL) }
if (dbname != NULL) {
// env_dbremove_subdb() converts (fname, dbname) to dname
return env_dbremove_subdb(env, txn, fname, dbname, flags); return env_dbremove_subdb(env, txn, fname, dbname, flags);
// env_dbremove_subdb() converts (fname, dbname) to dname }
const char * dname = fname; const char * dname = fname;
assert(dbname == NULL); assert(dbname == NULL);
if (flags!=0) return EINVAL;
// We check for an open db here as a "fast path" to error. // We check for an open db here as a "fast path" to error.
// We'll need to check again below to be sure. // We'll need to check again below to be sure.
if (env_is_db_with_dname_open(env, dname)) if (env_is_db_with_dname_open(env, dname)) {
return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n"); return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
}
DBT dname_dbt; DBT dname_dbt;
DBT iname_dbt; DBT iname_dbt;
toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1); toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL
int using_txns = env->i->open_flags & DB_INIT_TXN;
DB_TXN *child = NULL;
// begin child (unless transactionless)
if (using_txns) {
r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert_zero(r);
}
// get iname // get iname
r = toku_db_get(env->i->directory, child, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
char *iname = iname_dbt.data; char *iname = iname_dbt.data;
DB *db = NULL; DB *db = NULL;
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
r = ENOENT; r = ENOENT;
} else if (r == 0) { } else if (r == 0) {
// remove (dname,iname) from directory // remove (dname,iname) from directory
r = toku_db_del(env->i->directory, child, &dname_dbt, DB_DELETE_ANY, TRUE); r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, TRUE);
if (r != 0) { if (r != 0) {
goto exit; goto exit;
} }
...@@ -2691,7 +2705,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2691,7 +2705,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
assert_zero(r); assert_zero(r);
r = db_open_iname(db, txn, iname, 0, 0); r = db_open_iname(db, txn, iname, 0, 0);
assert_zero(r); assert_zero(r);
if (using_txns) { if (txn) {
// Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions) // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
if (env_is_db_with_dname_open(env, dname)) { if (env_is_db_with_dname_open(env, dname)) {
r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n"); r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
...@@ -2707,11 +2721,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2707,11 +2721,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
// otherwise, we're okay in marking this ft as remove on // otherwise, we're okay in marking this ft as remove on
// commit. no new handles can open for this dictionary // commit. no new handles can open for this dictionary
// because the txn has directory write locks on the dname // because the txn has directory write locks on the dname
if (toku_db_pre_acquire_table_lock(db, child) != 0) { if (toku_db_pre_acquire_table_lock(db, txn) != 0) {
r = DB_LOCK_NOTGRANTED; r = DB_LOCK_NOTGRANTED;
} else { } else {
// The ft will be removed when the txn commits // The ft will be removed when the txn commits
r = toku_ft_remove_on_commit(db->i->ft_handle, db_txn_struct_i(child)->tokutxn); r = toku_ft_remove_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
assert_zero(r); assert_zero(r);
} }
} }
...@@ -2726,17 +2740,6 @@ exit: ...@@ -2726,17 +2740,6 @@ exit:
int ret = toku_db_close(db); int ret = toku_db_close(db);
assert(ret == 0); assert(ret == 0);
} }
if (using_txns) {
// close txn
if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic
}
else { // abort
int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic
}
}
if (iname) { if (iname) {
toku_free(iname); toku_free(iname);
} }
...@@ -2769,22 +2772,25 @@ int ...@@ -2769,22 +2772,25 @@ int
toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) { toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, u_int32_t flags) {
int r; int r;
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); if (!env_opened(env) || flags != 0) {
if (!env_opened(env)) return EINVAL; return EINVAL;
if (dbname!=NULL) }
if (dbname != NULL) {
// env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
return env_dbrename_subdb(env, txn, fname, dbname, newname, flags); return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
// env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname }
const char * dname = fname; const char * dname = fname;
assert(dbname == NULL); assert(dbname == NULL);
if (flags != 0) return EINVAL;
// We check for open dnames for the old and new name as a "fast path" to error. // We check for open dnames for the old and new name as a "fast path" to error.
// We will need to check these again later. // We will need to check these again later.
if (env_is_db_with_dname_open(env, dname)) if (env_is_db_with_dname_open(env, dname)) {
return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n"); return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
if (env_is_db_with_dname_open(env, newname)) }
if (env_is_db_with_dname_open(env, newname)) {
return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n"); return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
}
DBT old_dname_dbt; DBT old_dname_dbt;
DBT new_dname_dbt; DBT new_dname_dbt;
...@@ -2793,30 +2799,22 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2793,30 +2799,22 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1); toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL
int using_txns = env->i->open_flags & DB_INIT_TXN;
DB_TXN *child = NULL;
// begin child (unless transactionless)
if (using_txns) {
r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert_zero(r);
}
// get iname // get iname
r = toku_db_get(env->i->directory, child, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
char *iname = iname_dbt.data; char *iname = iname_dbt.data;
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
r = ENOENT; r = ENOENT;
} else if (r == 0) { } else if (r == 0) {
// verify that newname does not already exist // verify that newname does not already exist
r = db_getf_set(env->i->directory, child, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL); r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
if (r == 0) { if (r == 0) {
r = EEXIST; r = EEXIST;
} }
else if (r == DB_NOTFOUND) { else if (r == DB_NOTFOUND) {
// remove old (dname,iname) and insert (newname,iname) in directory // remove old (dname,iname) and insert (newname,iname) in directory
r = toku_db_del(env->i->directory, child, &old_dname_dbt, DB_DELETE_ANY, TRUE); r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, TRUE);
if (r != 0) { goto exit; } if (r != 0) { goto exit; }
r = toku_db_put(env->i->directory, child, &new_dname_dbt, &iname_dbt, 0, TRUE); r = toku_db_put(env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, TRUE);
if (r != 0) { goto exit; } if (r != 0) { goto exit; }
//Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions) //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
...@@ -2839,7 +2837,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2839,7 +2837,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
// otherwise, we're okay in marking this ft as remove on // otherwise, we're okay in marking this ft as remove on
// commit. no new handles can open for this dictionary // commit. no new handles can open for this dictionary
// because the txn has directory write locks on the dname // because the txn has directory write locks on the dname
if (!can_acquire_table_lock(env, child, iname)) { if (txn && !can_acquire_table_lock(env, txn, iname)) {
r = DB_LOCK_NOTGRANTED; r = DB_LOCK_NOTGRANTED;
} }
// We don't do anything at the ft or cachetable layer for rename. // We don't do anything at the ft or cachetable layer for rename.
...@@ -2848,17 +2846,6 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2848,17 +2846,6 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
} }
exit: exit:
if (using_txns) {
// close txn
if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic
}
else { // abort
int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic
}
}
if (iname) { if (iname) {
toku_free(iname); toku_free(iname);
} }
...@@ -2867,9 +2854,7 @@ exit: ...@@ -2867,9 +2854,7 @@ exit:
int int
DB_CREATE_FUN (DB ** db, DB_ENV * env, u_int32_t flags) { DB_CREATE_FUN (DB ** db, DB_ENV * env, u_int32_t flags) {
toku_ydb_lock();
int r = toku_db_create(db, env, flags); int r = toku_db_create(db, env, flags);
toku_ydb_unlock();
return r; return r;
} }
......
...@@ -143,7 +143,6 @@ toku_db_close(DB * db) { ...@@ -143,7 +143,6 @@ toku_db_close(DB * db) {
return r; return r;
} }
/////////// ///////////
//db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor //db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor
...@@ -229,18 +228,17 @@ db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTY ...@@ -229,18 +228,17 @@ db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTY
static int static int
toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn); if (dbname != NULL) {
if (dbname!=NULL)
return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode); return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
}
// at this point fname is the dname // at this point fname is the dname
//This code ONLY supports single-db files. //This code ONLY supports single-db files.
assert(dbname==NULL); assert(dbname == NULL);
const char * dname = fname; // db_open_subdb() converts (fname, dbname) to dname const char * dname = fname; // db_open_subdb() converts (fname, dbname) to dname
////////////////////////////// do some level of parameter checking. ////////////////////////////// do some level of parameter checking.
u_int32_t unused_flags = flags; u_int32_t unused_flags = flags;
int using_txns = db->dbenv->i->open_flags & DB_INIT_TXN;
int r; int r;
if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL; if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
int is_db_excl = flags & DB_EXCL; unused_flags&=~DB_EXCL; int is_db_excl = flags & DB_EXCL; unused_flags&=~DB_EXCL;
...@@ -252,7 +250,6 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -252,7 +250,6 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
unused_flags&=~DB_READ_COMMITTED; unused_flags&=~DB_READ_COMMITTED;
unused_flags&=~DB_SERIALIZABLE; unused_flags&=~DB_SERIALIZABLE;
if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags
if (is_db_excl && !is_db_create) return EINVAL; if (is_db_excl && !is_db_create) return EINVAL;
if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL; if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
...@@ -262,16 +259,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -262,16 +259,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
if (r != 0) if (r != 0)
return r; return r;
if (db_opened(db)) if (db_opened(db)) {
return EINVAL; /* It was already open. */ // it was already open
////////////////////////////// return EINVAL;
DB_TXN *child = NULL;
// begin child (unless transactionless)
if (using_txns) {
r = toku_txn_begin(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(r==0);
} }
//////////////////////////////
// convert dname to iname // convert dname to iname
// - look up dname, get iname // - look up dname, get iname
...@@ -280,21 +272,20 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -280,21 +272,20 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
DBT iname_dbt; // holds iname_in_env DBT iname_dbt; // holds iname_in_env
toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1); toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL init_dbt_realloc(&iname_dbt); // sets iname_dbt.data = NULL
r = toku_db_get(db->dbenv->i->directory, child, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname r = toku_db_get(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
char *iname = iname_dbt.data; char *iname = iname_dbt.data;
if (r==DB_NOTFOUND && !is_db_create) if (r == DB_NOTFOUND && !is_db_create) {
r = ENOENT; r = ENOENT;
else if (r==0 && is_db_excl) { } else if (r==0 && is_db_excl) {
r = EEXIST; r = EEXIST;
} } else if (r == DB_NOTFOUND) {
else if (r==DB_NOTFOUND) {
char hint[strlen(dname) + 1]; char hint[strlen(dname) + 1];
// create iname and make entry in directory // create iname and make entry in directory
u_int64_t id = 0; u_int64_t id = 0;
if (using_txns) { if (txn) {
id = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn); id = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
} }
create_iname_hint(dname, hint); create_iname_hint(dname, hint);
iname = create_iname(db->dbenv, id, hint, NULL, -1); // allocated memory for iname iname = create_iname(db->dbenv, id, hint, NULL, -1); // allocated memory for iname
...@@ -305,33 +296,21 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -305,33 +296,21 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
// directory read lock is grabbed in toku_db_get above // directory read lock is grabbed in toku_db_get above
// //
u_int32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0); u_int32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0);
r = toku_db_put(db->dbenv->i->directory, child, &dname_dbt, &iname_dbt, put_flags, TRUE); r = toku_db_put(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, put_flags, TRUE);
} }
// we now have an iname // we now have an iname
if (r == 0) { if (r == 0) {
r = db_open_iname(db, child, iname, flags, mode); r = db_open_iname(db, txn, iname, flags, mode);
if (r==0) { if (r == 0) {
db->i->dname = toku_xstrdup(dname); db->i->dname = toku_xstrdup(dname);
env_note_db_opened(db->dbenv, db); // tell env that a new db handle is open (using dname) env_note_db_opened(db->dbenv, db); // tell env that a new db handle is open (using dname)
} }
} }
// free string holding iname if (iname) {
if (iname) toku_free(iname); toku_free(iname);
if (using_txns) {
// close txn
if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic
}
else { // abort
int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic
}
} }
return r; return r;
} }
...@@ -621,9 +600,10 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { ...@@ -621,9 +600,10 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
static int static int
locked_db_close(DB * db, u_int32_t UU(flags)) { locked_db_close(DB * db, u_int32_t UU(flags)) {
toku_ydb_lock(); // cannot begin a checkpoint
toku_multi_operation_client_lock();
int r = toku_db_close(db); int r = toku_db_close(db);
toku_ydb_unlock(); toku_multi_operation_client_unlock();
return r; return r;
} }
...@@ -649,11 +629,31 @@ autotxn_db_getf_set (DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBAC ...@@ -649,11 +629,31 @@ autotxn_db_getf_set (DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBAC
static int static int
locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
toku_multi_operation_client_lock(); //Cannot begin checkpoint int ret, r;
toku_ydb_lock(); HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
int r = toku_db_open(db, txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
toku_ydb_unlock(); DB_ENV *env = db->dbenv;
toku_multi_operation_client_unlock(); //Can now begin checkpoint DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
// cannot begin a checkpoint
toku_multi_operation_client_lock();
r = toku_db_open(db, child_txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
toku_multi_operation_client_unlock();
if (using_txns) {
if (r == 0) {
ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
} else {
ret = locked_txn_abort(child_txn);
invariant_zero(ret);
}
}
return r; return r;
} }
...@@ -871,11 +871,7 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -871,11 +871,7 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
return 0; return 0;
} }
// When the loader is created, it makes this call (toku_env_load_inames).
/* Following functions (ydb_load_xxx()) are used by loader:
*/
// When the loader is created, it makes this call.
// For each dictionary to be loaded, replace old iname in directory // For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname. This will also take a write lock // with a newly generated iname. This will also take a write lock
// on the directory entries. The write lock will be released when // on the directory entries. The write lock will be released when
...@@ -887,33 +883,29 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -887,33 +883,29 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
// If "mark_as_loader" is true, then include a mark in the iname // If "mark_as_loader" is true, then include a mark in the iname
// to indicate that the file is created by the brt loader. // to indicate that the file is created by the brt loader.
// Return 0 on success (could fail if write lock not available). // Return 0 on success (could fail if write lock not available).
int static int
ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn, BOOL mark_as_loader) { load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn, BOOL mark_as_loader) {
int rval; int rval = 0;
int i; int i;
int using_txns = env->i->open_flags & DB_INIT_TXN;
DB_TXN * child = NULL;
TXNID xid = 0; TXNID xid = 0;
DBT dname_dbt; // holds dname DBT dname_dbt; // holds dname
DBT iname_dbt; // holds new iname DBT iname_dbt; // holds new iname
char * mark; char * mark;
if (mark_as_loader) if (mark_as_loader) {
mark = "B"; mark = "B";
else } else {
mark = "P"; mark = "P";
}
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
new_inames_in_env[i] = NULL; new_inames_in_env[i] = NULL;
} }
// begin child (unless transactionless) if (txn) {
if (using_txns) { xid = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
rval = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(rval == 0);
xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn);
} }
for (i = 0; i < N; i++) { for (i = 0; i < N; i++) {
char * dname = dbs[i]->i->dname; char * dname = dbs[i]->i->dname;
...@@ -924,12 +916,12 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -924,12 +916,12 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
char * new_iname = create_iname(env, xid, hint, mark, i); // allocates memory for iname_in_env char * new_iname = create_iname(env, xid, hint, mark, i); // allocates memory for iname_in_env
new_inames_in_env[i] = new_iname; new_inames_in_env[i] = new_iname;
toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1); // iname_in_env goes in directory toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1); // iname_in_env goes in directory
rval = toku_db_put(env->i->directory, child, &dname_dbt, &iname_dbt, 0, TRUE); rval = toku_db_put(env->i->directory, txn, &dname_dbt, &iname_dbt, 0, TRUE);
if (rval) break; if (rval) break;
} }
// Generate load log entries. // Generate load log entries.
if (!rval && using_txns) { if (!rval && txn) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
int do_fsync = 0; int do_fsync = 0;
LSN *get_lsn = NULL; LSN *get_lsn = NULL;
...@@ -944,17 +936,33 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -944,17 +936,33 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
if (rval) break; if (rval) break;
} }
} }
return rval;
}
int
locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N], LSN *load_lsn, BOOL mark_as_loader) {
int ret, r;
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
if (using_txns) {
ret = locked_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
invariant_zero(ret);
}
// cannot begin a checkpoint
toku_multi_operation_client_lock();
r = load_inames(env, child_txn, N, dbs, new_inames_in_env, load_lsn, mark_as_loader);
toku_multi_operation_client_unlock();
if (using_txns) { if (using_txns) {
// close txn if (r == 0) {
if (rval == 0) { // all well so far, commit child ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
rval = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false); invariant_zero(ret);
assert(rval==0); } else {
} ret = locked_txn_abort(child_txn);
else { // abort child invariant_zero(ret);
int r2 = toku_txn_abort(child, NULL, NULL, false); for (int i = 0; i < N; i++) {
assert(r2==0);
for (i=0; i<N; i++) {
if (new_inames_in_env[i]) { if (new_inames_in_env[i]) {
toku_free(new_inames_in_env[i]); toku_free(new_inames_in_env[i]);
new_inames_in_env[i] = NULL; new_inames_in_env[i] = NULL;
...@@ -962,10 +970,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -962,10 +970,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
} }
} }
} }
return r;
return rval;
} }
#undef STATUS_VALUE #undef STATUS_VALUE
#include <valgrind/helgrind.h> #include <valgrind/helgrind.h>
......
...@@ -8,8 +8,6 @@ ...@@ -8,8 +8,6 @@
/* ydb functions used by loader /* ydb functions used by loader
*/ */
// When the loader is created, it makes this call. // When the loader is created, it makes this call.
// For each dictionary to be loaded, replace old iname in directory // For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname. This will also take a write lock // with a newly generated iname. This will also take a write lock
...@@ -22,13 +20,12 @@ ...@@ -22,13 +20,12 @@
// If "mark_as_loader" is true, then include a mark in the iname // If "mark_as_loader" is true, then include a mark in the iname
// to indicate that the file is created by the brt loader. // to indicate that the file is created by the brt loader.
// Return 0 on success (could fail if write lock not available). // Return 0 on success (could fail if write lock not available).
int ydb_load_inames(DB_ENV * env, int locked_load_inames(DB_ENV * env,
DB_TXN * txn, DB_TXN * txn,
int N, int N,
DB * dbs[/*N*/], DB * dbs[N],
/*out*/ char * new_inames_in_env[N], char * new_inames_in_env[N], /* out */
LSN *load_lsn, LSN *load_lsn,
BOOL mark_as_loader); BOOL mark_as_loader);
#endif #endif
...@@ -13,6 +13,11 @@ ...@@ -13,6 +13,11 @@
#include <valgrind/helgrind.h> #include <valgrind/helgrind.h>
#include "ft/txn_manager.h" #include "ft/txn_manager.h"
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll,
void *poll_extra, bool release_multi_operation_client_lock);
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
static int static int
toku_txn_release_locks(DB_TXN* txn) { toku_txn_release_locks(DB_TXN* txn) {
assert(txn); assert(txn);
...@@ -40,16 +45,6 @@ toku_txn_release_locks(DB_TXN* txn) { ...@@ -40,16 +45,6 @@ toku_txn_release_locks(DB_TXN* txn) {
return r; return r;
} }
// Yield the lock so someone else can work, and then reacquire the lock.
// Useful while processing commit or rollback logs, to allow others to access the system.
static void
ydb_yield (voidfp f, void *fv, void *UU(v)) {
toku_ydb_unlock();
if (f)
f(fv);
toku_ydb_lock();
}
static void static void
toku_txn_destroy(DB_TXN *txn) { toku_txn_destroy(DB_TXN *txn) {
(void) __sync_fetch_and_sub(&txn->mgrp->i->open_txns, 1); (void) __sync_fetch_and_sub(&txn->mgrp->i->open_txns, 1);
...@@ -64,9 +59,8 @@ toku_txn_destroy(DB_TXN *txn) { ...@@ -64,9 +59,8 @@ toku_txn_destroy(DB_TXN *txn) {
static int static int
toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock) { bool release_multi_operation_client_lock) {
if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children //Recursively kill off children
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
...@@ -96,12 +90,10 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -96,12 +90,10 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
int r; int r;
if (flags!=0) { if (flags!=0) {
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, poll, poll_extra);
r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra);
} else { } else {
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync,
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra); poll, poll_extra);
} }
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
...@@ -139,8 +131,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -139,8 +131,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
// modified by both this transaction and B, it'll read B's value, even // modified by both this transaction and B, it'll read B's value, even
// though it cannot read this transaction's value, which comes below // though it cannot read this transaction's value, which comes below
// B's value on the leafentry's stack. This behavior is incorrect. // B's value on the leafentry's stack. This behavior is incorrect.
// All of this happens while the ydb lock is yielded. This causes a failure // This causes a failure in the test_stress tests.
// in the test_stress tests.
// //
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn); toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
toku_txn_complete_txn(ttxn); toku_txn_complete_txn(ttxn);
...@@ -154,7 +145,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -154,7 +145,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
if (release_multi_operation_client_lock) { if (release_multi_operation_client_lock) {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync);
//Promote list to parent (dbs that must close before abort) //Promote list to parent (dbs that must close before abort)
if (txn->parent) { if (txn->parent) {
...@@ -175,9 +166,9 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -175,9 +166,9 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
return r; return r;
} }
int static int
toku_txn_commit(DB_TXN * txn, u_int32_t flags, toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock) { bool release_multi_operation_client_lock) {
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock); int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock);
toku_txn_destroy(txn); toku_txn_destroy(txn);
...@@ -194,8 +185,7 @@ toku_txn_id(DB_TXN * txn) { ...@@ -194,8 +185,7 @@ toku_txn_id(DB_TXN * txn) {
static int static int
toku_txn_abort_only(DB_TXN * txn, toku_txn_abort_only(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
bool release_multi_operation_client_lock) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
...@@ -217,7 +207,7 @@ toku_txn_abort_only(DB_TXN * txn, ...@@ -217,7 +207,7 @@ toku_txn_abort_only(DB_TXN * txn,
//All dbs that must close before abort, must now be closed //All dbs that must close before abort, must now be closed
assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort)); assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort));
int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra); int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, poll, poll_extra);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r, "Error during abort.\n"); env_panic(txn->mgrp, r, "Error during abort.\n");
} }
...@@ -225,13 +215,6 @@ toku_txn_abort_only(DB_TXN * txn, ...@@ -225,13 +215,6 @@ toku_txn_abort_only(DB_TXN * txn,
assert_zero(r); assert_zero(r);
r = toku_txn_release_locks(txn); r = toku_txn_release_locks(txn);
toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn); toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn);
// this lock must be released after toku_txn_complete_txn because
// this lock must be held until the references to the open FTs is released
// begin checkpoint logs these associations, so we must be protect
// the changing of these associations with checkpointing
if (release_multi_operation_client_lock) {
toku_multi_operation_client_unlock();
}
return r; return r;
} }
...@@ -257,7 +240,7 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { ...@@ -257,7 +240,7 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
LSN do_fsync_lsn; LSN do_fsync_lsn;
bool do_fsync; bool do_fsync;
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn); toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync);
return r; return r;
} }
...@@ -272,28 +255,26 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { ...@@ -272,28 +255,26 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
return toku_txn_xa_prepare(txn, &xid); return toku_txn_xa_prepare(txn, &xid);
} }
int static int
toku_txn_abort(DB_TXN * txn, toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
bool release_multi_operation_client_lock) { int r = toku_txn_abort_only(txn, poll, poll_extra);
int r = toku_txn_abort_only(txn, poll, poll_extra, release_multi_operation_client_lock);
toku_txn_destroy(txn); toku_txn_destroy(txn);
return r; return r;
} }
// Create a new transaction. // Create a new transaction.
// Called without holding the ydb lock.
int int
locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
int r = toku_txn_begin(env, stxn, txn, flags, 0, false); toku_multi_operation_client_lock();
int r = toku_txn_begin(env, stxn, txn, flags);
toku_multi_operation_client_unlock();
return r; return r;
} }
static u_int32_t static u_int32_t
locked_txn_id(DB_TXN *txn) { locked_txn_id(DB_TXN *txn) {
toku_ydb_lock();
u_int32_t r = toku_txn_id(txn); u_int32_t r = toku_txn_id(txn);
toku_ydb_unlock();
return r; return r;
} }
...@@ -305,9 +286,7 @@ toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { ...@@ -305,9 +286,7 @@ toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
static int static int
locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
toku_ydb_lock();
int r = toku_txn_txn_stat(txn, txn_stat); int r = toku_txn_txn_stat(txn, txn_stat);
toku_ydb_unlock();
return r; return r;
} }
...@@ -318,22 +297,21 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags, ...@@ -318,22 +297,21 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
if (toku_txn_requires_checkpoint(ttxn)) { if (toku_txn_requires_checkpoint(ttxn)) {
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT); toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
} }
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit. // cannot begin a checkpoint. the multi operation lock is taken here,
toku_ydb_lock(); // but released in toku_txn_commit_only. this way, we don't hold it
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock // while we fsync the log.
toku_ydb_unlock(); toku_multi_operation_client_lock();
toku_txn_destroy(txn); int r = toku_txn_commit(txn, flags, poll, poll_extra, true);
return r; return r;
} }
static int static int
locked_txn_abort_with_progress(DB_TXN *txn, locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
toku_multi_operation_client_lock(); //Cannot checkpoint during an abort. // cannot begin a checkpoint
toku_ydb_lock(); toku_multi_operation_client_lock();
int r = toku_txn_abort_only(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock int r = toku_txn_abort(txn, poll, poll_extra);
toku_ydb_unlock(); toku_multi_operation_client_unlock();
toku_txn_destroy(txn);
return r; return r;
} }
...@@ -351,16 +329,22 @@ locked_txn_abort(DB_TXN *txn) { ...@@ -351,16 +329,22 @@ locked_txn_abort(DB_TXN *txn) {
static int static int
locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
toku_ydb_lock(); int r = toku_txn_prepare (txn, gid); toku_ydb_unlock(); return r; toku_multi_operation_client_lock();
int r = toku_txn_prepare(txn, gid);
toku_multi_operation_client_unlock();
return r;
} }
static int static int
locked_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { locked_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) {
toku_ydb_lock(); int r = toku_txn_xa_prepare (txn, xid); toku_ydb_unlock(); return r; toku_multi_operation_client_lock();
int r = toku_txn_xa_prepare(txn, xid);
toku_multi_operation_client_unlock();
return r;
} }
int static int
toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) { toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists.
if (!toku_logger_is_open(env->i->logger)) if (!toku_logger_is_open(env->i->logger))
...@@ -388,7 +372,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool ...@@ -388,7 +372,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool
} }
flags &= ~iso_flags; flags &= ~iso_flags;
if (internal && stxn) { if (stxn) {
child_isolation = db_txn_struct_i(stxn)->iso; child_isolation = db_txn_struct_i(stxn)->iso;
} }
else { else {
...@@ -504,18 +488,14 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool ...@@ -504,18 +488,14 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool
snapshot_type, snapshot_type,
result result
); );
if (r != 0) if (r != 0) {
toku_free(result);
return r; return r;
if (!holds_ydb_lock) {
toku_ydb_lock();
} }
toku_txn_manager_start_txn( toku_txn_manager_start_txn(
toku_logger_get_txn_manager(env->i->logger), toku_logger_get_txn_manager(env->i->logger),
db_txn_struct_i(result)->tokutxn db_txn_struct_i(result)->tokutxn
); );
if (!holds_ydb_lock) {
toku_ydb_unlock();
}
//Add to the list of children for the parent. //Add to the list of children for the parent.
if (result->parent) { if (result->parent) {
......
...@@ -9,13 +9,17 @@ ...@@ -9,13 +9,17 @@
extern "C" { extern "C" {
#endif #endif
// begin, commit, and abort use the multi operation lock
// internally to synchronize with begin checkpoint. callers
// should not hold the multi operation lock.
int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock);
int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int locked_txn_commit(DB_TXN *txn, u_int32_t flags); int locked_txn_commit(DB_TXN *txn, u_int32_t flags);
int locked_txn_abort(DB_TXN *txn); int locked_txn_abort(DB_TXN *txn);
void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn);
void toku_keep_prepared_txn_callback(DB_ENV *env, TOKUTXN tokutxn);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment