Commit 64da224f authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4455 add txn create and start to the tokutxn API. this allows a txn to be...

#4455 add txn create and start to the tokutxn API.  this allows a txn to be created without holding any locks refs[t:4455]

git-svn-id: file:///svn/toku/tokudb@40438 c7de825b-a66e-492c-adef-691d508d4ae1
parent b78ea157
...@@ -474,7 +474,7 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger ...@@ -474,7 +474,7 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger
assert(r == 0); assert(r == 0);
assert(txn==NULL); assert(txn==NULL);
} }
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE); r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL);
assert(r == 0); assert(r == 0);
if (txnp) *txnp = txn; if (txnp) *txnp = txn;
return 0; return 0;
......
...@@ -24,7 +24,7 @@ test_main(int argc, const char *argv[]) { ...@@ -24,7 +24,7 @@ test_main(int argc, const char *argv[]) {
int r; int r;
r = toku_txn_ignore_init(txn); CKERR(r); toku_txn_ignore_init(txn);
FILENUM f1 = {1}; FILENUM f1 = {1};
FILENUM f2 = {2}; FILENUM f2 = {2};
......
...@@ -58,7 +58,6 @@ toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) { ...@@ -58,7 +58,6 @@ toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) {
*s = txn_status; *s = txn_status;
} }
int int
toku_txn_begin_txn ( toku_txn_begin_txn (
DB_TXN *container_db_txn, DB_TXN *container_db_txn,
...@@ -68,17 +67,23 @@ toku_txn_begin_txn ( ...@@ -68,17 +67,23 @@ toku_txn_begin_txn (
TXN_SNAPSHOT_TYPE snapshot_type TXN_SNAPSHOT_TYPE snapshot_type
) )
{ {
int r; int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn);
r = toku_txn_begin_with_xid(parent_tokutxn, return r;
tokutxn, logger, }
0,
snapshot_type int
); toku_txn_begin_with_xid (
if (r == 0) { TOKUTXN parent_tokutxn,
// container_db_txn set here, not in helper function toku_txn_begin_with_xid() TOKUTXN *tokutxn,
// because helper function is used by recovery, which does not have DB_TXN TOKULOGGER logger,
(*tokutxn)->container_db_txn = container_db_txn; // internal struct points to container TXNID xid,
} TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
)
{
int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
if (r == 0)
r = toku_txn_start_txn(*tokutxn);
return r; return r;
} }
...@@ -96,7 +101,6 @@ fill_xids (OMTVALUE xev, u_int32_t idx, void *varray) { ...@@ -96,7 +101,6 @@ fill_xids (OMTVALUE xev, u_int32_t idx, void *varray) {
return 0; return 0;
} }
// Create list of root transactions that were live when this txn began. // Create list of root transactions that were live when this txn began.
static int static int
setup_live_root_txn_list(TOKUTXN txn) { setup_live_root_txn_list(TOKUTXN txn) {
...@@ -130,7 +134,6 @@ snapshot_txnids_note_txn(TOKUTXN txn) { ...@@ -130,7 +134,6 @@ snapshot_txnids_note_txn(TOKUTXN txn) {
return r; return r;
} }
// If live txn is not in reverse live list, then add it. // If live txn is not in reverse live list, then add it.
// If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started. // If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started.
static int static int
...@@ -163,7 +166,6 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v ...@@ -163,7 +166,6 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v
return r; return r;
} }
// Maintain the reverse live list. The reverse live list is a list of xid pairs. The first xid in the pair // Maintain the reverse live list. The reverse live list is a list of xid pairs. The first xid in the pair
// is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to // is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to
// have that first xid in its live list. (The first xid may be closed, it only needed to be live when the // have that first xid in its live list. (The first xid may be closed, it only needed to be live when the
...@@ -180,12 +182,14 @@ live_list_reverse_note_txn_start(TOKUTXN txn) { ...@@ -180,12 +182,14 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
return r; return r;
} }
int toku_txn_begin_with_xid ( int
TOKUTXN parent_tokutxn, toku_txn_create_txn (
TOKUTXN *tokutxn, TOKUTXN *tokutxn,
TOKUTXN parent_tokutxn,
TOKULOGGER logger, TOKULOGGER logger,
TXNID xid, TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
) )
{ {
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
...@@ -194,26 +198,13 @@ int toku_txn_begin_with_xid ( ...@@ -194,26 +198,13 @@ int toku_txn_begin_with_xid (
} }
assert(logger->rollback_cachefile); assert(logger->rollback_cachefile);
TOKUTXN MALLOC(result); TOKUTXN MALLOC(result);
if (result==0) if (result == 0)
return errno; return errno;
int r;
LSN first_lsn;
result->starttime = time(NULL); // getting timestamp in seconds is a cheap call result->starttime = time(NULL); // getting timestamp in seconds is a cheap call
if (xid == 0) { int r;
r = toku_log_xbegin(logger, &first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died;
} else
first_lsn.lsn = xid;
r = toku_omt_create(&result->open_brts); r = toku_omt_create(&result->open_brts);
if (r!=0) goto died; if (r!=0) goto died;
result->txnid64 = first_lsn.lsn;
XIDS parent_xids;
if (parent_tokutxn==NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died;
result->logger = logger; result->logger = logger;
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->num_rollentries = 0; result->num_rollentries = 0;
...@@ -230,19 +221,69 @@ int toku_txn_begin_with_xid ( ...@@ -230,19 +221,69 @@ int toku_txn_begin_with_xid (
result->pinned_inprogress_rollback_log = NULL; result->pinned_inprogress_rollback_log = NULL;
result->snapshot_type = snapshot_type; result->snapshot_type = snapshot_type;
result->snapshot_txnid64 = TXNID_NONE; result->snapshot_txnid64 = TXNID_NONE;
result->container_db_txn = container_db_txn;
result->rollentry_raw_count = 0;
result->force_fsync_on_commit = FALSE;
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
result->state = TOKUTXN_LIVE;
result->do_fsync = FALSE;
toku_txn_ignore_init(result); // 2954
result->txnid64 = xid;
result->xids = NULL;
*tokutxn = result;
STATUS_VALUE(TXN_BEGIN)++;
STATUS_VALUE(TXN_NUM_OPEN)++;
if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
if (garbage_collection_debug) {
verify_snapshot_system(logger);
}
return 0;
died:
// TODO memory leak
toku_logger_panic(logger, r);
return r;
}
int
toku_txn_start_txn(TOKUTXN txn) {
TOKULOGGER logger = txn->logger;
TOKUTXN parent = txn->parent;
int r;
if (txn->txnid64 == TXNID_NONE) {
LSN first_lsn;
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
if (r!=0) goto died;
txn->txnid64 = first_lsn.lsn;
}
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
if ((r = xids_create_child(parent_xids, &txn->xids, txn->txnid64)))
goto died;
if (toku_omt_size(logger->live_txns) == 0) { if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == TXNID_NONE_LIVING); assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
logger->oldest_living_xid = result->txnid64; logger->oldest_living_xid = txn->txnid64;
logger->oldest_living_starttime = result->starttime; logger->oldest_living_starttime = txn->starttime;
} }
assert(logger->oldest_living_xid <= result->txnid64); assert(logger->oldest_living_xid <= txn->txnid64);
r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r); r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
{ {
//Add txn to list (omt) of live transactions //Add txn to list (omt) of live transactions
//We know it is the newest one. //We know it is the newest one.
r = toku_omt_insert_at(logger->live_txns, result, toku_omt_size(logger->live_txns)); r = toku_omt_insert_at(logger->live_txns, txn, toku_omt_size(logger->live_txns));
if (r!=0) goto died; if (r!=0) goto died;
// //
...@@ -261,34 +302,34 @@ int toku_txn_begin_with_xid ( ...@@ -261,34 +302,34 @@ int toku_txn_begin_with_xid (
// //
// add ancestor information, and maintain global live root txn list // add ancestor information, and maintain global live root txn list
if (parent_tokutxn==NULL) { if (parent == NULL) {
//Add txn to list (omt) of live root txns //Add txn to list (omt) of live root txns
r = toku_omt_insert_at(logger->live_root_txns, result, toku_omt_size(logger->live_root_txns)); //We know it is the newest one. r = toku_omt_insert_at(logger->live_root_txns, txn, toku_omt_size(logger->live_root_txns)); //We know it is the newest one.
if (r!=0) goto died; if (r!=0) goto died;
result->ancestor_txnid64 = result->txnid64; txn->ancestor_txnid64 = txn->txnid64;
} }
else { else {
result->ancestor_txnid64 = result->parent->ancestor_txnid64; txn->ancestor_txnid64 = parent->ancestor_txnid64;
} }
// setup information for snapshot reads // setup information for snapshot reads
if (snapshot_type != TXN_SNAPSHOT_NONE) { if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
// in this case, either this is a root level transaction that needs its live list setup, or it // in this case, either this is a root level transaction that needs its live list setup, or it
// is a child transaction that specifically asked for its own snapshot // is a child transaction that specifically asked for its own snapshot
if (parent_tokutxn==NULL || snapshot_type == TXN_SNAPSHOT_CHILD) { if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(result); r = setup_live_root_txn_list(txn);
assert_zero(r); assert_zero(r);
result->snapshot_txnid64 = result->txnid64; txn->snapshot_txnid64 = txn->txnid64;
r = snapshot_txnids_note_txn(result); r = snapshot_txnids_note_txn(txn);
assert_zero(r); assert_zero(r);
r = live_list_reverse_note_txn_start(result); r = live_list_reverse_note_txn_start(txn);
assert_zero(r); assert_zero(r);
} }
// in this case, it is a child transaction that specified its snapshot to be that // in this case, it is a child transaction that specified its snapshot to be that
// of the root transaction // of the root transaction
else if (snapshot_type == TXN_SNAPSHOT_ROOT) { else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
result->live_root_txn_list = result->parent->live_root_txn_list; txn->live_root_txn_list = parent->live_root_txn_list;
result->snapshot_txnid64 = result->parent->snapshot_txnid64; txn->snapshot_txnid64 = parent->snapshot_txnid64;
} }
else { else {
assert(FALSE); assert(FALSE);
...@@ -296,26 +337,6 @@ int toku_txn_begin_with_xid ( ...@@ -296,26 +337,6 @@ int toku_txn_begin_with_xid (
} }
} }
r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r); r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);
result->rollentry_raw_count = 0;
result->force_fsync_on_commit = FALSE;
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
result->state = TOKUTXN_LIVE;
result->do_fsync = FALSE;
// 2954
r = toku_txn_ignore_init(result);
if (r != 0) goto died;
*tokutxn = result;
STATUS_VALUE(TXN_BEGIN)++;
STATUS_VALUE(TXN_NUM_OPEN)++;
if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
if (garbage_collection_debug) {
verify_snapshot_system(logger);
}
return 0; return 0;
died: died:
...@@ -489,11 +510,11 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) { ...@@ -489,11 +510,11 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) {
} }
void toku_txn_close_txn(TOKUTXN txn) { void toku_txn_close_txn(TOKUTXN txn) {
toku_txn_rollback_txn(txn); toku_txn_complete_txn(txn);
toku_txn_destroy_txn(txn); toku_txn_destroy_txn(txn);
} }
void toku_txn_rollback_txn(TOKUTXN txn) { void toku_txn_complete_txn(TOKUTXN txn) {
toku_rollback_txn_close(txn); toku_rollback_txn_close(txn);
} }
...@@ -501,6 +522,7 @@ void toku_txn_destroy_txn(TOKUTXN txn) { ...@@ -501,6 +522,7 @@ void toku_txn_destroy_txn(TOKUTXN txn) {
if (garbage_collection_debug) if (garbage_collection_debug)
verify_snapshot_system(txn->logger); verify_snapshot_system(txn->logger);
if (txn->open_brts)
toku_omt_destroy(&txn->open_brts); toku_omt_destroy(&txn->open_brts);
xids_destroy(&txn->xids); xids_destroy(&txn->xids);
toku_txn_ignore_free(txn); // 2954 toku_txn_ignore_free(txn); // 2954
...@@ -685,20 +707,16 @@ verify_snapshot_system(TOKULOGGER logger) { ...@@ -685,20 +707,16 @@ verify_snapshot_system(TOKULOGGER logger) {
// ENOMEM if can't alloc memory // ENOMEM if can't alloc memory
// EINVAL if txn = NULL // EINVAL if txn = NULL
// -1 on other errors // -1 on other errors
int toku_txn_ignore_init(TOKUTXN txn) void toku_txn_ignore_init(TOKUTXN txn) {
{ assert(txn);
if ( !txn ) return EINVAL;
TXN_IGNORE txni = &(txn->ignore_errors); TXN_IGNORE txni = &(txn->ignore_errors);
txni->fns_allocated = 0; txni->fns_allocated = 0;
txni->filenums.num = 0; txni->filenums.num = 0;
txni->filenums.filenums = NULL; txni->filenums.filenums = NULL;
return 0;
} }
void toku_txn_ignore_free(TOKUTXN txn) void toku_txn_ignore_free(TOKUTXN txn) {
{ assert(txn);
TXN_IGNORE txni = &(txn->ignore_errors); TXN_IGNORE txni = &(txn->ignore_errors);
toku_free(txni->filenums.filenums); toku_free(txni->filenums.filenums);
txni->filenums.num = 0; txni->filenums.num = 0;
...@@ -710,9 +728,8 @@ void toku_txn_ignore_free(TOKUTXN txn) ...@@ -710,9 +728,8 @@ void toku_txn_ignore_free(TOKUTXN txn)
// ENOMEM if can't alloc memory // ENOMEM if can't alloc memory
// EINVAL if txn = NULL // EINVAL if txn = NULL
// -1 on other errors // -1 on other errors
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) {
{ assert(txn);
if ( !txn ) return EINVAL;
// check for dups // check for dups
if ( toku_txn_ignore_contains(txn, filenum) == 0 ) return 0; if ( toku_txn_ignore_contains(txn, filenum) == 0 ) return 0;
// alloc more space if needed // alloc more space if needed
...@@ -741,9 +758,8 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) ...@@ -741,9 +758,8 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum)
// EINVAL if txn = NULL // EINVAL if txn = NULL
// -1 on other errors // -1 on other errors
// THIS FUNCTION IS NOT USED IN FUNCTIONAL CODE, BUT IS USEFUL FOR TESTING // THIS FUNCTION IS NOT USED IN FUNCTIONAL CODE, BUT IS USEFUL FOR TESTING
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) {
{ assert(txn);
if ( !txn ) return EINVAL;
TXN_IGNORE txni = &(txn->ignore_errors); TXN_IGNORE txni = &(txn->ignore_errors);
int found_fn = 0; int found_fn = 0;
if ( txni->filenums.num == 0 ) return ENOENT; if ( txni->filenums.num == 0 ) return ENOENT;
...@@ -767,9 +783,8 @@ int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) ...@@ -767,9 +783,8 @@ int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum)
// ENOENT if not found // ENOENT if not found
// EINVAL if txn = NULL // EINVAL if txn = NULL
// -1 on other errors // -1 on other errors
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) {
{ assert(txn);
if ( !txn ) return EINVAL;
TXN_IGNORE txni = &(txn->ignore_errors); TXN_IGNORE txni = &(txn->ignore_errors);
for(uint32_t i=0; i<txni->filenums.num; i++) { for(uint32_t i=0; i<txni->filenums.num; i++) {
if ( txni->filenums.filenums[i].fileid == filenum.fileid ) { if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
......
...@@ -26,9 +26,16 @@ int toku_txn_begin_with_xid ( ...@@ -26,9 +26,16 @@ int toku_txn_begin_with_xid (
TOKUTXN *tokutxn, TOKUTXN *tokutxn,
TOKULOGGER logger, TOKULOGGER logger,
TXNID xid, TXNID xid,
TXN_SNAPSHOT_TYPE snapshot_type TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn
); );
// Allocate and initialize a txn
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn);
// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
int toku_txn_start_txn(TOKUTXN txn);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
...@@ -50,11 +57,11 @@ int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, ...@@ -50,11 +57,11 @@ int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync,
void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn); void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);
// Rollback and destroy a txn // Complete and destroy a txn
void toku_txn_close_txn(TOKUTXN txn); void toku_txn_close_txn(TOKUTXN txn);
// Remove the txn from any live txn lists // Remove a txn from any live txn lists
void toku_txn_rollback_txn(TOKUTXN txn); void toku_txn_complete_txn(TOKUTXN txn);
// Free the memory of a txn // Free the memory of a txn
void toku_txn_destroy_txn(TOKUTXN txn); void toku_txn_destroy_txn(TOKUTXN txn);
...@@ -73,7 +80,6 @@ BOOL toku_txnid_newer(TXNID a, TXNID b); ...@@ -73,7 +80,6 @@ BOOL toku_txnid_newer(TXNID a, TXNID b);
// Force fsync on commit // Force fsync on commit
void toku_txn_force_fsync_on_commit(TOKUTXN txn); void toku_txn_force_fsync_on_commit(TOKUTXN txn);
typedef enum { typedef enum {
TXN_BEGIN, // total number of transactions begun (does not include recovered txns) TXN_BEGIN, // total number of transactions begun (does not include recovered txns)
TXN_COMMIT, // successful commits TXN_COMMIT, // successful commits
...@@ -108,7 +114,7 @@ typedef struct tokutxn_filenum_ignore_errors { ...@@ -108,7 +114,7 @@ typedef struct tokutxn_filenum_ignore_errors {
FILENUMS filenums; FILENUMS filenums;
} TXN_IGNORE_S, *TXN_IGNORE; } TXN_IGNORE_S, *TXN_IGNORE;
int toku_txn_ignore_init(TOKUTXN txn); void toku_txn_ignore_init(TOKUTXN txn);
void toku_txn_ignore_free(TOKUTXN txn); void toku_txn_ignore_free(TOKUTXN txn);
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum); int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum); int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
......
...@@ -939,7 +939,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -939,7 +939,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
DB_TXN *txn=NULL; DB_TXN *txn=NULL;
if (using_txns) { if (using_txns) {
r = toku_txn_begin_internal(env, 0, &txn, 0, 1, true); r = toku_txn_begin(env, 0, &txn, 0, 1, true);
assert_zero(r); assert_zero(r);
} }
...@@ -2380,7 +2380,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -2380,7 +2380,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->set_errcall = toku_env_set_errcall; result->set_errcall = toku_env_set_errcall;
result->set_errfile = toku_env_set_errfile; result->set_errfile = toku_env_set_errfile;
result->set_errpfx = toku_env_set_errpfx; result->set_errpfx = toku_env_set_errpfx;
result->txn_begin = toku_txn_begin; result->txn_begin = locked_txn_begin;
MALLOC(result->i); MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; } if (result->i == 0) { r = ENOMEM; goto cleanup; }
...@@ -2653,7 +2653,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -2653,7 +2653,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
DB_TXN *child = NULL; DB_TXN *child = NULL;
// begin child (unless transactionless) // begin child (unless transactionless)
if (using_txns) { if (using_txns) {
r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert_zero(r); assert_zero(r);
} }
...@@ -2757,7 +2757,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -2757,7 +2757,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
DB_TXN *child = NULL; DB_TXN *child = NULL;
// begin child (unless transactionless) // begin child (unless transactionless)
if (using_txns) { if (using_txns) {
r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert_zero(r); assert_zero(r);
} }
......
...@@ -305,7 +305,7 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -305,7 +305,7 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
DB_TXN *child = NULL; DB_TXN *child = NULL;
// begin child (unless transactionless) // begin child (unless transactionless)
if (using_txns) { if (using_txns) {
r = toku_txn_begin_internal(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1, true); r = toku_txn_begin(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(r==0); assert(r==0);
} }
...@@ -641,7 +641,7 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL UU(just_lock)) { ...@@ -641,7 +641,7 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL UU(just_lock)) {
{ {
// begin child // begin child
int rt = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(rt==0); assert(rt==0);
} }
...@@ -1090,7 +1090,7 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -1090,7 +1090,7 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
// begin child (unless transactionless) // begin child (unless transactionless)
if (using_txns) { if (using_txns) {
rval = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); rval = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
assert(rval == 0); assert(rval == 0);
xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn); xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn);
} }
......
...@@ -57,7 +57,7 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c ...@@ -57,7 +57,7 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c
} }
BOOL nosync = (BOOL)(!force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT)); BOOL nosync = (BOOL)(!force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT));
u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0); u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0);
int r = toku_txn_begin_internal(env, NULL, txn, txn_flags, 1, holds_ydb_lock); int r = toku_txn_begin(env, NULL, txn, txn_flags, 1, holds_ydb_lock);
if (r!=0) return r; if (r!=0) return r;
*changed = TRUE; *changed = TRUE;
return 0; return 0;
......
...@@ -139,7 +139,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -139,7 +139,7 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
// in the test_stress tests. // in the test_stress tests.
// //
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn); toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
toku_txn_rollback_txn(ttxn); toku_txn_complete_txn(ttxn);
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL); toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL);
//Promote list to parent (dbs that must close before abort) //Promote list to parent (dbs that must close before abort)
...@@ -210,7 +210,7 @@ toku_txn_abort_only(DB_TXN * txn, ...@@ -210,7 +210,7 @@ toku_txn_abort_only(DB_TXN * txn,
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
assert_zero(r); assert_zero(r);
r = toku_txn_release_locks(txn); r = toku_txn_release_locks(txn);
toku_txn_rollback_txn(db_txn_struct_i(txn)->tokutxn); toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn);
return r; return r;
} }
...@@ -226,8 +226,8 @@ toku_txn_abort(DB_TXN * txn, ...@@ -226,8 +226,8 @@ toku_txn_abort(DB_TXN * txn,
// Create a new transaction. // Create a new transaction.
// Called without holding the ydb lock. // Called without holding the ydb lock.
int int
toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
int r = toku_txn_begin_internal(env, stxn, txn, flags, 0, false); int r = toku_txn_begin(env, stxn, txn, flags, 0, false);
return r; return r;
} }
...@@ -306,13 +306,16 @@ locked_txn_abort(DB_TXN *txn) { ...@@ -306,13 +306,16 @@ locked_txn_abort(DB_TXN *txn) {
} }
int int
toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) { toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists.
if (!toku_logger_is_open(env->i->logger)) return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n"); if (!toku_logger_is_open(env->i->logger))
if (!(env->i->open_flags & DB_INIT_TXN)) return toku_ydb_do_error(env, EINVAL, "Environment does not have transactions enabled\n"); return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n");
if (!(env->i->open_flags & DB_INIT_TXN))
return toku_ydb_do_error(env, EINVAL, "Environment does not have transactions enabled\n");
u_int32_t txn_flags = 0; u_int32_t txn_flags = 0;
txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks. txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks. RFP remove this?
TOKU_ISOLATION child_isolation = TOKU_ISO_SERIALIZABLE; TOKU_ISOLATION child_isolation = TOKU_ISO_SERIALIZABLE;
u_int32_t iso_flags = flags & DB_ISOLATION_FLAGS; u_int32_t iso_flags = flags & DB_ISOLATION_FLAGS;
if (!(iso_flags == 0 || if (!(iso_flags == 0 ||
...@@ -420,7 +423,6 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla ...@@ -420,7 +423,6 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla
// created. // created.
int r = 0; int r = 0;
//r = toku_logger_txn_begin(stxn ? db_txn_struct_i(stxn)->tokutxn : 0, &db_txn_struct_i(result)->tokutxn, env->i->logger);
TXN_SNAPSHOT_TYPE snapshot_type; TXN_SNAPSHOT_TYPE snapshot_type;
switch(db_txn_struct_i(result)->iso){ switch(db_txn_struct_i(result)->iso){
case(TOKU_ISO_SNAPSHOT): case(TOKU_ISO_SNAPSHOT):
...@@ -439,14 +441,20 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla ...@@ -439,14 +441,20 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla
break; break;
} }
} }
if (!holds_ydb_lock) toku_ydb_lock(); r = toku_txn_create_txn(&db_txn_struct_i(result)->tokutxn,
r = toku_txn_begin_txn(result,
stxn ? db_txn_struct_i(stxn)->tokutxn : 0, stxn ? db_txn_struct_i(stxn)->tokutxn : 0,
&db_txn_struct_i(result)->tokutxn,
env->i->logger, env->i->logger,
snapshot_type TXNID_NONE,
snapshot_type,
result
); );
if (!holds_ydb_lock) toku_ydb_unlock(); if (r != 0)
return r;
if (!holds_ydb_lock)
toku_ydb_lock();
r = toku_txn_start_txn(db_txn_struct_i(result)->tokutxn);
if (!holds_ydb_lock)
toku_ydb_unlock();
if (r != 0) if (r != 0)
return r; return r;
......
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
extern "C" { extern "C" {
#endif #endif
int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
int toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock); int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock);
int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock); int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock); int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
int locked_txn_commit(DB_TXN *txn, u_int32_t flags); int locked_txn_commit(DB_TXN *txn, u_int32_t flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment