Commit 0de3be8c authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-30671 InnoDB undo log truncation fails to wait for purge of history

It is not safe to invoke trx_purge_free_segment() or execute
innodb_undo_log_truncate=ON before all undo log records in
the rollback segment has been processed.

A prominent failure that would occur due to premature freeing of
undo log pages is that trx_undo_get_undo_rec() would crash when
trying to copy an undo log record to fetch the previous version
of a record.

If trx_undo_get_undo_rec() was not invoked in the unlucky time frame,
then the symptom would be that some committed transaction history is
never removed. This would be detected by CHECK TABLE...EXTENDED that
was impleented in commit ab019010.
Such a garbage collection leak should be possible even when using
innodb_undo_log_truncate=OFF, just involving trx_purge_free_segment().

trx_rseg_t::needs_purge: Change the type from Boolean to a transaction
identifier, noting the most recent non-purged transaction, or 0 if
everything has been purged. On transaction start, we initialize this
to 1 more than the transaction start ID. On recovery, the field may be
adjusted to the transaction end ID (TRX_UNDO_TRX_NO) if it is larger.

The field TRX_UNDO_NEEDS_PURGE becomes write-only; only some debug
assertions that would validate the value. The field reflects the old
inaccurate Boolean field trx_rseg_t::needs_purge.

trx_undo_mem_create_at_db_start(), trx_undo_lists_init(),
trx_rseg_mem_restore(): Remove the parameter max_trx_id.
Instead, store the maximum in trx_rseg_t::needs_purge,
where trx_rseg_array_init() will find it.

trx_purge_free_segment(): Contiguously hold a lock on
trx_rseg_t to prevent any concurrent allocation of undo log.

trx_purge_truncate_rseg_history(): Only invoke trx_purge_free_segment()
if the rollback segment is empty and there are no pending transactions
associated with it.

trx_purge_truncate_history(): Only proceed with innodb_undo_log_truncate=ON
if trx_rseg_t::needs_purge indicates that all history has been purged.

Tested by: Matthias Leich
parent d3f35aa4
SET @save_frequency=@@GLOBAL.innodb_purge_rseg_truncate_frequency;
SET @save_dbug=@@GLOBAL.debug_dbug;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
CREATE TABLE t1(f1 INT NOT NULL, f2 int not null, CREATE TABLE t1(f1 INT NOT NULL, f2 int not null,
f3 int generated always as (f2 * 2) VIRTUAL, f3 int generated always as (f2 * 2) VIRTUAL,
primary key(f1), INDEX (f3))ENGINE=InnoDB; primary key(f1), INDEX (f3))ENGINE=InnoDB;
connect con1,localhost,root,,,; connect con1,localhost,root,,,;
InnoDB 0 transactions not purged
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default; connection default;
INSERT INTO t1(f1, f2) VALUES(1,2); INSERT INTO t1(f1, f2) VALUES(1,2);
...@@ -18,5 +22,6 @@ commit; ...@@ -18,5 +22,6 @@ commit;
disconnect con1; disconnect con1;
disconnect con2; disconnect con2;
connection default; connection default;
set global debug_dbug=default; SET GLOBAL innodb_purge_rseg_truncate_frequency=@save_frequency;
SET GLOBAL debug_dbug=@save_dbug;
DROP TABLE t1; DROP TABLE t1;
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/have_debug.inc --source include/have_debug.inc
SET @save_frequency=@@GLOBAL.innodb_purge_rseg_truncate_frequency;
SET @save_dbug=@@GLOBAL.debug_dbug;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
CREATE TABLE t1(f1 INT NOT NULL, f2 int not null, CREATE TABLE t1(f1 INT NOT NULL, f2 int not null,
f3 int generated always as (f2 * 2) VIRTUAL, f3 int generated always as (f2 * 2) VIRTUAL,
primary key(f1), INDEX (f3))ENGINE=InnoDB; primary key(f1), INDEX (f3))ENGINE=InnoDB;
connect(con1,localhost,root,,,); connect(con1,localhost,root,,,);
--source ../innodb/include/wait_all_purged.inc
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default; connection default;
...@@ -26,5 +31,6 @@ commit; ...@@ -26,5 +31,6 @@ commit;
disconnect con1; disconnect con1;
disconnect con2; disconnect con2;
connection default; connection default;
set global debug_dbug=default; SET GLOBAL innodb_purge_rseg_truncate_frequency=@save_frequency;
SET GLOBAL debug_dbug=@save_dbug;
DROP TABLE t1; DROP TABLE t1;
SET @save_freq=@@GLOBAL.innodb_purge_rseg_truncate_frequency;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
CREATE TABLE t (a int PRIMARY KEY, b int NOT NULL UNIQUE) engine = InnoDB; CREATE TABLE t (a int PRIMARY KEY, b int NOT NULL UNIQUE) engine = InnoDB;
InnoDB 0 transactions not purged
connect prevent_purge,localhost,root,,; connect prevent_purge,localhost,root,,;
start transaction with consistent snapshot; start transaction with consistent snapshot;
connect con_del_1,localhost,root,,; connect con_del_1,localhost,root,,;
...@@ -34,3 +37,4 @@ disconnect con_del_2; ...@@ -34,3 +37,4 @@ disconnect con_del_2;
connection default; connection default;
SET DEBUG_SYNC = 'RESET'; SET DEBUG_SYNC = 'RESET';
DROP TABLE t; DROP TABLE t;
SET GLOBAL innodb_purge_rseg_truncate_frequency=@save_freq;
...@@ -7,6 +7,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency = 1; ...@@ -7,6 +7,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency = 1;
SET GLOBAL innodb_purge_rseg_truncate_frequency = 1; SET GLOBAL innodb_purge_rseg_truncate_frequency = 1;
CREATE TABLE t1(a INT PRIMARY KEY, b INT NOT NULL) CREATE TABLE t1(a INT PRIMARY KEY, b INT NOT NULL)
ROW_FORMAT=REDUNDANT ENGINE=InnoDB; ROW_FORMAT=REDUNDANT ENGINE=InnoDB;
InnoDB 0 transactions not purged
connect prevent_purge,localhost,root; connect prevent_purge,localhost,root;
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default; connection default;
...@@ -19,7 +20,11 @@ UPDATE t1 SET b=4 WHERE a=3; ...@@ -19,7 +20,11 @@ UPDATE t1 SET b=4 WHERE a=3;
disconnect prevent_purge; disconnect prevent_purge;
connection default; connection default;
InnoDB 0 transactions not purged InnoDB 0 transactions not purged
connection con1;
ROLLBACK;
disconnect con1; disconnect con1;
connection default;
InnoDB 0 transactions not purged
FLUSH TABLE t1 FOR EXPORT; FLUSH TABLE t1 FOR EXPORT;
Clustered index root page contents: Clustered index root page contents:
N_RECS=3; LEVEL=0 N_RECS=3; LEVEL=0
......
...@@ -3,6 +3,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency=1; ...@@ -3,6 +3,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
CREATE TABLE t1(id INT PRIMARY key, val VARCHAR(16000)) ENGINE=InnoDB; CREATE TABLE t1(id INT PRIMARY key, val VARCHAR(16000)) ENGINE=InnoDB;
INSERT INTO t1 (id,val) SELECT 2*seq,'x' FROM seq_0_to_1023; INSERT INTO t1 (id,val) SELECT 2*seq,'x' FROM seq_0_to_1023;
connect con1,localhost,root,,; connect con1,localhost,root,,;
InnoDB 0 transactions not purged
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default; connection default;
DELETE FROM t1 WHERE id=1788; DELETE FROM t1 WHERE id=1788;
......
...@@ -9,12 +9,10 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency= 1; ...@@ -9,12 +9,10 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency= 1;
CREATE PROCEDURE insert_n(start int, end int) CREATE PROCEDURE insert_n(start int, end int)
BEGIN BEGIN
DECLARE i INT DEFAULT start; DECLARE i INT DEFAULT start;
START TRANSACTION;
WHILE i <= end do WHILE i <= end do
INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = i; INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = i;
SET i = i + 1; SET i = i + 1;
END WHILE; END WHILE;
COMMIT;
END~~ END~~
CREATE FUNCTION num_pages_get() CREATE FUNCTION num_pages_get()
RETURNS INT RETURNS INT
...@@ -29,7 +27,8 @@ END~~ ...@@ -29,7 +27,8 @@ END~~
# Create a table with one record in it and start an RR transaction # Create a table with one record in it and start an RR transaction
# #
CREATE TABLE t1 (a INT, b INT, c INT, PRIMARY KEY(a,b), KEY (b,c)) CREATE TABLE t1 (a INT, b INT, c INT, PRIMARY KEY(a,b), KEY (b,c))
ENGINE=InnoDB; ENGINE=InnoDB STATS_PERSISTENT=0;
InnoDB 0 transactions not purged
BEGIN; BEGIN;
SELECT * FROM t1; SELECT * FROM t1;
a b c a b c
...@@ -38,20 +37,24 @@ a b c ...@@ -38,20 +37,24 @@ a b c
# #
connect con2, localhost, root,,; connect con2, localhost, root,,;
connection con2; connection con2;
BEGIN;
INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = NULL; INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = NULL;
CALL insert_n(1, 50);; CALL insert_n(1, 50);;
connect con3, localhost, root,,; connect con3, localhost, root,,;
connection con3; connection con3;
BEGIN;
CALL insert_n(51, 100);; CALL insert_n(51, 100);;
connection con2; connection con2;
COMMIT;
connection con3; connection con3;
INSERT INTO t1 VALUES (1, 2, 1) ON DUPLICATE KEY UPDATE c = NULL; INSERT INTO t1 VALUES (1, 2, 1) ON DUPLICATE KEY UPDATE c = NULL;
COMMIT;
connection default; connection default;
# #
# Connect to default and record how many pages were accessed # Connect to default and record how many pages were accessed
# when selecting the record using the secondary key. # when selecting the record using the secondary key.
# #
InnoDB 4 transactions not purged InnoDB 2 transactions not purged
SET @num_pages_1 = num_pages_get(); SET @num_pages_1 = num_pages_get();
SELECT * FROM t1 force index (b); SELECT * FROM t1 force index (b);
a b c a b c
......
...@@ -3,8 +3,11 @@ ...@@ -3,8 +3,11 @@
source include/have_debug.inc; source include/have_debug.inc;
source include/have_debug_sync.inc; source include/have_debug_sync.inc;
SET @save_freq=@@GLOBAL.innodb_purge_rseg_truncate_frequency;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
CREATE TABLE t (a int PRIMARY KEY, b int NOT NULL UNIQUE) engine = InnoDB; CREATE TABLE t (a int PRIMARY KEY, b int NOT NULL UNIQUE) engine = InnoDB;
--source include/wait_all_purged.inc
--connect(prevent_purge,localhost,root,,) --connect(prevent_purge,localhost,root,,)
start transaction with consistent snapshot; start transaction with consistent snapshot;
...@@ -80,4 +83,5 @@ INSERT INTO t VALUES(30, 20); ...@@ -80,4 +83,5 @@ INSERT INTO t VALUES(30, 20);
SET DEBUG_SYNC = 'RESET'; SET DEBUG_SYNC = 'RESET';
DROP TABLE t; DROP TABLE t;
SET GLOBAL innodb_purge_rseg_truncate_frequency=@save_freq;
--source include/wait_until_count_sessions.inc --source include/wait_until_count_sessions.inc
...@@ -14,6 +14,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency = 1; ...@@ -14,6 +14,7 @@ SET GLOBAL innodb_purge_rseg_truncate_frequency = 1;
CREATE TABLE t1(a INT PRIMARY KEY, b INT NOT NULL) CREATE TABLE t1(a INT PRIMARY KEY, b INT NOT NULL)
ROW_FORMAT=REDUNDANT ENGINE=InnoDB; ROW_FORMAT=REDUNDANT ENGINE=InnoDB;
--source include/wait_all_purged.inc
--connect (prevent_purge,localhost,root) --connect (prevent_purge,localhost,root)
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
...@@ -33,7 +34,12 @@ UPDATE t1 SET b=4 WHERE a=3; ...@@ -33,7 +34,12 @@ UPDATE t1 SET b=4 WHERE a=3;
# Initiate a full purge, which should reset the DB_TRX_ID except for a=3. # Initiate a full purge, which should reset the DB_TRX_ID except for a=3.
--source include/wait_all_purged.inc --source include/wait_all_purged.inc
# Initiate a ROLLBACK of the update, which should reset the DB_TRX_ID for a=3. # Initiate a ROLLBACK of the update, which should reset the DB_TRX_ID for a=3.
--connection con1
ROLLBACK;
--disconnect con1 --disconnect con1
--connection default
# Reset the DB_TRX_ID for the hidden ADD COLUMN metadata record.
--source include/wait_all_purged.inc
FLUSH TABLE t1 FOR EXPORT; FLUSH TABLE t1 FOR EXPORT;
# The following is based on innodb.table_flags: # The following is based on innodb.table_flags:
......
...@@ -9,6 +9,7 @@ CREATE TABLE t1(id INT PRIMARY key, val VARCHAR(16000)) ENGINE=InnoDB; ...@@ -9,6 +9,7 @@ CREATE TABLE t1(id INT PRIMARY key, val VARCHAR(16000)) ENGINE=InnoDB;
INSERT INTO t1 (id,val) SELECT 2*seq,'x' FROM seq_0_to_1023; INSERT INTO t1 (id,val) SELECT 2*seq,'x' FROM seq_0_to_1023;
connect(con1,localhost,root,,); connect(con1,localhost,root,,);
source include/wait_all_purged.inc;
# Prevent purge. # Prevent purge.
START TRANSACTION WITH CONSISTENT SNAPSHOT; START TRANSACTION WITH CONSISTENT SNAPSHOT;
connection default; connection default;
......
...@@ -13,12 +13,10 @@ DELIMITER ~~; ...@@ -13,12 +13,10 @@ DELIMITER ~~;
CREATE PROCEDURE insert_n(start int, end int) CREATE PROCEDURE insert_n(start int, end int)
BEGIN BEGIN
DECLARE i INT DEFAULT start; DECLARE i INT DEFAULT start;
START TRANSACTION;
WHILE i <= end do WHILE i <= end do
INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = i; INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = i;
SET i = i + 1; SET i = i + 1;
END WHILE; END WHILE;
COMMIT;
END~~ END~~
CREATE FUNCTION num_pages_get() CREATE FUNCTION num_pages_get()
...@@ -36,7 +34,8 @@ DELIMITER ;~~ ...@@ -36,7 +34,8 @@ DELIMITER ;~~
--echo # Create a table with one record in it and start an RR transaction --echo # Create a table with one record in it and start an RR transaction
--echo # --echo #
CREATE TABLE t1 (a INT, b INT, c INT, PRIMARY KEY(a,b), KEY (b,c)) CREATE TABLE t1 (a INT, b INT, c INT, PRIMARY KEY(a,b), KEY (b,c))
ENGINE=InnoDB; ENGINE=InnoDB STATS_PERSISTENT=0;
--source include/wait_all_purged.inc
BEGIN; BEGIN;
SELECT * FROM t1; SELECT * FROM t1;
...@@ -45,18 +44,22 @@ SELECT * FROM t1; ...@@ -45,18 +44,22 @@ SELECT * FROM t1;
--echo # --echo #
connect (con2, localhost, root,,); connect (con2, localhost, root,,);
connection con2; connection con2;
BEGIN;
INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = NULL; INSERT INTO t1 VALUES (1, 2, 3) ON DUPLICATE KEY UPDATE c = NULL;
--send CALL insert_n(1, 50); --send CALL insert_n(1, 50);
connect (con3, localhost, root,,); connect (con3, localhost, root,,);
connection con3; connection con3;
BEGIN;
--send CALL insert_n(51, 100); --send CALL insert_n(51, 100);
connection con2; connection con2;
reap; reap;
COMMIT;
connection con3; connection con3;
reap; reap;
INSERT INTO t1 VALUES (1, 2, 1) ON DUPLICATE KEY UPDATE c = NULL; INSERT INTO t1 VALUES (1, 2, 1) ON DUPLICATE KEY UPDATE c = NULL;
COMMIT;
connection default; connection default;
...@@ -64,7 +67,7 @@ connection default; ...@@ -64,7 +67,7 @@ connection default;
--echo # Connect to default and record how many pages were accessed --echo # Connect to default and record how many pages were accessed
--echo # when selecting the record using the secondary key. --echo # when selecting the record using the secondary key.
--echo # --echo #
--let $wait_all_purged=4 --let $wait_all_purged=2
--source include/wait_all_purged.inc --source include/wait_all_purged.inc
SET @num_pages_1 = num_pages_get(); SET @num_pages_1 = num_pages_get();
SELECT * FROM t1 force index (b); SELECT * FROM t1 force index (b);
......
...@@ -128,14 +128,18 @@ struct trx_rseg_t { ...@@ -128,14 +128,18 @@ struct trx_rseg_t {
/** trx_t::no | last_offset << 48 */ /** trx_t::no | last_offset << 48 */
uint64_t last_commit_and_offset; uint64_t last_commit_and_offset;
/** Whether the log segment needs purge */ /** Last known transaction that has not been purged yet,
bool needs_purge; or 0 if everything has been purged. */
trx_id_t needs_purge;
/** Reference counter to track rseg allocated transactions. */
/** Number of active (non-committed) transactions associated with a
an is_persistent() rollback segment. Needed for protecting
trx->rsegs.m_redo.rseg assignments
before trx->rsegs.m_redo.undo has been assigned. */
ulint trx_ref_count; ulint trx_ref_count;
/** If true, then skip allocating this rseg as it reside in /** whether undo log truncation was initiated, and transactions
UNDO-tablespace marked for truncate. */ cannot be allocated in this is_persistent() rollback segment */
bool skip_allocation; bool skip_allocation;
/** @return the commit ID of the last committed transaction */ /** @return the commit ID of the last committed transaction */
......
...@@ -258,12 +258,10 @@ trx_undo_free_at_shutdown(trx_t *trx); ...@@ -258,12 +258,10 @@ trx_undo_free_at_shutdown(trx_t *trx);
@param[in,out] rseg rollback segment @param[in,out] rseg rollback segment
@param[in] id rollback segment slot @param[in] id rollback segment slot
@param[in] page_no undo log segment page number @param[in] page_no undo log segment page number
@param[in,out] max_trx_id the largest observed transaction ID
@return the undo log @return the undo log
@retval nullptr on error */ @retval nullptr on error */
trx_undo_t * trx_undo_t *
trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no, trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no);
trx_id_t &max_trx_id);
#endif /* !UNIV_INNOCHECKSUM */ #endif /* !UNIV_INNOCHECKSUM */
...@@ -407,6 +405,8 @@ or 0 if the transaction has not been committed */ ...@@ -407,6 +405,8 @@ or 0 if the transaction has not been committed */
/** Before MariaDB 10.3.1, when purge did not reset DB_TRX_ID of /** Before MariaDB 10.3.1, when purge did not reset DB_TRX_ID of
surviving user records, this used to be called TRX_UNDO_DEL_MARKS. surviving user records, this used to be called TRX_UNDO_DEL_MARKS.
This field is redundant; it is only being read by some debug assertions.
The value 1 indicates that purge needs to process the undo log segment. The value 1 indicates that purge needs to process the undo log segment.
The value 0 indicates that all of it has been processed, and The value 0 indicates that all of it has been processed, and
trx_purge_free_segment() has been invoked, so the log is not safe to access. trx_purge_free_segment() has been invoked, so the log is not safe to access.
......
...@@ -221,6 +221,7 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) ...@@ -221,6 +221,7 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
trx_ulogf_t* undo_header = undo_page->frame + undo->hdr_offset; trx_ulogf_t* undo_header = undo_page->frame + undo->hdr_offset;
ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1); ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1);
ut_ad(rseg->needs_purge > trx->id);
if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT
+ rseg_header->frame))) { + rseg_header->frame))) {
...@@ -309,7 +310,6 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) ...@@ -309,7 +310,6 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
rseg->last_page_no = undo->hdr_page_no; rseg->last_page_no = undo->hdr_page_no;
rseg->set_last_commit(undo->hdr_offset, rseg->set_last_commit(undo->hdr_offset,
trx->rw_trx_hash_element->no); trx->rw_trx_hash_element->no);
rseg->needs_purge = true;
} }
trx_sys.rseg_history_len++; trx_sys.rseg_history_len++;
...@@ -339,16 +339,15 @@ static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log, ...@@ -339,16 +339,15 @@ static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log,
} }
/** Free an undo log segment, and remove the header from the history list. /** Free an undo log segment, and remove the header from the history list.
@param[in,out] mtr mini-transaction
@param[in,out] rseg rollback segment @param[in,out] rseg rollback segment
@param[in] hdr_addr file address of log_hdr */ @param[in] hdr_addr file address of log_hdr */
static static
void void trx_purge_free_segment(mtr_t &mtr, trx_rseg_t* rseg, fil_addr_t hdr_addr)
trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
{ {
mtr_t mtr; mtr.commit();
mtr.start(); mtr.start();
mutex_enter(&rseg->mutex); ut_ad(mutex_own(&rseg->mutex));
buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
buf_block_t* block = trx_undo_page_get( buf_block_t* block = trx_undo_page_get(
...@@ -365,13 +364,9 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) ...@@ -365,13 +364,9 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
while (!fseg_free_step_not_header( while (!fseg_free_step_not_header(
TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
+ block->frame, &mtr)) { + block->frame, &mtr)) {
mutex_exit(&rseg->mutex);
mtr.commit(); mtr.commit();
mtr.start(); mtr.start();
mutex_enter(&rseg->mutex);
rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
block = trx_undo_page_get( block = trx_undo_page_get(
...@@ -410,10 +405,6 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) ...@@ -410,10 +405,6 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
ut_ad(rseg->curr_size >= seg_size); ut_ad(rseg->curr_size >= seg_size);
rseg->curr_size -= seg_size; rseg->curr_size -= seg_size;
mutex_exit(&(rseg->mutex));
mtr_commit(&mtr);
} }
/** Remove unnecessary history data from a rollback segment. /** Remove unnecessary history data from a rollback segment.
...@@ -431,8 +422,6 @@ trx_purge_truncate_rseg_history( ...@@ -431,8 +422,6 @@ trx_purge_truncate_rseg_history(
trx_id_t undo_trx_no; trx_id_t undo_trx_no;
mtr.start(); mtr.start();
ut_ad(rseg.is_persistent());
mutex_enter(&rseg.mutex);
buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
...@@ -444,7 +433,6 @@ trx_purge_truncate_rseg_history( ...@@ -444,7 +433,6 @@ trx_purge_truncate_rseg_history(
loop: loop:
if (hdr_addr.page == FIL_NULL) { if (hdr_addr.page == FIL_NULL) {
func_exit: func_exit:
mutex_exit(&rseg.mutex);
mtr.commit(); mtr.commit();
return; return;
} }
...@@ -470,30 +458,26 @@ trx_purge_truncate_rseg_history( ...@@ -470,30 +458,26 @@ trx_purge_truncate_rseg_history(
prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset
- TRX_UNDO_HISTORY_NODE); - TRX_UNDO_HISTORY_NODE);
if (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + block->frame) if (!rseg.trx_ref_count
&& rseg.needs_purge <= (purge_sys.head.trx_no
? purge_sys.head.trx_no
: purge_sys.tail.trx_no)
&& mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ block->frame)
== TRX_UNDO_TO_PURGE == TRX_UNDO_TO_PURGE
&& !mach_read_from_2(block->frame + hdr_addr.boffset && !mach_read_from_2(block->frame + hdr_addr.boffset
+ TRX_UNDO_NEXT_LOG)) { + TRX_UNDO_NEXT_LOG)) {
/* We can free the whole log segment.
/* We can free the whole log segment */ This will call trx_purge_remove_log_hdr(). */
trx_purge_free_segment(mtr, &rseg, hdr_addr);
mutex_exit(&rseg.mutex);
mtr.commit();
/* calls the trx_purge_remove_log_hdr()
inside trx_purge_free_segment(). */
trx_purge_free_segment(&rseg, hdr_addr);
} else { } else {
/* Remove the log hdr from the rseg history. */ /* Remove the log hdr from the rseg history. */
trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset,
&mtr); &mtr);
mutex_exit(&rseg.mutex);
mtr.commit();
} }
mtr.commit();
mtr.start(); mtr.start();
mutex_enter(&rseg.mutex);
rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
...@@ -568,7 +552,10 @@ static void trx_purge_truncate_history() ...@@ -568,7 +552,10 @@ static void trx_purge_truncate_history()
if (trx_rseg_t *rseg= trx_sys.rseg_array[i]) if (trx_rseg_t *rseg= trx_sys.rseg_array[i])
{ {
ut_ad(rseg->id == i); ut_ad(rseg->id == i);
ut_ad(rseg->is_persistent());
mutex_enter(&rseg->mutex);
trx_purge_truncate_rseg_history(*rseg, head); trx_purge_truncate_rseg_history(*rseg, head);
mutex_exit(&rseg->mutex);
} }
} }
...@@ -611,34 +598,31 @@ static void trx_purge_truncate_history() ...@@ -611,34 +598,31 @@ static void trx_purge_truncate_history()
DBUG_LOG("undo", "marking for truncate: " << file->name); DBUG_LOG("undo", "marking for truncate: " << file->name);
for (ulint i= 0; i < TRX_SYS_N_RSEGS; ++i)
if (trx_rseg_t *rseg= trx_sys.rseg_array[i])
if (rseg->space == &space)
/* Once set, this rseg will not be allocated to subsequent
transactions, but we will wait for existing active
transactions to finish. */
rseg->skip_allocation= true;
for (ulint i= 0; i < TRX_SYS_N_RSEGS; ++i) for (ulint i= 0; i < TRX_SYS_N_RSEGS; ++i)
{ {
trx_rseg_t *rseg= trx_sys.rseg_array[i]; trx_rseg_t *rseg= trx_sys.rseg_array[i];
if (!rseg || rseg->space != &space) if (!rseg || rseg->space != &space)
continue; continue;
mutex_enter(&rseg->mutex);
ut_ad(rseg->skip_allocation);
ut_ad(rseg->is_persistent()); ut_ad(rseg->is_persistent());
if (rseg->trx_ref_count)
mutex_enter(&rseg->mutex);
/* Once set, this rseg will not be allocated to subsequent
transactions, but we will wait for existing active
transactions to finish and to be purged. */
rseg->skip_allocation = true;
if (rseg->trx_ref_count || rseg->needs_purge > head.trx_no)
{ {
not_free: not_free:
mutex_exit(&rseg->mutex); mutex_exit(&rseg->mutex);
return; return;
} }
if (rseg->curr_size != 1) ut_ad(UT_LIST_GET_LEN(rseg->undo_list) == 0);
{
/* Check if all segments are cached and safe to remove. */ /* Check if all segments are cached and safe to remove. */
ulint cached= 0; ulint cached= 0;
for (trx_undo_t *undo= UT_LIST_GET_FIRST(rseg->undo_cached); undo;
for (const trx_undo_t *undo= UT_LIST_GET_FIRST(rseg->undo_cached); undo;
undo= UT_LIST_GET_NEXT(undo_list, undo)) undo= UT_LIST_GET_NEXT(undo_list, undo))
{ {
if (head.trx_no < undo->trx_id) if (head.trx_no < undo->trx_id)
...@@ -651,7 +635,6 @@ static void trx_purge_truncate_history() ...@@ -651,7 +635,6 @@ static void trx_purge_truncate_history()
if (rseg->curr_size > cached + 1) if (rseg->curr_size > cached + 1)
goto not_free; goto not_free;
}
mutex_exit(&rseg->mutex); mutex_exit(&rseg->mutex);
} }
...@@ -753,6 +736,8 @@ static void trx_purge_truncate_history() ...@@ -753,6 +736,8 @@ static void trx_purge_truncate_history()
ut_ad(rseg->id == i); ut_ad(rseg->id == i);
ut_ad(rseg->is_persistent()); ut_ad(rseg->is_persistent());
ut_ad(!rseg->trx_ref_count);
ut_ad(rseg->needs_purge <= head.trx_no);
ut_d(const auto old_page= rseg->page_no); ut_d(const auto old_page= rseg->page_no);
buf_block_t *rblock= trx_rseg_header_create(&space, i, buf_block_t *rblock= trx_rseg_header_create(&space, i,
...@@ -775,9 +760,6 @@ static void trx_purge_truncate_history() ...@@ -775,9 +760,6 @@ static void trx_purge_truncate_history()
ut_free(undo); ut_free(undo);
} }
UT_LIST_INIT(rseg->undo_list, &trx_undo_t::undo_list);
UT_LIST_INIT(rseg->undo_cached, &trx_undo_t::undo_list);
/* These were written by trx_rseg_header_create(). */ /* These were written by trx_rseg_header_create(). */
ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + rblock->frame)); ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + rblock->frame));
ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE + ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE +
...@@ -786,9 +768,10 @@ static void trx_purge_truncate_history() ...@@ -786,9 +768,10 @@ static void trx_purge_truncate_history()
the rseg header */ the rseg header */
rseg->curr_size= 1; rseg->curr_size= 1;
rseg->trx_ref_count= 0; rseg->trx_ref_count= 0;
rseg->needs_purge= 0;
rseg->skip_allocation= false;
rseg->last_page_no= FIL_NULL; rseg->last_page_no= FIL_NULL;
rseg->last_commit_and_offset= 0; rseg->last_commit_and_offset= 0;
rseg->needs_purge= false;
} }
mtr.commit_shrink(space); mtr.commit_shrink(space);
...@@ -812,17 +795,6 @@ static void trx_purge_truncate_history() ...@@ -812,17 +795,6 @@ static void trx_purge_truncate_history()
log_buffer_flush_to_disk(); log_buffer_flush_to_disk();
DBUG_SUICIDE();); DBUG_SUICIDE(););
for (ulint i= 0; i < TRX_SYS_N_RSEGS; ++i)
{
if (trx_rseg_t *rseg= trx_sys.rseg_array[i])
{
ut_ad(rseg->id == i);
ut_ad(rseg->is_persistent());
if (rseg->space == &space)
rseg->skip_allocation= false;
}
}
ib::info() << "Truncated " << file->name; ib::info() << "Truncated " << file->name;
purge_sys.truncate.last= purge_sys.truncate.current; purge_sys.truncate.last= purge_sys.truncate.current;
ut_ad(&space == purge_sys.truncate.current); ut_ad(&space == purge_sys.truncate.current);
...@@ -891,7 +863,6 @@ static void trx_purge_rseg_get_next_history_log( ...@@ -891,7 +863,6 @@ static void trx_purge_rseg_get_next_history_log(
trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1); ut_ad(mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE) <= 1);
const byte needs_purge = log_hdr[TRX_UNDO_NEEDS_PURGE + 1];
mtr.commit(); mtr.commit();
...@@ -899,7 +870,6 @@ static void trx_purge_rseg_get_next_history_log( ...@@ -899,7 +870,6 @@ static void trx_purge_rseg_get_next_history_log(
purge_sys.rseg->last_page_no = prev_log_addr.page; purge_sys.rseg->last_page_no = prev_log_addr.page;
purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no); purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no);
purge_sys.rseg->needs_purge = needs_purge != 0;
/* Purge can also produce events, however these are already ordered /* Purge can also produce events, however these are already ordered
in the rollback segment and any user generated event will be greater in the rollback segment and any user generated event will be greater
......
...@@ -421,10 +421,9 @@ trx_rseg_mem_create(ulint id, fil_space_t* space, uint32_t page_no) ...@@ -421,10 +421,9 @@ trx_rseg_mem_create(ulint id, fil_space_t* space, uint32_t page_no)
/** Read the undo log lists. /** Read the undo log lists.
@param[in,out] rseg rollback segment @param[in,out] rseg rollback segment
@param[in,out] max_trx_id maximum observed transaction identifier
@param[in] rseg_header rollback segment header @param[in] rseg_header rollback segment header
@return error code */ @return error code */
static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id, static dberr_t trx_undo_lists_init(trx_rseg_t *rseg,
const buf_block_t *rseg_header) const buf_block_t *rseg_header)
{ {
ut_ad(srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN); ut_ad(srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN);
...@@ -434,8 +433,8 @@ static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id, ...@@ -434,8 +433,8 @@ static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id,
uint32_t page_no= trx_rsegf_get_nth_undo(rseg_header, i); uint32_t page_no= trx_rsegf_get_nth_undo(rseg_header, i);
if (page_no != FIL_NULL) if (page_no != FIL_NULL)
{ {
const trx_undo_t *undo= trx_undo_mem_create_at_db_start(rseg, i, page_no, const trx_undo_t *undo=
max_trx_id); trx_undo_mem_create_at_db_start(rseg, i, page_no);
if (!undo) if (!undo)
return DB_CORRUPTION; return DB_CORRUPTION;
rseg->curr_size+= undo->size; rseg->curr_size+= undo->size;
...@@ -448,11 +447,9 @@ static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id, ...@@ -448,11 +447,9 @@ static dberr_t trx_undo_lists_init(trx_rseg_t *rseg, trx_id_t &max_trx_id,
/** Restore the state of a persistent rollback segment. /** Restore the state of a persistent rollback segment.
@param[in,out] rseg persistent rollback segment @param[in,out] rseg persistent rollback segment
@param[in,out] max_trx_id maximum observed transaction identifier
@param[in,out] mtr mini-transaction @param[in,out] mtr mini-transaction
@return error code */ @return error code */
static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id, static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
mtr_t *mtr)
{ {
buf_block_t* rseg_hdr = trx_rsegf_get_new( buf_block_t* rseg_hdr = trx_rsegf_get_new(
rseg->space->id, rseg->page_no, mtr); rseg->space->id, rseg->page_no, mtr);
...@@ -460,9 +457,8 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id, ...@@ -460,9 +457,8 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
if (!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + rseg_hdr->frame)) { if (!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + rseg_hdr->frame)) {
trx_id_t id = mach_read_from_8(TRX_RSEG + TRX_RSEG_MAX_TRX_ID trx_id_t id = mach_read_from_8(TRX_RSEG + TRX_RSEG_MAX_TRX_ID
+ rseg_hdr->frame); + rseg_hdr->frame);
if (id > rseg->needs_purge) {
if (id > max_trx_id) { rseg->needs_purge = id;
max_trx_id = id;
} }
const byte* binlog_name = TRX_RSEG + TRX_RSEG_BINLOG_NAME const byte* binlog_name = TRX_RSEG + TRX_RSEG_BINLOG_NAME
...@@ -505,7 +501,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id, ...@@ -505,7 +501,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
rseg->curr_size = mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE rseg->curr_size = mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE
+ rseg_hdr->frame) + rseg_hdr->frame)
+ 1; + 1;
if (dberr_t err = trx_undo_lists_init(rseg, max_trx_id, rseg_hdr)) { if (dberr_t err = trx_undo_lists_init(rseg, rseg_hdr)) {
return err; return err;
} }
...@@ -524,23 +520,20 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id, ...@@ -524,23 +520,20 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
const buf_block_t* block = trx_undo_page_get( const buf_block_t* block = trx_undo_page_get(
page_id_t(rseg->space->id, node_addr.page), mtr); page_id_t(rseg->space->id, node_addr.page), mtr);
trx_id_t id = mach_read_from_8(block->frame + node_addr.boffset trx_id_t trx_id, id;
trx_id = mach_read_from_8(block->frame + node_addr.boffset
+ TRX_UNDO_TRX_ID); + TRX_UNDO_TRX_ID);
if (id > max_trx_id) {
max_trx_id = id;
}
id = mach_read_from_8(block->frame + node_addr.boffset id = mach_read_from_8(block->frame + node_addr.boffset
+ TRX_UNDO_TRX_NO); + TRX_UNDO_TRX_NO);
if (id > max_trx_id) { trx_id = std::max(trx_id, id);
max_trx_id = id;
if (trx_id > rseg->needs_purge) {
rseg->needs_purge = trx_id;
} }
rseg->set_last_commit(node_addr.boffset, id); rseg->set_last_commit(node_addr.boffset, id);
unsigned purge = mach_read_from_2(block->frame ut_ad(mach_read_from_2(block->frame + node_addr.boffset
+ node_addr.boffset + TRX_UNDO_NEEDS_PURGE) <= 1);
+ TRX_UNDO_NEEDS_PURGE);
ut_ad(purge <= 1);
rseg->needs_purge = purge != 0;
if (rseg->last_page_no != FIL_NULL) { if (rseg->last_page_no != FIL_NULL) {
...@@ -618,9 +611,12 @@ dberr_t trx_rseg_array_init() ...@@ -618,9 +611,12 @@ dberr_t trx_rseg_array_init()
ut_ad(rseg->id == rseg_id); ut_ad(rseg->id == rseg_id);
ut_ad(!trx_sys.rseg_array[rseg_id]); ut_ad(!trx_sys.rseg_array[rseg_id]);
trx_sys.rseg_array[rseg_id] = rseg; trx_sys.rseg_array[rseg_id] = rseg;
if ((err = trx_rseg_mem_restore( err = trx_rseg_mem_restore(rseg, &mtr);
rseg, max_trx_id, &mtr)) if (rseg->needs_purge > max_trx_id) {
!= DB_SUCCESS) { max_trx_id = rseg->needs_purge;
}
if (err != DB_SUCCESS) {
mtr.commit(); mtr.commit();
break; break;
} }
......
...@@ -665,6 +665,7 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, ...@@ -665,6 +665,7 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
uint64_t *rows_to_undo) uint64_t *rows_to_undo)
{ {
trx_state_t state; trx_state_t state;
ut_ad(rseg->needs_purge >= undo->trx_id);
/* /*
This is single-threaded startup code, we do not need the This is single-threaded startup code, we do not need the
protection of trx->mutex here. protection of trx->mutex here.
...@@ -688,6 +689,7 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, ...@@ -688,6 +689,7 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
return; return;
} }
++rseg->trx_ref_count;
trx_t *trx= trx_create(); trx_t *trx= trx_create();
trx->state= state; trx->state= state;
ut_d(trx->start_file= __FILE__); ut_d(trx->start_file= __FILE__);
...@@ -696,12 +698,6 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, ...@@ -696,12 +698,6 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
trx->rsegs.m_redo.undo= undo; trx->rsegs.m_redo.undo= undo;
trx->undo_no= undo->top_undo_no + 1; trx->undo_no= undo->top_undo_no + 1;
trx->rsegs.m_redo.rseg= rseg; trx->rsegs.m_redo.rseg= rseg;
/*
For transactions with active data will not have rseg size = 1
or will not qualify for purge limit criteria. So it is safe to increment
this trx_ref_count w/o mutex protection.
*/
++trx->rsegs.m_redo.rseg->trx_ref_count;
*trx->xid= undo->xid; *trx->xid= undo->xid;
trx->id= undo->trx_id; trx->id= undo->trx_id;
trx->is_recovered= true; trx->is_recovered= true;
...@@ -776,7 +772,8 @@ dberr_t trx_lists_init_at_db_start() ...@@ -776,7 +772,8 @@ dberr_t trx_lists_init_at_db_start()
ut_ad(trx->start_time == start_time); ut_ad(trx->start_time == start_time);
ut_ad(trx->is_recovered); ut_ad(trx->is_recovered);
ut_ad(trx->rsegs.m_redo.rseg == rseg); ut_ad(trx->rsegs.m_redo.rseg == rseg);
ut_ad(trx->rsegs.m_redo.rseg->trx_ref_count); ut_ad(rseg->trx_ref_count);
ut_ad(rseg->needs_purge);
trx->rsegs.m_redo.undo = undo; trx->rsegs.m_redo.undo = undo;
if (undo->top_undo_no >= trx->undo_no) { if (undo->top_undo_no >= trx->undo_no) {
...@@ -808,20 +805,18 @@ dberr_t trx_lists_init_at_db_start() ...@@ -808,20 +805,18 @@ dberr_t trx_lists_init_at_db_start()
/** Assign a persistent rollback segment in a round-robin fashion, /** Assign a persistent rollback segment in a round-robin fashion,
evenly distributed between 0 and innodb_undo_logs-1 evenly distributed between 0 and innodb_undo_logs-1
@return persistent rollback segment @param trx transaction */
@retval NULL if innodb_read_only */ static void trx_assign_rseg_low(trx_t *trx)
static trx_rseg_t* trx_assign_rseg_low()
{ {
if (high_level_read_only) { ut_ad(!trx->rsegs.m_redo.rseg);
ut_ad(!srv_available_undo_logs);
return(NULL);
}
ut_ad(srv_available_undo_logs == TRX_SYS_N_RSEGS); ut_ad(srv_available_undo_logs == TRX_SYS_N_RSEGS);
/* The first slot is always assigned to the system tablespace. */ /* The first slot is always assigned to the system tablespace. */
ut_ad(trx_sys.rseg_array[0]->space == fil_system.sys_space); ut_ad(trx_sys.rseg_array[0]->space == fil_system.sys_space);
trx_sys.register_rw(trx);
ut_ad(trx->id);
/* Choose a rollback segment evenly distributed between 0 and /* Choose a rollback segment evenly distributed between 0 and
innodb_undo_logs-1 in a round-robin fashion, skipping those innodb_undo_logs-1 in a round-robin fashion, skipping those
undo tablespaces that are scheduled for truncation. */ undo tablespaces that are scheduled for truncation. */
...@@ -835,7 +830,7 @@ static trx_rseg_t* trx_assign_rseg_low() ...@@ -835,7 +830,7 @@ static trx_rseg_t* trx_assign_rseg_low()
bool look_for_rollover = false; bool look_for_rollover = false;
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
bool allocated = false; bool skip_allocation;
do { do {
for (;;) { for (;;) {
...@@ -879,20 +874,18 @@ static trx_rseg_t* trx_assign_rseg_low() ...@@ -879,20 +874,18 @@ static trx_rseg_t* trx_assign_rseg_low()
break; break;
} }
/* By now we have only selected the rseg but not marked it
allocated. By marking it allocated we are ensuring that it will
never be selected for UNDO truncate purge. */
mutex_enter(&rseg->mutex); mutex_enter(&rseg->mutex);
if (!rseg->skip_allocation) { ut_ad(rseg->is_persistent());
rseg->trx_ref_count++; skip_allocation = rseg->skip_allocation;
allocated = true; if (!skip_allocation) {
/* Ensure that the allocation remains valid until
trx_undo_reuse_cached() is invoked. */
++rseg->trx_ref_count;
} }
mutex_exit(&rseg->mutex); mutex_exit(&rseg->mutex);
} while (!allocated); } while (skip_allocation);
ut_ad(rseg->trx_ref_count > 0); trx->rsegs.m_redo.rseg = rseg;
ut_ad(rseg->is_persistent());
return(rseg);
} }
/** Assign a rollback segment for modifying temporary tables. /** Assign a rollback segment for modifying temporary tables.
...@@ -976,15 +969,11 @@ trx_start_low( ...@@ -976,15 +969,11 @@ trx_start_low(
if (!trx->read_only if (!trx->read_only
&& (trx->mysql_thd == 0 || read_write || trx->ddl)) { && (trx->mysql_thd == 0 || read_write || trx->ddl)) {
/* Temporary rseg is assigned only if the transaction /* Temporary rseg is assigned only if the transaction
updates a temporary table */ updates a temporary table */
trx->rsegs.m_redo.rseg = trx_assign_rseg_low(); if (!high_level_read_only) {
ut_ad(trx->rsegs.m_redo.rseg != 0 trx_assign_rseg_low(trx);
|| srv_read_only_mode }
|| srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO);
trx_sys.register_rw(trx);
} else { } else {
if (!trx->is_autocommit_non_locking()) { if (!trx->is_autocommit_non_locking()) {
...@@ -1081,25 +1070,22 @@ trx_write_serialisation_history( ...@@ -1081,25 +1070,22 @@ trx_write_serialisation_history(
trx_undo_t*& undo = trx->rsegs.m_redo.undo; trx_undo_t*& undo = trx->rsegs.m_redo.undo;
if (!undo) {
return;
}
ut_ad(!trx->read_only); ut_ad(!trx->read_only);
ut_ad(!undo || undo->rseg == rseg);
mutex_enter(&rseg->mutex); mutex_enter(&rseg->mutex);
ut_ad(rseg->trx_ref_count);
--rseg->trx_ref_count;
/* Assign the transaction serialisation number and add any /* Assign the transaction serialisation number and add any
undo log to the purge queue. */ undo log to the purge queue. */
trx_serialise(trx);
if (undo) { if (undo) {
ut_ad(undo->rseg == rseg);
trx_serialise(trx);
UT_LIST_REMOVE(rseg->undo_list, undo); UT_LIST_REMOVE(rseg->undo_list, undo);
trx_purge_add_undo_to_history(trx, undo, mtr); trx_purge_add_undo_to_history(trx, undo, mtr);
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
} }
mutex_exit(&rseg->mutex); mutex_exit(&rseg->mutex);
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
} }
/******************************************************************** /********************************************************************
...@@ -1401,16 +1387,6 @@ inline void trx_t::commit_in_memory(const mtr_t *mtr) ...@@ -1401,16 +1387,6 @@ inline void trx_t::commit_in_memory(const mtr_t *mtr)
ut_ad(!rsegs.m_noredo.undo); ut_ad(!rsegs.m_noredo.undo);
/* Only after trx_undo_commit_cleanup() it is safe to release
our rseg reference. */
if (trx_rseg_t *rseg= rsegs.m_redo.rseg)
{
mutex_enter(&rseg->mutex);
ut_ad(rseg->trx_ref_count > 0);
--rseg->trx_ref_count;
mutex_exit(&rseg->mutex);
}
/* Free all savepoints, starting from the first. */ /* Free all savepoints, starting from the first. */
trx_named_savept_t *savep= UT_LIST_GET_FIRST(trx_savepoints); trx_named_savept_t *savep= UT_LIST_GET_FIRST(trx_savepoints);
...@@ -1491,6 +1467,15 @@ void trx_t::commit_low(mtr_t *mtr) ...@@ -1491,6 +1467,15 @@ void trx_t::commit_low(mtr_t *mtr)
mtr->commit(); mtr->commit();
} }
else if (trx_rseg_t *rseg= rsegs.m_redo.rseg)
{
ut_ad(id);
ut_ad(!rsegs.m_redo.undo);
mutex_enter(&rseg->mutex);
--rseg->trx_ref_count;
mutex_exit(&rseg->mutex);
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (debug_sync) if (debug_sync)
DEBUG_SYNC_C("before_trx_state_committed_in_memory"); DEBUG_SYNC_C("before_trx_state_committed_in_memory");
...@@ -2295,10 +2280,7 @@ trx_set_rw_mode( ...@@ -2295,10 +2280,7 @@ trx_set_rw_mode(
return; return;
} }
trx->rsegs.m_redo.rseg = trx_assign_rseg_low(); trx_assign_rseg_low(trx);
ut_ad(trx->rsegs.m_redo.rseg != 0);
trx_sys.register_rw(trx);
/* So that we can see our own changes. */ /* So that we can see our own changes. */
if (trx->read_view.is_open()) { if (trx->read_view.is_open()) {
......
...@@ -837,12 +837,10 @@ static void trx_undo_seg_free(const trx_undo_t *undo) ...@@ -837,12 +837,10 @@ static void trx_undo_seg_free(const trx_undo_t *undo)
@param[in,out] rseg rollback segment @param[in,out] rseg rollback segment
@param[in] id rollback segment slot @param[in] id rollback segment slot
@param[in] page_no undo log segment page number @param[in] page_no undo log segment page number
@param[in,out] max_trx_id the largest observed transaction ID
@return the undo log @return the undo log
@retval nullptr on error */ @retval nullptr on error */
trx_undo_t * trx_undo_t *
trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no, trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
trx_id_t &max_trx_id)
{ {
mtr_t mtr; mtr_t mtr;
XID xid; XID xid;
...@@ -876,10 +874,20 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no, ...@@ -876,10 +874,20 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no,
const trx_ulogf_t* const undo_header = block->frame + offset; const trx_ulogf_t* const undo_header = block->frame + offset;
uint16_t state = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE uint16_t state = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ block->frame); + block->frame);
const trx_id_t trx_id= mach_read_from_8(undo_header + TRX_UNDO_TRX_ID);
if (trx_id >> 48) {
sql_print_error("InnoDB: corrupted TRX_ID %llx", trx_id);
goto corrupted;
}
/* We will increment rseg->needs_purge, like trx_undo_reuse_cached()
would do it, to avoid trouble on rollback or XA COMMIT. */
trx_id_t trx_no = trx_id + 1;
switch (state) { switch (state) {
case TRX_UNDO_ACTIVE: case TRX_UNDO_ACTIVE:
case TRX_UNDO_PREPARED: case TRX_UNDO_PREPARED:
if (UNIV_LIKELY(type != 1)) { if (UNIV_LIKELY(type != 1)) {
trx_no = trx_id + 1;
break; break;
} }
sql_print_error("InnoDB: upgrade from older version than" sql_print_error("InnoDB: upgrade from older version than"
...@@ -902,13 +910,14 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no, ...@@ -902,13 +910,14 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no,
goto corrupted_type; goto corrupted_type;
} }
read_trx_no: read_trx_no:
trx_id_t id = mach_read_from_8(TRX_UNDO_TRX_NO + undo_header); trx_no = mach_read_from_8(TRX_UNDO_TRX_NO + undo_header);
if (id >> 48) { if (trx_no >> 48) {
sql_print_error("InnoDB: corrupted TRX_NO %llx", id); sql_print_error("InnoDB: corrupted TRX_NO %llx",
trx_no);
goto corrupted; goto corrupted;
} }
if (id > max_trx_id) { if (trx_no < trx_id) {
max_trx_id = id; trx_no = trx_id;
} }
} }
...@@ -921,16 +930,10 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no, ...@@ -921,16 +930,10 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no,
xid.null(); xid.null();
} }
trx_id_t trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID);
if (trx_id >> 48) {
sql_print_error("InnoDB: corrupted TRX_ID %llx", trx_id);
goto corrupted;
}
if (trx_id > max_trx_id) {
max_trx_id = trx_id;
}
mutex_enter(&rseg->mutex); mutex_enter(&rseg->mutex);
if (trx_no > rseg->needs_purge) {
rseg->needs_purge = trx_no;
}
trx_undo_t* undo = trx_undo_mem_create( trx_undo_t* undo = trx_undo_mem_create(
rseg, id, trx_id, &xid, page_no, offset); rseg, id, trx_id, &xid, page_no, offset);
mutex_exit(&rseg->mutex); mutex_exit(&rseg->mutex);
...@@ -1128,6 +1131,22 @@ trx_undo_reuse_cached(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** pundo, ...@@ -1128,6 +1131,22 @@ trx_undo_reuse_cached(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** pundo,
{ {
ut_ad(mutex_own(&rseg->mutex)); ut_ad(mutex_own(&rseg->mutex));
if (rseg->is_persistent()) {
ut_ad(rseg->trx_ref_count);
if (rseg->needs_purge <= trx->id) {
/* trx_purge_truncate_history() compares
rseg->needs_purge <= head.trx_no
so we need to compensate for that.
The rseg->needs_purge after crash
recovery would be at least trx->id + 1,
because that is the minimum possible value
assigned by trx_serialise() on commit. */
rseg->needs_purge = trx->id + 1;
}
} else {
ut_ad(!rseg->trx_ref_count);
}
trx_undo_t* undo = UT_LIST_GET_FIRST(rseg->undo_cached); trx_undo_t* undo = UT_LIST_GET_FIRST(rseg->undo_cached);
if (!undo) { if (!undo) {
return NULL; return NULL;
...@@ -1236,10 +1255,8 @@ buf_block_t* ...@@ -1236,10 +1255,8 @@ buf_block_t*
trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo, trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
dberr_t* err, mtr_t* mtr) dberr_t* err, mtr_t* mtr)
{ {
const bool is_temp __attribute__((unused)) = rseg == trx->rsegs.m_noredo.rseg; ut_d(const bool is_temp = rseg == trx->rsegs.m_noredo.rseg);
ut_ad(is_temp || rseg == trx->rsegs.m_redo.rseg);
ut_ad(rseg == trx->rsegs.m_redo.rseg
|| rseg == trx->rsegs.m_noredo.rseg);
ut_ad(undo == (is_temp ut_ad(undo == (is_temp
? &trx->rsegs.m_noredo.undo ? &trx->rsegs.m_noredo.undo
: &trx->rsegs.m_redo.undo)); : &trx->rsegs.m_redo.undo));
...@@ -1259,7 +1276,6 @@ trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo, ...@@ -1259,7 +1276,6 @@ trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
); );
mutex_enter(&rseg->mutex); mutex_enter(&rseg->mutex);
buf_block_t* block = trx_undo_reuse_cached(trx, rseg, undo, mtr); buf_block_t* block = trx_undo_reuse_cached(trx, rseg, undo, mtr);
if (!block) { if (!block) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment