Commit 50324ce6 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-21351 Replace recv_sys.heap with list of buf_block_t

InnoDB crash recovery used a special type of mem_heap_t that
allocates backing store from the buffer pool. That incurred
a significant overhead, leading to underutilization of memory,
and limiting the maximum contiguous allocated size of a log record.

recv_sys_t::blocks: A linked list of buf_block_t that are allocated
by buf_block_alloc() for redo log records. Replaces recv_sys_t::heap.
We repurpose buf_block_t::unzip_LRU for linking the elements.

recv_sys_t::max_log_blocks: Renamed from recv_n_pool_free_frames.

recv_sys_t::max_blocks(): Accessor for max_log_blocks.

recv_sys_t::alloc(): Allocate memory from the current recv_sys_t::blocks
element, or allocate another block.  In debug builds, various free()
member functions must be invoked, because we repurpose
buf_page_t::buf_fix_count for tracking allocations.

recv_sys_t::free_corrupted_page(): Renamed from recv_recover_corrupt_page()

recv_sys_t::is_memory_exhausted(): Renamed from recv_sys_heap_check()

recv_sys_t::pages and its elements are allocated directly by the
system memory allocator.

recv_parse_log_recs(): Remove the parameter available_memory.

We rename some variables 'store_to_hash' to 'store', because
recv_sys.pages is not actually a hash table.

This is joint work with Thirunarayanan Balathandayuthapani.
parent a983b244
......@@ -4,7 +4,7 @@ MariaBackup: hot backup tool for InnoDB
Originally Created 3/3/2009 Yasufumi Kinoshita
Written by Alexey Kopytov, Aleksandr Kuzminsky, Stewart Smith, Vadim Tkachenko,
Yasufumi Kinoshita, Ignacio Nin and Baron Schwartz.
(c) 2017, 2019, MariaDB Corporation.
(c) 2017, 2020, MariaDB Corporation.
Portions written by Marko Mäkelä.
This program is free software; you can redistribute it and/or modify
......@@ -2680,7 +2680,7 @@ static lsn_t xtrabackup_copy_log(lsn_t start_lsn, lsn_t end_lsn, bool last)
store_t store = STORE_NO;
if (more_data && recv_parse_log_recs(0, &store, 0, false)) {
if (more_data && recv_parse_log_recs(0, &store, false)) {
msg("Error: copying the log failed");
......
......@@ -5984,7 +5984,7 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict)
buf_corrupt_page_release(bpage, space);
if (recv_recovery_is_on()) {
recv_recover_corrupt_page(corrupt_page_id);
recv_sys.free_corrupted_page(corrupt_page_id);
}
space->release_for_io();
......
/*****************************************************************************
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2015, 2019, MariaDB Corporation.
Copyright (c) 2015, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -766,7 +766,7 @@ buf_read_recv_pages(
ulint count = 0;
buf_pool = buf_pool_get(cur_page_id);
while (buf_pool->n_pend_reads >= recv_n_pool_free_frames / 2) {
while (buf_pool->n_pend_reads >= recv_sys.max_blocks() / 2) {
os_thread_sleep(10000);
......
/*****************************************************************************
Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -47,11 +47,6 @@ dberr_t
recv_find_max_checkpoint(ulint* max_field)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Remove records for a corrupted page.
This function should called when srv_force_recovery > 0.
@param[in] page_id page id of the corrupted page */
ATTRIBUTE_COLD void recv_recover_corrupt_page(page_id_t page_id);
/** Apply any buffered redo log to a page that was just read from a data file.
@param[in,out] bpage buffer pool page */
ATTRIBUTE_COLD void recv_recover_page(buf_page_t* bpage);
......@@ -106,14 +101,12 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn);
to wait merging to file pages.
@param[in] checkpoint_lsn the LSN of the latest checkpoint
@param[in] store whether to store page operations
@param[in] available_memory memory to read the redo logs
@param[in] apply whether to apply the records
@return whether MLOG_CHECKPOINT record was seen the first time,
or corruption was noticed */
bool recv_parse_log_recs(
lsn_t checkpoint_lsn,
store_t* store,
ulint available_memory,
bool apply);
/** Moves the parsing buffer data left to the buffer start */
......@@ -223,6 +216,10 @@ struct page_recv_t
iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; }
inline void clear();
#ifdef UNIV_DEBUG
/** Declare the records as freed; @see recv_sys_t::alloc() */
inline void free() const;
#endif
} log;
/** Ignore any earlier redo log records for this page. */
......@@ -284,8 +281,6 @@ struct recv_sys_t{
record, or 0 if none was parsed */
/** the time when progress was last reported */
time_t progress_time;
mem_heap_t* heap; /*!< memory heap of log records and file
addresses*/
using map = std::map<const page_id_t, page_recv_t,
std::less<const page_id_t>,
......@@ -314,6 +309,26 @@ struct recv_sys_t{
/** Last added LSN to pages. */
lsn_t last_stored_lsn;
private:
/** Maximum number of buffer pool blocks to allocate for redo log records */
ulint max_log_blocks;
/** Base node of the redo block list (up to max_log_blocks)
List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks;
public:
/** @return the maximum number of buffer pool blocks for log records */
ulint max_blocks() const { return max_log_blocks; }
/** Check whether the number of read redo log blocks exceeds the maximum.
Store last_stored_lsn if the recovery is not in the last phase.
@param[in,out] store whether to store page operations
@return whether the memory is exhausted */
inline bool is_memory_exhausted(store_t *store);
#ifdef UNIV_DEBUG
/** whether all redo log in the current batch has been applied */
bool after_apply= false;
#endif
/** Initialize the redo log recovery subsystem. */
void create();
......@@ -352,6 +367,32 @@ struct recv_sys_t{
progress_time = time;
return true;
}
/** Get the memory block for storing recv_t and redo log data
@param[in] len length of the data to be stored
@param[in] store_recv whether to store recv_t object
@return pointer to len bytes of memory (never NULL) */
inline byte *alloc(size_t len, bool store_recv= false);
#ifdef UNIV_DEBUG
private:
/** Find the buffer pool block that is storing a redo log record.
@param[in] data pointer to buffer returned by alloc()
@return redo list element */
inline buf_block_t *find_block(const void *data) const;
public:
/** Declare a redo log record freed from a buffer pool block.
@param[in] data pointer to buffer returned by alloc() */
inline void free(const void *data) const;
#endif
/** @return the free length of the latest alloc() block, in bytes */
inline size_t get_free_len() const;
/** Remove records for a corrupted page.
This function should only be called when innodb_force_recovery is set.
@param page_id corrupted page identifier */
ATTRIBUTE_COLD void free_corrupted_page(page_id_t page_id);
};
/** The recovery system */
......@@ -392,10 +433,4 @@ times! */
roll-forward */
#define RECV_SCAN_SIZE (4U << srv_page_size_shift)
/** This many frames must be left free in the buffer pool when we scan
the log and store the scanned log records in the buffer pool: we will
use these free frames to read in pages when we start applying the
log records to the database. */
extern ulint recv_n_pool_free_frames;
#endif
/*****************************************************************************
Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -59,7 +59,6 @@ buffer pool; the latter method is used for very big heaps */
/** Different type of heaps in terms of which datastructure is using them */
#define MEM_HEAP_FOR_BTR_SEARCH (MEM_HEAP_BTR_SEARCH | MEM_HEAP_BUFFER)
#define MEM_HEAP_FOR_PAGE_HASH (MEM_HEAP_DYNAMIC)
#define MEM_HEAP_FOR_RECV_SYS (MEM_HEAP_BUFFER)
#define MEM_HEAP_FOR_LOCK_HEAP (MEM_HEAP_BUFFER)
/** The following start size is used for the first block in the memory heap if
......
......@@ -2,7 +2,7 @@
Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
Copyright (c) 2013, 2019, MariaDB Corporation.
Copyright (c) 2013, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
......@@ -55,10 +55,6 @@ Created 9/20/1997 Heikki Tuuri
#include "srv0srv.h"
#include "srv0start.h"
/** Log records are stored in the hash table in chunks at most of this size;
this must be less than srv_page_size as it is stored in the buffer pool */
#define RECV_DATA_BLOCK_SIZE (MEM_MAX_ALLOC_IN_BUF - sizeof(recv_t::data_t) - REDZONE_SIZE)
/** Read-ahead area in applying log records to file pages */
#define RECV_READ_AHEAD_AREA 32U
......@@ -100,14 +96,6 @@ static ulint recv_previous_parsed_rec_offset;
/** The 'multi' flag of the previous parsed redo log record */
static ulint recv_previous_parsed_rec_is_multi;
/** This many frames must be left free in the buffer pool when we scan
the log and store the scanned log records in the buffer pool: we will
use these free frames to read in pages when we start applying the
log records to the database.
This is the default value. If the actual size of the buffer pool is
larger than 10 MB we'll set this value to 512. */
ulint recv_n_pool_free_frames;
/** The maximum lsn we see for a page during the recovery process. If this
is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
......@@ -145,8 +133,7 @@ struct recv_t : public log_rec_t
struct data_t
{
/** pointer to the next chunk, or NULL for the last chunk. The
log record data (at most RECV_DATA_BLOCK_SIZE bytes per chunk)
is stored immediately after this field. */
log record data is stored immediately after this field. */
data_t *next= NULL;
data_t() {}
......@@ -165,6 +152,16 @@ struct recv_t : public log_rec_t
@param d log snippet
*/
void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; }
#ifdef UNIV_DEBUG
/** Declare the record freed in the buffer pool */
void free()
{
data_t *recv_data= this;
do
recv_sys.free(recv_data);
while ((recv_data= recv_data->next));
}
#endif
}* data;
};
......@@ -689,11 +686,6 @@ void recv_sys_t::close()
dblwr.pages.clear();
pages.clear();
if (heap) {
mem_heap_free(heap);
heap = NULL;
}
if (flush_start) {
os_event_destroy(flush_start);
}
......@@ -800,8 +792,6 @@ void recv_sys_t::create()
mutex_create(LATCH_ID_RECV_SYS, &mutex);
mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
if (!srv_read_only_mode) {
flush_start = os_event_create(0);
flush_end = os_event_create(0);
......@@ -811,8 +801,7 @@ void recv_sys_t::create()
apply_log_recs = false;
apply_batch_on = false;
recv_n_pool_free_frames = buf_pool_get_n_pages() / 3;
max_log_blocks = buf_pool_get_n_pages() / 3;
buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
buf_size = RECV_PARSING_BUF_SIZE;
len = 0;
......@@ -830,14 +819,28 @@ void recv_sys_t::create()
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
last_stored_lsn = 0;
UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU);
}
/** Clear a fully processed set of stored redo log records. */
inline void recv_sys_t::clear()
{
ut_ad(mutex_own(&mutex));
pages.clear();
mem_heap_empty(heap);
ut_ad(mutex_own(&mutex));
apply_log_recs= false;
apply_batch_on= false;
pages.clear();
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
{
buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY);
/* Check buf_fix_count after applying all buffered redo log records */
ut_ad(!after_apply || !block->page.buf_fix_count);
UT_LIST_REMOVE(blocks, block);
ut_d(block->page.buf_fix_count= 0);
buf_block_free(block);
block= prev_block;
}
}
/** Free most recovery data structures. */
......@@ -848,11 +851,9 @@ void recv_sys_t::debug_free()
mutex_enter(&mutex);
pages.clear();
mem_heap_free(heap);
ut_free_dodump(buf, buf_size);
buf = NULL;
heap = NULL;
/* wake page cleaner up to progress */
if (!srv_read_only_mode) {
......@@ -865,6 +866,62 @@ void recv_sys_t::debug_free()
mutex_exit(&mutex);
}
inline size_t recv_sys_t::get_free_len() const
{
if (UT_LIST_GET_LEN(blocks) == 0)
return 0;
return srv_page_size -
static_cast<size_t>(UT_LIST_GET_FIRST(blocks)->modify_clock);
}
inline byte* recv_sys_t::alloc(size_t len, bool store_recv)
{
ut_ad(mutex_own(&mutex));
ut_ad(len);
ut_ad(len <= srv_page_size);
buf_block_t *block= UT_LIST_GET_FIRST(blocks);
if (UNIV_UNLIKELY(!block))
{
create_block:
block= buf_block_alloc(nullptr);
block->modify_clock= len;
UT_LIST_ADD_FIRST(blocks, block);
return block->frame;
}
size_t free_offset= static_cast<size_t>(block->modify_clock);
ut_ad(free_offset <= srv_page_size);
if (store_recv &&
free_offset + len + sizeof(recv_t::data) + 1 > srv_page_size)
goto create_block;
if (free_offset + len > srv_page_size)
goto create_block;
block->modify_clock= free_offset + len;
return block->frame + free_offset;
}
#ifdef UNIV_DEBUG
inline buf_block_t *recv_sys_t::find_block(const void* data) const
{
data= page_align(data);
for (buf_block_t *block= UT_LIST_GET_LAST(blocks);
block; block = UT_LIST_GET_PREV(unzip_LRU, block))
if (block->frame == data)
return block;
ut_ad(0);
return nullptr;
}
inline void recv_sys_t::free(const void *data) const
{
find_block(data)->unfix();
}
#endif
/** Read a log segment to log_sys.buf.
@param[in,out] start_lsn in: read area start,
out: the last read valid lsn
......@@ -1758,17 +1815,18 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
/* Store the log record body in limited-size chunks, because the
heap grows into the buffer pool. */
uint32_t len= uint32_t(rec_end - body);
const uint32_t chunk_limit= static_cast<uint32_t>(RECV_DATA_BLOCK_SIZE);
size_t len= static_cast<size_t>(rec_end - body);
recv_t *recv= new (mem_heap_alloc(heap, sizeof(recv_t)))
recv_t(len, type, lsn, end_lsn);
recv_t *recv= new (alloc(sizeof *recv, true))
recv_t(static_cast<uint32_t>(len), type, lsn, end_lsn);
recs.log.append(recv);
for (recv_t::data_t *prev= NULL;;) {
const uint32_t l= std::min(len, chunk_limit);
recv_t::data_t *d= new (mem_heap_alloc(heap, sizeof(recv_t::data_t) + l))
for (recv_t::data_t *prev= nullptr;;)
{
const size_t l= std::min(len, get_free_len() - sizeof(recv_t::data));
recv_t::data_t *d= new (alloc(sizeof(recv_t::data) + l))
recv_t::data_t(body, l);
ut_d(find_block(d)->fix());
if (prev)
prev->append(d);
else
......@@ -1787,16 +1845,26 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
@return whether the entire log was trimmed */
inline bool page_recv_t::recs_t::trim(lsn_t start_lsn)
{
for (log_rec_t** prev= &head;; *prev= (*prev)->next)
for (log_rec_t** prev= &head; *prev; *prev= (*prev)->next)
{
if (!*prev) return true;
if ((*prev)->lsn >= start_lsn) return false;
ut_d(static_cast<const recv_t*>(*prev)->data->free());
}
return true;
}
#ifdef UNIV_DEBUG
inline void page_recv_t::recs_t::free() const
{
for (const log_rec_t *l= head; l; l= l->next)
static_cast<const recv_t*>(l)->data->free();
}
#endif
inline void page_recv_t::recs_t::clear()
{
head= tail= NULL;
ut_d(free());
head= tail= nullptr;
}
......@@ -1820,9 +1888,10 @@ recv_data_copy_to_buf(
{
const recv_t::data_t* recv_data = recv.data;
ulint len = recv.len;
const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
do {
ulint offset = page_offset(recv_data + 1);
const ulint chunk_limit = (srv_page_size - offset);
const ulint l = std::min(len, chunk_limit);
memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l);
recv_data = recv_data->next;
......@@ -1870,7 +1939,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
lsn_t start_lsn = 0, end_lsn = 0;
ut_d(lsn_t recv_start_lsn = 0);
const lsn_t init_lsn = init ? init->lsn : 0;
const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
for (const log_rec_t* l : p->second.log) {
ut_ad(l->lsn);
......@@ -1915,10 +1983,11 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
<< " len " << recv->len
<< " page " << block->page.id);
ulint data_offset = page_offset(recv->data + 1);
byte* buf;
const byte* recs;
if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
if (srv_page_size - data_offset < recv->len) {
/* We have to copy the record body to
a separate buffer */
recs = buf = static_cast<byte*>
......@@ -1933,6 +2002,7 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_parse_or_apply_log_rec_body(
recv->type, recs, recs + recv->len,
block->page.id, true, block, &mtr);
ut_free(buf);
end_lsn = recv->start_lsn + recv->len;
......@@ -1946,6 +2016,8 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn);
}
}
ut_d(recv->data->free(););
}
#ifdef UNIV_ZIP_DEBUG
......@@ -1994,13 +2066,18 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
}
/** Remove records for a corrupted page.
This function should called when srv_force_recovery > 0.
@param[in] page_id page id of the corrupted page */
void recv_recover_corrupt_page(page_id_t page_id)
This function should only be called when innodb_force_recovery is set.
@param page_id corrupted page identifier */
ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
{
mutex_enter(&recv_sys.mutex);
recv_sys.pages.erase(page_id);
mutex_exit(&recv_sys.mutex);
mutex_enter(&mutex);
#ifdef UNIV_DEBUG
map::const_iterator p= pages.find(page_id);
if (p != pages.end())
p->second.log.free();
#endif
pages.erase(page_id);
mutex_exit(&mutex);
}
/** Apply any buffered redo log to a page that was just read from a data file.
......@@ -2174,6 +2251,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
<< " < " << i.lsn);
ignore:
recv_sys_t::map::iterator r = p++;
ut_d(r->second.log.free());
recv_sys.pages.erase(r);
continue;
}
......@@ -2296,9 +2374,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
mlog_init.mark_ibuf_exist(mtr);
}
recv_sys.apply_log_recs = false;
recv_sys.apply_batch_on = false;
ut_d(recv_sys.after_apply= true;);
recv_sys.clear();
mutex_exit(&recv_sys.mutex);
......@@ -2493,39 +2569,31 @@ recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
}
}
/** Check whether read redo log memory exceeds the available memory
of buffer pool. Store last_stored_lsn if it is not in last phase
@param[in] store whether to store page operations
@param[in] available_mem Available memory in buffer pool to
read redo logs. */
static bool recv_sys_heap_check(store_t* store, ulint available_mem)
/** Check whether the number of read redo log blocks exceeds the maximum.
Store last_stored_lsn if the recovery is not in the last phase.
@param[in,out] store whether to store page operations
@return whether the memory is exhausted */
inline bool recv_sys_t::is_memory_exhausted(store_t *store)
{
if (*store != STORE_NO && mem_heap_get_size(recv_sys.heap) >= available_mem)
{
if (*store == STORE_YES)
recv_sys.last_stored_lsn= recv_sys.recovered_lsn;
*store= STORE_NO;
DBUG_PRINT("ib_log",("Ran out of memory and last "
"stored lsn " LSN_PF " last stored offset "
ULINTPF "\n",
recv_sys.recovered_lsn, recv_sys.recovered_offset));
return true;
}
return false;
if (*store == STORE_NO || UT_LIST_GET_LEN(blocks) < max_log_blocks)
return false;
if (*store == STORE_YES)
last_stored_lsn= recovered_lsn;
*store= STORE_NO;
DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
" last stored offset " ULINTPF "\n",
recovered_lsn, recovered_offset));
return true;
}
/** Parse log records from a buffer and optionally store them to a
hash table to wait merging to file pages.
@param[in] checkpoint_lsn the LSN of the latest checkpoint
@param[in] store whether to store page operations
@param[in] available_mem memory to read the redo logs
@param[in] apply whether to apply the records
@return whether MLOG_CHECKPOINT record was seen the first time,
or corruption was noticed */
bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
ulint available_mem, bool apply)
bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store, bool apply)
{
bool single_rec;
ulint len;
......@@ -2551,7 +2619,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t* store,
/* Check for memory overflow and ignore the parsing of remaining
redo log records if InnoDB ran out of memory */
if (recv_sys_heap_check(store, available_mem) && last_phase) {
if (recv_sys.is_memory_exhausted(store) && last_phase) {
return false;
}
......@@ -2925,13 +2993,11 @@ void recv_sys_justify_left_parsing_buf()
/** Scan redo log from a buffer and stores new log data to the parsing buffer.
Parse and hash the log records if new data found.
Apply log records automatically when the hash table becomes full.
@param[in] available_mem we let the hash table of recs to
grow to this size, at the maximum
@param[in,out] store_to_hash whether the records should be
stored to the hash table; this is
@param[in,out] store whether the records should be
stored into recv_sys.pages; this is
reset if just debug checking is
needed, or when the available_mem
runs out
needed, or when the num_max_blocks in
recv_sys runs out
@param[in] log_block log segment
@param[in] checkpoint_lsn latest checkpoint LSN
@param[in] start_lsn buffer start LSN
......@@ -2941,8 +3007,7 @@ Apply log records automatically when the hash table becomes full.
@param[out] group_scanned_lsn scanning succeeded upto this lsn
@return true if not able to scan any more in this log group */
static bool recv_scan_log_recs(
ulint available_mem,
store_t* store_to_hash,
store_t* store,
const byte* log_block,
lsn_t checkpoint_lsn,
lsn_t start_lsn,
......@@ -2956,7 +3021,7 @@ static bool recv_scan_log_recs(
bool more_data = false;
bool apply = recv_sys.mlog_checkpoint_lsn != 0;
ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
const bool last_phase = (*store_to_hash == STORE_IF_EXISTS);
const bool last_phase = (*store == STORE_IF_EXISTS);
ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
......@@ -3096,9 +3161,7 @@ static bool recv_scan_log_recs(
if (more_data && !recv_sys.found_corrupt_log) {
/* Try to parse more log records */
if (recv_parse_log_recs(checkpoint_lsn,
store_to_hash, available_mem,
apply)) {
if (recv_parse_log_recs(checkpoint_lsn, store, apply)) {
ut_ad(recv_sys.found_corrupt_log
|| recv_sys.found_corrupt_fs
|| recv_sys.mlog_checkpoint_lsn
......@@ -3107,7 +3170,7 @@ static bool recv_scan_log_recs(
goto func_exit;
}
recv_sys_heap_check(store_to_hash, available_mem);
recv_sys.is_memory_exhausted(store);
if (recv_sys.recovered_offset > recv_parsing_buf_size / 4) {
/* Move parsing buffer data to the buffer start */
......@@ -3116,7 +3179,7 @@ static bool recv_scan_log_recs(
/* Need to re-parse the redo log which're stored
in recv_sys.buf */
if (last_phase && *store_to_hash == STORE_NO) {
if (last_phase && *store == STORE_NO) {
finished = false;
}
}
......@@ -3162,18 +3225,16 @@ recv_group_scan_log_recs(
lsn_t start_lsn;
lsn_t end_lsn;
store_t store_to_hash = recv_sys.mlog_checkpoint_lsn == 0
store_t store = recv_sys.mlog_checkpoint_lsn == 0
? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
ulint available_mem = srv_page_size
* (buf_pool_get_n_pages()
- (recv_n_pool_free_frames * srv_buf_pool_instances));
log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
ut_d(recv_sys.after_apply = last_phase);
do {
if (last_phase && store_to_hash == STORE_NO) {
store_to_hash = STORE_IF_EXISTS;
if (last_phase && store == STORE_NO) {
store = STORE_IF_EXISTS;
recv_apply_hashed_log_recs(false);
/* Rescan the redo logs from last stored lsn */
end_lsn = recv_sys.recovered_lsn;
......@@ -3184,11 +3245,9 @@ recv_group_scan_log_recs(
end_lsn = start_lsn;
log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
} while (end_lsn != start_lsn
&& !recv_scan_log_recs(
available_mem, &store_to_hash, log_sys.buf,
checkpoint_lsn,
start_lsn, end_lsn,
contiguous_lsn, &log_sys.log.scanned_lsn));
&& !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn,
start_lsn, end_lsn, contiguous_lsn,
&log_sys.log.scanned_lsn));
if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
DBUG_RETURN(false);
......@@ -3198,7 +3257,7 @@ recv_group_scan_log_recs(
last_phase ? "rescan" : "scan",
log_sys.log.scanned_lsn));
DBUG_RETURN(store_to_hash == STORE_NO);
DBUG_RETURN(store == STORE_NO);
}
/** Report a missing tablespace for which page-redo log exists.
......@@ -3274,6 +3333,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
/* fall through */
case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++;
ut_d(r->second.log.free(););
recv_sys.pages.erase(r);
continue;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment