Commit d8682d10 authored by Sujatha's avatar Sujatha

MDEV-20220: Merge 5.7 P_S replication table 'replication_applier_status_by_worker

Fix:
===
Iterate through rpl_parallel_thread_pool and display slave worker thread
specific information as part of 'replication_applier_status_by_worker' table.

---------------------------------------------------------------------------------
|Column Name:           |        Description:                                   |
|-------------------------------------------------------------------------------|
|                       |                                                       |
|THREAD_ID              | Thread_Id as displayed in 'performance_schema.threads'|
|                       | table for thread with name                            |
|                       | 'thread/sql/rpl_parallel_thread'                      |
|                       |                                                       |
|                       | THREAD_ID will be NULL when worker threads are stopped|
|                       | due to error/force stop                               |
|                       |                                                       |
|SERVICE_STATE          | Thread is running or not                              |
|                       |                                                       |
|LAST_SEEN_TRANSACTION  | Last GTID executed by worker                          |
|                       |                                                       |
|LAST_ERROR_NUMBER      | Last Error that occurred on a particular worker       |
|                       |                                                       |
|LAST_ERROR_MESSAGE     | Last error specific message                           |
|                       |                                                       |
|LAST_ERROR_TIMESTAMP   | Time stamp of last error                              |
|                       |                                                       |
|WORKER_IDLE_TIME       | Total idle time in seconds that the worker thread has |
|                       | spent waiting for work from SQL thread                |
|                       |                                                       |
|LAST_TRANS_RETRY_COUNT | Total number of retries attempted by last transaction |
---------------------------------------------------------------------------------


In case STOP SLAVE is executed worker threads will be gone, hence worker
threads will be unavailable. Querying the table at this stage will give empty
rows. To address this case when worker threads are about to stop, due to an
error or forced stop, create a backup pool and preserve the data which is
relevant to populate performance schema table. Clear the backup pool upon
slave start.
parent c3d4e571
......@@ -29,9 +29,7 @@ threads_mysql_freebsd:
transaction_gtid: needs to be updated for MariaDB gtids
ddl_replication_applier_status_by_worker: todo
ddl_replication_connection_status: todo
dml_replication_applier_status_by_worker: todo
dml_replication_connection_status: todo
dml_replication_group_member_stats: todo?
......
......@@ -2,7 +2,7 @@ ALTER TABLE performance_schema.replication_applier_status_by_worker
add column foo integer;
ERROR 42000: Access denied for user 'root'@'localhost' to database 'performance_schema'
TRUNCATE TABLE performance_schema.replication_applier_status_by_worker;
ERROR HY000: Invalid performance_schema usage.
ERROR HY000: Invalid performance_schema usage
ALTER TABLE performance_schema.replication_applier_status_by_worker
ADD INDEX test_index(worker_id);
ERROR 42000: Access denied for user 'root'@'localhost' to database 'performance_schema'
......
......@@ -12,78 +12,81 @@ Warning 1287 '<select expression> INTO <destination>;' is deprecated and will be
# For each table in the performance schema, attempt HANDLER...OPEN,
# which should fail with an error 1031, ER_ILLEGAL_HA.
#
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=80;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=81;
HANDLER performance_schema.user_variables_by_thread OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`user_variables_by_thread` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=79;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=80;
HANDLER performance_schema.users OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`users` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=78;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=79;
HANDLER performance_schema.threads OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`threads` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=77;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=78;
HANDLER performance_schema.table_lock_waits_summary_by_table OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`table_lock_waits_summary_by_table` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=76;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=77;
HANDLER performance_schema.table_io_waits_summary_by_table OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`table_io_waits_summary_by_table` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=75;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=76;
HANDLER performance_schema.table_io_waits_summary_by_index_usage OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`table_io_waits_summary_by_index_usage` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=74;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=75;
HANDLER performance_schema.table_handles OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`table_handles` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=73;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=74;
HANDLER performance_schema.status_by_user OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`status_by_user` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=72;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=73;
HANDLER performance_schema.status_by_thread OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`status_by_thread` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=71;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=72;
HANDLER performance_schema.status_by_host OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`status_by_host` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=70;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=71;
HANDLER performance_schema.status_by_account OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`status_by_account` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=69;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=70;
HANDLER performance_schema.socket_summary_by_instance OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`socket_summary_by_instance` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=68;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=69;
HANDLER performance_schema.socket_summary_by_event_name OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`socket_summary_by_event_name` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=67;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=68;
HANDLER performance_schema.socket_instances OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`socket_instances` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=66;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=67;
HANDLER performance_schema.setup_timers OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`setup_timers` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=65;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=66;
HANDLER performance_schema.setup_objects OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`setup_objects` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=64;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=65;
HANDLER performance_schema.setup_instruments OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`setup_instruments` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=63;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=64;
HANDLER performance_schema.setup_consumers OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`setup_consumers` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=62;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=63;
HANDLER performance_schema.setup_actors OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`setup_actors` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=61;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=62;
HANDLER performance_schema.session_status OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`session_status` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=60;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=61;
HANDLER performance_schema.session_connect_attrs OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`session_connect_attrs` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=59;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=60;
HANDLER performance_schema.session_account_connect_attrs OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`session_account_connect_attrs` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=58;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=59;
HANDLER performance_schema.rwlock_instances OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`rwlock_instances` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=57;
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=58;
HANDLER performance_schema.replication_connection_configuration OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`replication_connection_configuration` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=57;
HANDLER performance_schema.replication_applier_status_by_worker OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`replication_applier_status_by_worker` doesn't have this option
SELECT TABLE_NAME INTO @table_name FROM table_list WHERE id=56;
HANDLER performance_schema.replication_applier_status_by_coordinator OPEN;
ERROR HY000: Storage engine PERFORMANCE_SCHEMA of the table `performance_schema`.`replication_applier_status_by_coordinator` doesn't have this option
......
......@@ -59,6 +59,7 @@ performance_schema prepared_statements_instances def
performance_schema replication_applier_configuration def
performance_schema replication_applier_status def
performance_schema replication_applier_status_by_coordinator def
performance_schema replication_applier_status_by_worker def
performance_schema replication_connection_configuration def
performance_schema rwlock_instances def
performance_schema session_account_connect_attrs def
......@@ -144,6 +145,7 @@ prepared_statements_instances BASE TABLE PERFORMANCE_SCHEMA
replication_applier_configuration BASE TABLE PERFORMANCE_SCHEMA
replication_applier_status BASE TABLE PERFORMANCE_SCHEMA
replication_applier_status_by_coordinator BASE TABLE PERFORMANCE_SCHEMA
replication_applier_status_by_worker BASE TABLE PERFORMANCE_SCHEMA
replication_connection_configuration BASE TABLE PERFORMANCE_SCHEMA
rwlock_instances BASE TABLE PERFORMANCE_SCHEMA
session_account_connect_attrs BASE TABLE PERFORMANCE_SCHEMA
......@@ -229,6 +231,7 @@ prepared_statements_instances 10 Dynamic
replication_applier_configuration 10 Fixed
replication_applier_status 10 Fixed
replication_applier_status_by_coordinator 10 Dynamic
replication_applier_status_by_worker 10 Dynamic
replication_connection_configuration 10 Dynamic
rwlock_instances 10 Dynamic
session_account_connect_attrs 10 Dynamic
......@@ -314,6 +317,7 @@ prepared_statements_instances 0
replication_applier_configuration 0
replication_applier_status 0
replication_applier_status_by_coordinator 0
replication_applier_status_by_worker 0
replication_connection_configuration 0
rwlock_instances 0
session_account_connect_attrs 0
......@@ -406,6 +410,7 @@ prepared_statements_instances 0 0
replication_applier_configuration 0 0
replication_applier_status 0 0
replication_applier_status_by_coordinator 0 0
replication_applier_status_by_worker 0 0
replication_connection_configuration 0 0
rwlock_instances 0 0
session_account_connect_attrs 0 0
......@@ -491,6 +496,7 @@ prepared_statements_instances 0 0 NULL
replication_applier_configuration 0 0 NULL
replication_applier_status 0 0 NULL
replication_applier_status_by_coordinator 0 0 NULL
replication_applier_status_by_worker 0 0 NULL
replication_connection_configuration 0 0 NULL
rwlock_instances 0 0 NULL
session_account_connect_attrs 0 0 NULL
......@@ -576,6 +582,7 @@ prepared_statements_instances NULL NULL NULL
replication_applier_configuration NULL NULL NULL
replication_applier_status NULL NULL NULL
replication_applier_status_by_coordinator NULL NULL NULL
replication_applier_status_by_worker NULL NULL NULL
replication_connection_configuration NULL NULL NULL
rwlock_instances NULL NULL NULL
session_account_connect_attrs NULL NULL NULL
......@@ -661,6 +668,7 @@ prepared_statements_instances utf8_general_ci NULL
replication_applier_configuration utf8_general_ci NULL
replication_applier_status utf8_general_ci NULL
replication_applier_status_by_coordinator utf8_general_ci NULL
replication_applier_status_by_worker utf8_general_ci NULL
replication_connection_configuration utf8_general_ci NULL
rwlock_instances utf8_general_ci NULL
session_account_connect_attrs utf8_bin NULL
......@@ -746,6 +754,7 @@ prepared_statements_instances
replication_applier_configuration
replication_applier_status
replication_applier_status_by_coordinator
replication_applier_status_by_worker
replication_connection_configuration
rwlock_instances
session_account_connect_attrs
......@@ -831,6 +840,7 @@ prepared_statements_instances
replication_applier_configuration
replication_applier_status
replication_applier_status_by_coordinator
replication_applier_status_by_worker
replication_connection_configuration
rwlock_instances
session_account_connect_attrs
......
......@@ -63,6 +63,7 @@ prepared_statements_instances
replication_applier_configuration
replication_applier_status
replication_applier_status_by_coordinator
replication_applier_status_by_worker
replication_connection_configuration
rwlock_instances
session_account_connect_attrs
......
......@@ -865,6 +865,14 @@ def performance_schema replication_applier_status_by_coordinator SERVICE_STATE 3
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_NUMBER 4 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_MESSAGE 5 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_TIMESTAMP 6 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker THREAD_ID 1 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker SERVICE_STATE 2 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_SEEN_TRANSACTION 3 NULL NO char 57 171 NULL NULL NULL utf8 utf8_general_ci char(57) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_ERROR_NUMBER 4 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_ERROR_MESSAGE 5 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_ERROR_TIMESTAMP 6 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker WORKER_IDLE_TIME 7 NULL NO bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_TRANS_RETRY_COUNT 8 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_connection_configuration CHANNEL_NAME 1 NULL NO char 64 192 NULL NULL NULL utf8 utf8_general_ci char(64) select,insert,update,references NEVER NULL
def performance_schema replication_connection_configuration HOST 2 NULL NO char 60 180 NULL NULL NULL utf8 utf8_bin char(60) select,insert,update,references NEVER NULL
def performance_schema replication_connection_configuration PORT 3 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
......
......@@ -49,8 +49,7 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 ORDER BY a;
a b
1 2
......@@ -82,8 +81,7 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_double_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
2
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 ORDER BY a;
a b
1 3
......@@ -129,6 +127,7 @@ include/wait_for_slave_sql_error.inc [errno=1213]
SET GLOBAL debug_dbug=@old_dbug;
retries
10
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 ORDER BY a;
a b
1 3
......@@ -179,8 +178,7 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
......@@ -224,8 +222,7 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 WHERE a >= 100 ORDER BY a;
a b
100 0
......@@ -322,8 +319,7 @@ SET debug_sync='now WAIT_FOR t3_waiting';
SET debug_sync='now SIGNAL t1_start';
SET GLOBAL debug_dbug=@old_dbug;
SET debug_sync='RESET';
retries
1
include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t3 ORDER BY a;
a b
1 NULL
......
include/master-slave.inc
[connection master]
call mtr.add_suppression("Error 'Table 'test.t' doesn't exist' on query.");
include/assert.inc [On master, the table should return an empty set.]
# Setup MTS and perform testing on a fresh slave.
include/sync_slave_sql_with_master.inc
connection slave;
include/stop_slave.inc
set @save_slave_parallel_workers= @@global.slave_parallel_workers;
SET @@global.slave_parallel_workers=1;
set @save_slave_transaction_retries= @@global.slave_transaction_retries;
include/start_slave.inc
include/assert.inc [thread_name should should indicate worker thread.]
include/assert.inc [Service_State should be "ON" on a fresh slave server.]
include/assert.inc [Last_Seen_Transaction should show "" if no transaction applierd]
connection master;
CREATE TABLE t1 (a INT);
DROP TABLE t1;
connection slave;
include/assert.inc [Last_Seen_Transaction should show "0-1-3".]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Message should both be empty.]
include/assert.inc [Value returned by PS table for Last_Error_Timestamp should be 0000-00-00 00:00:00.]
# Introduce an error in the worker thread and check for the correctness
# of error number, message and timestamp fields.
connection master;
use test;
create table t(a int primary key);
connection slave;
drop table t;
connection master;
insert into t values(1);
connection slave;
include/wait_for_slave_sql_error.inc [errno=1146]
# Extract the error related fields from SSS and PS table and compare
# them for correctness.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Last_Error_Message
Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
# Verify that the error fields are preserved after STOP SLAVE.
# 1. Verify that thread_id changes to NULL and service_state to "off" on
# STOP SLAVE.
include/assert.inc [After STOP SLAVE, thread_id should be NULL]
include/assert.inc [So, Service_State after STOP SLAVE should be "OFF".]
select Last_Seen_Transaction from performance_schema.replication_applier_status_by_worker;
Last_Seen_Transaction 0-1-5
# 2. Extract the worker_id and the error related fields from SSS and PS
# table and compare them. These fields should preserve their values.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Last_Error_Message
Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
STOP SLAVE;
RESET SLAVE;
connection master;
DROP TABLE t;
RESET MASTER;
# Cleanup.
connection slave;
set @@global.slave_parallel_workers= @save_slave_parallel_workers;
set @@global.slave_transaction_retries= @save_slave_transaction_retries;
include/start_slave.inc
include/rpl_end.inc
......@@ -68,9 +68,13 @@ let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 ORDER BY a;
......@@ -100,9 +104,12 @@ let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 ORDER BY a;
......@@ -142,6 +149,12 @@ let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 ORDER BY a;
STOP SLAVE IO_THREAD;
......@@ -187,9 +200,12 @@ let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
......@@ -235,9 +251,12 @@ let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t1 WHERE a >= 100 ORDER BY a;
# Stop the SQL thread. When the bug was there to give the incorrect relay log
......@@ -362,9 +381,12 @@ SET debug_sync='now SIGNAL t1_start';
SET GLOBAL debug_dbug=@old_dbug;
SET debug_sync='RESET';
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry >= 1 AS retries;
--enable_query_log
let $ps_value= query_get_value(select last_trans_retry_count from
performance_schema.replication_applier_status_by_worker where
last_trans_retry_count > 0, last_trans_retry_count, 1);
let $assert_text= Performance Schema retries should match with actual retries;
let $assert_cond= "$ps_value" = $new_retry - $old_retry;
source include/assert.inc;
SELECT * FROM t3 ORDER BY a;
......
# ==== Purpose ====
#
# This test script serves as the functionality testing for the table
# performance_schema.replication_applier_status_by_worker. Test
# for ddl and dml operations is a part of the perfschema suite.
# The ddl/dml tests are named:
# 1) ddl_replication_applier_status_by_worker.test and
# 2) dml_replication_applier_status_by_worker.test.
#
# This test script does the following:
# - Verify that SELECT works for every field in the table.
# - The SELECT per field produces an output similar to the corresponding field
# in SHOW SLAVE STATUS(SSS), if there is one.
# - If there is no matching field in SSS, we resort to other method of testing
# those fields.
# - We perform all the testing on connection "slave". On master, the table
# returns an empty set.
#
# The follwing scenarios are tested in this test script:
#
# - Test each field on a fresh replication setup.
# - Introduce error in worker thread and check for the correctness of error
# error number, message and timestamp.
# - Verify that, the change in values are correctly shown by the table.
# - Verify that the values are preserved after STOP SLAVE.
# - Set up replication in gtid-mode=on and test 'Last_Seen_Transaction' field.
# - Verify that the value in 'Last_Seen_Transaction' field is preserved after
# STOP SLAVE.
#
# ==== Related Bugs and Worklogs ====
#
# MDEV-20220: Merge 5.7 P_S replication table 'replication_applier_status_by_worker
#
--source include/have_binlog_format_mixed.inc
--source include/master-slave.inc
call mtr.add_suppression("Error 'Table 'test.t' doesn't exist' on query.");
let $assert_text= On master, the table should return an empty set.;
let $assert_cond= count(*) = 0 from performance_schema.replication_applier_status_by_worker;
source include/assert.inc;
--echo
--echo # Setup MTS and perform testing on a fresh slave.
--echo
--source include/sync_slave_sql_with_master.inc
--connection slave
source include/stop_slave.inc;
set @save_slave_parallel_workers= @@global.slave_parallel_workers;
SET @@global.slave_parallel_workers=1;
# to avoid warnings
set @save_slave_transaction_retries= @@global.slave_transaction_retries;
source include/start_slave.inc;
# To verify the correctness of thread_id field, we check for the name of
# the thread.
let $thread_name= `select name from performance_schema.threads where thread_id= (select Thread_Id from performance_schema.replication_applier_status_by_worker)`;
let $assert_text= thread_name should should indicate worker thread.;
let $assert_cond= "$thread_name" = "thread/sql/rpl_parallel_thread";
source include/assert.inc;
let $ps_value= query_get_value(select Service_State from performance_schema.replication_applier_status_by_worker, Service_State, 1);
let $assert_text= Service_State should be "ON" on a fresh slave server.;
let $assert_cond= "$ps_value"= "ON";
source include/assert.inc;
let $ps_value= query_get_value(select Last_Seen_Transaction from performance_schema.replication_applier_status_by_worker, Last_Seen_Transaction, 1);
let $assert_text= Last_Seen_Transaction should show "" if no transaction applierd;
let $assert_cond= "$ps_value" = "";
source include/assert.inc;
--connection master
CREATE TABLE t1 (a INT);
DROP TABLE t1;
--sync_slave_with_master
let $ps_value= query_get_value(select Last_Seen_Transaction from performance_schema.replication_applier_status_by_worker, Last_Seen_Transaction, 1);
let $assert_text= Last_Seen_Transaction should show "0-1-3".;
let $assert_cond= "$ps_value" = "0-1-3";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error, 1);
let $ps_value= query_get_value(select Last_Error_Message from performance_schema.replication_applier_status_by_worker, Last_Error_Message, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Message should both be empty.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_worker, Last_Error_Timestamp, 1);
let $assert_text= Value returned by PS table for Last_Error_Timestamp should be 0000-00-00 00:00:00.;
let $assert_cond= "$ps_value" = "0000-00-00 00:00:00";
source include/assert.inc;
--echo
--echo # Introduce an error in the worker thread and check for the correctness
--echo # of error number, message and timestamp fields.
--echo
# Cause an error in Worker thread.
# 1) Create a table 't' at master, replicate at slave.
# 2) Drop table 't' at slave only.
# 3) Insert a value in table 't' on master and replicate on slave.
# Since slave doesnt have table 't' anymore, worker thread will report an error.
--connection master
use test;
create table t(a int primary key);
sync_slave_with_master;
drop table t;
--connection master
insert into t values(1);
--connection slave
let $slave_sql_errno=1146;
source include/wait_for_slave_sql_error.inc;
--echo
--echo # Extract the error related fields from SSS and PS table and compare
--echo # them for correctness.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
--disable_query_log
--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
--enable_query_log
--echo
--echo # Verify that the error fields are preserved after STOP SLAVE.
--echo
--echo
--echo # 1. Verify that thread_id changes to NULL and service_state to "off" on
--echo # STOP SLAVE.
--echo
let $ps_value= query_get_value(select thread_id from performance_schema.replication_applier_status_by_worker, thread_id, 1);
let $assert_text= After STOP SLAVE, thread_id should be NULL;
let $assert_cond= "$ps_value" = "NULL";
source include/assert.inc;
let $ps_value= query_get_value(select service_state from performance_schema.replication_applier_status_by_worker, service_state, 1);
let $assert_text= So, Service_State after STOP SLAVE should be "OFF".;
let $assert_cond= "$ps_value"= "OFF";
source include/assert.inc;
query_vertical select Last_Seen_Transaction from performance_schema.replication_applier_status_by_worker;
--echo
--echo # 2. Extract the worker_id and the error related fields from SSS and PS
--echo # table and compare them. These fields should preserve their values.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
--disable_query_log
--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
--enable_query_log
# The timestamp format is slightly different in SSS and PS.
# SSS => YYMMDD HH:MM:SS
# PS => YYYY-MM-DD HH:MM:SS
# To match the two, we get rid of hyphons from PS output and first two digits
# the year field so that it can be matched directly.
#--- TODO: Can we include Last_SQL_Error_Timestamp as part of SSS
#let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error_Timestamp, 1);
#let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_worker, Last_Error_Timestamp, 1);
#let $ps_value_without_hyphons= `SELECT REPLACE("$ps_value", '-', '')`;
#let $ps_value_in_sss_format= `select substring("$ps_value_without_hyphons", 3)`;
#let $assert_text= Value returned by SSS and PS table for Last_Error_Timestamp should be same.;
#let $assert_cond= "$sss_value" = "$ps_value_in_sss_format";
#source include/assert.inc;
STOP SLAVE;
RESET SLAVE;
--connection master
DROP TABLE t;
RESET MASTER;
--echo
--echo # Cleanup.
--echo
--connection slave
set @@global.slave_parallel_workers= @save_slave_parallel_workers;
set @@global.slave_transaction_retries= @save_slave_transaction_retries;
source include/start_slave.inc;
source include/rpl_end.inc;
......@@ -798,6 +798,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
mysql_mutex_lock(&rli->data_lock);
++rli->retried_trans;
++rpt->last_trans_retry_count;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
......@@ -1098,6 +1099,11 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
PSI_thread *psi= PSI_CALL_get_thread();
PSI_CALL_set_thread_os_id(psi);
PSI_CALL_set_thread_THD(psi, thd);
PSI_CALL_set_thread_id(psi, thd->thread_id);
rpt->thd->set_psi(psi);
while (rpt->delay_start)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
......@@ -1119,6 +1125,7 @@ handle_rpl_parallel_thread(void *arg)
uint wait_count= 0;
rpl_parallel_thread::queued_event *qev, *next_qev;
rpt->start_time_tracker();
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
&stage_waiting_for_work_from_sql_thread, &old_stage);
/*
......@@ -1142,6 +1149,7 @@ handle_rpl_parallel_thread(void *arg)
}
rpt->dequeue1(events);
thd->EXIT_COND(&old_stage);
rpt->add_to_worker_idle_time_and_reset();
more_events:
for (qev= events; qev; qev= next_qev)
......@@ -1187,6 +1195,8 @@ handle_rpl_parallel_thread(void *arg)
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
{
rpt->last_seen_gtid= rgi->current_gtid;
rpt->last_trans_retry_count= 0;
bool did_enter_cond= false;
PSI_stage_info old_stage;
......@@ -1736,6 +1746,7 @@ int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{
int rc= 0;
struct pool_bkp_for_pfs* bkp= &pool->pfs_bkp;
if ((rc= pool_mark_busy(pool, current_thd)))
return rc; // killed
......@@ -1745,6 +1756,23 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
pool_mark_not_busy(pool);
rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
0);
if (!rc)
{
if (pool->count)
{
if (bkp->inited)
{
if (bkp->count != pool->count)
{
bkp->destroy();
bkp->init(pool->count);
}
}
else
bkp->init(pool->count);
}
}
}
else
{
......@@ -2003,7 +2031,8 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: threads(0), free_list(0), count(0), inited(false), busy(false)
: threads(0), free_list(0), count(0), inited(false), busy(false),
pfs_bkp{0, false, NULL}
{
}
......@@ -2034,6 +2063,7 @@ void
rpl_parallel_thread_pool::destroy()
{
deactivate();
pfs_bkp.destroy();
destroy_cond_mutex();
}
......@@ -2102,6 +2132,33 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
}
void
rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
{
if (pfs_bkp.inited)
{
for(uint i=0; i<count;i++)
{
rpl_parallel_thread *rpt, *pfs_rpt;
rpt= threads[i];
pfs_rpt= pfs_bkp.rpl_thread_arr[i];
pfs_rpt->thd= rpt->thd;
pfs_rpt->last_seen_gtid= rpt->last_seen_gtid;
if (rli->err_thread_id && rpt->thd->thread_id == rli->err_thread_id)
{
pfs_rpt->last_error_number= rli->last_error().number;
strmake(pfs_rpt->last_error_message,
rli->last_error().message, sizeof(rli->last_error().message));
pfs_rpt->last_error_timestamp= rli->last_error().skr*1000000;
}
else
pfs_rpt->last_error_number= 0;
pfs_rpt->running= false;
pfs_rpt->worker_idle_time= rpt->get_worker_idle_time();
pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count;
}
}
}
/*
Obtain a worker thread that we can queue an event to.
......@@ -2370,6 +2427,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
};);
global_rpl_thread_pool.copy_pool_for_pfs(rli);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
......
......@@ -7,11 +7,13 @@
struct rpl_parallel;
struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
extern struct rpl_parallel_thread_pool pool_bkp_for_pfs;
class Relay_log_info;
struct inuse_relaylog;
/*
Structure used to keep track of the parallel replication of a batch of
event-groups that group-committed together on the master.
......@@ -160,7 +162,34 @@ struct rpl_parallel_thread {
/* These keep track of batch update of inuse_relaylog refcounts. */
inuse_relaylog *accumulated_ir_last;
uint64 accumulated_ir_count;
ulonglong worker_idle_time;
ulonglong gco_wait_time;
ulonglong start_time;
rpl_gtid last_seen_gtid;
int last_error_number;
char last_error_message[MAX_SLAVE_ERRMSG];
ulong last_trans_retry_count;
ulonglong last_error_timestamp;
void start_time_tracker()
{
start_time= microsecond_interval_timer();
}
ulonglong compute_time_lapsed()
{
return (ulonglong)((microsecond_interval_timer() - start_time) / 1000000.0);
}
void add_to_worker_idle_time_and_reset()
{
worker_idle_time+= compute_time_lapsed();
start_time=0;
}
ulonglong get_worker_idle_time()
{
if (start_time)
return compute_time_lapsed();
else
return worker_idle_time;
}
void enqueue(queued_event *qev)
{
if (last_in_queue)
......@@ -226,6 +255,37 @@ struct rpl_parallel_thread {
void inuse_relaylog_refcount_update();
};
struct pool_bkp_for_pfs{
uint32 count;
bool inited;
struct rpl_parallel_thread **rpl_thread_arr;
void init(uint32 thd_count)
{
DBUG_ASSERT(thd_count);
rpl_thread_arr= (rpl_parallel_thread **)
my_malloc(PSI_INSTRUMENT_ME,
thd_count * sizeof(rpl_parallel_thread*),
MYF(0));
for (uint i=0; i<thd_count; i++)
rpl_thread_arr[i]= (rpl_parallel_thread *)
my_malloc(PSI_INSTRUMENT_ME, sizeof(rpl_parallel_thread),
MYF(0));
count= thd_count;
inited= true;
}
void destroy()
{
if (inited)
{
for (uint i=0; i<count; i++)
my_free(rpl_thread_arr[i]);
my_free(rpl_thread_arr);
rpl_thread_arr= NULL;
}
}
};
struct rpl_parallel_thread_pool {
struct rpl_parallel_thread **threads;
......@@ -240,8 +300,10 @@ struct rpl_parallel_thread_pool {
is in progress.
*/
bool busy;
struct pool_bkp_for_pfs pfs_bkp;
rpl_parallel_thread_pool();
void copy_pool_for_pfs(Relay_log_info *rli);
int init(uint32 size);
void destroy();
void deactivate();
......
......@@ -51,6 +51,7 @@ Slave_reporting_capability::report(loglevel level, int err_code,
pbuff= m_last_error.message;
pbuffsize= sizeof(m_last_error.message);
m_last_error.number = err_code;
m_last_error.update_timestamp();
report_function= sql_print_error;
break;
case WARNING_LEVEL:
......@@ -69,6 +70,7 @@ Slave_reporting_capability::report(loglevel level, int err_code,
mysql_mutex_unlock(&err_lock);
va_end(args);
err_thread_id= current_thd->thread_id;
/* If the msg string ends with '.', do not add a ',' it would be ugly */
report_function("%s %s: %s%s %s%sInternal MariaDB error code: %d",
......
......@@ -41,6 +41,7 @@ class Slave_reporting_capability
@param thread_name Printable name of the slave thread that is reporting.
*/
Slave_reporting_capability(char const *thread_name);
mutable my_thread_id err_thread_id;
/**
Writes a message and, if it's an error message, to Last_Error
......@@ -81,12 +82,35 @@ class Slave_reporting_capability
{
number= 0;
message[0]= '\0';
timestamp[0]= '\0';
}
void update_timestamp()
{
struct tm tm_tmp;
struct tm *start;
skr= my_time(0);
localtime_r(&skr, &tm_tmp);
start=&tm_tmp;
sprintf(timestamp, "%02d%02d%02d %02d:%02d:%02d",
start->tm_year % 100,
start->tm_mon+1,
start->tm_mday,
start->tm_hour,
start->tm_min,
start->tm_sec);
timestamp[15]= '\0';
}
/** Error code */
uint32 number;
/** Error message */
char message[MAX_SLAVE_ERRMSG];
/** Error timestamp as string */
char timestamp[64];
/** Error timestamp as time_t variable. Used in performance_schema */
time_t skr;
};
Error const& last_error() const { return m_last_error; }
......@@ -99,7 +123,6 @@ class Slave_reporting_capability
mutable Error m_last_error;
char const *const m_thread_name;
// not implemented
Slave_reporting_capability(const Slave_reporting_capability& rhs);
Slave_reporting_capability& operator=(const Slave_reporting_capability& rhs);
......
......@@ -271,7 +271,7 @@ table_replication_connection_configuration.cc
table_replication_applier_configuration.cc
table_replication_applier_status.cc
table_replication_applier_status_by_coordinator.cc
#table_replication_applier_status_by_worker.cc
table_replication_applier_status_by_worker.cc
#table_replication_group_member_stats.cc
)
......
......@@ -317,7 +317,7 @@ static PFS_engine_table_share *all_shares[]=
&table_replication_applier_configuration::m_share,
&table_replication_applier_status::m_share,
&table_replication_applier_status_by_coordinator::m_share,
//&table_replication_applier_status_by_worker::m_share,
&table_replication_applier_status_by_worker::m_share,
//&table_replication_group_member_stats::m_share,
#endif
......
......@@ -33,12 +33,10 @@
#include "pfs_instr_class.h"
#include "pfs_instr.h"
#include "slave.h"
//#include "rpl_info.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "sql_parse.h"
//#include "rpl_rli_pdb.h"
//#include "rpl_msr.h" /*Multi source replication */
#include "rpl_parallel.h"
THR_LOCK table_replication_applier_status_by_worker::m_table_lock;
......@@ -54,14 +52,14 @@ table_replication_applier_status_by_worker::m_share=
sizeof(pos_t), /* ref length */
&m_table_lock,
{ C_STRING_WITH_LEN("CREATE TABLE replication_applier_status_by_worker("
"CHANNEL_NAME CHAR(64) collate utf8_general_ci not null,"
"WORKER_ID BIGINT UNSIGNED not null,"
"THREAD_ID BIGINT UNSIGNED,"
"SERVICE_STATE ENUM('ON','OFF') not null,"
"LAST_SEEN_TRANSACTION CHAR(57) not null,"
"LAST_ERROR_NUMBER INTEGER not null,"
"LAST_ERROR_MESSAGE VARCHAR(1024) not null,"
"LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null)") },
"LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null,"
"WORKER_IDLE_TIME BIGINT UNSIGNED not null,"
"LAST_TRANS_RETRY_COUNT INTEGER not null)") },
false /* perpetual */
};
......@@ -88,130 +86,99 @@ void table_replication_applier_status_by_worker::reset_position(void)
ha_rows table_replication_applier_status_by_worker::get_row_count()
{
/*
Return an estimate, number of master info's multipled by worker threads
*/
return master_info_index->master_info_hash.records*32;
return opt_slave_parallel_threads;
}
int table_replication_applier_status_by_worker::rnd_next(void)
{
Slave_worker *worker;
Master_info *mi;
size_t wc;
mysql_mutex_lock(&LOCK_active_mi);
for (m_pos.set_at(&m_next_pos);
m_pos.has_more_channels(master_info_index->master_info_hash.records);
m_pos.next_channel())
{
mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index_1);
if (mi && mi->host[0])
if (global_rpl_thread_pool.inited && global_rpl_thread_pool.count)
{
wc= mi->rli->get_worker_count();
if (wc == 0)
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
uint worker_count= pool->count;
for (m_pos.set_at(&m_next_pos);
m_pos.has_more_workers(worker_count);
m_pos.next_worker())
{
/* Single Thread Slave */
make_row(mi);
m_next_pos.set_channel_after(&m_pos);
channel_map.unlock();
rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt);
m_next_pos.set_after(&m_pos);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0;
}
for (; m_pos.m_index_2 < wc; m_pos.next_worker())
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
}
else
{
/* Multi Thread Slave */
worker = mi->rli->get_worker(m_pos.m_index_2);
if (worker)
struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
if (bkp_pool->inited && bkp_pool->count)
{
for (m_pos.set_at(&m_next_pos);
m_pos.has_more_workers(bkp_pool->count);
m_pos.next_worker())
{
make_row(worker);
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt);
m_next_pos.set_after(&m_pos);
channel_map.unlock();
return 0;
}
}
}
}
mysql_mutex_unlock(&LOCK_active_mi);
return HA_ERR_END_OF_FILE;
}
int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
{
Slave_worker *worker;
Master_info *mi;
int res= HA_ERR_RECORD_DELETED;
size_t wc;
set_position(pos);
mysql_mutex_lock(&LOCK_active_mi);
mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index_1);
if (!mi || !mi->host[0])
goto end;
wc = mi->rli->get_worker_count();
if (wc == 0)
if (global_rpl_thread_pool.inited && global_rpl_thread_pool.count)
{
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
if(m_pos.m_index < pool->count)
{
/* Single Thread Slave */
make_row(mi);
res=0;
rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
res= 0;
}
}
else
{
/* Multi Thread Slave */
if (m_pos.m_index_2 < wc)
{
worker = mi->rli->get_worker(m_pos.m_index_2);
if (worker != NULL)
struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count)
{
make_row(worker);
res=0;
}
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt);
res= 0;
}
}
end:
mysql_mutex_unlock(&LOCK_active_mi);
return res;
}
/**
Function to display SQL Thread's status as part of
'replication_applier_status_by_worker' in single threaded slave mode.
Function to display slave worker thread specific information
@param[in] Master_info
@param[in] rpl_parallel_thread
@retval void
*/
void table_replication_applier_status_by_worker::make_row(Master_info *mi)
void table_replication_applier_status_by_worker::make_row(rpl_parallel_thread *rpt)
{
m_row_exists= false;
m_row.worker_id= 0;
char buf[10+1+10+1+20+1];
String str(buf, sizeof(buf), system_charset_info);
bool first= true;
m_row.thread_id= 0;
str.length(0);
rpl_gtid gtid= rpt->last_seen_gtid;
DBUG_ASSERT(mi != NULL);
DBUG_ASSERT(mi->rli != NULL);
mysql_mutex_lock(&mi->rli->data_lock);
m_row.channel_name_length= strlen(mi->get_channel());
memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
m_row_exists= false;
if (mi->rli->slave_running)
if (rpt->running)
{
PSI_thread *psi= thd_get_psi(mi->rli->info_thd);
PSI_thread *psi= thd_get_psi(rpt->thd);
PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
if(pfs)
{
......@@ -224,140 +191,45 @@ void table_replication_applier_status_by_worker::make_row(Master_info *mi)
else
m_row.thread_id_is_null= true;
if (mi->rli->slave_running)
m_row.service_state= PS_RPL_YES;
else
m_row.service_state= PS_RPL_NO;
if (mi->rli->currently_executing_gtid.type == GTID_GROUP)
{
global_sid_lock->rdlock();
m_row.last_seen_transaction_length=
mi->rli->currently_executing_gtid.to_string(global_sid_map,
m_row.last_seen_transaction);
global_sid_lock->unlock();
}
else if (mi->rli->currently_executing_gtid.type == ANONYMOUS_GROUP)
if ((gtid.seq_no > 0 &&
!rpl_slave_state_tostring_helper(&str, &gtid, &first)))
{
m_row.last_seen_transaction_length=
mi->rli->currently_executing_gtid.to_string((rpl_sid *)NULL,
m_row.last_seen_transaction);
strmake(m_row.last_seen_transaction,str.ptr(), str.length());
m_row.last_seen_transaction_length= str.length();
}
else
{
/*
For SQL thread currently_executing_gtid, type is set to
AUTOMATIC_GROUP when the SQL thread is not executing any
transaction. For this case, the field should be empty.
*/
DBUG_ASSERT(mi->rli->currently_executing_gtid.type == AUTOMATIC_GROUP);
m_row.last_seen_transaction_length= 0;
memcpy(m_row.last_seen_transaction, "", 1);
}
mysql_mutex_lock(&mi->rli->err_lock);
m_row.last_error_number= (long int) mi->rli->last_error().number;
m_row.last_error_message_length= 0;
m_row.last_error_timestamp= 0;
/** if error, set error message and timestamp */
if (m_row.last_error_number)
{
char *temp_store= (char*) mi->rli->last_error().message;
m_row.last_error_message_length= strlen(temp_store);
memcpy(m_row.last_error_message, temp_store,
m_row.last_error_message_length);
/** time in millisecond since epoch */
m_row.last_error_timestamp= (ulonglong)mi->rli->last_error().skr*1000000;
}
mysql_mutex_unlock(&mi->rli->err_lock);
mysql_mutex_unlock(&mi->rli->data_lock);
m_row_exists= true;
}
void table_replication_applier_status_by_worker::make_row(Slave_worker *w)
{
m_row_exists= false;
m_row.worker_id= w->get_internal_id();
m_row.thread_id= 0;
m_row.channel_name_length= strlen(w->get_channel());
memcpy(m_row.channel_name, (char*)w->get_channel(), m_row.channel_name_length);
mysql_mutex_lock(&w->jobs_lock);
if (w->running_status == Slave_worker::RUNNING)
{
PSI_thread *psi= thd_get_psi(w->info_thd);
PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
if(pfs)
{
m_row.thread_id= pfs->m_thread_internal_id;
m_row.thread_id_is_null= false;
}
else /* no instrumentation found */
m_row.thread_id_is_null= true;
}
else
m_row.thread_id_is_null= true;
if (w->running_status == Slave_worker::RUNNING)
if (rpt->running)
m_row.service_state= PS_RPL_YES;
else
m_row.service_state= PS_RPL_NO;
m_row.last_error_number= (unsigned int) w->last_error().number;
if (w->currently_executing_gtid.type == GTID_GROUP)
{
global_sid_lock->rdlock();
m_row.last_seen_transaction_length=
w->currently_executing_gtid.to_string(global_sid_map,
m_row.last_seen_transaction);
global_sid_lock->unlock();
}
else if (w->currently_executing_gtid.type == ANONYMOUS_GROUP)
{
m_row.last_seen_transaction_length=
w->currently_executing_gtid.to_string((rpl_sid *)NULL,
m_row.last_seen_transaction);
}
else
{
/*
For worker->currently_executing_gtid, type is set to
AUTOMATIC_GROUP when the worker is not executing any
transaction. For this case, the field should be empty.
*/
DBUG_ASSERT(w->currently_executing_gtid.type == AUTOMATIC_GROUP);
m_row.last_seen_transaction_length= 0;
memcpy(m_row.last_seen_transaction, "", 1);
}
m_row.last_error_number= (unsigned int) w->last_error().number;
m_row.last_error_number= 0;
m_row.last_error_message_length= 0;
m_row.last_error_timestamp= 0;
/** if error, set error message and timestamp */
if (m_row.last_error_number)
if (rpt->last_error_number)
{
char * temp_store= (char*)w->last_error().message;
m_row.last_error_message_length= strlen(temp_store);
memcpy(m_row.last_error_message, w->last_error().message,
m_row.last_error_number= rpt->last_error_number;
char* temp_store= (char*)rpt->last_error_message;
m_row.last_error_message_length= (uint)strlen(temp_store);
strmake(m_row.last_error_message, rpt->last_error_message,
m_row.last_error_message_length);
/** time in millisecond since epoch */
m_row.last_error_timestamp= (ulonglong)w->last_error().skr*1000000;
m_row.last_error_timestamp= rpt->last_error_timestamp;
}
mysql_mutex_unlock(&w->jobs_lock);
m_row.last_trans_retry_count= rpt->last_trans_retry_count;
if (rpt->running)
m_row.worker_idle_time= rpt->get_worker_idle_time();
else
m_row.worker_idle_time= rpt->worker_idle_time;
m_row_exists= true;
}
int table_replication_applier_status_by_worker
::read_row_values(TABLE *table, unsigned char *buf, Field **fields,
bool read_all)
......@@ -376,33 +248,33 @@ int table_replication_applier_status_by_worker
{
switch(f->field_index)
{
case 0: /** channel_name */
set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
break;
case 1: /*worker_id*/
set_field_ulonglong(f, m_row.worker_id);
break;
case 2: /*thread_id*/
case 0: /*thread_id*/
if(m_row.thread_id_is_null)
f->set_null();
else
set_field_ulonglong(f, m_row.thread_id);
break;
case 3: /*service_state*/
case 1: /*service_state*/
set_field_enum(f, m_row.service_state);
break;
case 4: /*last_seen_transaction*/
case 2: /*last_seen_transaction*/
set_field_char_utf8(f, m_row.last_seen_transaction, m_row.last_seen_transaction_length);
break;
case 5: /*last_error_number*/
case 3: /*last_error_number*/
set_field_ulong(f, m_row.last_error_number);
break;
case 6: /*last_error_message*/
case 4: /*last_error_message*/
set_field_varchar_utf8(f, m_row.last_error_message, m_row.last_error_message_length);
break;
case 7: /*last_error_timestamp*/
case 5: /*last_error_timestamp*/
set_field_timestamp(f, m_row.last_error_timestamp);
break;
case 6: /*worker_idle_time*/
set_field_ulonglong(f, m_row.worker_idle_time);
break;
case 7: /*last_trans_retry_count*/
set_field_ulong(f, m_row.last_trans_retry_count);
break;
default:
DBUG_ASSERT(false);
}
......
......@@ -34,13 +34,8 @@
#include "pfs_engine_table.h"
#include "rpl_mi.h"
#include "mysql_com.h"
//#include "rpl_rli_pdb.h"
//#include "rpl_msr.h"
//#include "rpl_info.h" /*CHANNEL_NAME_LENGTH*/
#include "my_thread.h"
class Slave_worker;
class Master_info;
#include "rpl_parallel.h"
/**
@addtogroup Performance_schema_tables
......@@ -62,14 +57,6 @@ enum enum_rpl_yes_no {
*/
struct st_row_worker {
char channel_name[CHANNEL_NAME_LENGTH];
uint channel_name_length;
/*
worker_id is added to the table because thread is killed at STOP SLAVE
but the status needs to show up, so worker_id is used as a permanent
identifier.
*/
ulonglong worker_id;
ulonglong thread_id;
uint thread_id_is_null;
enum_rpl_yes_no service_state;
......@@ -79,46 +66,31 @@ struct st_row_worker {
char last_error_message[MAX_SLAVE_ERRMSG];
uint last_error_message_length;
ulonglong last_error_timestamp;
ulonglong worker_idle_time;
ulong last_trans_retry_count;
};
/**
Position in table replication_applier_status_by_worker.
Index 1 for replication channel.
Index 2 for worker:
- position [0] is for Single Thread Slave (Master_info)
- position [1] .. [N] is for Multi Thread Slave (Slave_worker)
We have global replication thread pool.
*/
struct pos_replication_applier_status_by_worker : public PFS_double_index
struct pos_replication_applier_status_by_worker : public PFS_simple_index
{
pos_replication_applier_status_by_worker() : PFS_double_index(0, 0)
pos_replication_applier_status_by_worker() : PFS_simple_index(0)
{}
inline void reset(void)
{
m_index_1= 0;
m_index_2= 0;
}
inline bool has_more_channels(uint num)
{ return (m_index_1 < num); }
inline void next_channel(void)
{
m_index_1++;
m_index_2= 0;
m_index= 0;
}
inline void next_worker()
{
m_index_2++;
}
inline bool has_more_workers(uint num)
{ return (m_index < num); }
inline void
set_channel_after(const pos_replication_applier_status_by_worker *other)
inline void next_worker(void)
{
m_index_1 = other->m_index_1 + 1;
m_index_2 = 0;
m_index++;
}
};
......@@ -129,13 +101,11 @@ class table_replication_applier_status_by_worker: public PFS_engine_table
typedef pos_replication_applier_status_by_worker pos_t;
private:
void make_row(Slave_worker *);
/*
Master_info to construct a row to display SQL Thread's status
information in STS mode
*/
void make_row(Master_info *);
void make_row(rpl_parallel_thread *);
/** Table share lock. */
static THR_LOCK m_table_lock;
/** Fields definition. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment