Commit b7fc2c89 authored by Marko Mäkelä's avatar Marko Mäkelä

Refactor recv_sys_t::recs_t into page_recv_t

page_recv_t: Replaces recv_sys_t::recs_t.
page_recv_t::state is not private, even though some accessors exist.

page_recv_t::log: A singly-linked list of log_rec_t* with STL decoration
and the custom operations trim() and append(). The list members are private.

recv_t::data_t: Replaces recv_data_t.

recv_t::data: Remove the pointer indirection for the first log chunk,
and copy the first chunk directly after the record. Adjust the
definition of RECV_DATA_BLOCK_SIZE accordingly.
parent 2aa1f77e
......@@ -133,32 +133,18 @@ extern void (*log_file_op)(ulint space_id, const byte* flags,
const byte* name, ulint len,
const byte* new_name, ulint new_len);
/** Block of log record data */
struct recv_data_t{
recv_data_t* next; /*!< pointer to the next block or NULL */
/*!< the log record data is stored physically
immediately after this struct, max amount
RECV_DATA_BLOCK_SIZE bytes of it */
};
/** Stored redo log record */
struct log_rec_t
{
log_rec_t(lsn_t lsn) : next(NULL), lsn(lsn) {}
log_rec_t()= delete;
log_rec_t(const log_rec_t&)= delete;
log_rec_t &operator=(const log_rec_t&)= delete;
/** Stored log record struct */
struct recv_t{
/** next record */
recv_t* next;
/** log record body length in bytes */
uint32_t len;
/** log record type */
mlog_id_t type;
recv_data_t* data; /*!< chain of blocks containing the log record
body */
lsn_t start_lsn;/*!< start lsn of the log segment written by
the mtr which generated this log record: NOTE
that this is not necessarily the start lsn of
this log record */
lsn_t end_lsn;/*!< end lsn of the log segment written by
the mtr which generated this log record: NOTE
that this is not necessarily the end lsn of
this log record */
log_rec_t *next;
/** mtr_t::commit_lsn() of the mini-transaction */
const lsn_t lsn;
};
struct recv_dblwr_t {
......@@ -180,6 +166,68 @@ struct recv_dblwr_t {
list pages;
};
/** the recovery state and buffered records for a page */
struct page_recv_t
{
/** Recovery state */
enum
{
/** not yet processed */
RECV_NOT_PROCESSED,
/** not processed; the page will be reinitialized */
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED
} state= RECV_NOT_PROCESSED;
/** log records for a page */
class recs_t
{
/** The first log record */
log_rec_t *head= NULL;
/** The last log record */
log_rec_t *tail= NULL;
public:
/** Append a redo log snippet for the page
@param recs log snippet */
void append(log_rec_t* recs)
{
if (tail)
tail->next= recs;
else
head= recs;
tail= recs;
}
/** Trim old log records for a page
@param start_lsn oldest log sequence number to preserve
@return whether the entire log was trimmed */
inline bool trim(lsn_t start_lsn);
/** @return the last log snippet */
const log_rec_t* last() const { return tail; }
class iterator
{
log_rec_t *cur;
public:
iterator(log_rec_t* rec) : cur(rec) {}
log_rec_t* operator*() const { return cur; }
iterator &operator++() { cur= cur->next; return *this; }
bool operator!=(const iterator& i) const { return cur != i.cur; }
};
iterator begin() { return head; }
iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; }
inline void clear();
} log;
/** Ignore any earlier redo log records for this page. */
inline void will_not_read();
/** @return whether the log records for the page are being processed */
bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
};
/** Recovery system data structure */
struct recv_sys_t{
ib_mutex_t mutex; /*!< mutex protecting the fields apply_log_recs,
......@@ -236,29 +284,10 @@ struct recv_sys_t{
mem_heap_t* heap; /*!< memory heap of log records and file
addresses*/
/** buffered records waiting to be applied to a page */
struct recs_t
{
/** Recovery state */
enum {
/** not yet processed */
RECV_NOT_PROCESSED,
/** not processed; the page will be reinitialized */
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED
} state;
/** First log record */
recv_t* log;
/** Last log record */
recv_t* last;
};
using map = std::map<const page_id_t, recs_t,
using map = std::map<const page_id_t, page_recv_t,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t,recs_t>>>;
ut_allocator
<std::pair<const page_id_t, page_recv_t>>>;
/** buffered records waiting to be applied to pages */
map pages;
......
......@@ -58,7 +58,7 @@ Created 9/20/1997 Heikki Tuuri
/** Log records are stored in the hash table in chunks at most of this size;
this must be less than srv_page_size as it is stored in the buffer pool */
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_data_t) - REDZONE_SIZE)
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_t) - REDZONE_SIZE)
/** Read-ahead area in applying log records to file pages */
#define RECV_READ_AHEAD_AREA 32U
......@@ -121,6 +121,32 @@ mysql_pfs_key_t recv_writer_thread_key;
/** Is recv_writer_thread active? */
bool recv_writer_thread_active;
/** Stored physiological log record with byte-oriented start/end LSN */
struct recv_t : public log_rec_t
{
recv_t(uint32_t len, mlog_id_t type, lsn_t start_lsn, lsn_t end_lsn) :
log_rec_t(end_lsn), len(len), type(type), start_lsn(start_lsn),
data({NULL})
{}
/** log record body length in bytes */
const uint32_t len;
/** log record type */
const mlog_id_t type;
/** start LSN of the mini-transaction (not necessarily of this record) */
const lsn_t start_lsn;
/** log record */
struct data_t
{
/** pointer to the next chunk, or NULL for the last chunk. The
log record data (at most RECV_DATA_BLOCK_SIZE bytes per chunk)
is stored immediately after this field. */
data_t *next;
} data;
};
#ifndef DBUG_OFF
/** Return string name of the redo log record type.
@param[in] type record log record enum
......@@ -313,20 +339,8 @@ inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
ut_ad(mutex_own(&mutex));
for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
p != pages.end() && p->first.space() == page_id.space();) {
for (recv_t** prev = &p->second.log;;) {
if (!*prev || (*prev)->start_lsn >= lsn) {
break;
}
DBUG_LOG("ib_log", "Discarding "
<< get_mlog_string((*prev)->type)
<< " for " << p->first << " at "
<< (*prev)->start_lsn);
*prev = (*prev)->next;
}
recv_sys_t::map::iterator r = p++;
if (!r->second.log) {
if (r->second.log.trim(lsn)) {
pages.erase(r);
}
}
......@@ -1720,81 +1734,105 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
ut_ad(type != MLOG_INDEX_LOAD);
ut_ad(type != MLOG_TRUNCATE);
std::pair<map::iterator, bool> p = pages.insert(
map::value_type(page_id, recs_t{recs_t::RECV_NOT_PROCESSED,
NULL, NULL}));
recv_sys_t::recs_t& recs = p.first->second;
ut_ad(p.second == !recs.log);
ut_ad(p.second == !recs.last);
recv_data_t** prev_field;
std::pair<map::iterator, bool> p = pages.emplace(
map::value_type(page_id, page_recv_t()));
page_recv_t& recs = p.first->second;
ut_ad(p.second == recs.log.empty());
switch (type) {
case MLOG_INIT_FILE_PAGE2:
case MLOG_ZIP_PAGE_COMPRESS:
case MLOG_INIT_FREE_PAGE:
/* Ignore any earlier redo log records for this page. */
ut_ad(recs.state == recs_t::RECV_NOT_PROCESSED
|| recs.state == recs_t::RECV_WILL_NOT_READ);
recs.state = recs_t::RECV_WILL_NOT_READ;
recs.will_not_read();
mlog_init.add(page_id, lsn);
recs.last = NULL;
/* fall through */
default:
recv_t** prev = recs.last ? &recs.last->next : &recs.log;
*prev = recs.last = new (mem_heap_alloc(heap, sizeof(recv_t)))
recv_t{NULL, uint32_t(rec_end - body), type, NULL,
lsn, end_lsn};
prev_field = &(*prev)->data;
break;
}
/* Store the log record body in chunks of less than srv_page_size:
heap grows into the buffer pool, and bigger chunks could not
be allocated */
uint32_t len = uint32_t(rec_end - body);
const uint32_t chunk_limit = static_cast<uint32_t>(
RECV_DATA_BLOCK_SIZE);
uint32_t chunk_len = std::min(len, chunk_limit);
while (rec_end > body) {
ulint rec_len = ulint(rec_end - body);
recv_t* recv = new
(mem_heap_alloc(heap, sizeof(recv_t) + chunk_len))
recv_t(len, type, lsn, end_lsn);
memcpy(recv + 1, body, chunk_len);
recs.log.append(recv);
if (rec_len > RECV_DATA_BLOCK_SIZE) {
rec_len = RECV_DATA_BLOCK_SIZE;
if (UNIV_LIKELY(len == chunk_len)) {
return;
}
recv_data_t* recv_data = static_cast<recv_data_t*>(
mem_heap_alloc(heap, sizeof(recv_data_t) + rec_len));
recv_t::data_t** prev_field = &recv->data.next;
do {
body += chunk_len;
len -= chunk_len;
chunk_len = std::min(len, chunk_limit);
recv_t::data_t* recv_data = static_cast<recv_t::data_t*>
(mem_heap_alloc(heap, sizeof(recv_t::data_t)
+ chunk_len));
*prev_field = recv_data;
memcpy(recv_data + 1, body, rec_len);
memcpy(recv_data + 1, body, chunk_len);
prev_field = &recv_data->next;
} while (len != chunk_len);
body += rec_len;
*prev_field = NULL;
}
/** Trim old log records for a page
@param start_lsn oldest log sequence number to preserve
@return whether the entire log was trimmed */
inline bool page_recv_t::recs_t::trim(lsn_t start_lsn)
{
for (log_rec_t** prev= &head;; *prev= (*prev)->next)
{
if (!*prev) return true;
if ((*prev)->lsn >= start_lsn) return false;
}
}
*prev_field = NULL;
inline void page_recv_t::recs_t::clear()
{
head= tail= NULL;
}
/** Ignore any earlier redo log records for this page. */
inline void page_recv_t::will_not_read()
{
ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
state= RECV_WILL_NOT_READ;
log.clear();
}
/*********************************************************************//**
Copies the log record body from recv to buf. */
static
static ATTRIBUTE_COLD
void
recv_data_copy_to_buf(
/*==================*/
byte* buf, /*!< in: buffer of length at least recv->len */
const recv_t& recv) /*!< in: log record */
{
const recv_data_t* recv_data = recv.data;
const recv_t::data_t* recv_data = &recv.data;
ulint len = recv.len;
const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
do {
const ulint part_len = std::min<ulint>(len,
RECV_DATA_BLOCK_SIZE);
memcpy(buf, &reinterpret_cast<const byte*>(recv_data)[
sizeof(recv_data_t)],
part_len);
const ulint l = std::min(len, chunk_limit);
memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l);
recv_data = recv_data->next;
buf += part_len;
len -= part_len;
buf += l;
len -= l;
} while (len);
}
......@@ -1817,7 +1855,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
ut_ad(!init || init->created);
ut_ad(!init || init->lsn);
ut_ad(block->page.id == p->first);
ut_ad(p->second.state != recv_sys_t::recs_t::RECV_BEING_PROCESSED);
ut_ad(!p->second.is_being_processed());
if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
ib::info() << "Applying log to page " << block->page.id;
......@@ -1825,7 +1863,8 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
DBUG_LOG("ib_log", "Applying log to page " << block->page.id);
p->second.state = recv_sys_t::recs_t::RECV_BEING_PROCESSED;
p->second.state = page_recv_t::RECV_BEING_PROCESSED;
mutex_exit(&recv_sys.mutex);
page = block->frame;
......@@ -1842,14 +1881,16 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
lsn_t start_lsn = 0, end_lsn = 0;
ut_d(lsn_t recv_start_lsn = 0);
const lsn_t init_lsn = init ? init->lsn : 0;
const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
for (const recv_t* recv = p->second.log; recv; recv = recv->next) {
for (const log_rec_t* l : p->second.log) {
const recv_t* recv = static_cast<const recv_t*>(l);
ut_ad(recv->start_lsn);
ut_ad(recv->end_lsn);
ut_ad(recv->lsn);
ut_ad(recv_start_lsn < recv->start_lsn);
ut_d(recv_start_lsn = recv->start_lsn);
ut_ad(end_lsn <= recv->end_lsn);
end_lsn = recv->end_lsn;
ut_ad(end_lsn <= recv->lsn);
end_lsn = recv->lsn;
ut_ad(end_lsn <= log_sys.log.scanned_lsn);
if (recv->start_lsn < page_lsn) {
......@@ -1886,15 +1927,16 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
byte* buf;
if (recv->len > RECV_DATA_BLOCK_SIZE) {
if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
/* We have to copy the record body to
a separate buffer */
buf = static_cast<byte*>
(ut_malloc_nokey(recv->len));
recv_data_copy_to_buf(buf, *recv);
} else {
buf = reinterpret_cast<byte*>(recv->data)
+ sizeof *recv->data;
buf = const_cast<byte*>
(reinterpret_cast<const byte*>
(&recv->data + 1));
}
recv_parse_or_apply_log_rec_body(
......@@ -1913,7 +1955,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn);
}
if (recv->len > RECV_DATA_BLOCK_SIZE) {
if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
ut_free(buf);
}
}
......@@ -1952,7 +1994,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_max_page_lsn = page_lsn;
}
ut_ad(p->second.state == recv_sys_t::recs_t::RECV_BEING_PROCESSED);
ut_ad(p->second.is_being_processed());
ut_ad(!recv_sys.pages.empty());
recv_sys.pages.erase(p);
......@@ -1998,8 +2040,7 @@ void recv_recover_page(buf_page_t* bpage)
if (recv_sys.apply_log_recs) {
recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id);
if (p != recv_sys.pages.end()
&& p->second.state
!= recv_sys_t::recs_t::RECV_BEING_PROCESSED) {
&& !p->second.is_being_processed()) {
recv_recover_page(block, mtr, p);
goto func_exit;
}
......@@ -2027,9 +2068,9 @@ static void recv_read_in_area(page_id_t page_id)
i != recv_sys.pages.end()
&& i->first.space() == page_id.space()
&& i->first.page_no() < up_limit; i++) {
if (i->second.state == recv_sys_t::recs_t::RECV_NOT_PROCESSED
if (i->second.state == page_recv_t::RECV_NOT_PROCESSED
&& !buf_page_peek(i->first)) {
i->second.state = recv_sys_t::recs_t::RECV_BEING_READ;
i->second.state = page_recv_t::RECV_BEING_READ;
*p++ = i->first.page_no();
}
}
......@@ -2112,15 +2153,15 @@ void recv_apply_hashed_log_recs(bool last_batch)
for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) {
const page_id_t page_id = p->first;
recv_sys_t::recs_t& recs = p->second;
ut_ad(recs.log);
page_recv_t& recs = p->second;
ut_ad(!recs.log.empty());
switch (recs.state) {
case recv_sys_t::recs_t::RECV_BEING_READ:
case recv_sys_t::recs_t::RECV_BEING_PROCESSED:
case page_recv_t::RECV_BEING_READ:
case page_recv_t::RECV_BEING_PROCESSED:
p++;
continue;
case recv_sys_t::recs_t::RECV_NOT_PROCESSED:
case page_recv_t::RECV_NOT_PROCESSED:
apply:
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
......@@ -2137,9 +2178,9 @@ void recv_apply_hashed_log_recs(bool last_batch)
recv_read_in_area(page_id);
}
break;
case recv_sys_t::recs_t::RECV_WILL_NOT_READ:
case page_recv_t::RECV_WILL_NOT_READ:
mlog_init_t::init& i = mlog_init.last(page_id);
const lsn_t end_lsn = recs.last->end_lsn;
const lsn_t end_lsn = recs.log.last()->lsn;
if (end_lsn < i.lsn) {
DBUG_LOG("ib_log", "skip log for page "
<< page_id
......@@ -2160,8 +2201,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
if (space->enable_lsn) {
do_read:
space->release_for_io();
recs.state = recv_sys_t::recs_t::
RECV_NOT_PROCESSED;
recs.state = page_recv_t::RECV_NOT_PROCESSED;
goto apply;
}
......@@ -3228,7 +3268,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
p != recv_sys.pages.end();) {
ut_ad(p->second.log);
ut_ad(!p->second.log.empty());
const ulint space = p->first.space();
if (is_predefined_tablespace(space)) {
next:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment