MDEV-21351 Replace recv_sys.heap with list of buf_block_t

- Added a debug assert to unfix the block after applying the redo log
- Added store_recv in recv_sys_t::alloc() to make sure that
recv_t and recv_t::data is in the same block
- Added the assert which check buf_fix_count is 0 after applying
all redo log records.
parent 4f47daf0
...@@ -221,12 +221,16 @@ struct page_recv_t ...@@ -221,12 +221,16 @@ struct page_recv_t
iterator end() { return NULL; } iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; } bool empty() const { ut_ad(!head == !tail); return !head; }
inline void clear(); inline void clear();
#ifdef UNIV_DEBUG
inline void unfix();
#endif
} log; } log;
/** Ignore any earlier redo log records for this page. */ /** Ignore any earlier redo log records for this page. */
inline void will_not_read(); inline void will_not_read();
/** @return whether the log records for the page are being processed */ /** @return whether the log records for the page are being processed */
bool is_being_processed() const { return state == RECV_BEING_PROCESSED; } bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
}; };
/** Recovery system data structure */ /** Recovery system data structure */
...@@ -318,6 +322,9 @@ struct recv_sys_t{ ...@@ -318,6 +322,9 @@ struct recv_sys_t{
List elements are linked via buf_block_t::unzip_LRU. */ List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) redo_list; UT_LIST_BASE_NODE_T(buf_block_t) redo_list;
#ifdef UNIV_DEBUG
bool after_apply= false;
#endif
/** Initialize the redo log recovery subsystem. */ /** Initialize the redo log recovery subsystem. */
void create(); void create();
...@@ -359,8 +366,9 @@ struct recv_sys_t{ ...@@ -359,8 +366,9 @@ struct recv_sys_t{
/** Get the memory block for storing recv_t and redo log data /** Get the memory block for storing recv_t and redo log data
@param[in] len length of the data to be stored @param[in] len length of the data to be stored
@param[in] store_recv whether to store recv_t object
@return pointer to len bytes of memory (never NULL) */ @return pointer to len bytes of memory (never NULL) */
inline byte *alloc(uint32_t len); inline byte *alloc(uint32_t len, bool store_recv=false);
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** Find the redo_list element corresponding to a redo log record. /** Find the redo_list element corresponding to a redo log record.
...@@ -370,7 +378,7 @@ struct recv_sys_t{ ...@@ -370,7 +378,7 @@ struct recv_sys_t{
#endif #endif
/** @return the free length of the latest alloc() block, in bytes */ /** @return the free length of the latest alloc() block, in bytes */
inline ulong get_free_len() const; inline ulint get_free_len() const;
}; };
/** The recovery system */ /** The recovery system */
......
...@@ -152,6 +152,18 @@ struct recv_t : public log_rec_t ...@@ -152,6 +152,18 @@ struct recv_t : public log_rec_t
@param d log snippet @param d log snippet
*/ */
void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; } void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; }
#ifdef UNIV_DEBUG
/** Unfix all the block which contains the data */
void unfix()
{
data_t* recv_data= this;
while(recv_data)
{
recv_sys.find_block(recv_data)->unfix();
recv_data= recv_data->next;
}
}
#endif
}* data; }* data;
}; };
...@@ -826,8 +838,11 @@ inline void recv_sys_t::clear() ...@@ -826,8 +838,11 @@ inline void recv_sys_t::clear()
{ {
prev_block= UT_LIST_GET_PREV(unzip_LRU, block); prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY); ut_ad(buf_block_get_state(block) == BUF_BLOCK_MEMORY);
ut_ad(block->page.buf_fix_count == 0); /* Check buf_fix_count after applying all buffered redo log records */
ut_ad(!recv_sys.after_apply
|| block->page.buf_fix_count == 0);
UT_LIST_REMOVE(redo_list, block); UT_LIST_REMOVE(redo_list, block);
ut_d(block->page.buf_fix_count= 0;);
buf_block_free(block); buf_block_free(block);
} }
} }
...@@ -855,15 +870,16 @@ void recv_sys_t::debug_free() ...@@ -855,15 +870,16 @@ void recv_sys_t::debug_free()
mutex_exit(&mutex); mutex_exit(&mutex);
} }
inline ulong recv_sys_t::get_free_len() const inline ulint recv_sys_t::get_free_len() const
{ {
if (UT_LIST_GET_LEN(redo_list) == 0) if (UT_LIST_GET_LEN(redo_list) == 0)
return 0; return 0;
return srv_page_size - UT_LIST_GET_FIRST(redo_list)->modify_clock; return srv_page_size -
static_cast<ulint>(UT_LIST_GET_FIRST(redo_list)->modify_clock);
} }
inline byte* recv_sys_t::alloc(uint32_t len) inline byte* recv_sys_t::alloc(uint32_t len,bool store_recv)
{ {
ut_ad(mutex_own(&mutex)); ut_ad(mutex_own(&mutex));
ut_ad(len); ut_ad(len);
...@@ -881,8 +897,12 @@ inline byte* recv_sys_t::alloc(uint32_t len) ...@@ -881,8 +897,12 @@ inline byte* recv_sys_t::alloc(uint32_t len)
uint64_t free_offset= block->modify_clock; uint64_t free_offset= block->modify_clock;
ut_ad(free_offset <= srv_page_size); ut_ad(free_offset <= srv_page_size);
free_offset+= len;
if (free_offset > srv_page_size) if (store_recv
&& (free_offset + len + sizeof(recv_t::data) + 1) > srv_page_size)
goto create_block;
if (free_offset + len > srv_page_size)
goto create_block; goto create_block;
block->modify_clock+= len; block->modify_clock+= len;
return block->frame + free_offset; return block->frame + free_offset;
...@@ -1795,7 +1815,8 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id, ...@@ -1795,7 +1815,8 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
heap grows into the buffer pool. */ heap grows into the buffer pool. */
uint32_t len= uint32_t(rec_end - body); uint32_t len= uint32_t(rec_end - body);
recv_t *recv= new (alloc(sizeof(recv_t))) recv_t(len, type, lsn, end_lsn); recv_t *recv= new (alloc(sizeof(recv_t), true))
recv_t(len, type, lsn, end_lsn);
recs.log.append(recv); recs.log.append(recv);
for (recv_t::data_t *prev= nullptr;;) { for (recv_t::data_t *prev= nullptr;;) {
...@@ -1829,8 +1850,22 @@ inline bool page_recv_t::recs_t::trim(lsn_t start_lsn) ...@@ -1829,8 +1850,22 @@ inline bool page_recv_t::recs_t::trim(lsn_t start_lsn)
} }
} }
#ifdef UNIV_DEBUG
inline void page_recv_t::recs_t::unfix()
{
log_rec_t *prev= head;
while (prev)
{
const recv_t *recv= static_cast<const recv_t*>(prev);
recv->data->unfix();
prev= prev->next;
}
}
#endif
inline void page_recv_t::recs_t::clear() inline void page_recv_t::recs_t::clear()
{ {
ut_d(unfix());
head= tail= NULL; head= tail= NULL;
} }
...@@ -1861,12 +1896,6 @@ recv_data_copy_to_buf( ...@@ -1861,12 +1896,6 @@ recv_data_copy_to_buf(
const ulint chunk_limit = (srv_page_size - offset); const ulint chunk_limit = (srv_page_size - offset);
const ulint l = std::min(len, chunk_limit); const ulint l = std::min(len, chunk_limit);
memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l); memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l);
#ifdef UNIV_DEBUG
if ((ulint(recv.data) ^ ulint(recv_data))
& (srv_page_size - 1)) {
recv_sys.find_block(recv_data)->unfix();
}
#endif
recv_data = recv_data->next; recv_data = recv_data->next;
buf += l; buf += l;
len -= l; len -= l;
...@@ -1976,7 +2005,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ...@@ -1976,7 +2005,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv->type, recs, recs + recv->len, recv->type, recs, recs + recv->len,
block->page.id, true, block, &mtr); block->page.id, true, block, &mtr);
ut_d(recv_sys.find_block(recv)->unfix());
ut_free(buf); ut_free(buf);
end_lsn = recv->start_lsn + recv->len; end_lsn = recv->start_lsn + recv->len;
...@@ -1990,6 +2018,8 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ...@@ -1990,6 +2018,8 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
end_lsn); end_lsn);
} }
} }
ut_d(recv->data->unfix(););
} }
#ifdef UNIV_ZIP_DEBUG #ifdef UNIV_ZIP_DEBUG
...@@ -2043,6 +2073,12 @@ This function should called when srv_force_recovery > 0. ...@@ -2043,6 +2073,12 @@ This function should called when srv_force_recovery > 0.
void recv_recover_corrupt_page(page_id_t page_id) void recv_recover_corrupt_page(page_id_t page_id)
{ {
mutex_enter(&recv_sys.mutex); mutex_enter(&recv_sys.mutex);
#ifdef UNIV_DEBUG
recv_sys_t::map::iterator p = recv_sys.pages.find(page_id);
if (p != recv_sys.pages.end()) {
p->second.log.unfix();
}
#endif
recv_sys.pages.erase(page_id); recv_sys.pages.erase(page_id);
mutex_exit(&recv_sys.mutex); mutex_exit(&recv_sys.mutex);
} }
...@@ -2340,6 +2376,7 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2340,6 +2376,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
mlog_init.mark_ibuf_exist(mtr); mlog_init.mark_ibuf_exist(mtr);
} }
ut_d(recv_sys.after_apply= true;);
recv_sys.clear(); recv_sys.clear();
mutex_exit(&recv_sys.mutex); mutex_exit(&recv_sys.mutex);
...@@ -3201,6 +3238,7 @@ recv_group_scan_log_recs( ...@@ -3201,6 +3238,7 @@ recv_group_scan_log_recs(
log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn = log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE); ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
ut_d(recv_sys.after_apply = last_phase);
do { do {
if (last_phase && store_to_hash == STORE_NO) { if (last_phase && store_to_hash == STORE_NO) {
...@@ -3305,6 +3343,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3305,6 +3343,7 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
/* fall through */ /* fall through */
case file_name_t::DELETED: case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++; recv_sys_t::map::iterator r = p++;
ut_d(r->second.log.unfix(););
recv_sys.pages.erase(r); recv_sys.pages.erase(r);
continue; continue;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment