Commit 18173ddf authored by Nirbhay Choubey's avatar Nirbhay Choubey

MDEV-9162 : MariaDB Galera Cluster memory leak on async slave node

As galera node (slave) received query log events from an async
replication master, it partially wrote the updates made to replication
state table (mysql.gtid_slave_pos) to galera transaction writeset post
TOI. As a result, the transaction handle, thus created within galera,
was never freed/purged as the corresponding trx did not commit.
Thus, it kept piling up for every query log event and was only reclaimed
upon server shutdown when the transaction map object got destructed.
Fixed by making sure that updates in replication slave state table
are not written to galera transaction writeset and thus, not replicated
to other nodes.
parent 4437f516
...@@ -574,6 +574,14 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ...@@ -574,6 +574,14 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if ((err= gtid_check_rpl_slave_state_table(table))) if ((err= gtid_check_rpl_slave_state_table(table)))
goto end; goto end;
#ifdef WITH_WSREP
/*
Updates in slave state table should not be appended to galera transaction
writeset.
*/
thd->wsrep_skip_append_keys= true;
#endif
if (!in_transaction) if (!in_transaction)
{ {
DBUG_PRINT("info", ("resetting OPTION_BEGIN")); DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
...@@ -687,6 +695,10 @@ IF_DBUG(dbug_break:, ) ...@@ -687,6 +695,10 @@ IF_DBUG(dbug_break:, )
end: end:
#ifdef WITH_WSREP
thd->wsrep_skip_append_keys= false;
#endif
if (table_opened) if (table_opened)
{ {
if (err || (err= ha_commit_trans(thd, FALSE))) if (err || (err= ha_commit_trans(thd, FALSE)))
......
...@@ -962,6 +962,10 @@ extern "C" int wsrep_thd_retry_counter(THD *thd) ...@@ -962,6 +962,10 @@ extern "C" int wsrep_thd_retry_counter(THD *thd)
{ {
return(thd->wsrep_retry_counter); return(thd->wsrep_retry_counter);
} }
extern "C" bool wsrep_thd_skip_append_keys(THD *thd)
{
return thd->wsrep_skip_append_keys;
}
extern int extern int
wsrep_trx_order_before(void *thd1, void *thd2) wsrep_trx_order_before(void *thd1, void *thd2)
...@@ -1083,7 +1087,8 @@ THD::THD() ...@@ -1083,7 +1087,8 @@ THD::THD()
wsrep_po_cnt(0), wsrep_po_cnt(0),
wsrep_po_in_trans(FALSE), wsrep_po_in_trans(FALSE),
wsrep_apply_format(0), wsrep_apply_format(0),
wsrep_apply_toi(false) wsrep_apply_toi(false),
wsrep_skip_append_keys(false)
#endif #endif
{ {
ulong tmp; ulong tmp;
......
...@@ -3863,6 +3863,7 @@ public: ...@@ -3863,6 +3863,7 @@ public:
#endif /* GTID_SUPPORT */ #endif /* GTID_SUPPORT */
void* wsrep_apply_format; void* wsrep_apply_format;
bool wsrep_apply_toi; /* applier processing in TOI */ bool wsrep_apply_toi; /* applier processing in TOI */
bool wsrep_skip_append_keys;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
}; };
......
...@@ -180,7 +180,7 @@ extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); ...@@ -180,7 +180,7 @@ extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); extern "C" void wsrep_thd_awake(THD *thd, my_bool signal);
extern "C" int wsrep_thd_retry_counter(THD *thd); extern "C" int wsrep_thd_retry_counter(THD *thd);
extern "C" bool wsrep_thd_skip_append_keys(THD *thd);
extern void wsrep_close_client_connections(my_bool wait_to_end); extern void wsrep_close_client_connections(my_bool wait_to_end);
extern int wsrep_wait_committing_connections_close(int wait_time); extern int wsrep_wait_committing_connections_close(int wait_time);
...@@ -195,7 +195,6 @@ extern bool wsrep_start_replication(); ...@@ -195,7 +195,6 @@ extern bool wsrep_start_replication();
extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ); extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ);
extern int wsrep_check_opts (int argc, char* const* argv); extern int wsrep_check_opts (int argc, char* const* argv);
extern void wsrep_prepend_PATH (const char* path); extern void wsrep_prepend_PATH (const char* path);
/* some inline functions are defined in wsrep_mysqld_inl.h */
/* Other global variables */ /* Other global variables */
extern wsrep_seqno_t wsrep_locked_seqno; extern wsrep_seqno_t wsrep_locked_seqno;
......
...@@ -7760,7 +7760,8 @@ report_error: ...@@ -7760,7 +7760,8 @@ report_error:
if (!error_result && if (!error_result &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd) && wsrep_on(user_thd) &&
!wsrep_consistency_check(user_thd)) !wsrep_consistency_check(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{ {
if (wsrep_append_keys(user_thd, false, record, NULL)) if (wsrep_append_keys(user_thd, false, record, NULL))
{ {
...@@ -8278,10 +8279,11 @@ func_exit: ...@@ -8278,10 +8279,11 @@ func_exit:
innobase_active_small(); innobase_active_small();
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error == DB_SUCCESS && if (error == DB_SUCCESS &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd)) { wsrep_on(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{
DBUG_PRINT("wsrep", ("update row key")); DBUG_PRINT("wsrep", ("update row key"));
if (wsrep_append_keys(user_thd, false, old_row, new_row)) { if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
...@@ -8343,10 +8345,11 @@ ha_innobase::delete_row( ...@@ -8343,10 +8345,11 @@ ha_innobase::delete_row(
innobase_active_small(); innobase_active_small();
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error == DB_SUCCESS && if (error == DB_SUCCESS &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd)) { wsrep_on(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{
if (wsrep_append_keys(user_thd, false, record, NULL)) { if (wsrep_append_keys(user_thd, false, record, NULL)) {
DBUG_PRINT("wsrep", ("delete fail")); DBUG_PRINT("wsrep", ("delete fail"));
error = (dberr_t)HA_ERR_INTERNAL_ERROR; error = (dberr_t)HA_ERR_INTERNAL_ERROR;
......
...@@ -8381,7 +8381,8 @@ report_error: ...@@ -8381,7 +8381,8 @@ report_error:
if (!error_result && if (!error_result &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd) && wsrep_on(user_thd) &&
!wsrep_consistency_check(user_thd)) !wsrep_consistency_check(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{ {
if (wsrep_append_keys(user_thd, false, record, NULL)) if (wsrep_append_keys(user_thd, false, record, NULL))
{ {
...@@ -8911,10 +8912,11 @@ func_exit: ...@@ -8911,10 +8912,11 @@ func_exit:
innobase_active_small(); innobase_active_small();
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error == DB_SUCCESS && if (error == DB_SUCCESS &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd)) { wsrep_on(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{
DBUG_PRINT("wsrep", ("update row key")); DBUG_PRINT("wsrep", ("update row key"));
if (wsrep_append_keys(user_thd, false, old_row, new_row)) { if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
...@@ -8990,9 +8992,11 @@ ha_innobase::delete_row( ...@@ -8990,9 +8992,11 @@ ha_innobase::delete_row(
innobase_active_small(); innobase_active_small();
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error == DB_SUCCESS && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE && if (error == DB_SUCCESS &&
wsrep_on(user_thd)) { wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd) &&
!wsrep_thd_skip_append_keys(user_thd))
{
if (wsrep_append_keys(user_thd, false, record, NULL)) { if (wsrep_append_keys(user_thd, false, record, NULL)) {
DBUG_PRINT("wsrep", ("delete fail")); DBUG_PRINT("wsrep", ("delete fail"));
error = DB_ERROR; error = DB_ERROR;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment