Commit a48c1b89 authored by Daniel Black's avatar Daniel Black

MDEV-24670 memory pressure - eventfd rather than pipe

Eventfds have a simplier interface and are one file
descriptor rather than two.

Reuse the patten of the accepting socket connections
by testing for abort after a poll returns. This way
the same event descriptor can be used for Quit
and debugging trigger.

Also correct the registration of mem pressure file
descriptors.
parent f2bd662f
...@@ -744,6 +744,7 @@ bool buf_page_is_corrupted(bool check_lsn, const byte *read_buf, ...@@ -744,6 +744,7 @@ bool buf_page_is_corrupted(bool check_lsn, const byte *read_buf,
#ifdef __linux__ #ifdef __linux__
#include <poll.h> #include <poll.h>
#include <sys/eventfd.h>
#include <fstream> #include <fstream>
/** Memory Pressure /** Memory Pressure
...@@ -752,14 +753,16 @@ based off https://www.kernel.org/doc/html/latest/accounting/psi.html#pressure-in ...@@ -752,14 +753,16 @@ based off https://www.kernel.org/doc/html/latest/accounting/psi.html#pressure-in
and https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory */ and https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory */
class mem_pressure class mem_pressure
{ {
/* triggers + pipe */ /* triggers + eventfd */
struct pollfd m_fds[3]; struct pollfd m_fds[3];
nfds_t m_num_fds; nfds_t m_num_fds;
int m_pipe[2]= {-1, -1}; int m_event_fd= -1;
Atomic_relaxed<bool> m_abort= false;
std::thread m_thd; std::thread m_thd;
/* mem pressure garbarge collection resticted to interval */ /* mem pressure garbage collection restricted to interval */
static constexpr ulonglong max_interval_us= 60*1000000; static constexpr ulonglong max_interval_us= 60*1000000;
public: public:
mem_pressure() : m_num_fds(0) {} mem_pressure() : m_num_fds(0) {}
...@@ -796,7 +799,7 @@ class mem_pressure ...@@ -796,7 +799,7 @@ class mem_pressure
shutdown(); shutdown();
return false; return false;
} }
my_register_filename(m_pipe[0], memcgroup.c_str(), FILE_BY_OPEN, 0, MYF(0)); my_register_filename(m_fds[m_num_fds].fd, memcgroup.c_str(), FILE_BY_OPEN, 0, MYF(0));
ssize_t slen= strlen(*trig); ssize_t slen= strlen(*trig);
if (write(m_fds[m_num_fds].fd, *trig, slen) < slen) if (write(m_fds[m_num_fds].fd, *trig, slen) < slen)
{ {
...@@ -810,15 +813,14 @@ class mem_pressure ...@@ -810,15 +813,14 @@ class mem_pressure
if (m_num_fds < 1) if (m_num_fds < 1)
return false; return false;
if (pipe2(m_pipe, O_CLOEXEC | O_DIRECT)) if ((m_event_fd= eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK)) == -1)
{ {
sql_print_warning("InnoDB: No memory pressure - can't create pipe"); sql_print_warning("InnoDB: No memory pressure - can't create eventfd");
shutdown(); shutdown();
return false; return false;
} }
my_register_filename(m_pipe[0], "mem_pressure_read", FILE_BY_DUP, 0, MYF(0)); my_register_filename(m_event_fd, "mem_pressure_eventfd", FILE_BY_DUP, 0, MYF(0));
my_register_filename(m_pipe[1], "mem_pressure_write", FILE_BY_DUP, 0, MYF(0)); m_fds[m_num_fds].fd= m_event_fd;
m_fds[m_num_fds].fd= m_pipe[0];
m_fds[m_num_fds].events= POLLIN; m_fds[m_num_fds].events= POLLIN;
m_num_fds++; m_num_fds++;
m_thd= std::thread(pressure_routine, this); m_thd= std::thread(pressure_routine, this);
...@@ -827,18 +829,13 @@ class mem_pressure ...@@ -827,18 +829,13 @@ class mem_pressure
void shutdown() void shutdown()
{ {
/* m_event_fd is in this list */
while (m_num_fds) while (m_num_fds)
{ {
m_num_fds--; m_num_fds--;
my_close(m_fds[m_num_fds].fd, MYF(MY_WME)); my_close(m_fds[m_num_fds].fd, MYF(MY_WME));
m_fds[m_num_fds].fd= -1; m_fds[m_num_fds].fd= -1;
} }
/* note m_pipe[0] closed in above loop */
if (m_pipe[1] >= 0)
{
my_close(m_pipe[1], MYF(MY_WME));
m_pipe[1]= -1;
}
} }
static void pressure_routine(mem_pressure *m); static void pressure_routine(mem_pressure *m);
...@@ -846,8 +843,8 @@ class mem_pressure ...@@ -846,8 +843,8 @@ class mem_pressure
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
void trigger_collection() void trigger_collection()
{ {
const char c= 'G'; uint64_t u= 1;
if (m_pipe[1] >=0 && write(m_pipe[1], &c, 1) < 1) if (m_event_fd >=0 && write(m_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
sql_print_information("InnoDB: (Debug) Failed to trigger memory pressure"); sql_print_information("InnoDB: (Debug) Failed to trigger memory pressure");
else /* assumed failed to meet intialization criteria, so trigger directy */ else /* assumed failed to meet intialization criteria, so trigger directy */
buf_pool.garbage_collect(); buf_pool.garbage_collect();
...@@ -856,8 +853,9 @@ class mem_pressure ...@@ -856,8 +853,9 @@ class mem_pressure
void quit() void quit()
{ {
const char c= 'Q'; uint64_t u= 1;
if (m_num_fds && write(m_pipe[1], &c, 1) < 1) m_abort= true;
if (write(m_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
sql_print_warning("InnoDB: Failed to write memory pressure quit message"); sql_print_warning("InnoDB: Failed to write memory pressure quit message");
} }
...@@ -896,7 +894,7 @@ void mem_pressure::pressure_routine(mem_pressure *m) ...@@ -896,7 +894,7 @@ void mem_pressure::pressure_routine(mem_pressure *m)
} }
ulonglong last= microsecond_interval_timer() - max_interval_us; ulonglong last= microsecond_interval_timer() - max_interval_us;
while (1) while (!m->m_abort)
{ {
if (poll(&m->m_fds[0], m->m_num_fds, -1) < 0) if (poll(&m->m_fds[0], m->m_num_fds, -1) < 0)
{ {
...@@ -909,6 +907,9 @@ void mem_pressure::pressure_routine(mem_pressure *m) ...@@ -909,6 +907,9 @@ void mem_pressure::pressure_routine(mem_pressure *m)
break; break;
} }
} }
if (!m->m_abort)
break;
for (pollfd &p : st_::span<pollfd>(m->m_fds, m->m_num_fds)) for (pollfd &p : st_::span<pollfd>(m->m_fds, m->m_num_fds))
{ {
if (p.revents & POLLPRI) if (p.revents & POLLPRI)
...@@ -921,23 +922,17 @@ void mem_pressure::pressure_routine(mem_pressure *m) ...@@ -921,23 +922,17 @@ void mem_pressure::pressure_routine(mem_pressure *m)
} }
} }
#ifdef UNIV_DEBUG
if (p.revents & POLLIN) if (p.revents & POLLIN)
{ {
char c= '\0'; uint64_t u;
/* signal to quit */ /* we haven't aborted, so this must be a debug trigger */
if (read(p.fd, &c, 1) >=0) if (read(p.fd, &u, sizeof(u)) >=0)
switch (c) { buf_pool.garbage_collect();
case 'Q':
goto shutdown;
#ifdef UNIV_DEBUG
case 'G':
buf_pool.garbage_collect();
#endif
}
} }
#endif
} }
} }
shutdown:
m->shutdown(); m->shutdown();
my_thread_end(); my_thread_end();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment