Commit 71777355 authored by Marko Mäkelä's avatar Marko Mäkelä

Release early, release often

trx_t::rollback_low(): On a partial rollback of an insert into an
empty table, release the exclusive table lock, because the transaction
will no longer have had any impact on that table.

lock_table_x_unlock(): Release an exclusive table lock.
parent 53d2c751
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(0);
BEGIN;
DELETE FROM t1;
INSERT INTO t2 VALUES(1),(1);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
connect con1,localhost,root,,;
BEGIN;
SELECT * FROM t2 LOCK IN SHARE MODE;
a
connection default;
SET innodb_lock_wait_timeout=1;
INSERT INTO t2 VALUES(2);
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
disconnect con1;
INSERT INTO t2 VALUES(3);
COMMIT;
SELECT * FROM t1;
a
SELECT * FROM t2;
a
3
DROP TABLE t1, t2;
--source include/have_innodb.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(0);
BEGIN;
DELETE FROM t1;
--error ER_DUP_ENTRY
INSERT INTO t2 VALUES(1),(1);
connect (con1,localhost,root,,);
BEGIN;
SELECT * FROM t2 LOCK IN SHARE MODE;
connection default;
SET innodb_lock_wait_timeout=1;
--error ER_LOCK_WAIT_TIMEOUT
INSERT INTO t2 VALUES(2);
disconnect con1;
INSERT INTO t2 VALUES(3);
COMMIT;
SELECT * FROM t1;
SELECT * FROM t2;
DROP TABLE t1, t2;
......@@ -405,11 +405,16 @@ lock_table_ix_resurrect(
dict_table_t* table, /*!< in/out: table */
trx_t* trx); /*!< in/out: transaction */
/** Creates a table X lock object for a resurrected transaction.
@param table table to be X-locked
@param trx transaction */
void
lock_table_x_resurrect(dict_table_t *table, trx_t *trx);
/** Create a table X lock object for a resurrected TRX_UNDO_EMPTY transaction.
@param table table to be X-locked
@param trx transaction */
void lock_table_x_resurrect(dict_table_t *table, trx_t *trx);
/** Release a table X lock after rolling back an insert into an empty table
(which was covered by a TRX_UNDO_EMPTY record).
@param table table to be X-unlocked
@param trx transaction */
void lock_table_x_unlock(dict_table_t *table, trx_t *trx);
/** Sets a lock on a table based on the given mode.
@param[in] table table to lock
......
......@@ -568,6 +568,10 @@ class trx_mod_table_time_t
covered by a TRX_UNDO_EMPTY record (for the first statement to
insert into an empty table) */
static constexpr undo_no_t BULK= 1ULL << 63;
/** Flag in 'first' to indicate that some operations were
covered by a TRX_UNDO_EMPTY record (for the first statement to
insert into an empty table) */
static constexpr undo_no_t WAS_BULK= 1ULL << 63;
/** First modification of the table, possibly ORed with BULK */
undo_no_t first;
......@@ -583,7 +587,7 @@ class trx_mod_table_time_t
@param rows number of modified rows so far
@return whether the object is valid */
bool valid(undo_no_t rows= NONE) const
{ auto f= first & ~BULK; return f <= first_versioned && f <= rows; }
{ auto f= first & LIMIT; return f <= first_versioned && f <= rows; }
#endif /* UNIV_DEBUG */
/** @return if versioned columns were modified */
bool is_versioned() const { return first_versioned != NONE; }
......@@ -598,13 +602,15 @@ class trx_mod_table_time_t
}
/** Notify the start of a bulk insert operation */
void start_bulk_insert() { first|= BULK; }
void start_bulk_insert() { first|= BULK | WAS_BULK; }
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
/** @return whether an insert is covered by TRX_UNDO_EMPTY record */
bool is_bulk_insert() const { return first & BULK; }
/** @return whether an insert was covered by TRX_UNDO_EMPTY record */
bool was_bulk_insert() const { return first & WAS_BULK; }
/** Invoked after partial rollback
@param limit number of surviving modified rows (trx_t::undo_no)
......@@ -612,7 +618,7 @@ class trx_mod_table_time_t
bool rollback(undo_no_t limit)
{
ut_ad(valid());
if ((~BULK & first) >= limit)
if ((LIMIT & first) >= limit)
return true;
if (first_versioned < limit)
first_versioned= NONE;
......
......@@ -3523,8 +3523,10 @@ lock_table_ix_resurrect(
mutex->wr_unlock();
}
void
lock_table_x_resurrect(dict_table_t *table,trx_t *trx)
/** Create a table X lock object for a resurrected TRX_UNDO_EMPTY transaction.
@param table table to be X-locked
@param trx transaction */
void lock_table_x_resurrect(dict_table_t *table, trx_t *trx)
{
ut_ad(trx->is_recovered);
if (lock_table_has(trx, table, LOCK_X))
......@@ -3608,6 +3610,32 @@ lock_table_dequeue(
}
}
/** Release a table X lock after rolling back an insert into an empty table
(which was covered by a TRX_UNDO_EMPTY record).
@param table table to be X-unlocked
@param trx transaction */
void lock_table_x_unlock(dict_table_t *table, trx_t *trx)
{
ut_ad(!trx->is_recovered);
lock_sys.mutex_lock();
for (lock_t*& lock : trx->lock.table_locks)
{
if (lock && lock->trx == trx && lock->type_mode == (LOCK_TABLE | LOCK_X))
{
ut_ad(!lock_get_wait(lock));
lock_table_dequeue(lock);
lock= nullptr;
goto func_exit;
}
}
ut_ad("lock not found" == 0);
func_exit:
lock_sys.mutex_unlock();
}
/** Sets a lock on a table based on the given mode.
@param[in] table table to lock
@param[in,out] trx transaction
......
......@@ -137,7 +137,11 @@ inline void trx_t::rollback_low(trx_savept_t *savept)
trx_mod_tables_t::iterator j= i++;
ut_ad(j->second.valid());
if (j->second.rollback(limit))
{
if (j->second.was_bulk_insert() && !j->first->is_temporary())
lock_table_x_unlock(j->first, this);
mod_tables.erase(j);
}
}
lock.que_state= TRX_QUE_RUNNING;
MONITOR_INC(MONITOR_TRX_ROLLBACK_SAVEPOINT);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment