Commit c4d15d67 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-24341 reduce or eliminate waits for Innodb redo log group commit in foreground threads

parent 3f871b33
......@@ -640,8 +640,20 @@ net_real_write(NET *net,const uchar *packet, size_t len)
my_bool net_blocking = vio_is_blocking(net->vio);
DBUG_ENTER("net_real_write");
#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE)
query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr);
#if defined(MYSQL_SERVER)
THD *thd= (THD *)net->thd;
#if defined(USE_QUERY_CACHE)
query_cache_insert(thd, (char*) packet, len, net->pkt_nr);
#endif
if (likely(thd))
{
/*
Wait until pending operations (currently it is engine
asynchronous group commit) are finished before replying
to the client, to keep durability promise.
*/
thd->async_state.wait_for_pending_ops();
}
#endif
if (unlikely(net->error == 2))
......
......@@ -40,6 +40,8 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
/** resume previous unfinished command (threadpool only)*/
void (*thd_resume)(THD* thd);
};
......
......@@ -682,7 +682,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_stmt_da(&main_da),
tdc_hash_pins(0),
xid_hash_pins(0),
m_tmp_tables_locked(false)
m_tmp_tables_locked(false),
async_state()
#ifdef HAVE_REPLICATION
,
current_linfo(0),
......@@ -4947,6 +4948,56 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
/**
This function can be used by storage engine
to indicate a start of an async operation.
This asynchronous is such operation needs to be
finished before we write response to the client
.
An example of this operation is Innodb's asynchronous
group commit. Server needs to wait for the end of it
before writing response to client, to provide durability
guarantees, in other words, server can't send OK packet
before modified data is durable in redo log.
*/
extern "C" MYSQL_THD thd_increment_pending_ops(void)
{
THD *thd = current_thd;
if (!thd)
return NULL;
thd->async_state.inc_pending_ops();
return thd;
}
/**
This function can be used by plugin/engine to indicate
end of async operation (such as end of group commit
write flush)
@param thd THD
*/
extern "C" void thd_decrement_pending_ops(MYSQL_THD thd)
{
DBUG_ASSERT(thd);
thd_async_state::enum_async_state state;
if (thd->async_state.dec_pending_ops(&state) == 0)
{
switch(state)
{
case thd_async_state::enum_async_state::SUSPENDED:
DBUG_ASSERT(thd->scheduler->thd_resume);
thd->scheduler->thd_resume(thd);
break;
case thd_async_state::enum_async_state::NONE:
break;
default:
DBUG_ASSERT(0);
}
}
}
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
......
......@@ -2308,6 +2308,152 @@ struct THD_count
~THD_count() { thread_count--; }
};
/**
Support structure for asynchronous group commit
(or generally any asynchronous operation that needs
to finish before server writes response to client)
*/
struct thd_async_state
{
enum class enum_async_state
{
NONE,
SUSPENDED, /* do_command() did not finish, and needs to be resumed */
RESUMED /* do_command() is resumed*/
};
enum_async_state m_state{enum_async_state::NONE};
/* Stuff we need to resume do_command where we finished last time*/
enum enum_server_command m_command{COM_SLEEP};
LEX_STRING m_packet{};
mysql_mutex_t m_mtx;
mysql_cond_t m_cond;
/** Pending counter*/
Atomic_counter<int> m_pending_ops=0;
#ifndef DBUG_OFF
/* Checks */
pthread_t m_dbg_thread;
#endif
thd_async_state()
{
mysql_mutex_init(PSI_NOT_INSTRUMENTED, &m_mtx, 0);
mysql_cond_init(PSI_INSTRUMENT_ME, &m_cond, 0);
}
/*
Currently only used with threadpool, one can "suspend" and "resume" a THD.
Suspend only means leaving do_command earlier, after saving some state.
Resume is continuing suspended THD's do_command(), from where it finished last time.
*/
bool try_suspend()
{
bool ret;
mysql_mutex_lock(&m_mtx);
DBUG_ASSERT(m_state == enum_async_state::NONE);
DBUG_ASSERT(m_pending_ops >= 0);
if(m_pending_ops)
{
ret=true;
m_state= enum_async_state::SUSPENDED;
}
else
{
/*
If there is no pending operations, can't suspend, since
nobody can resume it.
*/
ret=false;
}
mysql_mutex_unlock(&m_mtx);
return ret;
}
~thd_async_state()
{
wait_for_pending_ops();
mysql_mutex_destroy(&m_mtx);
mysql_cond_destroy(&m_cond);
}
/*
Increment pending asynchronous operations.
The client response may not be written if
this count > 0.
So, without threadpool query needs to wait for
the operations to finish.
With threadpool, THD can be suspended and resumed
when this counter goes to 0.
*/
void inc_pending_ops()
{
mysql_mutex_lock(&m_mtx);
#ifndef DBUG_OFF
/*
Check that increments are always done by the same thread.
*/
if (!m_pending_ops)
m_dbg_thread= pthread_self();
else
DBUG_ASSERT(pthread_equal(pthread_self(),m_dbg_thread));
#endif
m_pending_ops++;
mysql_mutex_unlock(&m_mtx);
}
int dec_pending_ops(enum_async_state* state)
{
int ret;
mysql_mutex_lock(&m_mtx);
ret= --m_pending_ops;
if (!ret)
mysql_cond_signal(&m_cond);
*state = m_state;
mysql_mutex_unlock(&m_mtx);
return ret;
}
/*
This is used for "dirty" reading pending ops,
when dirty read is OK.
*/
int pending_ops()
{
return m_pending_ops;
}
/* Wait for pending operations to finish.*/
void wait_for_pending_ops()
{
/*
It is fine to read m_pending_ops and compare it with 0,
without mutex protection.
The value is only incremented by the current thread, and will
be decremented by another one, thus "dirty" may show positive number
when it is really 0, but this is not a problem, and the only
bad thing from that will be rechecking under mutex.
*/
if (!pending_ops())
return;
mysql_mutex_lock(&m_mtx);
DBUG_ASSERT(m_pending_ops >= 0);
while (m_pending_ops)
mysql_cond_wait(&m_cond, &m_mtx);
mysql_mutex_unlock(&m_mtx);
}
};
extern "C" MYSQL_THD thd_increment_pending_ops(void);
extern "C" void thd_decrement_pending_ops(MYSQL_THD);
/**
@class THD
......@@ -5024,6 +5170,7 @@ class THD: public THD_count, /* this must be first */
}
public:
thd_async_state async_state;
#ifdef HAVE_REPLICATION
/*
If we do a purge of binary logs, log index info of the threads
......
......@@ -1168,25 +1168,50 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
/**
Read one command from connection and execute it (query or simple command).
This function is called in loop from thread function.
This function is to be used by different schedulers (one-thread-per-connection,
pool-of-threads)
For profiling to work, it must never be called recursively.
@param thd - client connection context
@param blocking - wait for command to finish.
if true, will wait for outstanding operations (e.g group commit) to finish,
before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK,
in this case another do_command() needs to be executed to finish the current
command.
@retval
DISPATCH_COMMAND_SUCCESS - success
@retval
0 success
DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description)
@retval
1 request of thread shutdown (see dispatch_command() description)
DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations
to finish, only returned if blocking=false,
*/
bool do_command(THD *thd)
dispatch_command_return do_command(THD *thd, bool blocking)
{
bool return_value;
dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
DBUG_ASSERT(!thd->async_state.pending_ops());
if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
{
/*
Resuming previously suspended command.
Restore the state
*/
command = thd->async_state.m_command;
packet = thd->async_state.m_packet.str;
packet_length = (ulong)thd->async_state.m_packet.length;
goto resume;
}
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
......@@ -1253,12 +1278,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
return_value= TRUE; // We have to close it.
return_value= DISPATCH_COMMAND_ERROR; // We have to close it.
goto out;
}
net->error= 0;
return_value= FALSE;
return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
......@@ -1325,7 +1350,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
return_value= FALSE;
return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
......@@ -1351,7 +1376,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
return_value= FALSE;
return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
......@@ -1362,8 +1387,18 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
resume:
return_value= dispatch_command(command, thd, packet+1,
(uint) (packet_length-1));
(uint) (packet_length-1), blocking);
if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
{
/* Save current state, and resume later.*/
thd->async_state.m_command= command;
thd->async_state.m_packet={packet,packet_length};
DBUG_RETURN(return_value);
}
DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
......@@ -1522,8 +1557,8 @@ class Silence_all_errors : public Internal_error_handler
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length)
dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
......@@ -1535,6 +1570,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
{
thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
goto resume;
}
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
......@@ -2265,6 +2306,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
......@@ -2278,7 +2320,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
do_end_of_statement= true;
/*
For the threadpool i.e if non-blocking call, if not all async operations
are finished, return without cleanup. The cleanup will be done on
later, when command execution is resumed.
*/
if (!blocking && !error && thd->async_state.pending_ops())
{
DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
}
resume:
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
......@@ -2382,7 +2435,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
DBUG_RETURN(error);
DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)
......
......@@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex);
void create_table_set_open_action_and_adjust_tables(LEX *lex);
int bootstrap(MYSQL_FILE *file);
int mysql_execute_command(THD *thd);
bool do_command(THD *thd);
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length);
enum dispatch_command_return
{
DISPATCH_COMMAND_SUCCESS=0,
DISPATCH_COMMAND_ERROR= 1,
DISPATCH_COMMAND_WOULDBLOCK= 2
};
dispatch_command_return do_command(THD *thd, bool blocking = true);
dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length, bool blocking = true);
void log_slow_statement(THD *thd);
bool append_file_to_dir(THD *thd, const char **filename_ptr,
const LEX_CSTRING *table_name);
......
......@@ -133,6 +133,7 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
virtual void resume(TP_connection* c)=0;
};
#ifdef _WIN32
......@@ -146,6 +147,7 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
void resume(TP_connection *c);
};
#endif
......@@ -159,6 +161,7 @@ struct TP_pool_generic :TP_pool
virtual int set_pool_size(uint);
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
void resume(TP_connection* c);
};
#endif /* HAVE_POOL_OF_THREADS */
......@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
#include <sql_class.h>
#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
......@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
static int threadpool_process_request(THD *thd);
static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
......@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
else if (threadpool_process_request(thd))
else
{
/* QUIT or an error occurred. */
goto error;
retry:
switch(threadpool_process_request(thd))
{
case DISPATCH_COMMAND_WOULDBLOCK:
if (!thd->async_state.try_suspend())
{
/*
All async operations finished meanwhile, thus nobody is will wake up
this THD. Therefore, we'll resume "manually" here.
*/
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
goto retry;
}
worker_context.restore();
return;
case DISPATCH_COMMAND_ERROR:
/* QUIT or an error occurred. */
goto error;
case DISPATCH_COMMAND_SUCCESS:
break;
}
thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
......@@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd)
/**
Process a single client request or a single batch.
*/
static int threadpool_process_request(THD *thd)
static dispatch_command_return threadpool_process_request(THD *thd)
{
int retval= 0;
dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
thread_attach(thd);
if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
goto resume;
if (thd->killed >= KILL_CONNECTION)
{
......@@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
retval= 1;
retval= DISPATCH_COMMAND_ERROR;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
......@@ -365,19 +390,27 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
if ((retval= do_command(thd)) != 0)
goto end;
resume:
retval= do_command(thd, false);
switch(retval)
{
case DISPATCH_COMMAND_WOULDBLOCK:
case DISPATCH_COMMAND_ERROR:
goto end;
case DISPATCH_COMMAND_SUCCESS:
break;
}
if (!thd_is_connection_alive(thd))
{
retval= 1;
retval=DISPATCH_COMMAND_ERROR;
goto end;
}
set_thd_idle(thd);
if (!has_unread_data(thd))
{
{
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
......@@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
/* Resume previously suspended THD */
static void tp_resume(THD* thd)
{
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
TP_connection* c = get_TP_connection(thd);
pool->resume(c);
}
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
......@@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
tp_end // end
tp_end, // end
tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
......
......@@ -1327,7 +1327,10 @@ void TP_pool_generic::add(TP_connection *c)
DBUG_VOID_RETURN;
}
void TP_pool_generic::resume(TP_connection* c)
{
add(c);
}
/**
MySQL scheduler callback: wait begin
......
......@@ -125,6 +125,12 @@ void TP_pool_win::add(TP_connection *c)
}
}
void TP_pool_win::resume(TP_connection* c)
{
DBUG_ASSERT(c->state == TP_STATE_RUNNING);
SubmitThreadpoolWork(((TP_connection_win*)c)->work);
}
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
......@@ -438,3 +444,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect)
}
return c;
}
......@@ -103,15 +103,21 @@ bool
log_set_capacity(ulonglong file_size)
MY_ATTRIBUTE((warn_unused_result));
/** Ensure that the log has been written to the log file up to a given
/**
Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false);
@param[in] rotate_key whether to rotate the encryption key
@param[in] cb completion callback. If not NULL, the callback will be called
whenever lsn is written or flushed.
*/
struct completion_callback;
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false,
const completion_callback* cb=nullptr);
/** write to the log file up to the last log entry.
@param[in] sync whether we want the written log
......
......@@ -771,6 +771,7 @@ bool log_write_lock_own()
}
#endif
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
......@@ -779,7 +780,8 @@ included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
const completion_callback *callback)
{
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
......@@ -788,16 +790,18 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
{
/* Recovery is running and no operations on the log files are
allowed yet (the variable name .._no_ibuf_.. is misleading) */
ut_a(!callback);
return;
}
if (flush_to_disk &&
flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
{
return;
}
if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
if (write_lock.acquire(lsn, flush_to_disk?0:callback) ==
group_commit_lock::ACQUIRED)
{
mysql_mutex_lock(&log_sys.mutex);
lsn_t write_lsn= log_sys.get_lsn();
......
......@@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <log0types.h>
#include "log0sync.h"
#include <mysql/service_thd_wait.h>
#include <sql_class.h>
/**
Helper class , used in group commit lock.
......@@ -158,10 +159,10 @@ void binary_semaphore::wake()
/* A thread helper structure, used in group commit lock below*/
struct group_commit_waiter_t
{
lsn_t m_value;
binary_semaphore m_sema;
group_commit_waiter_t* m_next;
group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
lsn_t m_value=0;
binary_semaphore m_sema{};
group_commit_waiter_t* m_next{};
bool m_group_commit_leader=false;
};
group_commit_lock::group_commit_lock() :
......@@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num)
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
static inline void do_completion_callback(const completion_callback* cb)
{
if (cb)
cb->m_callback(cb->m_param);
}
group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
{
unsigned int spins = MAX_SPINS;
......@@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
if (num <= value())
{
/* No need to wait.*/
do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
......@@ -212,17 +220,23 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
}
thread_local_waiter.m_value = num;
thread_local_waiter.m_group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
while (num > value())
{
lk.lock();
/* Re-read current value after acquiring the lock*/
if (num <= value())
if (num <= value() &&
(!thread_local_waiter.m_group_commit_leader || m_lock))
{
lk.unlock();
do_completion_callback(callback);
thread_local_waiter.m_group_commit_leader=false;
return lock_return_code::EXPIRED;
}
thread_local_waiter.m_group_commit_leader= false;
if (!m_lock)
{
/* Take the lock, become group commit leader.*/
......@@ -230,9 +244,21 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
#ifndef DBUG_OFF
m_owner_id = std::this_thread::get_id();
#endif
if (callback)
m_pending_callbacks.push_back({num,*callback});
return lock_return_code::ACQUIRED;
}
if (callback && m_waiters_list)
{
/*
We need to have at least one waiter,
so it can become the new group commit leader.
*/
m_pending_callbacks.push_back({num, *callback});
return lock_return_code::CALLBACK_QUEUED;
}
/* Add yourself to waiters list.*/
thread_local_waiter.m_next = m_waiters_list;
m_waiters_list = &thread_local_waiter;
......@@ -244,11 +270,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
thd_wait_end(0);
}
do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
void group_commit_lock::release(value_type num)
{
completion_callback callbacks[1000];
size_t callback_count = 0;
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
......@@ -262,12 +292,21 @@ void group_commit_lock::release(value_type num)
*/
group_commit_waiter_t* cur, * prev, * next;
group_commit_waiter_t* wakeup_list = nullptr;
int extra_wake = 0;
for (auto& c : m_pending_callbacks)
{
if (c.first <= num)
{
if (callback_count < array_elements(callbacks))
callbacks[callback_count++] = c.second;
else
c.second.m_callback(c.second.m_param);
}
}
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
if (cur->m_value <= num || extra_wake++ == 0)
if (cur->m_value <= num)
{
/* Move current waiter to wakeup_list*/
......@@ -291,8 +330,43 @@ void group_commit_lock::release(value_type num)
prev= cur;
}
}
auto it= std::remove_if(
m_pending_callbacks.begin(), m_pending_callbacks.end(),
[num](const std::pair<value_type,completion_callback> &c) { return c.first <= num; });
m_pending_callbacks.erase(it, m_pending_callbacks.end());
if (m_pending_callbacks.size() || m_waiters_list)
{
/*
Ensure that after this thread released the lock,
there is a new group commit leader
We take this from waiters list or wakeup list. It
might look like a spurious wake, but in fact we just
ensure the waiter do not wait for eternity.
*/
if (!m_waiters_list && !wakeup_list)
abort(); /* Assert, also in release version */
if (m_waiters_list)
{
/* Move one waiter to wakeup list */
auto e= m_waiters_list;
m_waiters_list= m_waiters_list->m_next;
e->m_next= wakeup_list;
e->m_group_commit_leader= true;
wakeup_list = e;
}
else //if (wakeup_list)
wakeup_list->m_group_commit_leader=true;
}
lk.unlock();
for (size_t i = 0; i < callback_count; i++)
callbacks[i].m_callback(callbacks[i].m_param);
for (cur= wakeup_list; cur; cur= next)
{
next= cur->m_next;
......
......@@ -18,8 +18,14 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic>
#include <thread>
#include <log0types.h>
#include <vector>
struct group_commit_waiter_t;
struct completion_callback
{
void (*m_callback)(void*);
void* m_param;
};
/**
Special synchronization primitive, which is helpful for
......@@ -63,14 +69,17 @@ class group_commit_lock
std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
std::vector<std::pair<value_type,completion_callback>> m_pending_callbacks;
public:
group_commit_lock();
enum lock_return_code
{
ACQUIRED,
EXPIRED
EXPIRED,
CALLBACK_QUEUED
};
lock_return_code acquire(value_type num);
lock_return_code acquire(value_type num, const completion_callback *cb);
void release(value_type num);
value_type value() const;
value_type pending() const;
......
......@@ -23,6 +23,7 @@ The transaction
Created 3/26/1996 Heikki Tuuri
*******************************************************/
#define MYSQL_SERVER
#include "trx0trx.h"
......@@ -1165,35 +1166,49 @@ trx_finalize_for_fts(
trx->fts_trx = NULL;
}
/**********************************************************************//**
If required, flushes the log to disk based on the value of
innodb_flush_log_at_trx_commit. */
static
void
trx_flush_log_if_needed_low(
/*========================*/
lsn_t lsn) /*!< in: lsn up to which logs are to be
flushed. */
extern "C" MYSQL_THD thd_increment_pending_ops();
extern "C" void thd_decrement_pending_ops(MYSQL_THD);
#include "../log/log0sync.h"
/*
If required, initiates write and optionally flush of the log to
disk
@param[in] lsn - lsn up to which logs are to be flushed.
@param[in] trx_state - if trx_state is PREPARED, the function will
also wait for the flush to complete.
*/
static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state)
{
bool flush = srv_file_flush_method != SRV_NOSYNC;
if (!srv_flush_log_at_trx_commit)
return;
switch (srv_flush_log_at_trx_commit) {
case 3:
case 2:
/* Write the log but do not flush it to disk */
flush = false;
/* fall through */
case 1:
/* Write the log and optionally flush it to disk */
log_write_up_to(lsn, flush);
srv_inc_activity_count();
return;
case 0:
/* Do nothing */
return;
}
if (log_sys.get_flushed_lsn() > lsn)
return;
ut_error;
bool flush= srv_file_flush_method != SRV_NOSYNC &&
srv_flush_log_at_trx_commit == 1;
if (trx_state == TRX_STATE_PREPARED)
{
/* XA, which is used with binlog as well.
Be conservative, use synchronous wait.*/
log_write_up_to(lsn, flush);
return;
}
completion_callback cb;
if ((cb.m_param = thd_increment_pending_ops()))
{
cb.m_callback = (void (*)(void *)) thd_decrement_pending_ops;
log_write_up_to(lsn, flush, false, &cb);
}
else
{
/* No THD, synchronous write */
log_write_up_to(lsn, flush);
}
}
/**********************************************************************//**
......@@ -1208,7 +1223,7 @@ trx_flush_log_if_needed(
trx_t* trx) /*!< in/out: transaction */
{
trx->op_info = "flushing log";
trx_flush_log_if_needed_low(lsn);
trx_flush_log_if_needed_low(lsn,trx->state);
trx->op_info = "";
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment