Commit 43eea285 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4889], move code for completing a txn to txn.c from rollback.c

git-svn-id: file:///svn/toku/tokudb@43587 c7de825b-a66e-492c-adef-691d508d4ae1
parent 132eb1cf
...@@ -164,189 +164,6 @@ toku_find_pair_by_xid (OMTVALUE v, void *xidv) { ...@@ -164,189 +164,6 @@ toku_find_pair_by_xid (OMTVALUE v, void *xidv) {
return 0; return 0;
} }
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
// There must be one.
// If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
// than the closing txn, and there is nothing to be done (except to assert the invariant).
// If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
// txn. If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the
// second xid in the pair, otherwise delete the entry from the reverse live list.
static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of txn that is closing
TXNID live_xid = (TXNID)live_xidv; // xid on closing txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
invariant(r==0);
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
if (pair->xid2 == xid) {
//There is a record that needs to be either deleted or updated
TXNID olderxid;
OMTVALUE olderv;
uint32_t olderidx;
OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
if (r==0) {
//There is an older txn
olderxid = (TXNID) olderv;
invariant(olderxid < xid);
if (olderxid >= live_xid) {
//older txn is new enough, we need to update.
pair->xid2 = olderxid;
should_delete = FALSE;
}
}
else {
invariant(r==DB_NOTFOUND);
}
if (should_delete) {
//Delete record
toku_free(pair);
r = toku_omt_delete_at(reverse, idx);
invariant(r==0);
}
}
else {
invariant(pair->xid2 > xid);
}
return r;
}
// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
static int
live_list_reverse_note_txn_end(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
invariant(r==0);
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
struct brt_header* h = hv;
TOKUTXN txn = txnv;
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
toku_brtheader_remove_txn_ref(h, txn);
return 0;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brt_headers, remove_txn, txn);
}
void toku_rollback_txn_close (TOKUTXN txn) {
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b);
int r;
TOKULOGGER logger = txn->logger;
r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
{
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(logger->live_txns, idx);
assert(r==0);
}
if (txn->parent==NULL) {
OMTVALUE v;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(logger->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
assert(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->live_root_txns, idx);
assert(r==0);
}
//
// if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
// the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
// necessary because root snapshot transactions are in their own live lists. If we do not remove
// the txnid from the snapshot txnid list first, then when we go to make the modifications to
// live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
//
if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
{
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->snapshot_txnids, idx);
invariant(r==0);
}
live_list_reverse_note_txn_end(txn);
{
//Free memory used for live root txns local list
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
toku_omt_destroy(&txn->live_root_txn_list);
}
}
}
r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);
assert(logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == logger->oldest_living_xid) {
OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
logger->oldest_living_xid = oldest_txn->txnid64;
logger->oldest_living_starttime = oldest_txn->starttime;
}
else {
//No living transactions
assert(r==EINVAL);
logger->oldest_living_xid = TXNID_NONE_LIVING;
logger->oldest_living_starttime = 0;
}
}
note_txn_closing(txn);
}
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) { void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
return malloc_in_memarena(log->rollentry_arena, size); return malloc_in_memarena(log->rollentry_arena, size);
} }
......
...@@ -14,7 +14,6 @@ extern "C" { ...@@ -14,7 +14,6 @@ extern "C" {
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint); void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
void toku_rollback_txn_close (TOKUTXN txn);
// these functions assert internally that they succeed // these functions assert internally that they succeed
......
...@@ -581,8 +581,186 @@ void toku_txn_close_txn(TOKUTXN txn) { ...@@ -581,8 +581,186 @@ void toku_txn_close_txn(TOKUTXN txn) {
toku_txn_destroy_txn(txn); toku_txn_destroy_txn(txn);
} }
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
// There must be one.
// If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
// than the closing txn, and there is nothing to be done (except to assert the invariant).
// If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
// txn. If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the
// second xid in the pair, otherwise delete the entry from the reverse live list.
static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of txn that is closing
TXNID live_xid = (TXNID)live_xidv; // xid on closing txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
invariant(r==0);
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
if (pair->xid2 == xid) {
//There is a record that needs to be either deleted or updated
TXNID olderxid;
OMTVALUE olderv;
uint32_t olderidx;
OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
if (r==0) {
//There is an older txn
olderxid = (TXNID) olderv;
invariant(olderxid < xid);
if (olderxid >= live_xid) {
//older txn is new enough, we need to update.
pair->xid2 = olderxid;
should_delete = FALSE;
}
}
else {
invariant(r==DB_NOTFOUND);
}
if (should_delete) {
//Delete record
toku_free(pair);
r = toku_omt_delete_at(reverse, idx);
invariant(r==0);
}
}
else {
invariant(pair->xid2 > xid);
}
return r;
}
// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
static int
live_list_reverse_note_txn_end(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
invariant(r==0);
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
struct brt_header* h = hv;
TOKUTXN txn = txnv;
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
toku_brtheader_remove_txn_ref(h, txn);
return 0;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brt_headers, remove_txn, txn);
}
void toku_txn_complete_txn(TOKUTXN txn) { void toku_txn_complete_txn(TOKUTXN txn) {
toku_rollback_txn_close(txn); assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b);
int r;
TOKULOGGER logger = txn->logger;
r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
{
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(logger->live_txns, idx);
assert(r==0);
}
if (txn->parent==NULL) {
OMTVALUE v;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(logger->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
assert(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->live_root_txns, idx);
assert(r==0);
}
//
// if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
// the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
// necessary because root snapshot transactions are in their own live lists. If we do not remove
// the txnid from the snapshot txnid list first, then when we go to make the modifications to
// live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
//
if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
{
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->snapshot_txnids, idx);
invariant(r==0);
}
live_list_reverse_note_txn_end(txn);
{
//Free memory used for live root txns local list
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
toku_omt_destroy(&txn->live_root_txn_list);
}
}
}
r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);
assert(logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == logger->oldest_living_xid) {
OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
logger->oldest_living_xid = oldest_txn->txnid64;
logger->oldest_living_starttime = oldest_txn->starttime;
}
else {
//No living transactions
assert(r==EINVAL);
logger->oldest_living_xid = TXNID_NONE_LIVING;
logger->oldest_living_starttime = 0;
}
}
note_txn_closing(txn);
} }
void toku_txn_destroy_txn(TOKUTXN txn) { void toku_txn_destroy_txn(TOKUTXN txn) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment