Commit 51fb39bf authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-19586: Restore pointer indirection for recv_t::data

Something was wrong with the removal of pointer indirection,
because on 32-bit Windows we got crash recovery failures.
Curiously, WITH_ASAN of a 32-bit debug build did not notice anything.

This reverts a part of commit b7fc2c89.

We have a 2MiB recv_sys.buf for the initial buffering. The minimum size
of log_sys.buf would be 16MiB, and that buffer should be practically
unused during recovery. If the buffer pool size is measured in gigabytes,
it would indeed make sense to use the buffer pool for the recovered log
records, perhaps after we have run out of log_sys.buf.

FIXME: allow the entire buf_block_t::frame to be used for buffered
log records, and remove MEM_HEAP_FOR_RECV_SYS. For example, use
buf_page_t::list or buf_page_t::LRU for keeping track of memory that
was allocated for recovery? Most members of buf_block_t
(except buf_block_t::frame) are unused when
block->page.state == BUF_BLOCK_MEMORY.
parent f89436b5
......@@ -58,7 +58,7 @@ Created 9/20/1997 Heikki Tuuri
/** Log records are stored in the hash table in chunks at most of this size;
this must be less than srv_page_size as it is stored in the buffer pool */
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_t) - REDZONE_SIZE)
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_t::data_t) - REDZONE_SIZE)
/** Read-ahead area in applying log records to file pages */
#define RECV_READ_AHEAD_AREA 32U
......@@ -131,16 +131,10 @@ struct recv_t : public log_rec_t
@param type redo log record chunk
@param start_lsn start LSN of the mini-transaction
@param end_lsn end LSN of the mini-transaction
@param body redo log record body
@param body_len redo log record length, in bytes
*/
recv_t(uint32_t len, mlog_id_t type, lsn_t start_lsn, lsn_t end_lsn,
const void* body, size_t body_len) :
log_rec_t(end_lsn), len(len), type(type), start_lsn(start_lsn),
data()
{
memcpy(reinterpret_cast<void*>(this + 1), body, body_len);
}
recv_t(uint32_t len, mlog_id_t type, lsn_t start_lsn, lsn_t end_lsn) :
log_rec_t(end_lsn), len(len), type(type), start_lsn(start_lsn), data(NULL)
{}
/** log record body length in bytes */
const uint32_t len;
......@@ -172,7 +166,7 @@ struct recv_t : public log_rec_t
@param d log snippet
*/
void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; }
} data;
}* data;
};
......@@ -1759,8 +1753,8 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
ut_ad(type != MLOG_INDEX_LOAD);
ut_ad(type != MLOG_TRUNCATE);
std::pair<map::iterator, bool> p = pages.insert(map::value_type
(page_id, page_recv_t()));
std::pair<map::iterator, bool> p= pages.insert(map::value_type
(page_id, page_recv_t()));
page_recv_t& recs= p.first->second;
ut_ad(p.second == recs.log.empty());
......@@ -1778,27 +1772,26 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
heap grows into the buffer pool. */
uint32_t len= uint32_t(rec_end - body);
const uint32_t chunk_limit= static_cast<uint32_t>(RECV_DATA_BLOCK_SIZE);
uint32_t l= std::min(len, chunk_limit);
recv_t* recv= new (mem_heap_alloc(heap, sizeof(recv_t) + l))
recv_t(len, type, lsn, end_lsn, body, l);
recv_t *recv= new (mem_heap_alloc(heap, sizeof(recv_t)))
recv_t(len, type, lsn, end_lsn);
recs.log.append(recv);
if (UNIV_LIKELY(len == l))
return;
recv_t::data_t* prev= &recv->data;
for (recv_t::data_t *prev= NULL;;) {
const uint32_t l= std::min(len, chunk_limit);
recv_t::data_t *d= new (mem_heap_alloc(heap, sizeof(recv_t::data_t) + l))
recv_t::data_t(body, l);
if (prev)
prev->append(d);
else
recv->data= d;
prev= d;
do {
body+= l;
len-= l;
l = std::min(len, chunk_limit);
recv_t::data_t* d= new (mem_heap_alloc(heap, sizeof(recv_t::data_t) + l))
recv_t::data_t(body, l);
prev->append(d);
prev= d;
} while (len != l);
if (!len)
break;
}
}
/** Trim old log records for a page
......@@ -1837,7 +1830,7 @@ recv_data_copy_to_buf(
byte* buf, /*!< in: buffer of length at least recv->len */
const recv_t& recv) /*!< in: log record */
{
const recv_t::data_t* recv_data = &recv.data;
const recv_t::data_t* recv_data = recv.data;
ulint len = recv.len;
const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
......@@ -1940,21 +1933,22 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
<< " page " << block->page.id);
byte* buf;
const byte* recs;
if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
/* We have to copy the record body to
a separate buffer */
buf = static_cast<byte*>
recs = buf = static_cast<byte*>
(ut_malloc_nokey(recv->len));
recv_data_copy_to_buf(buf, *recv);
} else {
buf = const_cast<byte*>
(reinterpret_cast<const byte*>
(&recv->data + 1));
buf = NULL;
recs = reinterpret_cast<const byte*>
(recv->data + 1);
}
recv_parse_or_apply_log_rec_body(
recv->type, buf, buf + recv->len,
recv->type, recs, recs + recv->len,
block->page.id.space(),
block->page.id.page_no(), true, block, &mtr);
......@@ -1969,9 +1963,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn);
}
if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
ut_free(buf);
}
ut_free(buf);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment