Commit 6058f92f authored by Marko Mäkelä's avatar Marko Mäkelä

Simplify undo log access during InnoDB startup

trx_rseg_mem_restore(): Update the max_trx_id from the undo log pages.

trx_sys_init_at_db_start(): Remove; merge with trx_lists_init_at_db_start().

trx_undo_lists_init(): Move to the only calling module, trx0rseg.cc.

trx_undo_mem_create_at_db_start(): Declare globally. Return the number
of pages.
parent d24229ba
......@@ -58,9 +58,6 @@ trx_sys_hdr_page(const page_id_t& page_id)
&& page_id.page_no() == TRX_SYS_PAGE_NO);
}
/** Initialize the transaction system main-memory data structures. */
void trx_sys_init_at_db_start();
/*****************************************************************//**
Creates and initializes the transaction system at the database creation. */
void
......
......@@ -210,15 +210,6 @@ trx_undo_truncate_start(
ulint hdr_page_no,
ulint hdr_offset,
undo_no_t limit);
/********************************************************************//**
Initializes the undo log lists for a rollback segment memory copy.
This function is only called when the database is started or a new
rollback segment created.
@return the combined size of undo log segments in pages */
ulint
trx_undo_lists_init(
/*================*/
trx_rseg_t* rseg); /*!< in: rollback segment memory object */
/** Assign an undo log for a persistent transaction.
A new undo log is created or a cached undo log reused.
@param[in,out] trx transaction
......@@ -326,6 +317,13 @@ trx_undo_parse_page_header(
const byte* end_ptr,
page_t* page,
mtr_t* mtr);
/** Read an undo log when starting up the database.
@param[in,out] rseg rollback segment
@param[in] id rollback segment slot
@param[in] page_no undo log segment page number
@return size of the undo log in pages */
ulint
trx_undo_mem_create_at_db_start(trx_rseg_t* rseg, ulint id, ulint page_no);
/************************************************************************
Frees an undo log memory copy. */
void
......
......@@ -2173,7 +2173,7 @@ innobase_start_or_create_for_mysql()
All the remaining rollback segments will be created later,
after the double write buffer has been created. */
trx_sys_create_sys_pages();
trx_sys_init_at_db_start();
trx_lists_init_at_db_start();
err = dict_create();
......@@ -2234,7 +2234,7 @@ innobase_start_or_create_for_mysql()
}
/* This must precede
recv_apply_hashed_log_recs(true). */
trx_sys_init_at_db_start();
trx_lists_init_at_db_start();
break;
case SRV_OPERATION_RESTORE_DELTA:
case SRV_OPERATION_BACKUP:
......@@ -2325,7 +2325,7 @@ innobase_start_or_create_for_mysql()
}
/* recv_recovery_from_checkpoint_finish needs trx lists which
are initialized in trx_sys_init_at_db_start(). */
are initialized in trx_lists_init_at_db_start(). */
recv_recovery_from_checkpoint_finish();
......
......@@ -168,49 +168,71 @@ trx_rseg_mem_create(ulint id, ulint space, ulint page_no)
return(rseg);
}
/** Restore the state of a persistent rollback segment.
@param[in,out] rseg persistent rollback segment
@param[in,out] mtr mini-transaction */
/** Read the undo log lists.
@param[in,out] rseg rollback segment
@param[in] rseg_header rollback segment header
@param[in,out] mtr mini-transaction
@return the combined size of undo log segments in pages */
static
void
trx_rseg_mem_restore(trx_rseg_t* rseg, mtr_t* mtr)
ulint
trx_undo_lists_init(trx_rseg_t* rseg, const trx_rsegf_t* rseg_header,
mtr_t* mtr)
{
ulint len;
fil_addr_t node_addr;
trx_rsegf_t* rseg_header;
trx_ulogf_t* undo_log_hdr;
ulint sum_of_undo_sizes;
ut_ad(srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN);
rseg_header = trx_rsegf_get_new(rseg->space, rseg->page_no, mtr);
ulint size = 0;
rseg->max_size = mtr_read_ulint(
rseg_header + TRX_RSEG_MAX_SIZE, MLOG_4BYTES, mtr);
for (ulint i = 0; i < TRX_RSEG_N_SLOTS; i++) {
ulint page_no = trx_rsegf_get_nth_undo(rseg_header, i);
if (page_no != FIL_NULL) {
size += trx_undo_mem_create_at_db_start(
rseg, i, page_no);
MONITOR_INC(MONITOR_NUM_UNDO_SLOT_USED);
}
}
/* Initialize the undo log lists according to the rseg header */
return(size);
}
sum_of_undo_sizes = trx_undo_lists_init(rseg);
/** Restore the state of a persistent rollback segment.
@param[in,out] rseg persistent rollback segment
@param[in,out] max_trx_id maximum observed transaction identifier
@param[in,out] mtr mini-transaction */
static
void
trx_rseg_mem_restore(trx_rseg_t* rseg, trx_id_t& max_trx_id, mtr_t* mtr)
{
const trx_rsegf_t* rseg_header = trx_rsegf_get_new(
rseg->space, rseg->page_no, mtr);
rseg->max_size = mach_read_from_4(rseg_header + TRX_RSEG_MAX_SIZE);
rseg->curr_size = mtr_read_ulint(
rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr)
+ 1 + sum_of_undo_sizes;
/* Initialize the undo log lists according to the rseg header */
len = flst_get_len(rseg_header + TRX_RSEG_HISTORY);
rseg->curr_size = mach_read_from_4(rseg_header + TRX_RSEG_HISTORY_SIZE)
+ 1 + trx_undo_lists_init(rseg, rseg_header, mtr);
if (len > 0) {
if (ulint len = flst_get_len(rseg_header + TRX_RSEG_HISTORY)) {
my_atomic_addlint(&trx_sys.rseg_history_len, len);
node_addr = trx_purge_get_log_from_hist(
fil_addr_t node_addr = trx_purge_get_log_from_hist(
flst_get_last(rseg_header + TRX_RSEG_HISTORY, mtr));
rseg->last_page_no = node_addr.page;
rseg->last_offset = node_addr.boffset;
undo_log_hdr = trx_undo_page_get(
const trx_ulogf_t* undo_log_hdr = trx_undo_page_get(
page_id_t(rseg->space, node_addr.page), mtr)
+ node_addr.boffset;
rseg->last_trx_no = mach_read_from_8(
undo_log_hdr + TRX_UNDO_TRX_NO);
trx_id_t id = mach_read_from_8(undo_log_hdr + TRX_UNDO_TRX_ID);
if (id > max_trx_id) {
max_trx_id = id;
}
id = mach_read_from_8(undo_log_hdr + TRX_UNDO_TRX_NO);
rseg->last_trx_no = id;
if (id > max_trx_id) {
max_trx_id = id;
}
unsigned purge = mach_read_from_2(
undo_log_hdr + TRX_UNDO_NEEDS_PURGE);
ut_ad(purge <= 1);
......@@ -233,11 +255,30 @@ trx_rseg_mem_restore(trx_rseg_t* rseg, mtr_t* mtr)
void
trx_rseg_array_init()
{
mtr_t mtr;
trx_id_t max_trx_id = 0;
for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
mtr_t mtr;
mtr.start();
if (const buf_block_t* sys = trx_sysf_get(&mtr, false)) {
if (rseg_id == 0) {
/* VERY important: after the database
is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN,
and the first call of
trx_sys.get_new_trx_id() will invoke
flush_max_trx_id()! Thus trx id values
will not overlap when the database is
repeatedly started! */
max_trx_id = 2 * TRX_SYS_TRX_ID_WRITE_MARGIN
+ ut_uint64_align_up(
mach_read_from_8(
TRX_SYS
+ TRX_SYS_TRX_ID_STORE
+ sys->frame),
TRX_SYS_TRX_ID_WRITE_MARGIN);
}
const uint32_t page_no = trx_sysf_rseg_get_page_no(
sys, rseg_id);
if (page_no != FIL_NULL) {
......@@ -249,12 +290,14 @@ trx_rseg_array_init()
ut_ad(rseg->id == rseg_id);
ut_ad(!trx_sys.rseg_array[rseg_id]);
trx_sys.rseg_array[rseg_id] = rseg;
trx_rseg_mem_restore(rseg, &mtr);
trx_rseg_mem_restore(rseg, max_trx_id, &mtr);
}
}
mtr.commit();
}
trx_sys.init_max_trx_id(max_trx_id);
}
/** Create a persistent rollback segment.
......
......@@ -415,39 +415,6 @@ trx_sysf_create(
ut_a(page_no == FSP_FIRST_RSEG_PAGE_NO);
}
/** Initialize the transaction system main-memory data structures. */
void
trx_sys_init_at_db_start()
{
/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the 'if' in
trx_sys.get_new_trx_id will evaluate to TRUE when the function
is first time called, and the value for trx id will be written
to the disk-based header! Thus trx id values will not overlap when
the database is repeatedly started! */
mtr_t mtr;
mtr.start();
buf_block_t* block = trx_sysf_get(&mtr);
trx_id_t max_trx_id = block
? 2 * TRX_SYS_TRX_ID_WRITE_MARGIN
+ ut_uint64_align_up(mach_read_from_8(TRX_SYS
+ TRX_SYS_TRX_ID_STORE
+ block->frame),
TRX_SYS_TRX_ID_WRITE_MARGIN)
: 0;
trx_sys.init_max_trx_id(max_trx_id);
mtr.commit();
trx_dummy_sess = sess_open();
trx_lists_init_at_db_start();
trx_sys.mvcc.clone_oldest_view(&purge_sys->view);
}
/** Create the instance */
void
trx_sys_t::create()
......
......@@ -895,22 +895,23 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
void
trx_lists_init_at_db_start()
{
uint64_t rows_to_undo = 0;
ut_a(srv_is_being_started);
ut_ad(!srv_was_started);
ut_ad(!purge_sys);
purge_sys = UT_NEW_NOKEY(purge_sys_t());
ut_ad(!trx_dummy_sess);
if (srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN) {
return;
}
trx_dummy_sess = sess_open();
purge_sys = UT_NEW_NOKEY(purge_sys_t());
trx_rseg_array_init();
/* Look from the rollback segments if there exist undo logs for
transactions. */
const ib_time_t start_time = ut_time();
const ib_time_t start_time = ut_time();
uint64_t rows_to_undo = 0;
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
trx_undo_t* undo;
......@@ -979,6 +980,7 @@ trx_lists_init_at_db_start()
}
std::sort(trx_sys.rw_trx_ids.begin(), trx_sys.rw_trx_ids.end());
trx_sys.mvcc.clone_oldest_view(&purge_sys->view);
}
/** Assign a persistent rollback segment in a round-robin fashion,
......
......@@ -684,10 +684,7 @@ trx_undo_write_xid(
Read X/Open XA Transaction Identification (XID) from undo log header */
static
void
trx_undo_read_xid(
/*==============*/
trx_ulogf_t* log_hdr,/*!< in: undo log header */
XID* xid) /*!< out: X/Open XA Transaction Identification */
trx_undo_read_xid(const trx_ulogf_t* log_hdr, XID* xid)
{
xid->formatID=static_cast<long>(mach_read_from_4(
log_hdr + TRX_UNDO_XA_FORMAT));
......@@ -1102,66 +1099,52 @@ trx_undo_seg_free(
/*========== UNDO LOG MEMORY COPY INITIALIZATION =====================*/
/********************************************************************//**
Creates and initializes an undo log memory object according to the values
in the header in file, when the database is started. The memory object is
inserted in the appropriate list of rseg.
@return own: the undo log memory object */
static
trx_undo_t*
trx_undo_mem_create_at_db_start(
/*============================*/
trx_rseg_t* rseg, /*!< in: rollback segment memory object */
ulint id, /*!< in: slot index within rseg */
ulint page_no,/*!< in: undo log segment page number */
mtr_t* mtr) /*!< in: mtr */
/** Read an undo log when starting up the database.
@param[in,out] rseg rollback segment
@param[in] id rollback segment slot
@param[in] page_no undo log segment page number
@return size of the undo log in pages */
ulint
trx_undo_mem_create_at_db_start(trx_rseg_t* rseg, ulint id, ulint page_no)
{
page_t* undo_page;
trx_usegf_t* seg_header;
trx_ulogf_t* undo_header;
trx_undo_t* undo;
ulint state;
trx_id_t trx_id;
ulint offset;
mtr_t mtr;
XID xid;
ut_a(id < TRX_RSEG_N_SLOTS);
ut_ad(id < TRX_RSEG_N_SLOTS);
undo_page = trx_undo_page_get(page_id_t(rseg->space, page_no), mtr);
mtr.start();
const page_t* undo_page = trx_undo_page_get(
page_id_t(rseg->space, page_no), &mtr);
const ulint type = mach_read_from_2(
TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE + undo_page);
ut_ad(type == 0 || type == TRX_UNDO_INSERT || type == TRX_UNDO_UPDATE);
seg_header = undo_page + TRX_UNDO_SEG_HDR;
state = mach_read_from_2(seg_header + TRX_UNDO_STATE);
offset = mach_read_from_2(seg_header + TRX_UNDO_LAST_LOG);
undo_header = undo_page + offset;
trx_id = mach_read_from_8(undo_header + TRX_UNDO_TRX_ID);
uint state = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ undo_page);
uint offset = mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG
+ undo_page);
const bool xid_exists = mtr_read_ulint(
undo_header + TRX_UNDO_XID_EXISTS, MLOG_1BYTE, mtr);
const trx_ulogf_t* undo_header = undo_page + offset;
/* Read X/Open XA transaction identification if it exists, or
set it to NULL. */
xid.null();
if (xid_exists) {
if (undo_header[TRX_UNDO_XID_EXISTS]) {
trx_undo_read_xid(undo_header, &xid);
} else {
xid.null();
}
mutex_enter(&(rseg->mutex));
undo = trx_undo_mem_create(rseg, id, trx_id, &xid, page_no, offset);
mutex_exit(&(rseg->mutex));
undo->dict_operation = mtr_read_ulint(
undo_header + TRX_UNDO_DICT_TRANS, MLOG_1BYTE, mtr);
mutex_enter(&rseg->mutex);
trx_undo_t* undo = trx_undo_mem_create(
rseg, id, mach_read_from_8(undo_header + TRX_UNDO_TRX_ID),
&xid, page_no, offset);
mutex_exit(&rseg->mutex);
undo->dict_operation = undo_header[TRX_UNDO_DICT_TRANS];
undo->table_id = mach_read_from_8(undo_header + TRX_UNDO_TABLE_ID);
undo->size = flst_get_len(seg_header + TRX_UNDO_PAGE_LIST);
undo->size = flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ undo_page);
if (UNIV_UNLIKELY(state == TRX_UNDO_TO_FREE)) {
/* This is an old-format insert_undo log segment that
......@@ -1170,13 +1153,14 @@ trx_undo_mem_create_at_db_start(
state = TRX_UNDO_TO_PURGE;
} else {
fil_addr_t last_addr = flst_get_last(
seg_header + TRX_UNDO_PAGE_LIST, mtr);
TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + undo_page,
&mtr);
undo->last_page_no = last_addr.page;
undo->top_page_no = last_addr.page;
page_t* last_page = trx_undo_page_get(
page_id_t(rseg->space, undo->last_page_no), mtr);
page_id_t(rseg->space, undo->last_page_no), &mtr);
const trx_undo_rec_t* rec = trx_undo_page_get_last_rec(
last_page, page_no, offset);
......@@ -1199,61 +1183,8 @@ trx_undo_mem_create_at_db_start(
MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED);
}
return(undo);
}
/********************************************************************//**
Initializes the undo log lists for a rollback segment memory copy. This
function is only called when the database is started or a new rollback
segment is created.
@return the combined size of undo log segments in pages */
ulint
trx_undo_lists_init(
/*================*/
trx_rseg_t* rseg) /*!< in: rollback segment memory object */
{
ulint size = 0;
trx_rsegf_t* rseg_header;
ulint i;
mtr_t mtr;
mtr_start(&mtr);
rseg_header = trx_rsegf_get_new(rseg->space, rseg->page_no, &mtr);
for (i = 0; i < TRX_RSEG_N_SLOTS; i++) {
uint32_t page_no = trx_rsegf_get_nth_undo(rseg_header, i);
/* In forced recovery: try to avoid operations which look
at database pages; undo logs are rapidly changing data, and
the probability that they are in an inconsistent state is
high */
if (page_no != FIL_NULL
&& srv_force_recovery < SRV_FORCE_NO_UNDO_LOG_SCAN) {
trx_undo_t* undo;
undo = trx_undo_mem_create_at_db_start(
rseg, i, page_no, &mtr);
size += undo->size;
mtr_commit(&mtr);
mtr_start(&mtr);
rseg_header = trx_rsegf_get(
rseg->space, rseg->page_no, &mtr);
/* Found a used slot */
MONITOR_INC(MONITOR_NUM_UNDO_SLOT_USED);
}
}
mtr_commit(&mtr);
return(size);
mtr.commit();
return undo->size;
}
/********************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment