Commit 9593cccf authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-26055: Improve adaptive flushing

Adaptive flushing is enabled by setting innodb_max_dirty_pages_pct_lwm>0
(not default) and innodb_adaptive_flushing=ON (default).
There is also the parameter innodb_adaptive_flushing_lwm
(default: 10 per cent of the log capacity). It should enable some
adaptive flushing even when innodb_max_dirty_pages_pct_lwm=0.
That is not being changed here.

This idea was first presented by Inaam Rana several years ago,
and I discussed it with Jean-François Gagné at FOSDEM 2023.

buf_flush_page_cleaner(): When we are not near the log capacity limit
(neither buf_flush_async_lsn nor buf_flush_sync_lsn are set),
also try to move clean blocks from the buf_pool.LRU list to buf_pool.free
or initiate writes (but not the eviction) of dirty blocks, until
the remaining I/O capacity has been consumed.

buf_flush_LRU_list_batch(): Add the parameter bool evict, to specify
whether dirty least recently used pages (from buf_pool.LRU) should
be evicted immediately after they have been written out. Callers outside
buf_flush_page_cleaner() will pass evict=true, to retain the existing
behaviour.

buf_do_LRU_batch(): Add the parameter bool evict.
Return counts of evicted and flushed pages.

buf_flush_LRU(): Add the parameter bool evict.
Assume that the caller holds buf_pool.mutex and
will invoke buf_dblwr.flush_buffered_writes() afterwards.

buf_flush_list_holding_mutex(): A low-level variant of buf_flush_list()
whose caller must hold buf_pool.mutex and invoke
buf_dblwr.flush_buffered_writes() afterwards.

buf_flush_wait_batch_end_acquiring_mutex(): Remove. It is enough to have
buf_flush_wait_batch_end().

page_cleaner_flush_pages_recommendation(): Avoid some floating-point
arithmetics.

buf_flush_page(), buf_flush_check_neighbor(), buf_flush_check_neighbors(),
buf_flush_try_neighbors(): Rename the parameter "bool lru" to "bool evict".

buf_free_from_unzip_LRU_list_batch(): Remove the parameter.
Only actual page writes will contribute towards the limit.

buf_LRU_free_page(): Evict freed pages of temporary tables.

buf_pool.done_free: Broadcast whenever a block is freed
(and buf_pool.try_LRU_scan is set).

buf_pool_t::io_buf_t::reserve(): Retry indefinitely.
During the test encryption.innochecksum we easily run out of
these buffers for PAGE_COMPRESSED or ENCRYPTED pages.

Tested by Matthias Leich and Axel Schwenke
parent 4105017a
......@@ -408,7 +408,6 @@ static bool buf_page_decrypt_after_read(buf_page_t *bpage,
if (id.space() == SRV_TMP_SPACE_ID
&& innodb_encrypt_temporary_tables) {
slot = buf_pool.io_buf_reserve();
ut_a(slot);
slot->allocate();
bool ok = buf_tmp_page_decrypt(slot->crypt_buf, dst_frame);
slot->release();
......@@ -431,7 +430,6 @@ static bool buf_page_decrypt_after_read(buf_page_t *bpage,
}
slot = buf_pool.io_buf_reserve();
ut_a(slot);
slot->allocate();
decompress_with_slot:
......@@ -455,7 +453,6 @@ static bool buf_page_decrypt_after_read(buf_page_t *bpage,
}
slot = buf_pool.io_buf_reserve();
ut_a(slot);
slot->allocate();
/* decrypt using crypt_buf to dst_frame */
......@@ -1293,6 +1290,41 @@ inline bool buf_pool_t::realloc(buf_block_t *block)
return(true); /* free_list was enough */
}
void buf_pool_t::io_buf_t::create(ulint n_slots)
{
this->n_slots= n_slots;
slots= static_cast<buf_tmp_buffer_t*>
(ut_malloc_nokey(n_slots * sizeof *slots));
memset((void*) slots, 0, n_slots * sizeof *slots);
}
void buf_pool_t::io_buf_t::close()
{
for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++)
{
aligned_free(s->crypt_buf);
aligned_free(s->comp_buf);
}
ut_free(slots);
slots= nullptr;
n_slots= 0;
}
buf_tmp_buffer_t *buf_pool_t::io_buf_t::reserve()
{
for (;;)
{
for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++)
if (s->acquire())
return s;
os_aio_wait_until_no_pending_writes();
for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++)
if (s->acquire())
return s;
os_aio_wait_until_no_pending_reads();
}
}
/** Sets the global variable that feeds MySQL's innodb_buffer_pool_resize_status
to the specified string. The format and the following parameters are the
same as the ones used for printf(3).
......@@ -1359,21 +1391,23 @@ inline bool buf_pool_t::withdraw_blocks()
block = next_block;
}
mysql_mutex_unlock(&mutex);
/* reserve free_list length */
if (UT_LIST_GET_LEN(withdraw) < withdraw_target) {
buf_flush_LRU(
std::max<ulint>(withdraw_target
- UT_LIST_GET_LEN(withdraw),
srv_LRU_scan_depth));
buf_flush_wait_batch_end_acquiring_mutex(true);
srv_LRU_scan_depth),
true);
mysql_mutex_unlock(&buf_pool.mutex);
buf_dblwr.flush_buffered_writes();
mysql_mutex_lock(&buf_pool.mutex);
buf_flush_wait_batch_end(true);
}
/* relocate blocks/buddies in withdrawn area */
ulint count2 = 0;
mysql_mutex_lock(&mutex);
buf_pool_mutex_exit_forbid();
for (buf_page_t* bpage = UT_LIST_GET_FIRST(LRU), *next_bpage;
bpage; bpage = next_bpage) {
......@@ -2380,11 +2414,6 @@ buf_page_get_low(
|| (rw_latch == RW_X_LATCH)
|| (rw_latch == RW_SX_LATCH)
|| (rw_latch == RW_NO_LATCH));
ut_ad(!allow_ibuf_merge
|| mode == BUF_GET
|| mode == BUF_GET_POSSIBLY_FREED
|| mode == BUF_GET_IF_IN_POOL
|| mode == BUF_GET_IF_IN_POOL_OR_WATCH);
if (err) {
*err = DB_SUCCESS;
......@@ -2392,15 +2421,15 @@ buf_page_get_low(
#ifdef UNIV_DEBUG
switch (mode) {
case BUF_PEEK_IF_IN_POOL:
default:
ut_ad(!allow_ibuf_merge);
ut_ad(mode == BUF_PEEK_IF_IN_POOL);
break;
case BUF_GET_POSSIBLY_FREED:
case BUF_GET_IF_IN_POOL:
/* The caller may pass a dummy page size,
because it does not really matter. */
break;
default:
MY_ASSERT_UNREACHABLE();
case BUF_GET_POSSIBLY_FREED:
break;
case BUF_GET:
case BUF_GET_IF_IN_POOL_OR_WATCH:
ut_ad(!mtr->is_freeing_tree());
......@@ -2547,11 +2576,12 @@ buf_page_get_low(
return nullptr;
}
} else if (mode != BUF_PEEK_IF_IN_POOL) {
} else if (!mtr) {
} else if (!mtr) {
ut_ad(!block->page.oldest_modification());
mysql_mutex_lock(&buf_pool.mutex);
block->unfix();
free_unfixed_block:
if (!buf_LRU_free_page(&block->page, true)) {
ut_ad(0);
}
......@@ -2667,20 +2697,19 @@ buf_page_get_low(
/* Decompress the page while not holding
buf_pool.mutex. */
auto ok = buf_zip_decompress(block, false);
block->page.read_unfix(state);
state = block->page.state();
block->page.lock.x_unlock();
const auto ok = buf_zip_decompress(block, false);
--buf_pool.n_pend_unzip;
if (!ok) {
/* FIXME: Evict the corrupted
ROW_FORMAT=COMPRESSED page! */
if (err) {
*err = DB_PAGE_CORRUPTED;
}
return nullptr;
mysql_mutex_lock(&buf_pool.mutex);
}
state = block->page.read_unfix(state);
block->page.lock.x_unlock();
if (!ok) {
goto free_unfixed_block;
}
}
......@@ -2886,72 +2915,73 @@ buf_page_get_gen(
dberr_t* err,
bool allow_ibuf_merge)
{
if (buf_block_t *block= recv_sys.recover(page_id))
buf_block_t *block= recv_sys.recover(page_id);
if (UNIV_LIKELY(!block))
return buf_page_get_low(page_id, zip_size, rw_latch,
guess, mode, mtr, err, allow_ibuf_merge);
else if (UNIV_UNLIKELY(block == reinterpret_cast<buf_block_t*>(-1)))
{
if (UNIV_UNLIKELY(block == reinterpret_cast<buf_block_t*>(-1)))
{
corrupted:
if (err)
*err= DB_CORRUPTION;
return nullptr;
}
/* Recovery is a special case; we fix() before acquiring lock. */
auto s= block->page.fix();
ut_ad(s >= buf_page_t::FREED);
/* The block may be write-fixed at this point because we are not
holding a lock, but it must not be read-fixed. */
ut_ad(s < buf_page_t::READ_FIX || s >= buf_page_t::WRITE_FIX);
corrupted:
if (err)
*err= DB_SUCCESS;
const bool must_merge= allow_ibuf_merge &&
ibuf_page_exists(page_id, block->zip_size());
*err= DB_CORRUPTION;
return nullptr;
}
/* Recovery is a special case; we fix() before acquiring lock. */
auto s= block->page.fix();
ut_ad(s >= buf_page_t::FREED);
/* The block may be write-fixed at this point because we are not
holding a lock, but it must not be read-fixed. */
ut_ad(s < buf_page_t::READ_FIX || s >= buf_page_t::WRITE_FIX);
if (err)
*err= DB_SUCCESS;
const bool must_merge= allow_ibuf_merge &&
ibuf_page_exists(page_id, block->zip_size());
if (s < buf_page_t::UNFIXED)
{
got_freed_page:
ut_ad(mode == BUF_GET_POSSIBLY_FREED || mode == BUF_PEEK_IF_IN_POOL);
mysql_mutex_lock(&buf_pool.mutex);
block->page.unfix();
buf_LRU_free_page(&block->page, true);
mysql_mutex_unlock(&buf_pool.mutex);
goto corrupted;
}
else if (must_merge &&
fil_page_get_type(block->page.frame) == FIL_PAGE_INDEX &&
page_is_leaf(block->page.frame))
{
block->page.lock.x_lock();
s= block->page.state();
ut_ad(s > buf_page_t::FREED);
ut_ad(s < buf_page_t::READ_FIX);
if (s < buf_page_t::UNFIXED)
{
got_freed_page:
ut_ad(mode == BUF_GET_POSSIBLY_FREED || mode == BUF_PEEK_IF_IN_POOL);
block->page.unfix();
goto corrupted;
block->page.lock.x_unlock();
goto got_freed_page;
}
else if (must_merge &&
fil_page_get_type(block->page.frame) == FIL_PAGE_INDEX &&
page_is_leaf(block->page.frame))
else
{
block->page.lock.x_lock();
s= block->page.state();
ut_ad(s > buf_page_t::FREED);
ut_ad(s < buf_page_t::READ_FIX);
if (s < buf_page_t::UNFIXED)
if (block->page.is_ibuf_exist())
block->page.clear_ibuf_exist();
if (dberr_t e=
ibuf_merge_or_delete_for_page(block, page_id, block->zip_size()))
{
block->page.lock.x_unlock();
goto got_freed_page;
}
else
{
if (block->page.is_ibuf_exist())
block->page.clear_ibuf_exist();
if (dberr_t e=
ibuf_merge_or_delete_for_page(block, page_id, block->zip_size()))
{
if (err)
*err= e;
buf_pool.corrupted_evict(&block->page, s);
return nullptr;
}
if (err)
*err= e;
buf_pool.corrupted_evict(&block->page, s);
return nullptr;
}
}
if (rw_latch == RW_X_LATCH)
{
mtr->memo_push(block, MTR_MEMO_PAGE_X_FIX);
return block;
}
block->page.lock.x_unlock();
if (rw_latch == RW_X_LATCH)
{
mtr->memo_push(block, MTR_MEMO_PAGE_X_FIX);
return block;
}
mtr->page_lock(block, rw_latch);
return block;
block->page.lock.x_unlock();
}
return buf_page_get_low(page_id, zip_size, rw_latch,
guess, mode, mtr, err, allow_ibuf_merge);
mtr->page_lock(block, rw_latch);
return block;
}
/********************************************************************//**
......
This diff is collapsed.
......@@ -402,6 +402,7 @@ buf_block_t *buf_LRU_get_free_block(bool have_mutex)
&& recv_sys.apply_log_recs) {
goto flush_lru;
});
get_mutex:
mysql_mutex_lock(&buf_pool.mutex);
got_mutex:
buf_LRU_check_size_of_non_data_objects();
......@@ -490,15 +491,18 @@ buf_block_t *buf_LRU_get_free_block(bool have_mutex)
#ifndef DBUG_OFF
flush_lru:
#endif
if (!buf_flush_LRU(innodb_lru_flush_size)) {
mysql_mutex_lock(&buf_pool.mutex);
if (!buf_flush_LRU(innodb_lru_flush_size, true)) {
MONITOR_INC(MONITOR_LRU_SINGLE_FLUSH_FAILURE_COUNT);
++flush_failures;
}
n_iterations++;
mysql_mutex_lock(&buf_pool.mutex);
buf_pool.stat.LRU_waits++;
goto got_mutex;
mysql_mutex_unlock(&buf_pool.mutex);
buf_dblwr.flush_buffered_writes();
goto get_mutex;
}
/** Move the LRU_old pointer so that the length of the old blocks list
......@@ -807,50 +811,57 @@ bool buf_LRU_free_page(buf_page_t *bpage, bool zip)
/* We cannot use transactional_lock_guard here,
because buf_buddy_relocate() in buf_buddy_free() could get stuck. */
hash_lock.lock();
lsn_t oldest_modification = bpage->oldest_modification_acquire();
const lsn_t oldest_modification = bpage->oldest_modification_acquire();
if (UNIV_UNLIKELY(!bpage->can_relocate())) {
/* Do not free buffer fixed and I/O-fixed blocks. */
goto func_exit;
}
if (oldest_modification == 1) {
switch (oldest_modification) {
case 2:
ut_ad(id.space() == SRV_TMP_SPACE_ID);
ut_ad(!bpage->zip.data);
if (!bpage->is_freed()) {
goto func_exit;
}
bpage->clear_oldest_modification();
break;
case 1:
mysql_mutex_lock(&buf_pool.flush_list_mutex);
oldest_modification = bpage->oldest_modification();
if (oldest_modification) {
ut_ad(oldest_modification == 1);
if (const lsn_t om = bpage->oldest_modification()) {
ut_ad(om == 1);
buf_pool.delete_from_flush_list(bpage);
}
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
ut_ad(!bpage->oldest_modification());
oldest_modification = 0;
}
if (zip || !bpage->zip.data) {
/* This would completely free the block. */
/* Do not completely free dirty blocks. */
if (oldest_modification) {
goto func_exit;
/* fall through */
case 0:
if (zip || !bpage->zip.data || !bpage->frame) {
break;
}
} else if (oldest_modification && !bpage->frame) {
func_exit:
hash_lock.unlock();
return(false);
} else if (bpage->frame) {
relocate_compressed:
b = static_cast<buf_page_t*>(ut_zalloc_nokey(sizeof *b));
ut_a(b);
mysql_mutex_lock(&buf_pool.flush_list_mutex);
new (b) buf_page_t(*bpage);
b->frame = nullptr;
b->set_state(buf_page_t::UNFIXED + 1);
break;
default:
if (zip || !bpage->zip.data || !bpage->frame) {
/* This would completely free the block. */
/* Do not completely free dirty blocks. */
func_exit:
hash_lock.unlock();
return(false);
}
goto relocate_compressed;
}
mysql_mutex_assert_owner(&buf_pool.mutex);
DBUG_PRINT("ib_buf", ("free page %u:%u",
id.space(), id.page_no()));
DBUG_PRINT("ib_buf", ("free page %u:%u", id.space(), id.page_no()));
ut_ad(bpage->can_relocate());
......
......@@ -724,13 +724,14 @@ class buf_page_t
ut_ad(s < REINIT);
}
void read_unfix(uint32_t s)
uint32_t read_unfix(uint32_t s)
{
ut_ad(lock.is_write_locked());
ut_ad(s == UNFIXED + 1 || s == IBUF_EXIST + 1 || s == REINIT + 1);
ut_d(auto old_state=) zip.fix.fetch_add(s - READ_FIX);
uint32_t old_state= zip.fix.fetch_add(s - READ_FIX);
ut_ad(old_state >= READ_FIX);
ut_ad(old_state < WRITE_FIX);
return old_state + (s - READ_FIX);
}
void set_freed(uint32_t prev_state, uint32_t count= 0)
......@@ -782,10 +783,10 @@ class buf_page_t
inline void write_complete(bool temporary);
/** Write a flushable page to a file. buf_pool.mutex must be held.
@param lru true=buf_pool.LRU; false=buf_pool.flush_list
@param evict whether to evict the page on write completion
@param space tablespace
@return whether the page was flushed and buf_pool.mutex was released */
inline bool flush(bool lru, fil_space_t *space);
inline bool flush(bool evict, fil_space_t *space);
/** Notify that a page in a temporary tablespace has been modified. */
void set_temp_modified()
......@@ -1546,9 +1547,6 @@ class buf_pool_t
/** broadcast when n_flush_list reaches 0; protected by mutex */
pthread_cond_t done_flush_list;
TPOOL_SUPPRESS_TSAN ulint n_flush_LRU() const { return n_flush_LRU_; }
TPOOL_SUPPRESS_TSAN ulint n_flush_list() const { return n_flush_list_; }
/** @name General fields */
/* @{ */
ulint curr_pool_size; /*!< Current pool size in bytes */
......@@ -1755,7 +1753,7 @@ class buf_pool_t
last_activity_count= activity_count;
}
// n_flush_LRU() + n_flush_list()
// n_flush_LRU_ + n_flush_list_
// is approximately COUNT(is_write_fixed()) in flush_list
unsigned freed_page_clock;/*!< a sequence number used
......@@ -1785,7 +1783,8 @@ class buf_pool_t
UT_LIST_BASE_NODE_T(buf_page_t) free;
/*!< base node of the free
block list */
/** signaled each time when the free list grows; protected by mutex */
/** signaled each time when the free list grows and
broadcast each time try_LRU_scan is set; protected by mutex */
pthread_cond_t done_free;
UT_LIST_BASE_NODE_T(buf_page_t) withdraw;
......@@ -1851,9 +1850,9 @@ class buf_pool_t
return any_pending;
}
/** @return total amount of pending I/O */
ulint io_pending() const
TPOOL_SUPPRESS_TSAN ulint io_pending() const
{
return n_pend_reads + n_flush_LRU() + n_flush_list();
return n_pend_reads + n_flush_LRU_ + n_flush_list_;
}
private:
......@@ -1886,34 +1885,12 @@ class buf_pool_t
/** array of slots */
buf_tmp_buffer_t *slots;
void create(ulint n_slots)
{
this->n_slots= n_slots;
slots= static_cast<buf_tmp_buffer_t*>
(ut_malloc_nokey(n_slots * sizeof *slots));
memset((void*) slots, 0, n_slots * sizeof *slots);
}
void create(ulint n_slots);
void close()
{
for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++)
{
aligned_free(s->crypt_buf);
aligned_free(s->comp_buf);
}
ut_free(slots);
slots= nullptr;
n_slots= 0;
}
void close();
/** Reserve a buffer */
buf_tmp_buffer_t *reserve()
{
for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++)
if (s->acquire())
return s;
return nullptr;
}
buf_tmp_buffer_t *reserve();
} io_buf;
/** whether resize() is in the critical path */
......@@ -2002,7 +1979,10 @@ inline void buf_page_t::set_oldest_modification(lsn_t lsn)
/** Clear oldest_modification after removing from buf_pool.flush_list */
inline void buf_page_t::clear_oldest_modification()
{
mysql_mutex_assert_owner(&buf_pool.flush_list_mutex);
#ifdef SAFE_MUTEX
if (oldest_modification() != 2)
mysql_mutex_assert_owner(&buf_pool.flush_list_mutex);
#endif /* SAFE_MUTEX */
ut_d(const auto s= state());
ut_ad(s >= REMOVE_HASH);
ut_ad(oldest_modification());
......
......@@ -86,11 +86,15 @@ buf_flush_init_for_writing(
bool buf_flush_list_space(fil_space_t *space, ulint *n_flushed= nullptr)
MY_ATTRIBUTE((warn_unused_result));
/** Write out dirty blocks from buf_pool.LRU.
/** Write out dirty blocks from buf_pool.LRU,
and move clean blocks to buf_pool.free.
The caller must invoke buf_dblwr.flush_buffered_writes()
after releasing buf_pool.mutex.
@param max_n wished maximum mumber of blocks flushed
@return the number of processed pages
@param evict whether to evict pages after flushing
@return evict ? number of processed pages : number of pages written
@retval 0 if a buf_pool.LRU batch is already running */
ulint buf_flush_LRU(ulint max_n);
ulint buf_flush_LRU(ulint max_n, bool evict);
/** Wait until a flush batch ends.
@param lru true=buf_pool.LRU; false=buf_pool.flush_list */
......@@ -131,9 +135,6 @@ inline void buf_flush_note_modification(buf_block_t *b, lsn_t start, lsn_t end)
/** Initialize page_cleaner. */
ATTRIBUTE_COLD void buf_flush_page_cleaner_init();
/** Wait for pending flushes to complete. */
void buf_flush_wait_batch_end_acquiring_mutex(bool lru);
/** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment