Commit 01466adc authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-32841 Provide Innodb async IO statistics

Provide some statistics about asynchronous IO reads and writes:
 - number of pending operations
 - number of completion callbacks that are currently being executed
 - number of completion callbacks that are currently queued
   (due to restriction on number of IO threads)
 - total number of IOs finished
 - total time to wait for free IO slot
 - total number of completions that were queued.

Also revert tpool InnoDB perfschema instrumentation (MDEV-31048)
That instrumentation of cache mutex did not bring any revelation (
the mutex is taken for a couple of instructions), and made it impossible
to use tpool outside of the server (e.g in mariadbimport/dump)
parent 374783c3
...@@ -5,6 +5,18 @@ AND variable_name NOT IN ...@@ -5,6 +5,18 @@ AND variable_name NOT IN
'INNODB_MEM_ADAPTIVE_HASH', 'INNODB_MEM_ADAPTIVE_HASH',
'INNODB_BUFFERED_AIO_SUBMITTED','INNODB_BUFFER_POOL_PAGES_LATCHED'); 'INNODB_BUFFERED_AIO_SUBMITTED','INNODB_BUFFER_POOL_PAGES_LATCHED');
variable_name variable_name
INNODB_ASYNC_READS_PENDING
INNODB_ASYNC_READS_TASKS_RUNNING
INNODB_ASYNC_READS_TOTAL_COUNT
INNODB_ASYNC_READS_TOTAL_ENQUEUES
INNODB_ASYNC_READS_QUEUE_SIZE
INNODB_ASYNC_READS_WAIT_SLOT_SEC
INNODB_ASYNC_WRITES_PENDING
INNODB_ASYNC_WRITES_TASKS_RUNNING
INNODB_ASYNC_WRITES_TOTAL_COUNT
INNODB_ASYNC_WRITES_TOTAL_ENQUEUES
INNODB_ASYNC_WRITES_QUEUE_SIZE
INNODB_ASYNC_WRITES_WAIT_SLOT_SEC
INNODB_BACKGROUND_LOG_SYNC INNODB_BACKGROUND_LOG_SYNC
INNODB_BUFFER_POOL_DUMP_STATUS INNODB_BUFFER_POOL_DUMP_STATUS
INNODB_BUFFER_POOL_LOAD_STATUS INNODB_BUFFER_POOL_LOAD_STATUS
......
...@@ -526,7 +526,6 @@ mysql_pfs_key_t trx_pool_manager_mutex_key; ...@@ -526,7 +526,6 @@ mysql_pfs_key_t trx_pool_manager_mutex_key;
mysql_pfs_key_t lock_wait_mutex_key; mysql_pfs_key_t lock_wait_mutex_key;
mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t trx_sys_mutex_key;
mysql_pfs_key_t srv_threads_mutex_key; mysql_pfs_key_t srv_threads_mutex_key;
mysql_pfs_key_t tpool_cache_mutex_key;
/* all_innodb_mutexes array contains mutexes that are /* all_innodb_mutexes array contains mutexes that are
performance schema instrumented if "UNIV_PFS_MUTEX" performance schema instrumented if "UNIV_PFS_MUTEX"
...@@ -558,7 +557,6 @@ static PSI_mutex_info all_innodb_mutexes[] = { ...@@ -558,7 +557,6 @@ static PSI_mutex_info all_innodb_mutexes[] = {
PSI_KEY(rtr_match_mutex), PSI_KEY(rtr_match_mutex),
PSI_KEY(rtr_path_mutex), PSI_KEY(rtr_path_mutex),
PSI_KEY(trx_sys_mutex), PSI_KEY(trx_sys_mutex),
PSI_KEY(tpool_cache_mutex),
}; };
# endif /* UNIV_PFS_MUTEX */ # endif /* UNIV_PFS_MUTEX */
...@@ -886,6 +884,32 @@ static SHOW_VAR innodb_status_variables[]= { ...@@ -886,6 +884,32 @@ static SHOW_VAR innodb_status_variables[]= {
{"adaptive_hash_non_hash_searches", {"adaptive_hash_non_hash_searches",
&export_vars.innodb_ahi_miss, SHOW_SIZE_T}, &export_vars.innodb_ahi_miss, SHOW_SIZE_T},
#endif #endif
{"async_reads_pending",
&export_vars.async_read_stats.pending_ops, SHOW_SIZE_T},
{"async_reads_tasks_running",
&export_vars.async_read_stats.completion_stats.tasks_running, SHOW_SIZE_T},
{"async_reads_total_count",
&export_vars.async_read_stats.completion_stats.total_tasks_executed,SHOW_ULONGLONG},
{"async_reads_total_enqueues",
&export_vars.async_read_stats.completion_stats.total_tasks_enqueued,SHOW_ULONGLONG},
{"async_reads_queue_size",
&export_vars.async_read_stats.completion_stats.queue_size, SHOW_SIZE_T},
{"async_reads_wait_slot_sec",
&export_vars.async_read_stats.slot_wait_time_sec, SHOW_DOUBLE},
{"async_writes_pending",
&export_vars.async_write_stats.pending_ops,SHOW_SIZE_T},
{"async_writes_tasks_running",
&export_vars.async_write_stats.completion_stats.tasks_running, SHOW_SIZE_T},
{"async_writes_total_count",
&export_vars.async_write_stats.completion_stats.total_tasks_executed, SHOW_ULONGLONG},
{"async_writes_total_enqueues",
&export_vars.async_write_stats.completion_stats.total_tasks_enqueued, SHOW_ULONGLONG},
{"async_writes_queue_size",
&export_vars.async_write_stats.completion_stats.queue_size, SHOW_SIZE_T},
{"async_writes_wait_slot_sec",
&export_vars.async_write_stats.slot_wait_time_sec, SHOW_DOUBLE},
{"background_log_sync", &srv_log_writes_and_flush, SHOW_SIZE_T}, {"background_log_sync", &srv_log_writes_and_flush, SHOW_SIZE_T},
{"buffer_pool_dump_status", {"buffer_pool_dump_status",
(char*) &export_vars.innodb_buffer_pool_dump_status, SHOW_CHAR}, (char*) &export_vars.innodb_buffer_pool_dump_status, SHOW_CHAR},
......
...@@ -1187,4 +1187,44 @@ inline bool is_absolute_path(const char *path) ...@@ -1187,4 +1187,44 @@ inline bool is_absolute_path(const char *path)
#include "os0file.inl" #include "os0file.inl"
/**
Structure used for async io statistics
There is one instance of this structure for each operation type
(read or write)
*/
struct innodb_async_io_stats_t
{
/**
Current of submitted and not yet finished IOs.
IO is considered finished when it finished in the OS
*and* the completion callback has been called
*/
size_t pending_ops;
/**
Time, in seconds, spent waiting for a slot to become
available. There is a limited number of slots for async IO
operations. If all slots are in use, the IO submission has
to wait.
*/
double slot_wait_time_sec;
/**
Information related to IO completion callbacks.
- number of tasks currently running (<= innodb_read/write_io_threads)
- total number of tasks that have been completed
- current task queue size . Queueing happens if running tasks is
maxed out (equal to innodb_read/write_io_threads)
- total number of tasks that have been queued
*/
tpool::group_stats completion_stats;
};
/**
Statistics for asynchronous I/O
@param[in] op operation - aio_opcode::AIO_PREAD or aio_opcode::AIO_PWRITE
@param[in] stats - structure to fill
*/
extern void innodb_io_slots_stats(tpool::aio_opcode op,
innodb_async_io_stats_t *stats);
#endif /* os0file_h */ #endif /* os0file_h */
...@@ -573,6 +573,8 @@ struct export_var_t{ ...@@ -573,6 +573,8 @@ struct export_var_t{
ulint innodb_ahi_hit; ulint innodb_ahi_hit;
ulint innodb_ahi_miss; ulint innodb_ahi_miss;
#endif /* BTR_CUR_HASH_ADAPT */ #endif /* BTR_CUR_HASH_ADAPT */
innodb_async_io_stats_t async_read_stats;
innodb_async_io_stats_t async_write_stats;
char innodb_buffer_pool_dump_status[OS_FILE_MAX_PATH + 128];/*!< Buf pool dump status */ char innodb_buffer_pool_dump_status[OS_FILE_MAX_PATH + 128];/*!< Buf pool dump status */
char innodb_buffer_pool_load_status[OS_FILE_MAX_PATH + 128];/*!< Buf pool load status */ char innodb_buffer_pool_load_status[OS_FILE_MAX_PATH + 128];/*!< Buf pool load status */
char innodb_buffer_pool_resize_status[512];/*!< Buf pool resize status */ char innodb_buffer_pool_resize_status[512];/*!< Buf pool resize status */
......
...@@ -80,7 +80,7 @@ Created 10/21/1995 Heikki Tuuri ...@@ -80,7 +80,7 @@ Created 10/21/1995 Heikki Tuuri
class io_slots class io_slots
{ {
private: private:
tpool::cache<tpool::aiocb> m_cache; tpool::cache<tpool::aiocb, true> m_cache;
tpool::task_group m_group; tpool::task_group m_group;
int m_max_aio; int m_max_aio;
public: public:
...@@ -106,9 +106,9 @@ class io_slots ...@@ -106,9 +106,9 @@ class io_slots
} }
/* Wait for completions of all AIO operations */ /* Wait for completions of all AIO operations */
void wait(mysql_mutex_t &m) void wait(std::unique_lock<std::mutex> &lk)
{ {
m_cache.wait(m); m_cache.wait(lk);
} }
void wait() void wait()
...@@ -121,6 +121,11 @@ class io_slots ...@@ -121,6 +121,11 @@ class io_slots
return m_cache.pos(); return m_cache.pos();
} }
std::chrono::duration<float> wait_time()
{
return m_cache.wait_time();
}
tpool::task_group* get_task_group() tpool::task_group* get_task_group()
{ {
return &m_group; return &m_group;
...@@ -131,7 +136,7 @@ class io_slots ...@@ -131,7 +136,7 @@ class io_slots
wait(); wait();
} }
mysql_mutex_t& mutex() std::mutex &mutex()
{ {
return m_cache.mutex(); return m_cache.mutex();
} }
...@@ -152,6 +157,22 @@ class io_slots ...@@ -152,6 +157,22 @@ class io_slots
static io_slots *read_slots; static io_slots *read_slots;
static io_slots *write_slots; static io_slots *write_slots;
/**
Statistics for asynchronous I/O
@param[in] op operation type (aio_opcode::AIO_PREAD or aio_opcode::AIO_PWRITE)
@param[in] stats pointer to the structure to fill
*/
void innodb_io_slots_stats(tpool::aio_opcode op, innodb_async_io_stats_t *stats)
{
io_slots *slots= op == tpool::aio_opcode::AIO_PREAD? read_slots : write_slots;
stats->pending_ops = slots->pending_io_count();
stats->slot_wait_time_sec=
std::chrono::duration_cast<std::chrono::duration<float>>(
slots->wait_time()).count();
slots->task_group().get_stats(&stats->completion_stats);
}
/** Number of retries for partial I/O's */ /** Number of retries for partial I/O's */
constexpr ulint NUM_RETRIES_ON_PARTIAL_IO = 10; constexpr ulint NUM_RETRIES_ON_PARTIAL_IO = 10;
...@@ -3623,9 +3644,8 @@ more concurrent threads via thread_group setting. ...@@ -3623,9 +3644,8 @@ more concurrent threads via thread_group setting.
int os_aio_resize(ulint n_reader_threads, ulint n_writer_threads) int os_aio_resize(ulint n_reader_threads, ulint n_writer_threads)
{ {
/* Lock the slots, and wait until all current IOs finish.*/ /* Lock the slots, and wait until all current IOs finish.*/
auto &lk_read= read_slots->mutex(), &lk_write= write_slots->mutex(); std::unique_lock<std::mutex> lk_read(read_slots->mutex()),
mysql_mutex_lock(&lk_read); lk_write(write_slots->mutex());
mysql_mutex_lock(&lk_write);
read_slots->wait(lk_read); read_slots->wait(lk_read);
write_slots->wait(lk_write); write_slots->wait(lk_write);
...@@ -3653,9 +3673,6 @@ int os_aio_resize(ulint n_reader_threads, ulint n_writer_threads) ...@@ -3653,9 +3673,6 @@ int os_aio_resize(ulint n_reader_threads, ulint n_writer_threads)
read_slots->resize(max_read_events, static_cast<int>(n_reader_threads)); read_slots->resize(max_read_events, static_cast<int>(n_reader_threads));
write_slots->resize(max_write_events, static_cast<int>(n_writer_threads)); write_slots->resize(max_write_events, static_cast<int>(n_writer_threads));
} }
mysql_mutex_unlock(&lk_read);
mysql_mutex_unlock(&lk_write);
return ret; return ret;
} }
...@@ -3693,10 +3710,8 @@ void os_aio_wait_until_no_pending_writes(bool declare) ...@@ -3693,10 +3710,8 @@ void os_aio_wait_until_no_pending_writes(bool declare)
/** @return number of pending reads */ /** @return number of pending reads */
size_t os_aio_pending_reads() size_t os_aio_pending_reads()
{ {
mysql_mutex_lock(&read_slots->mutex()); std::lock_guard<std::mutex> lock(read_slots->mutex());
size_t pending= read_slots->pending_io_count(); return read_slots->pending_io_count();
mysql_mutex_unlock(&read_slots->mutex());
return pending;
} }
/** @return approximate number of pending reads */ /** @return approximate number of pending reads */
...@@ -3708,10 +3723,8 @@ size_t os_aio_pending_reads_approx() ...@@ -3708,10 +3723,8 @@ size_t os_aio_pending_reads_approx()
/** @return number of pending writes */ /** @return number of pending writes */
size_t os_aio_pending_writes() size_t os_aio_pending_writes()
{ {
mysql_mutex_lock(&write_slots->mutex()); std::lock_guard<std::mutex> lock(write_slots->mutex());
size_t pending= write_slots->pending_io_count(); return write_slots->pending_io_count();
mysql_mutex_unlock(&write_slots->mutex());
return pending;
} }
/** Wait until all pending asynchronous reads have completed. /** Wait until all pending asynchronous reads have completed.
......
...@@ -813,6 +813,9 @@ srv_printf_innodb_monitor( ...@@ -813,6 +813,9 @@ srv_printf_innodb_monitor(
return(ret); return(ret);
} }
void innodb_io_slots_stats(tpool::aio_opcode op,
innodb_async_io_stats_t *stats);
/******************************************************************//** /******************************************************************//**
Function to pass InnoDB status variables to MySQL */ Function to pass InnoDB status variables to MySQL */
void void
...@@ -824,6 +827,10 @@ srv_export_innodb_status(void) ...@@ -824,6 +827,10 @@ srv_export_innodb_status(void)
if (!srv_read_only_mode) { if (!srv_read_only_mode) {
fil_crypt_total_stat(&crypt_stat); fil_crypt_total_stat(&crypt_stat);
} }
innodb_io_slots_stats(tpool::aio_opcode::AIO_PREAD,
&export_vars.async_read_stats);
innodb_io_slots_stats(tpool::aio_opcode::AIO_PWRITE,
&export_vars.async_write_stats);
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
export_vars.innodb_ahi_hit = btr_cur_n_sea; export_vars.innodb_ahi_hit = btr_cur_n_sea;
......
...@@ -42,6 +42,8 @@ namespace tpool ...@@ -42,6 +42,8 @@ namespace tpool
: :
m_queue(8), m_queue(8),
m_mtx(), m_mtx(),
m_total_tasks(0),
m_total_enqueues(0),
m_tasks_running(), m_tasks_running(),
m_max_concurrent_tasks(max_concurrency), m_max_concurrent_tasks(max_concurrency),
m_enable_task_release(enable_task_release) m_enable_task_release(enable_task_release)
...@@ -59,6 +61,7 @@ namespace tpool ...@@ -59,6 +61,7 @@ namespace tpool
{ {
/* Queue for later execution by another thread.*/ /* Queue for later execution by another thread.*/
m_queue.push(t); m_queue.push(t);
m_total_enqueues++;
return; return;
} }
m_tasks_running++; m_tasks_running++;
...@@ -72,7 +75,7 @@ namespace tpool ...@@ -72,7 +75,7 @@ namespace tpool
t->release(); t->release();
} }
lk.lock(); lk.lock();
m_total_tasks++;
if (m_queue.empty()) if (m_queue.empty())
break; break;
t = m_queue.front(); t = m_queue.front();
...@@ -96,6 +99,15 @@ namespace tpool ...@@ -96,6 +99,15 @@ namespace tpool
} }
} }
void task_group::get_stats(group_stats *stats)
{
std::lock_guard<std::mutex> lk(m_mtx);
stats->tasks_running= m_tasks_running;
stats->queue_size= m_queue.size();
stats->total_tasks_executed= m_total_tasks;
stats->total_tasks_enqueued= m_total_enqueues;
}
task_group::~task_group() task_group::~task_group()
{ {
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
......
...@@ -57,6 +57,18 @@ typedef void (*callback_func)(void *); ...@@ -57,6 +57,18 @@ typedef void (*callback_func)(void *);
typedef void (*callback_func_np)(void); typedef void (*callback_func_np)(void);
class task; class task;
struct group_stats
{
/** Current number of running tasks*/
size_t tasks_running;
/** Current number of tasks in the queue*/
size_t queue_size;
/** Total number of tasks executed */
unsigned long long total_tasks_executed;
/** Total number of tasks enqueued */
unsigned long long total_tasks_enqueued;
};
/** A class that can be used e.g. for /** A class that can be used e.g. for
restricting concurrency for specific class of tasks. */ restricting concurrency for specific class of tasks. */
...@@ -66,6 +78,8 @@ class task_group ...@@ -66,6 +78,8 @@ class task_group
circular_queue<task*> m_queue; circular_queue<task*> m_queue;
std::mutex m_mtx; std::mutex m_mtx;
std::condition_variable m_cv; std::condition_variable m_cv;
unsigned long long m_total_tasks;
unsigned long long m_total_enqueues;
unsigned int m_tasks_running; unsigned int m_tasks_running;
unsigned int m_max_concurrent_tasks; unsigned int m_max_concurrent_tasks;
const bool m_enable_task_release; const bool m_enable_task_release;
...@@ -75,6 +89,7 @@ class task_group ...@@ -75,6 +89,7 @@ class task_group
void set_max_tasks(unsigned int max_concurrent_tasks); void set_max_tasks(unsigned int max_concurrent_tasks);
void execute(task* t); void execute(task* t);
void cancel_pending(task *t); void cancel_pending(task *t);
void get_stats(group_stats* stats);
~task_group(); ~task_group();
}; };
......
...@@ -20,7 +20,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ ...@@ -20,7 +20,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <stack> #include <stack>
#include <assert.h> #include <assert.h>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <condition_variable>
#include <mutex>
/* Suppress TSAN warnings, that we believe are not critical. */ /* Suppress TSAN warnings, that we believe are not critical. */
#if defined(__has_feature) #if defined(__has_feature)
#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__) #define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__)
...@@ -36,10 +38,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ ...@@ -36,10 +38,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#define TPOOL_SUPPRESS_TSAN #define TPOOL_SUPPRESS_TSAN
#endif #endif
#ifdef HAVE_PSI_INTERFACE
typedef unsigned int mysql_pfs_key_t;
extern mysql_pfs_key_t tpool_cache_mutex_key;
#endif
namespace tpool namespace tpool
{ {
...@@ -56,16 +54,16 @@ namespace tpool ...@@ -56,16 +54,16 @@ namespace tpool
We assume that put() will only put back the elements that We assume that put() will only put back the elements that
were retrieved previously with get(). were retrieved previously with get().
*/ */
template<typename T> class cache template<typename T, bool timed=false> class cache
{ {
/** Protects updates of m_pos and m_cache members */ /** Protects updates of m_pos and m_cache members */
mysql_mutex_t m_mtx; std::mutex m_mtx;
/** /**
Notify waiting threads about "cache full" or "cache not empty" conditions Notify waiting threads about "cache full" or "cache not empty" conditions
@see get() and wait() @see get() and wait()
*/ */
pthread_cond_t m_cv; std::condition_variable m_cv;
/** Cached items vector.Does not change after construction */ /** Cached items vector.Does not change after construction */
std::vector<T> m_base; std::vector<T> m_base;
...@@ -84,6 +82,12 @@ template<typename T> class cache ...@@ -84,6 +82,12 @@ template<typename T> class cache
/** Current cache size. Protected by m_mtx*/ /** Current cache size. Protected by m_mtx*/
size_t m_pos; size_t m_pos;
/**
Total time spent waiting on entries in cache, inside get()
Only valid if timed template parameter is true.
*/
std::chrono::duration<float> m_wait_time;
private: private:
inline size_t capacity() inline size_t capacity()
...@@ -107,27 +111,33 @@ template<typename T> class cache ...@@ -107,27 +111,33 @@ template<typename T> class cache
return m_pos == capacity(); return m_pos == capacity();
} }
/**
Wait on condition. Instrumented (wait time is recorded),
if timed template parameter is true.
*/
void condition_wait(std::unique_lock<std::mutex>& lock)
{
if (timed)
{
auto start= std::chrono::high_resolution_clock::now();
m_cv.wait(lock);
m_wait_time+= std::chrono::high_resolution_clock::now() - start;
}
else
m_cv.wait(lock);
}
public: public:
/** /**
Constructor Constructor
@param size - maximum number of items in cache @param size - maximum number of items in cache
*/ */
cache(size_t size) : m_base(size), m_cache(size), cache(size_t size) :m_mtx(), m_cv(), m_base(size), m_cache(size),
m_waiters(), m_pos(0) m_waiters(), m_pos(0), m_wait_time()
{ {
mysql_mutex_init(tpool_cache_mutex_key, &m_mtx, nullptr);
pthread_cond_init(&m_cv, nullptr);
for(size_t i= 0 ; i < size; i++) for(size_t i= 0 ; i < size; i++)
m_cache[i]= &m_base[i]; m_cache[i]= &m_base[i];
} }
~cache()
{
mysql_mutex_destroy(&m_mtx);
pthread_cond_destroy(&m_cv);
}
/** /**
Retrieve an item from cache. Waits for free item, if cache is Retrieve an item from cache. Waits for free item, if cache is
currently empty. currently empty.
...@@ -135,17 +145,16 @@ template<typename T> class cache ...@@ -135,17 +145,16 @@ template<typename T> class cache
*/ */
T* get() T* get()
{ {
mysql_mutex_lock(&m_mtx); std::unique_lock<std::mutex> lock(m_mtx);
while (is_empty()) while (is_empty())
my_cond_wait(&m_cv, &m_mtx.m_mutex); condition_wait(lock);
assert(m_pos < capacity()); assert(m_pos < capacity());
// return last element // return last element
T *t= m_cache[m_pos++]; T *t= m_cache[m_pos++];
mysql_mutex_unlock(&m_mtx);
return t; return t;
} }
mysql_mutex_t &mutex() { return m_mtx; } std::mutex &mutex() { return m_mtx; }
/** /**
Put back an element to cache. Put back an element to cache.
...@@ -153,15 +162,14 @@ template<typename T> class cache ...@@ -153,15 +162,14 @@ template<typename T> class cache
*/ */
void put(T *ele) void put(T *ele)
{ {
mysql_mutex_lock(&m_mtx); std::unique_lock<std::mutex> lock(m_mtx);
assert(!is_full()); assert(!is_full());
const bool was_empty= is_empty(); const bool was_empty= is_empty();
// put element to the logical end of the array // put element to the logical end of the array
m_cache[--m_pos] = ele; m_cache[--m_pos] = ele;
if (was_empty || (is_full() && m_waiters)) if (was_empty || (is_full() && m_waiters))
pthread_cond_broadcast(&m_cv); m_cv.notify_all();
mysql_mutex_unlock(&m_mtx);
} }
/** Check if pointer represents cached element */ /** Check if pointer represents cached element */
...@@ -172,22 +180,20 @@ template<typename T> class cache ...@@ -172,22 +180,20 @@ template<typename T> class cache
} }
/** Wait until cache is full /** Wait until cache is full
@param m cache mutex (locked) */ @param lock */
void wait(mysql_mutex_t &m) void wait(std::unique_lock<std::mutex> &lk)
{ {
mysql_mutex_assert_owner(&m);
m_waiters++; m_waiters++;
while (!is_full()) while (!is_full())
my_cond_wait(&m_cv, &m.m_mutex); m_cv.wait(lk);
m_waiters--; m_waiters--;
} }
/* Wait until cache is full.*/ /* Wait until cache is full.*/
void wait() void wait()
{ {
mysql_mutex_lock(&m_mtx); std::unique_lock<std::mutex> lock(m_mtx);
wait(m_mtx); wait(lock);
mysql_mutex_unlock(&m_mtx);
} }
/** /**
...@@ -199,9 +205,13 @@ template<typename T> class cache ...@@ -199,9 +205,13 @@ template<typename T> class cache
return m_pos; return m_pos;
} }
TPOOL_SUPPRESS_TSAN std::chrono::duration<float> wait_time()
{
return m_wait_time;
}
void resize(size_t count) void resize(size_t count)
{ {
mysql_mutex_assert_owner(&m_mtx);
assert(is_full()); assert(is_full());
m_base.resize(count); m_base.resize(count);
m_cache.resize(count); m_cache.resize(count);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment