Commit 70642871 authored by Sujatha's avatar Sujatha

MDEV-16437: merge 5.7 P_S replication instrumentation and tables

Merge 'replication_applier_status_by_coordinator' table.

This table captures SQL_THREAD status in case of both single threaded and
multi threaded slave configuration. When multi_source replication is enabled
this table will display each source specific SQL_THREAD status.

Added new columns for:
 - LAST_SEEN_TRANSACTION
 - LAST_TRANS_RETRY_COUNT
parent 2674365c
...@@ -179,6 +179,26 @@ Last_SQL_Errno = '0' ...@@ -179,6 +179,26 @@ Last_SQL_Errno = '0'
Slave_heartbeat_period = '60.000' Slave_heartbeat_period = '60.000'
Slave_heartbeat_period = '60.000' Slave_heartbeat_period = '60.000'
# #
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
CHANNEL_NAME master1
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION 0-1-7
LAST_TRANS_RETRY_COUNT 0
CHANNEL_NAME
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION 0-2-4
LAST_TRANS_RETRY_COUNT 0
select * from db1.t1; select * from db1.t1;
i f1 i f1
1 one 1 one
......
...@@ -179,6 +179,26 @@ Last_SQL_Errno = '0' ...@@ -179,6 +179,26 @@ Last_SQL_Errno = '0'
Slave_heartbeat_period = '60.000' Slave_heartbeat_period = '60.000'
Slave_heartbeat_period = '60.000' Slave_heartbeat_period = '60.000'
# #
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
CHANNEL_NAME master1
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION 0-1-7
LAST_TRANS_RETRY_COUNT 0
CHANNEL_NAME
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION 0-2-4
LAST_TRANS_RETRY_COUNT 0
select * from db1.t1; select * from db1.t1;
i f1 i f1
1 one 1 one
......
...@@ -185,6 +185,26 @@ IGNORE_SERVER_IDS ...@@ -185,6 +185,26 @@ IGNORE_SERVER_IDS
REPL_DO_DOMAIN_IDS REPL_DO_DOMAIN_IDS
REPL_IGNORE_DOMAIN_IDS REPL_IGNORE_DOMAIN_IDS
start all slaves; start all slaves;
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
select * from performance_schema.replication_applier_status_by_coordinator;
CHANNEL_NAME slave2
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION
LAST_TRANS_RETRY_COUNT 0
CHANNEL_NAME slave1
THREAD_ID #
SERVICE_STATE ON
LAST_ERROR_NUMBER 0
LAST_ERROR_MESSAGE
LAST_ERROR_TIMESTAMP 0000-00-00 00:00:00
LAST_SEEN_TRANSACTION
LAST_TRANS_RETRY_COUNT 0
stop slave 'slave1'; stop slave 'slave1';
show slave 'slave1' status; show slave 'slave1' status;
Slave_IO_State Slave_IO_State
......
...@@ -58,6 +58,12 @@ query_vertical select * from performance_schema.replication_connection_configura ...@@ -58,6 +58,12 @@ query_vertical select * from performance_schema.replication_connection_configura
# Ensure that start all slaves doesn't do anything as all slaves are started # Ensure that start all slaves doesn't do anything as all slaves are started
start all slaves; start all slaves;
--echo #
--echo # MDEV:16437: merge 5.7 P_S replication instrumentation and tables
--echo #
--replace_column 2 #
query_vertical select * from performance_schema.replication_applier_status_by_coordinator;
stop slave 'slave1'; stop slave 'slave1';
--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2 $read_master_log_pos <read_master_log_pos> $relay_log_pos <relay_log_pos> $relay_log_space1 <relay_log_space1> $relay_log_space2 <relay_log_space2> --replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2 $read_master_log_pos <read_master_log_pos> $relay_log_pos <relay_log_pos> $relay_log_space1 <relay_log_space1> $relay_log_space2 <relay_log_space2>
......
...@@ -859,12 +859,14 @@ def performance_schema replication_applier_status CHANNEL_NAME 1 NULL NO char 64 ...@@ -859,12 +859,14 @@ def performance_schema replication_applier_status CHANNEL_NAME 1 NULL NO char 64
def performance_schema replication_applier_status SERVICE_STATE 2 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL def performance_schema replication_applier_status SERVICE_STATE 2 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status REMAINING_DELAY 3 NULL YES int NULL NULL 10 0 NULL NULL NULL int(10) unsigned select,insert,update,references NEVER NULL def performance_schema replication_applier_status REMAINING_DELAY 3 NULL YES int NULL NULL 10 0 NULL NULL NULL int(10) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status COUNT_TRANSACTIONS_RETRIES 4 NULL NO bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL def performance_schema replication_applier_status COUNT_TRANSACTIONS_RETRIES 4 NULL NO bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator CHANNEL_NAME 1 NULL NO char 64 192 NULL NULL NULL utf8 utf8_general_ci char(64) select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator CHANNEL_NAME 1 NULL NO varchar 256 768 NULL NULL NULL utf8 utf8_general_ci varchar(256) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator THREAD_ID 2 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator THREAD_ID 2 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator SERVICE_STATE 3 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator SERVICE_STATE 3 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_NUMBER 4 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator LAST_ERROR_NUMBER 4 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_MESSAGE 5 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator LAST_ERROR_MESSAGE 5 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_TIMESTAMP 6 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_coordinator LAST_ERROR_TIMESTAMP 6 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_SEEN_TRANSACTION 7 NULL NO char 57 171 NULL NULL NULL utf8 utf8_general_ci char(57) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_TRANS_RETRY_COUNT 8 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker CHANNEL_NAME 1 NULL NO varchar 256 768 NULL NULL NULL utf8 utf8_general_ci varchar(256) select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_worker CHANNEL_NAME 1 NULL NO varchar 256 768 NULL NULL NULL utf8 utf8_general_ci varchar(256) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker THREAD_ID 2 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_worker THREAD_ID 2 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker SERVICE_STATE 3 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL def performance_schema replication_applier_status_by_worker SERVICE_STATE 3 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
--source include/not_embedded.inc --source include/not_embedded.inc
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/have_perfschema.inc
--source include/binlog_start_pos.inc --source include/binlog_start_pos.inc
--let $rpl_server_count= 0 --let $rpl_server_count= 0
...@@ -263,6 +264,12 @@ let $condition= = 'Waiting for master to send event'; ...@@ -263,6 +264,12 @@ let $condition= = 'Waiting for master to send event';
--let $all_slaves_status= --let $all_slaves_status=
--echo # --echo #
--echo #
--echo # MDEV:16437: merge 5.7 P_S replication instrumentation and tables
--echo #
--replace_column 2 #
query_vertical SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
--sorted_result --sorted_result
select * from db1.t1; select * from db1.t1;
......
include/master-slave.inc
[connection master]
call mtr.add_suppression("Error 'Table 'test.t' doesn't exist' on query.");
include/assert.inc [On master, the table should return an empty set.]
connection slave;
include/wait_for_slave_param.inc [Slave_SQL_Running_State]
# Testing on fresh slave.
include/assert.inc [thread_name should should indicate sql thread.]
include/assert.inc [SSS shows Slave_IO_Running as "Yes". So, Service_State from this PS table should be "ON".]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Message should be same.]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Timestamp should be 0000-00-00 00:00:00.]
include/assert.inc [Last_trans_retry_count should be 0.]
# Cause an error in the SQL thread and check for the correctness of
# values in error number, message and timestamp fields.
connection master;
use test;
create table t(a int primary key);
include/sync_slave_sql_with_master.inc
drop table t;
connection master;
insert into t values(1);
connection slave;
include/wait_for_slave_sql_error.inc [errno=1146]
# Extract the error related fields from SSS and PS table and compare
# them for correctness.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Value returned by SSS and PS table for Last_Error_Message is same.
# Verify that the error fields are preserved after STOP SLAVE, thread_id
# changes to NULL and service_state changes to "Off".
# 1. Verify that thread_id changes to NULL and service_state to "off" on
# STOP SLAVE.
include/assert.inc [After STOP SLAVE, thread_id should be NULL]
include/assert.inc [SSS shows Slave_SQL_Running as "No". So, Service_State from this PS table should be "OFF".]
# 2. Extract the error related fields from SSS and PS table and compare
# them. These fields should preserve their values.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Value returned by SSS and PS table for Last_Error_Message is same.
connection master;
drop table t;
reset master;
connection slave;
include/stop_slave.inc
reset slave;
reset master;
set @saved_slave_trans_retry_interval= @@GLOBAL.slave_transaction_retry_interval;
set global slave_transaction_retry_interval=1;
include/start_slave.inc
#
# Test Last_Trans_Retry_Count value.
#
connection master;
create table t1 (f int primary key) engine=innodb;
insert into t1 values (10);
connection slave;
connect slave2,127.0.0.1,root,,test,$SLAVE_MYPORT,;
connection slave2;
begin;
update t1 set f=40 where f=10;
connection master;
begin;
update t1 set f=60 where f=10;
commit;
connection slave;
connection slave2;
rollback;
include/assert.inc [Value returned by PS table for Last_Trans_Retry_Count should be > 0.]
connection master;
drop table t1;
connection slave;
disconnect slave2;
set global slave_transaction_retry_interval=@saved_slave_trans_retry_interval;
include/stop_slave.inc
# Restarting servers and setting up MTS now. Since, SQL thread and
# coordinator are the same and follow same code path, we can skip
# testing for coordinator thread in all scenarios. Testing for one
# scenario is enough.
include/rpl_restart_server.inc [server_number=1]
include/rpl_restart_server.inc [server_number=2]
connection slave;
change master to
master_host='127.0.0.1',
master_port=MASTER_MYPORT,
master_user='root';
SET @save.slave_parallel_workers=@@global.slave_parallel_workers;
SET @@global.slave_parallel_workers=1;
set @save.slave_transaction_retries= @@global.slave_transaction_retries;
include/start_slave.inc
include/wait_for_slave_param.inc [Slave_SQL_Running_State]
include/assert.inc [thread_name should should indicate sql thread.]
include/assert.inc [SSS shows Slave_SQL_Running as "Yes". So, Service_State from this PS table should be "ON".]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
# Cleanup.
include/stop_slave.inc
set @@global.slave_parallel_workers= @save.slave_parallel_workers;
set @@global.slave_transaction_retries= @save.slave_transaction_retries;
include/start_slave.inc
include/rpl_end.inc
# ==== Purpose ====
#
# This test script serves as the functionality testing for the table
# performance_schema.replication_applier_status_by_coordinator. Test
# for ddl and dml operations is a part of the perfschema suite.
# The ddl/dml tests are named:
# 1) ddl_replication_connection_configuration.test and
# 2) dml_replication_connection_configuration.test.
#
# This test script does the following:
# - Verify that SELECT works for every field in the table.
# - The SELECT per field produces an output similar to the corresponding field
# in SHOW SLAVE STATUS(SSS), if there is one.
# - If there is no matching field in SSS, we resort to other method of testing
# those fields.
# - We perform all the testing on connection "slave". On master, the table
# returns an empty set.
#
# The follwing scenarios are tested in this test script:
#
# - Test each field in STS on a fresh replication setup.
# - Change configuration parameters using CHANGE MASTER TO and verify that
# these changes are seen in SELECTs from PS table.
# - Verify that, the change in values are correctly shown by the table.
# - Verify that the values are preserved after STOP SLAVE, thread_id
# changes to NULL and service_state changes to "Off".
# - A priliminary test for Multi-threaded slave(MTS) mode.
#
# ==== Reference ====
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
--source include/have_innodb.inc
--source include/have_binlog_format_mixed.inc
--source include/have_perfschema.inc
--source include/master-slave.inc
call mtr.add_suppression("Error 'Table 'test.t' doesn't exist' on query.");
let $assert_text= On master, the table should return an empty set.;
let $assert_cond= count(*) = 0 from performance_schema.replication_applier_status_by_coordinator;
source include/assert.inc;
--connection slave
--let $slave_param= Slave_SQL_Running_State
--let $slave_param_value= Slave has read all relay log; waiting for more updates
source include/wait_for_slave_param.inc;
--echo
--echo # Testing on fresh slave.
--echo
# To verify the correctness of thread_id field, we check for the name of
# the thread.
let $thread_name= `select name from performance_schema.threads where thread_id= (select Thread_Id from performance_schema.replication_applier_status_by_coordinator)`;
let $assert_text= thread_name should should indicate sql thread.;
let $assert_cond= "$thread_name" = "thread/sql/slave_sql";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running, 1);
let $ps_value= query_get_value(select Service_State from performance_schema.replication_applier_status_by_coordinator, Service_State, 1);
let $assert_text= SSS shows Slave_IO_Running as "Yes". So, Service_State from this PS table should be "ON".;
let $assert_cond= "$sss_value" = "Yes" AND "$ps_value"= "ON";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_coordinator, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error, 1);
let $ps_value= query_get_value(select Last_Error_Message from performance_schema.replication_applier_status_by_coordinator, Last_Error_Message, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Message should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_coordinator, Last_Error_Timestamp, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Timestamp should be 0000-00-00 00:00:00.;
let $assert_cond= "$ps_value" = "0000-00-00 00:00:00";
source include/assert.inc;
let $ps_value= query_get_value(select Last_Trans_Retry_Count from performance_schema.replication_applier_status_by_coordinator, Last_Trans_Retry_Count, 1);
let $assert_text= Last_trans_retry_count should be 0.;
let $assert_cond= "$ps_value"= 0;
source include/assert.inc;
--echo
--echo # Cause an error in the SQL thread and check for the correctness of
--echo # values in error number, message and timestamp fields.
--echo
# Cause an error in SQL thread.
# 1) Ceate a table 't' at master, replicate at slave.
# 2) Drop table 't' at slave only.
# 3) Insert a value in table 't' on master and replicate on slave.
# Since slave doesnt have table 't' anymore, SQL thread will show an error.
--connection master
use test;
create table t(a int primary key);
--source include/sync_slave_sql_with_master.inc
drop table t;
--connection master
insert into t values(1);
--connection slave
let $slave_sql_errno=1146;
source include/wait_for_slave_sql_error.inc;
--echo
--echo # Extract the error related fields from SSS and PS table and compare
--echo # them for correctness.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_coordinator, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
# Availability of special characters like single quote and backtick character
# makes it difficult use the assert.inc or mysql functionstrcmp().
# So, the equality of error messages is checked using the below perl code.
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error, 1);
let $ps_value= query_get_value(select Last_Error_Message from performance_schema.replication_applier_status_by_coordinator, Last_Error_Message, 1);
let PS_VALUE= $ps_value;
let SSS_VALUE= $sss_value;
perl;
use strict;
my $ps_value= $ENV{'PS_VALUE'};
my $sss_value= $ENV{'SSS_VALUE'};
if ($ps_value eq $sss_value)
{
print "Value returned by SSS and PS table for Last_Error_Message is same.\n";
}
else
{
print "Value returned by SSS and PS table for Last_Error_Message is NOT same\n";
}
EOF
--echo
--echo # Verify that the error fields are preserved after STOP SLAVE, thread_id
--echo # changes to NULL and service_state changes to "Off".
--echo
--echo
--echo # 1. Verify that thread_id changes to NULL and service_state to "off" on
--echo # STOP SLAVE.
--echo
let $ps_value= query_get_value(select thread_id from performance_schema.replication_applier_status_by_coordinator, thread_id, 1);
let $assert_text= After STOP SLAVE, thread_id should be NULL;
let $assert_cond= "$ps_value" = "NULL";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running, 1);
let $ps_value= query_get_value(select Service_State from performance_schema.replication_applier_status_by_coordinator, Service_State, 1);
let $assert_text= SSS shows Slave_SQL_Running as "No". So, Service_State from this PS table should be "OFF".;
let $assert_cond= "$sss_value" = "No" AND "$ps_value"= "OFF";
source include/assert.inc;
--echo
--echo # 2. Extract the error related fields from SSS and PS table and compare
--echo # them. These fields should preserve their values.
--echo
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_coordinator, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
# Availability of special characters like single quote and backtick character
# makes it difficult use the assert.inc or mysql functionstrcmp().
# So, the equality of error messages is checked using the below perl code.
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error, 1);
let $ps_value= query_get_value(select Last_Error_Message from performance_schema.replication_applier_status_by_coordinator, Last_Error_Message, 1);
let PS_VALUE= $ps_value;
let SSS_VALUE= $sss_value;
perl;
use strict;
my $ps_value= $ENV{'PS_VALUE'};
my $sss_value= $ENV{'SSS_VALUE'};
if ($ps_value eq $sss_value)
{
print "Value returned by SSS and PS table for Last_Error_Message is same.\n";
}
else
{
print "Value returned by SSS and PS table for Last_Error_Message is NOT same\n";
}
EOF
# End of perl code for testing the error message.
--connection master
drop table t;
reset master;
--connection slave
--source include/stop_slave.inc
reset slave;
reset master;
set @saved_slave_trans_retry_interval= @@GLOBAL.slave_transaction_retry_interval;
set global slave_transaction_retry_interval=1;
--source include/start_slave.inc
# End of perl code for testing error message.
--echo #
--echo # Test Last_Trans_Retry_Count value.
--echo #
--connection master
create table t1 (f int primary key) engine=innodb;
insert into t1 values (10);
--sync_slave_with_master
connect (slave2,127.0.0.1,root,,test,$SLAVE_MYPORT,);
--connection slave2
begin;
update t1 set f=40 where f=10;
--connection master
begin;
update t1 set f=60 where f=10;
commit;
--connection slave
--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE Info = "update t1 set f=60 where f=10"
--source include/wait_condition.inc
sleep 4 ;
--connection slave2
rollback;
let $ps_value= query_get_value(select Last_Trans_Retry_Count from performance_schema.replication_applier_status_by_coordinator, Last_Trans_Retry_Count, 1);
let $assert_text= Value returned by PS table for Last_Trans_Retry_Count should be > 0.;
let $assert_cond= "$ps_value" > 0;
source include/assert.inc;
--connection master
drop table t1;
--sync_slave_with_master
--disconnect slave2
set global slave_transaction_retry_interval=@saved_slave_trans_retry_interval;
source include/stop_slave.inc;
--echo
--echo # Restarting servers and setting up MTS now. Since, SQL thread and
--echo # coordinator are the same and follow same code path, we can skip
--echo # testing for coordinator thread in all scenarios. Testing for one
--echo # scenario is enough.
--echo
--let $rpl_server_number= 1
--source include/rpl_restart_server.inc
--let $rpl_server_number= 2
--source include/rpl_restart_server.inc
--connection slave
replace_result $MASTER_MYPORT MASTER_MYPORT;
replace_column 2 ###;
eval change master to
master_host='127.0.0.1',
master_port=$MASTER_MYPORT,
master_user='root';
SET @save.slave_parallel_workers=@@global.slave_parallel_workers;
SET @@global.slave_parallel_workers=1;
# to avoid warnings
set @save.slave_transaction_retries= @@global.slave_transaction_retries;
source include/start_slave.inc;
--let $slave_param= Slave_SQL_Running_State
--let $slave_param_value= Slave has read all relay log; waiting for more updates
source include/wait_for_slave_param.inc;
# To verify the correctness of thread_id field, we check for the name of
# the thread.
let $thread_name= `select name from performance_schema.threads where thread_id= (select Thread_Id from performance_schema.replication_applier_status_by_coordinator)`;
let $assert_text= thread_name should should indicate sql thread.;
let $assert_cond= "$thread_name" = "thread/sql/slave_sql";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running, 1);
let $ps_value= query_get_value(select Service_State from performance_schema.replication_applier_status_by_coordinator, Service_State, 1);
let $assert_text= SSS shows Slave_SQL_Running as "Yes". So, Service_State from this PS table should be "ON".;
let $assert_cond= "$sss_value" = "Yes" AND "$ps_value"= "ON";
source include/assert.inc;
let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_coordinator, Last_Error_Number, 1);
let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
let $assert_cond= "$sss_value" = "$ps_value";
source include/assert.inc;
--echo
--echo # Cleanup.
--echo
source include/stop_slave.inc;
set @@global.slave_parallel_workers= @save.slave_parallel_workers;
set @@global.slave_transaction_retries= @save.slave_transaction_retries;
source include/start_slave.inc;
source include/rpl_end.inc;
...@@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name) ...@@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
sql_delay(0), sql_delay_end(0), last_trans_retry_count(0), sql_delay(0), sql_delay_end(0),
until_relay_log_names_defer(false), until_relay_log_names_defer(false),
m_flags(0) m_flags(0)
{ {
...@@ -87,6 +87,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name) ...@@ -87,6 +87,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
max_relay_log_size= global_system_variables.max_relay_log_size; max_relay_log_size= global_system_variables.max_relay_log_size;
bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf)); bzero((char*) &cache_buf, sizeof(cache_buf));
bzero(&last_seen_gtid, sizeof(last_seen_gtid));
mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_data_lock, mysql_mutex_init(key_relay_log_info_data_lock,
&data_lock, MY_MUTEX_INIT_FAST); &data_lock, MY_MUTEX_INIT_FAST);
......
...@@ -539,7 +539,8 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -539,7 +539,8 @@ class Relay_log_info : public Slave_reporting_capability
int32 get_sql_delay() { return sql_delay; } int32 get_sql_delay() { return sql_delay; }
void set_sql_delay(int32 _sql_delay) { sql_delay= _sql_delay; } void set_sql_delay(int32 _sql_delay) { sql_delay= _sql_delay; }
time_t get_sql_delay_end() { return sql_delay_end; } time_t get_sql_delay_end() { return sql_delay_end; }
rpl_gtid last_seen_gtid;
ulong last_trans_retry_count;
private: private:
......
...@@ -4297,6 +4297,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -4297,6 +4297,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
rli->last_seen_gtid= serial_rgi->current_gtid;
rli->last_trans_retry_count= serial_rgi->trans_retries;
if (opt_gtid_ignore_duplicates && if (opt_gtid_ignore_duplicates &&
rli->mi->using_gtid != Master_info::USE_GTID_NO) rli->mi->using_gtid != Master_info::USE_GTID_NO)
{ {
...@@ -5305,6 +5307,7 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -5305,6 +5307,7 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false; serial_rgi->gtid_pending= false;
rli->last_seen_gtid= serial_rgi->current_gtid;
if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() && if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() &&
rli->restart_gtid_pos.count() > 0) rli->restart_gtid_pos.count() > 0)
{ {
......
...@@ -55,12 +55,14 @@ table_replication_applier_status_by_coordinator::m_share= ...@@ -55,12 +55,14 @@ table_replication_applier_status_by_coordinator::m_share=
sizeof(pos_t), /* ref length */ sizeof(pos_t), /* ref length */
&m_table_lock, &m_table_lock,
{ C_STRING_WITH_LEN("CREATE TABLE replication_applier_status_by_coordinator(" { C_STRING_WITH_LEN("CREATE TABLE replication_applier_status_by_coordinator("
"CHANNEL_NAME CHAR(64) collate utf8_general_ci not null," "CHANNEL_NAME VARCHAR(256) collate utf8_general_ci not null,"
"THREAD_ID BIGINT UNSIGNED," "THREAD_ID BIGINT UNSIGNED,"
"SERVICE_STATE ENUM('ON','OFF') not null," "SERVICE_STATE ENUM('ON','OFF') not null,"
"LAST_ERROR_NUMBER INTEGER not null," "LAST_ERROR_NUMBER INTEGER not null,"
"LAST_ERROR_MESSAGE VARCHAR(1024) not null," "LAST_ERROR_MESSAGE VARCHAR(1024) not null,"
"LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null)") }, "LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null,"
"LAST_SEEN_TRANSACTION CHAR(57) not null,"
"LAST_TRANS_RETRY_COUNT INTEGER not null)") },
false /* perpetual */ false /* perpetual */
}; };
...@@ -104,15 +106,7 @@ int table_replication_applier_status_by_coordinator::rnd_next(void) ...@@ -104,15 +106,7 @@ int table_replication_applier_status_by_coordinator::rnd_next(void)
{ {
mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index); mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index);
/* if (mi && mi->host[0])
Construct and display SQL Thread's (Coordinator) information in
'replication_applier_status_by_coordinator' table only in the case of
multi threaded slave mode. Code should do nothing in the case of single
threaded slave mode. In case of single threaded slave mode SQL Thread's
status will be reported as part of
'replication_applier_status_by_worker' table.
*/
if (mi && mi->host[0] && /*mi->rli.get_worker_count() > */ 0)
{ {
make_row(mi); make_row(mi);
m_next_pos.set_after(&m_pos); m_next_pos.set_after(&m_pos);
...@@ -147,11 +141,15 @@ int table_replication_applier_status_by_coordinator::rnd_pos(const void *pos) ...@@ -147,11 +141,15 @@ int table_replication_applier_status_by_coordinator::rnd_pos(const void *pos)
void table_replication_applier_status_by_coordinator::make_row(Master_info *mi) void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
{ {
m_row_exists= false; m_row_exists= false;
rpl_gtid gtid;
StringBuffer<10+1+10+1+20+1> str;
bool first= true;
DBUG_ASSERT(mi != NULL); DBUG_ASSERT(mi != NULL);
mysql_mutex_lock(&mi->rli.data_lock); mysql_mutex_lock(&mi->rli.data_lock);
gtid= mi->rli.last_seen_gtid;
m_row.channel_name_length= static_cast<uint>(mi->connection_name.length); m_row.channel_name_length= static_cast<uint>(mi->connection_name.length);
memcpy(m_row.channel_name, mi->connection_name.str, m_row.channel_name_length); memcpy(m_row.channel_name, mi->connection_name.str, m_row.channel_name_length);
...@@ -175,6 +173,18 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi) ...@@ -175,6 +173,18 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
else else
m_row.service_state= PS_RPL_NO; m_row.service_state= PS_RPL_NO;
if ((gtid.seq_no > 0 &&
!rpl_slave_state_tostring_helper(&str, &gtid, &first)))
{
strmake(m_row.last_seen_transaction,str.ptr(), str.length());
m_row.last_seen_transaction_length= str.length();
}
else
{
m_row.last_seen_transaction_length= 0;
memcpy(m_row.last_seen_transaction, "", 1);
}
mysql_mutex_lock(&mi->rli.err_lock); mysql_mutex_lock(&mi->rli.err_lock);
m_row.last_error_number= (long int) mi->rli.last_error().number; m_row.last_error_number= (long int) mi->rli.last_error().number;
...@@ -190,10 +200,11 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi) ...@@ -190,10 +200,11 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
m_row.last_error_message_length); m_row.last_error_message_length);
/** time in millisecond since epoch */ /** time in millisecond since epoch */
m_row.last_error_timestamp= 0;//(ulonglong)mi->rli.last_error().skr*1000000; m_row.last_error_timestamp= (ulonglong)mi->rli.last_error().skr*1000000;
} }
mysql_mutex_unlock(&mi->rli.err_lock); mysql_mutex_unlock(&mi->rli.err_lock);
m_row.last_trans_retry_count= (ulong)mi->rli.last_trans_retry_count;
mysql_mutex_unlock(&mi->rli.data_lock); mysql_mutex_unlock(&mi->rli.data_lock);
m_row_exists= true; m_row_exists= true;
...@@ -218,7 +229,8 @@ int table_replication_applier_status_by_coordinator ...@@ -218,7 +229,8 @@ int table_replication_applier_status_by_coordinator
switch(f->field_index) switch(f->field_index)
{ {
case 0: /* channel_name */ case 0: /* channel_name */
set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length); set_field_varchar_utf8(f, m_row.channel_name,
m_row.channel_name_length);
break; break;
case 1: /*thread_id*/ case 1: /*thread_id*/
if (!m_row.thread_id_is_null) if (!m_row.thread_id_is_null)
...@@ -239,6 +251,14 @@ int table_replication_applier_status_by_coordinator ...@@ -239,6 +251,14 @@ int table_replication_applier_status_by_coordinator
case 5: /*last_error_timestamp*/ case 5: /*last_error_timestamp*/
set_field_timestamp(f, m_row.last_error_timestamp); set_field_timestamp(f, m_row.last_error_timestamp);
break; break;
case 6: /*last_seen_transaction*/
set_field_char_utf8(f, m_row.last_seen_transaction,
m_row.last_seen_transaction_length);
break;
case 7: /*last_trans_retry_count*/
set_field_ulong(f, m_row.last_trans_retry_count);
break;
default: default:
DBUG_ASSERT(false); DBUG_ASSERT(false);
} }
......
...@@ -34,8 +34,6 @@ ...@@ -34,8 +34,6 @@
#include "pfs_engine_table.h" #include "pfs_engine_table.h"
#include "rpl_mi.h" #include "rpl_mi.h"
#include "mysql_com.h" #include "mysql_com.h"
//#include "rpl_msr.h"
//#include "rpl_info.h" /*CHANNEL_NAME_LENGTH*/
#include "my_thread.h" #include "my_thread.h"
class Master_info; class Master_info;
...@@ -68,6 +66,9 @@ struct st_row_coordinator { ...@@ -68,6 +66,9 @@ struct st_row_coordinator {
char last_error_message[MAX_SLAVE_ERRMSG]; char last_error_message[MAX_SLAVE_ERRMSG];
uint last_error_message_length; uint last_error_message_length;
ulonglong last_error_timestamp; ulonglong last_error_timestamp;
char last_seen_transaction[GTID_MAX_STR_LENGTH + 1];
uint last_seen_transaction_length;
ulong last_trans_retry_count;
}; };
/** Table PERFORMANCE_SCHEMA.replication_applier_status_by_coordinator */ /** Table PERFORMANCE_SCHEMA.replication_applier_status_by_coordinator */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment