Commit 30ea63b7 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-21534 - Improve innodb redo log group commit performance

Introduce special synchronization primitive  group_commit_lock
for more efficient synchronization of redo log writing and flushing.

The goal is to reduce CPU consumption on log_write_up_to, to reduce
the spurious wakeups, and improve the throughput in write-intensive
benchmarks.
parent 607960c7
......@@ -203,9 +203,9 @@ IF(MSVC)
ENDIF()
ENDIF()
# Always link with socket library
STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib")
STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib")
# Always link with socket/synchronization libraries
STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib")
STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib")
# System checks
SET(SIGNAL_WITH_VIO_CLOSE 1) # Something that runtime team needs
......
......@@ -84,6 +84,7 @@ SET(INNOBASE_SOURCES
log/log0log.cc
log/log0recv.cc
log/log0crypt.cc
log/log0sync.cc
mem/mem0mem.cc
mtr/mtr0mtr.cc
os/os0file.cc
......
......@@ -577,8 +577,6 @@ struct log_t{
MY_ALIGNED(CACHE_LINE_SIZE)
LogSysMutex mutex; /*!< mutex protecting the log */
MY_ALIGNED(CACHE_LINE_SIZE)
LogSysMutex write_mutex; /*!< mutex protecting writing to log */
MY_ALIGNED(CACHE_LINE_SIZE)
FlushOrderMutex log_flush_order_mutex;/*!< mutex to serialize access to
the flush list when we are putting
dirty blocks in the list. The idea
......@@ -710,13 +708,7 @@ struct log_t{
AND flushed to disk */
std::atomic<size_t> pending_flushes; /*!< system calls in progress */
std::atomic<size_t> flushes; /*!< system calls counter */
ulint n_pending_flushes;/*!< number of currently
pending flushes; protected by
log_sys.mutex */
os_event_t flush_event; /*!< this event is in the reset state
when a flush is running;
os_event_set() and os_event_reset()
are protected by log_sys.mutex */
ulint n_log_ios; /*!< number of log i/os initiated thus
far */
ulint n_log_ios_old; /*!< number of log i/o's at the
......@@ -834,6 +826,9 @@ struct log_t{
/** Redo log system */
extern log_t log_sys;
#ifdef UNIV_DEBUG
extern bool log_write_lock_own();
#endif
/** Gets the log capacity. It is OK to read the value without
holding log_sys.mutex because it is constant.
......@@ -848,7 +843,7 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const
ut_ad(this == &log_sys.log);
/* The lsn parameters are updated while holding both the mutexes
and it is ok to have either of them while reading */
ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
const lsn_t size = capacity();
lsn_t l= lsn - this->lsn;
if (longlong(l) < 0) {
......@@ -862,12 +857,12 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const
}
inline void log_t::file::set_lsn(lsn_t a_lsn) {
ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
lsn = a_lsn;
}
inline void log_t::file::set_lsn_offset(lsn_t a_lsn) {
ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
ut_ad((lsn % OS_FILE_LOG_BLOCK_SIZE) == (a_lsn % OS_FILE_LOG_BLOCK_SIZE));
lsn_offset = a_lsn;
}
......@@ -888,32 +883,14 @@ inline void log_t::file::set_lsn_offset(lsn_t a_lsn) {
/** Test if log sys mutex is owned. */
#define log_mutex_own() mutex_own(&log_sys.mutex)
/** Test if log sys write mutex is owned. */
#define log_write_mutex_own() mutex_own(&log_sys.write_mutex)
/** Acquire the log sys mutex. */
#define log_mutex_enter() mutex_enter(&log_sys.mutex)
/** Acquire the log sys write mutex. */
#define log_write_mutex_enter() mutex_enter(&log_sys.write_mutex)
/** Acquire all the log sys mutexes. */
#define log_mutex_enter_all() do { \
mutex_enter(&log_sys.write_mutex); \
mutex_enter(&log_sys.mutex); \
} while (0)
/** Release the log sys mutex. */
#define log_mutex_exit() mutex_exit(&log_sys.mutex)
/** Release the log sys write mutex.*/
#define log_write_mutex_exit() mutex_exit(&log_sys.write_mutex)
/** Release all the log sys mutexes. */
#define log_mutex_exit_all() do { \
mutex_exit(&log_sys.mutex); \
mutex_exit(&log_sys.write_mutex); \
} while (0)
/* log scrubbing speed, in bytes/sec */
extern ulonglong innodb_scrub_log_speed;
......
......@@ -54,6 +54,7 @@ Created 12/9/1995 Heikki Tuuri
#include "srv0mon.h"
#include "sync0sync.h"
#include "buf0dump.h"
#include "log0sync.h"
/*
General philosophy of InnoDB redo-logs:
......@@ -509,7 +510,6 @@ void log_t::create()
m_initialised= true;
mutex_create(LATCH_ID_LOG_SYS, &mutex);
mutex_create(LATCH_ID_LOG_WRITE, &write_mutex);
mutex_create(LATCH_ID_LOG_FLUSH_ORDER, &log_flush_order_mutex);
/* Start the lsn from one log block from zero: this way every
......@@ -535,9 +535,6 @@ void log_t::create()
buf_next_to_write= 0;
write_lsn= lsn;
flushed_to_disk_lsn= 0;
n_pending_flushes= 0;
flush_event = os_event_create("log_flush_event");
os_event_set(flush_event);
n_log_ios= 0;
n_log_ios_old= 0;
log_capacity= 0;
......@@ -856,7 +853,7 @@ log_file_header_flush(
lsn_t start_lsn) /*!< in: log file data starts at this
lsn */
{
ut_ad(log_write_mutex_own());
ut_ad(log_write_lock_own());
ut_ad(!recv_no_log_write);
ut_ad(log_sys.log.format == log_t::FORMAT_10_5
|| log_sys.log.format == log_t::FORMAT_ENC_10_5);
......@@ -909,7 +906,7 @@ log_write_buf(
lsn_t next_offset;
ulint i;
ut_ad(log_write_mutex_own());
ut_ad(log_write_lock_own());
ut_ad(!recv_no_log_write);
ut_a(len % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_a(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
......@@ -1002,20 +999,14 @@ log_write_buf(
and invoke log_mutex_enter(). */
static
void
log_write_flush_to_disk_low()
log_write_flush_to_disk_low(lsn_t lsn)
{
/* FIXME: This is not holding log_sys.mutex while
calling os_event_set()! */
ut_a(log_sys.n_pending_flushes == 1); /* No other threads here */
log_sys.log.flush_data_only();
log_mutex_enter();
log_sys.flushed_to_disk_lsn = log_sys.current_flush_lsn;
log_sys.n_pending_flushes--;
os_event_set(log_sys.flush_event);
ut_a(lsn >= log_sys.flushed_to_disk_lsn);
log_sys.flushed_to_disk_lsn = lsn;
log_mutex_exit();
}
/** Switch the log buffer in use, and copy the content of last block
......@@ -1026,7 +1017,7 @@ void
log_buffer_switch()
{
ut_ad(log_mutex_own());
ut_ad(log_write_mutex_own());
ut_ad(log_write_lock_own());
const byte* old_buf = log_sys.buf;
ulong area_end = ut_calc_align(
......@@ -1053,86 +1044,24 @@ log_buffer_switch()
log_sys.buf_next_to_write = log_sys.buf_free;
}
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
{
#ifdef UNIV_DEBUG
ulint loop_count = 0;
#endif /* UNIV_DEBUG */
byte* write_buf;
lsn_t write_lsn;
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
if (recv_no_ibuf_operations) {
/* Recovery is running and no operations on the log file are
allowed yet (the variable name .._no_ibuf_.. is misleading) */
/**
Writes log buffer to disk
which is the "write" part of log_write_up_to().
return;
}
This function does not flush anything.
loop:
ut_ad(++loop_count < 128);
#if UNIV_WORD_SIZE > 7
/* We can do a dirty read of LSN. */
/* NOTE: Currently doesn't do dirty read for
(flush_to_disk == true) case, because the log_mutex
contention also works as the arbitrator for write-IO
(fsync) bandwidth between log file and data files. */
if (!flush_to_disk && log_sys.write_lsn >= lsn) {
return;
}
#endif
Note : the caller must have log_mutex locked, and this
mutex is released in the function.
log_write_mutex_enter();
*/
static void log_write(bool rotate_key)
{
ut_ad(log_mutex_own());
ut_ad(!recv_no_log_write);
lsn_t limit_lsn = flush_to_disk
? log_sys.flushed_to_disk_lsn
: log_sys.write_lsn;
if (limit_lsn >= lsn) {
log_write_mutex_exit();
return;
}
/* If it is a write call we should just go ahead and do it
as we checked that write_lsn is not where we'd like it to
be. If we have to flush as well then we check if there is a
pending flush and based on that we wait for it to finish
before proceeding further. */
if (flush_to_disk
&& (log_sys.n_pending_flushes > 0
|| !os_event_is_set(log_sys.flush_event))) {
/* Figure out if the current flush will do the job
for us. */
bool work_done = log_sys.current_flush_lsn >= lsn;
log_write_mutex_exit();
os_event_wait(log_sys.flush_event);
if (work_done) {
return;
} else {
goto loop;
}
}
log_mutex_enter();
if (!flush_to_disk
&& log_sys.buf_free == log_sys.buf_next_to_write) {
/* Nothing to write and no flush to disk requested */
log_mutex_exit_all();
lsn_t write_lsn;
if (log_sys.buf_free == log_sys.buf_next_to_write) {
/* Nothing to write */
log_mutex_exit();
return;
}
......@@ -1146,19 +1075,7 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF,
log_sys.write_lsn,
log_sys.lsn));
if (flush_to_disk) {
log_sys.n_pending_flushes++;
log_sys.current_flush_lsn = log_sys.lsn;
os_event_reset(log_sys.flush_event);
if (log_sys.buf_free == log_sys.buf_next_to_write) {
/* Nothing to write, flush only */
log_mutex_exit_all();
log_write_flush_to_disk_low();
log_mutex_exit();
return;
}
}
start_offset = log_sys.buf_next_to_write;
end_offset = log_sys.buf_free;
......@@ -1175,7 +1092,7 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
log_sys.next_checkpoint_no);
write_lsn = log_sys.lsn;
write_buf = log_sys.buf;
byte *write_buf = log_sys.buf;
log_buffer_switch();
......@@ -1209,8 +1126,7 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
if (UNIV_UNLIKELY(srv_shutdown_state != SRV_SHUTDOWN_NONE)) {
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"InnoDB log write: "
LSN_PF "," LSN_PF,
log_sys.write_lsn, lsn);
LSN_PF, log_sys.write_lsn);
}
if (log_sys.is_encrypted()) {
......@@ -1230,16 +1146,76 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
start_offset - area_start);
srv_stats.log_padded.add(pad_size);
log_sys.write_lsn = write_lsn;
if (log_sys.log.writes_are_durable())
log_sys.flushed_to_disk_lsn = write_lsn;
return;
}
log_write_mutex_exit();
static group_commit_lock write_lock;
static group_commit_lock flush_lock;
if (flush_to_disk) {
log_write_flush_to_disk_low();
ib_uint64_t flush_lsn = log_sys.flushed_to_disk_lsn;
log_mutex_exit();
#ifdef UNIV_DEBUG
bool log_write_lock_own()
{
return write_lock.is_owner();
}
#endif
innobase_mysql_log_notify(flush_lsn);
}
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
{
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
if (recv_no_ibuf_operations)
{
/* Recovery is running and no operations on the log files are
allowed yet (the variable name .._no_ibuf_.. is misleading) */
return;
}
if (flush_to_disk &&
flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
{
return;
}
if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
{
log_mutex_enter();
auto write_lsn = log_sys.lsn;
write_lock.set_pending(write_lsn);
log_write(rotate_key);
ut_a(log_sys.write_lsn == write_lsn);
write_lock.release(write_lsn);
}
if (!flush_to_disk)
{
return;
}
/* Flush the highest written lsn.*/
auto flush_lsn = write_lock.value();
flush_lock.set_pending(flush_lsn);
if (!log_sys.log.writes_are_durable())
{
log_write_flush_to_disk_low(flush_lsn);
}
flush_lock.release(flush_lsn);
innobase_mysql_log_notify(flush_lsn);
}
/** write to the log file up to the last log entry.
......@@ -1270,8 +1246,7 @@ log_buffer_sync_in_background(
lsn = log_sys.lsn;
if (flush
&& log_sys.n_pending_flushes > 0
&& log_sys.current_flush_lsn >= lsn) {
&& log_sys.flushed_to_disk_lsn >= lsn) {
/* The write + flush will write enough */
log_mutex_exit();
return;
......@@ -1836,7 +1811,7 @@ logs_empty_and_mark_files_at_shutdown(void)
if (log_sys.is_initialised()) {
log_mutex_enter();
const ulint n_write = log_sys.n_pending_checkpoint_writes;
const ulint n_flush = log_sys.n_pending_flushes;
const ulint n_flush = log_sys.pending_flushes;
log_mutex_exit();
if (log_scrub_thread_active || n_write || n_flush) {
......@@ -2011,7 +1986,7 @@ log_print(
ULINTPF " pending log flushes, "
ULINTPF " pending chkp writes\n"
ULINTPF " log i/o's done, %.2f log i/o's/second\n",
log_sys.n_pending_flushes,
log_sys.pending_flushes.load(),
log_sys.n_pending_checkpoint_writes,
log_sys.n_log_ios,
static_cast<double>(
......@@ -2047,9 +2022,7 @@ void log_t::close()
ut_free_dodump(buf, srv_log_buffer_size * 2);
buf = NULL;
os_event_destroy(flush_event);
mutex_free(&mutex);
mutex_free(&write_mutex);
mutex_free(&log_flush_order_mutex);
if (!srv_read_only_mode && srv_scrub_log)
......
/*****************************************************************************
Copyright (c) 2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*****************************************************************************/
/*
The group commit synchronization used in log_write_up_to()
works as follows
For simplicity, lets consider only write operation,synchronozation of
flush operation works the same.
Rules of the game
A thread enters log_write_up_to() with lsn of the current transaction
1. If last written lsn is greater than wait lsn (another thread already
wrote the log buffer),then there is no need to do anything.
2. If no other thread is currently writing, write the log buffer,
and update last written lsn.
3. Otherwise, wait, and go to step 1.
Synchronization can be done in different ways, e.g
a) Simple mutex locking the entire check and write operation
Disadvantage that threads that could continue after updating
last written lsn, still wait.
b) Spinlock, with periodic checks for last written lsn.
Fixes a) but burns CPU unnecessary.
c) Mutex / condition variable combo.
Condtion variable notifies (broadcast) all waiters, whenever
last written lsn is changed.
Has a disadvantage of many suprious wakeups, stress on OS scheduler,
and mutex contention.
d) Something else.
Make use of the waiter's lsn parameter, and only wakeup "right" waiting
threads.
We chose d). Even if implementation is more complicated than alternatves
due to the need to maintain list of waiters, it provides the best performance.
See group_commit_lock implementation for details.
Note that if write operation is very fast, a) or b) can be fine as alternative.
*/
#ifdef _WIN32
#include <windows.h>
#endif
#ifdef __linux__
#include <linux/futex.h>
#include <sys/syscall.h>
#endif
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <my_cpu.h>
#include <log0types.h>
#include "log0sync.h"
/**
Helper class , used in group commit lock.
Binary semaphore, or (same thing), an auto-reset event
Has state (signalled or not), and provides 2 operations.
wait() and wake()
The implementation uses efficient locking primitives on Linux and Windows.
Or, mutex/condition combo elsewhere.
*/
class binary_semaphore
{
public:
/**Wait until semaphore becomes signalled, and atomically reset the state
to non-signalled*/
void wait();
/** signals the semaphore */
void wake();
private:
#if defined(__linux__) || defined (_WIN32)
std::atomic<int> m_signalled;
const std::memory_order mem_order = std::memory_order::memory_order_acq_rel;
public:
binary_semaphore() :m_signalled(0) {}
#else
std::mutex m_mtx{};
std::condition_variable m_cv{};
bool m_signalled = false;
#endif
};
#if defined (__linux__) || defined (_WIN32)
void binary_semaphore::wait()
{
for (;;)
{
if (m_signalled.exchange(0, mem_order) == 1)
{
break;
}
#ifdef _WIN32
int zero = 0;
WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
#else
syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
#endif
}
}
void binary_semaphore::wake()
{
if (m_signalled.exchange(1, mem_order) == 0)
{
#ifdef _WIN32
WakeByAddressSingle(&m_signalled);
#else
syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
#endif
}
}
#else
void binary_semaphore::wake()
{
std::unique_lock<std::mutex> lk(m_mtx);
while (!m_signalled)
m_cv.wait(lk);
m_signalled = false;
}
void binary_semaphore::wake()
{
std::unique_lock<std::mutex> lk(m_mtx);
m_signalled = true;
m_cv.notify_one();
}
#endif
/* A thread helper structure, used in group commit lock below*/
struct group_commit_waiter_t
{
lsn_t m_value;
binary_semaphore m_sema;
group_commit_waiter_t* m_next;
group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
};
group_commit_lock::group_commit_lock() :
m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
{
}
group_commit_lock::value_type group_commit_lock::value() const
{
return m_value.load(std::memory_order::memory_order_relaxed);
}
group_commit_lock::value_type group_commit_lock::pending() const
{
return m_pending_value.load(std::memory_order::memory_order_relaxed);
}
void group_commit_lock::set_pending(group_commit_lock::value_type num)
{
ut_a(num >= value());
m_pending_value.store(num, std::memory_order::memory_order_relaxed);
}
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
{
unsigned int spins = MAX_SPINS;
for(;;)
{
if (num <= value())
{
/* No need to wait.*/
return lock_return_code::EXPIRED;
}
if(spins-- == 0)
break;
if (num > pending())
{
/* Longer wait expected (longer than currently running operation),
don't spin.*/
break;
}
ut_delay(1);
}
thread_local_waiter.m_value = num;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
while (num > value())
{
lk.lock();
/* Re-read current value after acquiring the lock*/
if (num <= value())
{
return lock_return_code::EXPIRED;
}
if (!m_lock)
{
/* Take the lock, become group commit leader.*/
m_lock = true;
#ifndef DBUG_OFF
m_owner_id = std::this_thread::get_id();
#endif
return lock_return_code::ACQUIRED;
}
/* Add yourself to waiters list.*/
thread_local_waiter.m_next = m_waiters_list;
m_waiters_list = &thread_local_waiter;
lk.unlock();
/* Sleep until woken in release().*/
thread_local_waiter.m_sema.wait();
}
return lock_return_code::EXPIRED;
}
void group_commit_lock::release(value_type num)
{
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
/* Update current value. */
ut_a(num >= value());
m_value.store(num, std::memory_order_relaxed);
/*
Wake waiters for value <= current value.
Wake one more waiter, who will become the group commit lead.
*/
group_commit_waiter_t* cur, * prev, * next;
group_commit_waiter_t* wakeup_list = nullptr;
int extra_wake = 0;
for (cur = m_waiters_list; cur; cur = next)
{
next = cur->m_next;
if (cur->m_value <= num || extra_wake++ == 0)
{
/* Move current waiter to wakeup_list*/
if (cur == m_waiters_list)
{
/* Remove from the start of the list.*/
m_waiters_list = next;
}
else
{
/* Remove from the middle of the list.*/
prev->m_next = cur->m_next;
}
/* Append entry to the wakeup list.*/
cur->m_next = wakeup_list;
wakeup_list = cur;
}
else
{
prev = cur;
}
}
lk.unlock();
for (cur = wakeup_list; cur; cur = next)
{
next = cur->m_next;
cur->m_sema.wake();
}
}
#ifndef DBUG_OFF
bool group_commit_lock::is_owner()
{
return m_lock && std::this_thread::get_id() == m_owner_id;
}
#endif
/*****************************************************************************
Copyright (c) 2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*****************************************************************************/
#include <atomic>
#include <thread>
#include <log0types.h>
struct group_commit_waiter_t;
/**
Special synchronization primitive, which is helpful for
performing group commit.
It has a state consisting of
- locked (bool)
- current value (number). This value is always increasing.
- pending value (number). current value can soon become this number
This is only used for optimization, does not have to be exact
Operations supported on this semaphore
1.acquire(num):
- waits until current value exceeds num, or until lock is granted.
- returns EXPIRED if current_value >= num,
or ACQUIRED, if current_value < num and lock is granted.
2.release(num)
- releases lock
- sets new current value to max(num,current_value)
- releases some threads waiting in acquire()
3. value()
- read current value
4. pending_value()
- read pending value
5. set_pending_value()
*/
class group_commit_lock
{
using value_type = lsn_t;
#ifndef DBUG_OFF
std::thread::id m_owner_id{};
#endif
std::mutex m_mtx;
std::atomic<value_type> m_value;
std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
public:
group_commit_lock();
enum lock_return_code
{
ACQUIRED,
EXPIRED
};
lock_return_code acquire(value_type num);
void release(value_type num);
value_type value() const;
value_type pending() const;
void set_pending(value_type num);
#ifndef DBUG_OFF
bool is_owner();
#endif
};
......@@ -1976,9 +1976,8 @@ srv_mon_process_existing_counter(
break;
case MONITOR_PENDING_LOG_FLUSH:
mutex_enter(&log_sys.mutex);
value = static_cast<mon_type_t>(log_sys.n_pending_flushes);
mutex_exit(&log_sys.mutex);
value = static_cast<mon_type_t>(log_sys.pending_flushes);
break;
case MONITOR_PENDING_CHECKPOINT_WRITE:
......
......@@ -1037,11 +1037,12 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists)
}
srv_start_lsn = flushed_lsn;
/* Flush the old log file. */
bool do_flush_logs = flushed_lsn != log_sys.flushed_to_disk_lsn;
log_mutex_exit();
log_write_up_to(flushed_lsn, true);
if (do_flush_logs) {
log_write_up_to(flushed_lsn, false);
}
log_sys.log.flush_data_only();
ut_ad(flushed_lsn == log_get_lsn());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment