Commit a091d6ac authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-26827 fixup: Do not duplicate io_slots::pending_io_count()

os_aio_pending_reads_approx(), os_aio_pending_reads(): Replaces
buf_pool.n_pend_reads.

os_aio_pending_writes(): Replaces buf_dblwr.pending_writes().

buf_dblwr_t::write_cond, buf_dblwr_t::writes_pending: Remove.
parent 5bada124
......@@ -1058,7 +1058,7 @@ dberr_t btr_cur_t::search_leaf(const dtuple_t *tuple, page_cur_mode_t mode,
if (lock_intention == BTR_INTENTION_DELETE)
{
compress_limit= BTR_CUR_PAGE_COMPRESS_LIMIT(index());
if (buf_pool.n_pend_reads &&
if (os_aio_pending_reads_approx() &&
trx_sys.history_size_approx() > BTR_CUR_FINE_HISTORY_LENGTH)
{
/* Most delete-intended operations are due to the purge of history.
......@@ -1843,7 +1843,7 @@ dberr_t btr_cur_t::open_leaf(bool first, dict_index_t *index,
{
compress_limit= BTR_CUR_PAGE_COMPRESS_LIMIT(index);
if (buf_pool.n_pend_reads &&
if (os_aio_pending_reads_approx() &&
trx_sys.history_size_approx() > BTR_CUR_FINE_HISTORY_LENGTH)
{
mtr_x_lock_index(index, mtr);
......
......@@ -3539,9 +3539,6 @@ dberr_t buf_page_t::read_complete(const fil_node_t &node)
ut_ad(zip_size() == node.space->zip_size());
ut_ad(!!zip.ssize == !!zip.data);
ut_d(auto n=) buf_pool.n_pend_reads--;
ut_ad(n > 0);
const byte *read_frame= zip.data ? zip.data : frame;
ut_ad(read_frame);
......@@ -3841,9 +3838,8 @@ void buf_pool_t::print()
<< ", modified database pages="
<< UT_LIST_GET_LEN(flush_list)
<< ", n pending decompressions=" << n_pend_unzip
<< ", n pending reads=" << n_pend_reads
<< ", n pending flush LRU=" << n_flush()
<< " list=" << buf_dblwr.pending_writes()
<< " list=" << os_aio_pending_writes()
<< ", pages made young=" << stat.n_pages_made_young
<< ", not young=" << stat.n_pages_not_made_young
<< ", pages read=" << stat.n_pages_read
......@@ -3956,11 +3952,11 @@ void buf_stats_get_pool_info(buf_pool_info_t *pool_info)
pool_info->n_pend_unzip = UT_LIST_GET_LEN(buf_pool.unzip_LRU);
pool_info->n_pend_reads = buf_pool.n_pend_reads;
pool_info->n_pend_reads = os_aio_pending_reads_approx();
pool_info->n_pending_flush_lru = buf_pool.n_flush();
pool_info->n_pending_flush_list = buf_dblwr.pending_writes();
pool_info->n_pending_flush_list = os_aio_pending_writes();
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
current_time = time(NULL);
......
......@@ -53,7 +53,6 @@ void buf_dblwr_t::init()
active_slot= &slots[0];
mysql_mutex_init(buf_dblwr_mutex_key, &mutex, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_cond_init(&write_cond, nullptr);
}
}
......@@ -469,7 +468,6 @@ void buf_dblwr_t::close()
ut_ad(!batch_running);
pthread_cond_destroy(&cond);
pthread_cond_destroy(&write_cond);
for (int i= 0; i < 2; i++)
{
aligned_free(slots[i].write_buf);
......@@ -481,38 +479,31 @@ void buf_dblwr_t::close()
}
/** Update the doublewrite buffer on write completion. */
void buf_dblwr_t::write_completed(bool with_doublewrite)
void buf_dblwr_t::write_completed()
{
ut_ad(this == &buf_dblwr);
ut_ad(!srv_read_only_mode);
mysql_mutex_lock(&mutex);
ut_ad(writes_pending);
if (!--writes_pending)
pthread_cond_broadcast(&write_cond);
ut_ad(is_created());
ut_ad(srv_use_doublewrite_buf);
ut_ad(batch_running);
slot *flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_ad(flush_slot->reserved);
ut_ad(flush_slot->reserved <= flush_slot->first_free);
if (with_doublewrite)
if (!--flush_slot->reserved)
{
ut_ad(is_created());
ut_ad(srv_use_doublewrite_buf);
ut_ad(batch_running);
slot *flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_ad(flush_slot->reserved);
ut_ad(flush_slot->reserved <= flush_slot->first_free);
if (!--flush_slot->reserved)
{
mysql_mutex_unlock(&mutex);
/* This will finish the batch. Sync data files to the disk. */
fil_flush_file_spaces();
mysql_mutex_lock(&mutex);
mysql_mutex_unlock(&mutex);
/* This will finish the batch. Sync data files to the disk. */
fil_flush_file_spaces();
mysql_mutex_lock(&mutex);
/* We can now reuse the doublewrite memory buffer: */
flush_slot->first_free= 0;
batch_running= false;
pthread_cond_broadcast(&cond);
}
/* We can now reuse the doublewrite memory buffer: */
flush_slot->first_free= 0;
batch_running= false;
pthread_cond_broadcast(&cond);
}
mysql_mutex_unlock(&mutex);
......@@ -756,7 +747,6 @@ void buf_dblwr_t::add_to_batch(const IORequest &request, size_t size)
const ulint buf_size= 2 * block_size();
mysql_mutex_lock(&mutex);
writes_pending++;
for (;;)
{
......
......@@ -246,7 +246,7 @@ void buf_flush_remove_pages(ulint id)
if (!deferred)
break;
buf_dblwr.wait_for_page_writes();
os_aio_wait_until_no_pending_writes();
}
}
......@@ -376,9 +376,9 @@ void buf_page_write_complete(const IORequest &request)
if (request.is_LRU())
{
const bool temp= bpage->oldest_modification() == 2;
if (!temp)
buf_dblwr.write_completed(state < buf_page_t::WRITE_FIX_REINIT &&
request.node->space->use_doublewrite());
if (!temp && state < buf_page_t::WRITE_FIX_REINIT &&
request.node->space->use_doublewrite())
buf_dblwr.write_completed();
/* We must hold buf_pool.mutex while releasing the block, so that
no other thread can access it before we have freed it. */
mysql_mutex_lock(&buf_pool.mutex);
......@@ -390,8 +390,9 @@ void buf_page_write_complete(const IORequest &request)
}
else
{
buf_dblwr.write_completed(state < buf_page_t::WRITE_FIX_REINIT &&
request.node->space->use_doublewrite());
if (state < buf_page_t::WRITE_FIX_REINIT &&
request.node->space->use_doublewrite())
buf_dblwr.write_completed();
bpage->write_complete(false);
}
}
......@@ -886,8 +887,6 @@ bool buf_page_t::flush(bool evict, fil_space_t *space)
if (lsn > log_sys.get_flushed_lsn())
log_write_up_to(lsn, true);
}
if (UNIV_LIKELY(space->purpose != FIL_TYPE_TEMPORARY))
buf_dblwr.add_unbuffered();
space->io(IORequest{type, this, slot}, physical_offset(), size,
write_frame, this);
}
......@@ -1853,7 +1852,7 @@ static void buf_flush_wait(lsn_t lsn)
break;
}
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
buf_dblwr.wait_for_page_writes();
os_aio_wait_until_no_pending_writes();
mysql_mutex_lock(&buf_pool.flush_list_mutex);
}
}
......@@ -1875,8 +1874,6 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
if (buf_pool.get_oldest_modification(sync_lsn) < sync_lsn)
{
MONITOR_INC(MONITOR_FLUSH_SYNC_WAITS);
thd_wait_begin(nullptr, THD_WAIT_DISKIO);
tpool::tpool_wait_begin();
#if 1 /* FIXME: remove this, and guarantee that the page cleaner serves us */
if (UNIV_UNLIKELY(!buf_page_cleaner_is_active))
......@@ -1891,7 +1888,7 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
MONITOR_FLUSH_SYNC_COUNT,
MONITOR_FLUSH_SYNC_PAGES, n_pages);
}
buf_dblwr.wait_for_page_writes();
os_aio_wait_until_no_pending_writes();
mysql_mutex_lock(&buf_pool.flush_list_mutex);
}
while (buf_pool.get_oldest_modification(sync_lsn) < sync_lsn);
......@@ -1900,7 +1897,6 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
#endif
buf_flush_wait(sync_lsn);
tpool::tpool_wait_end();
thd_wait_end(nullptr);
}
......@@ -2335,7 +2331,7 @@ static void buf_flush_page_cleaner()
last_activity_count= activity_count;
goto maybe_unemployed;
}
else if (buf_pool.page_cleaner_idle() && buf_pool.n_pend_reads == 0)
else if (buf_pool.page_cleaner_idle() && !os_aio_pending_reads())
{
/* reaching here means 3 things:
- last_activity_count == activity_count: suggesting server is idle
......@@ -2411,7 +2407,7 @@ static void buf_flush_page_cleaner()
mysql_mutex_lock(&buf_pool.flush_list_mutex);
buf_flush_wait_LRU_batch_end();
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
buf_dblwr.wait_for_page_writes();
os_aio_wait_until_no_pending_writes();
}
mysql_mutex_lock(&buf_pool.flush_list_mutex);
......@@ -2461,15 +2457,7 @@ ATTRIBUTE_COLD void buf_flush_buffer_pool()
{
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
buf_flush_list(srv_max_io_capacity);
if (const size_t pending= buf_dblwr.pending_writes())
{
timespec abstime;
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"Waiting to write %zu pages", pending);
set_timespec(abstime, INNODB_EXTEND_TIMEOUT_INTERVAL / 2);
buf_dblwr.wait_for_page_writes(abstime);
}
os_aio_wait_until_no_pending_writes();
mysql_mutex_lock(&buf_pool.flush_list_mutex);
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"Waiting to flush " ULINTPF " pages",
......@@ -2477,20 +2465,18 @@ ATTRIBUTE_COLD void buf_flush_buffer_pool()
}
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
ut_ad(!buf_pool.any_io_pending());
ut_ad(!os_aio_pending_writes());
ut_ad(!os_aio_pending_reads());
}
/** Synchronously flush dirty blocks during recv_sys_t::apply().
NOTE: The calling thread is not allowed to hold any buffer page latches! */
void buf_flush_sync_batch(lsn_t lsn)
{
thd_wait_begin(nullptr, THD_WAIT_DISKIO);
tpool::tpool_wait_begin();
lsn= std::max(lsn, log_sys.get_lsn());
mysql_mutex_lock(&buf_pool.flush_list_mutex);
buf_flush_wait(lsn);
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
tpool::tpool_wait_end();
thd_wait_end(nullptr);
}
/** Synchronously flush dirty blocks.
......
......@@ -227,12 +227,9 @@ static buf_page_t* buf_page_init_for_read(ulint mode, const page_id_t page_id,
}
buf_pool.stat.n_pages_read++;
mysql_mutex_unlock(&buf_pool.mutex);
buf_pool.n_pend_reads++;
goto func_exit_no_mutex;
func_exit:
mysql_mutex_unlock(&buf_pool.mutex);
func_exit_no_mutex:
if (mode == BUF_READ_IBUF_PAGES_ONLY)
ibuf_mtr_commit(&mtr);
......@@ -319,8 +316,6 @@ buf_read_page_low(
page_id.page_no() * len, len, dst, bpage);
if (UNIV_UNLIKELY(fio.err != DB_SUCCESS)) {
ut_d(auto n=) buf_pool.n_pend_reads--;
ut_ad(n > 0);
buf_pool.corrupted_evict(bpage, buf_page_t::READ_FIX);
} else if (sync) {
thd_wait_end(NULL);
......@@ -367,7 +362,8 @@ buf_read_ahead_random(const page_id_t page_id, ulint zip_size, bool ibuf)
read-ahead, as that could break the ibuf page access order */
return 0;
if (buf_pool.n_pend_reads > buf_pool.curr_size / BUF_READ_AHEAD_PEND_LIMIT)
if (os_aio_pending_reads_approx() >
buf_pool.curr_size / BUF_READ_AHEAD_PEND_LIMIT)
return 0;
fil_space_t* space= fil_space_t::get(page_id.space());
......@@ -519,7 +515,8 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf)
/* No read-ahead to avoid thread deadlocks */
return 0;
if (buf_pool.n_pend_reads > buf_pool.curr_size / BUF_READ_AHEAD_PEND_LIMIT)
if (os_aio_pending_reads_approx() >
buf_pool.curr_size / BUF_READ_AHEAD_PEND_LIMIT)
return 0;
const uint32_t buf_read_ahead_area= buf_pool.read_ahead_area;
......@@ -689,18 +686,8 @@ void buf_read_recv_pages(ulint space_id, const uint32_t* page_nos, ulint n)
limit += buf_pool.chunks[j].size / 2;
}
for (ulint count = 0; buf_pool.n_pend_reads >= limit; ) {
std::this_thread::sleep_for(
std::chrono::milliseconds(10));
if (!(++count % 1000)) {
ib::error()
<< "Waited for " << count / 100
<< " seconds for "
<< buf_pool.n_pend_reads
<< " pending reads";
}
if (os_aio_pending_reads() >= limit) {
os_aio_wait_until_no_pending_reads();
}
space->reacquire();
......
......@@ -1693,8 +1693,6 @@ class buf_pool_t
/** map of block->frame to buf_block_t blocks that belong
to buf_buddy_alloc(); protected by buf_pool.mutex */
hash_table_t zip_hash;
/** number of pending read operations */
Atomic_counter<ulint> n_pend_reads;
Atomic_counter<ulint>
n_pend_unzip; /*!< number of pending decompressions */
......@@ -1721,7 +1719,7 @@ class buf_pool_t
/** flush_list size in bytes; protected by flush_list_mutex */
ulint flush_list_bytes;
/** possibly modified persistent pages (a subset of LRU);
buf_dblwr.pending_writes() is approximately COUNT(is_write_fixed()) */
os_aio_pending_writes() is approximately COUNT(is_write_fixed()) */
UT_LIST_BASE_NODE_T(buf_page_t) flush_list;
private:
static constexpr unsigned PAGE_CLEANER_IDLE= 1;
......@@ -1874,18 +1872,6 @@ class buf_pool_t
/** Reserve a buffer. */
buf_tmp_buffer_t *io_buf_reserve() { return io_buf.reserve(); }
/** @return whether any I/O is pending */
bool any_io_pending()
{
if (n_pend_reads)
return true;
mysql_mutex_lock(&flush_list_mutex);
const bool any_pending= page_cleaner_status > PAGE_CLEANER_IDLE ||
buf_dblwr.pending_writes();
mysql_mutex_unlock(&flush_list_mutex);
return any_pending;
}
private:
/** Remove a block from the flush list. */
inline void delete_from_flush_list_low(buf_page_t *bpage);
......
......@@ -72,10 +72,6 @@ class buf_dblwr_t
ulint writes_completed;
/** number of pages written by flush_buffered_writes_completed() */
ulint pages_written;
/** condition variable for !writes_pending */
pthread_cond_t write_cond;
/** number of pending page writes */
size_t writes_pending;
slot slots[2];
slot *active_slot;
......@@ -124,7 +120,7 @@ class buf_dblwr_t
void recover();
/** Update the doublewrite buffer on data page write completion. */
void write_completed(bool with_doublewrite);
void write_completed();
/** Flush possible buffered writes to persistent storage.
It is very important to call this function after a batch of writes has been
posted, and also when we may have to wait for a page latch!
......@@ -167,40 +163,6 @@ class buf_dblwr_t
my_cond_wait(&cond, &mutex.m_mutex);
mysql_mutex_unlock(&mutex);
}
/** Register an unbuffered page write */
void add_unbuffered()
{
mysql_mutex_lock(&mutex);
writes_pending++;
mysql_mutex_unlock(&mutex);
}
size_t pending_writes()
{
mysql_mutex_lock(&mutex);
const size_t pending{writes_pending};
mysql_mutex_unlock(&mutex);
return pending;
}
/** Wait for writes_pending to reach 0 */
void wait_for_page_writes()
{
mysql_mutex_lock(&mutex);
while (writes_pending)
my_cond_wait(&write_cond, &mutex.m_mutex);
mysql_mutex_unlock(&mutex);
}
/** Wait for writes_pending to reach 0 */
void wait_for_page_writes(const timespec &abstime)
{
mysql_mutex_lock(&mutex);
while (writes_pending)
my_cond_timedwait(&write_cond, &mutex.m_mutex, &abstime);
mysql_mutex_unlock(&mutex);
}
};
/** The doublewrite buffer */
......
......@@ -1059,6 +1059,13 @@ void os_aio_free();
@retval DB_IO_ERROR on I/O error */
dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n);
/** @return number of pending reads */
size_t os_aio_pending_reads();
/** @return approximate number of pending reads */
size_t os_aio_pending_reads_approx();
/** @return number of pending writes */
size_t os_aio_pending_writes();
/** Wait until there are no pending asynchronous writes. */
void os_aio_wait_until_no_pending_writes();
......
......@@ -3384,7 +3384,7 @@ void recv_sys_t::apply(bool last_batch)
for (;;)
{
const bool empty= pages.empty();
if (empty && !buf_pool.n_pend_reads)
if (empty && !os_aio_pending_reads())
break;
if (!is_corrupt_fs() && !is_corrupt_log())
......@@ -3398,7 +3398,6 @@ void recv_sys_t::apply(bool last_batch)
{
mysql_mutex_unlock(&mutex);
os_aio_wait_until_no_pending_reads();
ut_ad(!buf_pool.n_pend_reads);
mysql_mutex_lock(&mutex);
ut_ad(pages.empty());
}
......
......@@ -131,6 +131,11 @@ class io_slots
{
wait();
}
std::mutex& mutex()
{
return m_cache.mutex();
}
};
static io_slots *read_slots;
......@@ -3660,6 +3665,26 @@ void os_aio_wait_until_no_pending_writes()
buf_dblwr.wait_flush_buffered_writes();
}
/** @return number of pending reads */
size_t os_aio_pending_reads()
{
std::unique_lock<std::mutex> lk(read_slots->mutex());
return read_slots->pending_io_count();
}
/** @return approximate number of pending reads */
size_t os_aio_pending_reads_approx()
{
return read_slots->pending_io_count();
}
/** @return number of pending writes */
size_t os_aio_pending_writes()
{
std::unique_lock<std::mutex> lk(write_slots->mutex());
return write_slots->pending_io_count();
}
/** Wait until all pending asynchronous reads have completed. */
void os_aio_wait_until_no_pending_reads()
{
......
......@@ -249,7 +249,11 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn,
}
DBUG_PRINT("ib_log", ("After innodb_log_abort_6"));
DBUG_ASSERT(!buf_pool.any_io_pending());
ut_ad(!os_aio_pending_reads());
ut_ad(!os_aio_pending_writes());
ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
ut_ad(!buf_pool.get_oldest_modification(0));
ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
DBUG_EXECUTE_IF("innodb_log_abort_7", return DB_ERROR;);
DBUG_PRINT("ib_log", ("After innodb_log_abort_7"));
......@@ -967,7 +971,11 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists)
}
ut_ad(flushed_lsn == log_sys.get_lsn());
ut_ad(!buf_pool.any_io_pending());
ut_ad(!os_aio_pending_reads());
ut_ad(!os_aio_pending_writes());
ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
ut_ad(!buf_pool.get_oldest_modification(0));
ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
DBUG_RETURN(flushed_lsn);
}
......@@ -1601,7 +1609,11 @@ dberr_t srv_start(bool create_new_db)
ut_ad(srv_force_recovery <= SRV_FORCE_IGNORE_CORRUPT);
ut_ad(recv_no_log_write);
err = fil_write_flushed_lsn(log_sys.get_lsn());
DBUG_ASSERT(!buf_pool.any_io_pending());
ut_ad(!os_aio_pending_reads());
ut_ad(!os_aio_pending_writes());
ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
ut_ad(!buf_pool.get_oldest_modification(0));
ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
log_sys.log.close_file();
if (err == DB_SUCCESS) {
bool trunc = srv_operation
......@@ -1645,7 +1657,11 @@ dberr_t srv_start(bool create_new_db)
threads until creating a log checkpoint at the
end of create_log_file(). */
ut_d(recv_no_log_write = true);
DBUG_ASSERT(!buf_pool.any_io_pending());
ut_ad(!os_aio_pending_reads());
ut_ad(!os_aio_pending_writes());
ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
ut_ad(!buf_pool.get_oldest_modification(0));
ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
DBUG_EXECUTE_IF("innodb_log_abort_3",
return(srv_init_abort(DB_ERROR)););
......
......@@ -130,9 +130,12 @@ template<typename T> class cache
return m_cache[m_pos++];
}
/**
Put back an item to cache.
@param item - item to put back
std::mutex &mutex() { return m_mtx; }
/**
Put back an element to cache.
@param ele element to put back
*/
void put(T *ele)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment