Commit d51a373a authored by Sujatha's avatar Sujatha

MDEV-16437: merge 5.7 P_S replication instrumentation and tables

Merge 'replication_applier_status_by_coordinator' table.

This table captures SQL_THREAD status in case of both single threaded and
multi threaded slave configuration. When multi_source replication is enabled
this table will display each source specific SQL_THREAD status.

Replaced following columns:
  - CHANNEL_NAME with CONNECTION_NAME
Added new columns for:
 - LAST_SEEN_TRANSACTION
 - LAST_TRANS_RETRY_COUNT
parent 5582a45a
......@@ -179,6 +179,13 @@ Last_SQL_Errno = '0'
Slave_heartbeat_period = '60.000'
Slave_heartbeat_period = '60.000'
#
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
CONNECTION_NAME THREAD_ID SERVICE_STATE LAST_SEEN_TRANSACTION LAST_ERROR_NUMBER LAST_ERROR_MESSAGE LAST_ERROR_TIMESTAMP LAST_TRANS_RETRY_COUNT
master1 # ON 0-1-7 0 0000-00-00 00:00:00 0
# ON 0-2-4 0 0000-00-00 00:00:00 0
select * from db1.t1;
i f1
1 one
......
......@@ -179,6 +179,13 @@ Last_SQL_Errno = '0'
Slave_heartbeat_period = '60.000'
Slave_heartbeat_period = '60.000'
#
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
CONNECTION_NAME THREAD_ID SERVICE_STATE LAST_SEEN_TRANSACTION LAST_ERROR_NUMBER LAST_ERROR_MESSAGE LAST_ERROR_TIMESTAMP LAST_TRANS_RETRY_COUNT
master1 # ON 0-1-7 0 0000-00-00 00:00:00 0
# ON 0-2-4 0 0000-00-00 00:00:00 0
select * from db1.t1;
i f1
1 one
......
......@@ -29,6 +29,13 @@ CONNECTION_NAME HOST PORT USER USING_GTID SSL_ALLOWED SSL_CA_FILE SSL_CA_PATH SS
slave2 # # root NO NO NO 60 86400 60.000
slave1 # # root NO NO NO 60 86400 60.000
start all slaves;
#
# MDEV:16437: merge 5.7 P_S replication instrumentation and tables
#
select * from performance_schema.replication_applier_status_by_coordinator;
CONNECTION_NAME THREAD_ID SERVICE_STATE LAST_SEEN_TRANSACTION LAST_ERROR_NUMBER LAST_ERROR_MESSAGE LAST_ERROR_TIMESTAMP LAST_TRANS_RETRY_COUNT
slave2 # ON 0 0000-00-00 00:00:00 0
slave1 # ON 0 0000-00-00 00:00:00 0
stop slave 'slave1';
show slave 'slave1' status;
Slave_IO_State
......
......@@ -57,6 +57,12 @@ select * from performance_schema.replication_connection_configuration;
# Ensure that start all slaves doesn't do anything as all slaves are started
start all slaves;
--echo #
--echo # MDEV:16437: merge 5.7 P_S replication instrumentation and tables
--echo #
--replace_column 2 #
select * from performance_schema.replication_applier_status_by_coordinator;
stop slave 'slave1';
--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2 $read_master_log_pos <read_master_log_pos> $relay_log_pos <relay_log_pos> $relay_log_space1 <relay_log_space1> $relay_log_space2 <relay_log_space2>
......
......@@ -859,12 +859,14 @@ def performance_schema replication_applier_status CHANNEL_NAME 1 NULL NO char 64
def performance_schema replication_applier_status SERVICE_STATE 2 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status REMAINING_DELAY 3 NULL YES int NULL NULL 10 0 NULL NULL NULL int(10) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status COUNT_TRANSACTIONS_RETRIES 4 NULL NO bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator CHANNEL_NAME 1 NULL NO char 64 192 NULL NULL NULL utf8 utf8_general_ci char(64) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator CONNECTION_NAME 1 NULL NO varchar 256 768 NULL NULL NULL utf8 utf8_general_ci varchar(256) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator THREAD_ID 2 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator SERVICE_STATE 3 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_NUMBER 4 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_MESSAGE 5 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_TIMESTAMP 6 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_SEEN_TRANSACTION 4 NULL NO char 57 171 NULL NULL NULL utf8 utf8_general_ci char(57) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_NUMBER 5 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_MESSAGE 6 NULL NO varchar 1024 3072 NULL NULL NULL utf8 utf8_general_ci varchar(1024) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_ERROR_TIMESTAMP 7 current_timestamp() NO timestamp NULL NULL NULL NULL 0 NULL NULL timestamp on update current_timestamp() select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_coordinator LAST_TRANS_RETRY_COUNT 8 NULL NO int NULL NULL 10 0 NULL NULL NULL int(11) select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker THREAD_ID 1 NULL YES bigint NULL NULL 20 0 NULL NULL NULL bigint(20) unsigned select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker SERVICE_STATE 2 NULL NO enum 3 9 NULL NULL NULL utf8 utf8_general_ci enum('ON','OFF') select,insert,update,references NEVER NULL
def performance_schema replication_applier_status_by_worker LAST_SEEN_TRANSACTION 3 NULL NO char 57 171 NULL NULL NULL utf8 utf8_general_ci char(57) select,insert,update,references NEVER NULL
......
......@@ -263,6 +263,12 @@ let $condition= = 'Waiting for master to send event';
--let $all_slaves_status=
--echo #
--echo #
--echo # MDEV:16437: merge 5.7 P_S replication instrumentation and tables
--echo #
--replace_column 2 #
SELECT * FROM performance_schema.replication_applier_status_by_coordinator;
--sorted_result
select * from db1.t1;
......
include/master-slave.inc
[connection master]
call mtr.add_suppression("Error 'Table 'test.t' doesn't exist' on query.");
include/assert.inc [On master, the table should return an empty set.]
connection slave;
include/wait_for_slave_param.inc [Slave_SQL_Running_State]
# Testing on fresh slave.
include/assert.inc [thread_name should should indicate sql thread.]
include/assert.inc [SSS shows Slave_IO_Running as "Yes". So, Service_State from this PS table should be "ON".]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Message should be same.]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Timestamp should be 0000-00-00 00:00:00.]
include/assert.inc [Last_trans_retry_count should be 0.]
# Cause an error in the SQL thread and check for the correctness of
# values in error number, message and timestamp fields.
connection master;
use test;
create table t(a int primary key);
include/sync_slave_sql_with_master.inc
drop table t;
connection master;
insert into t values(1);
connection slave;
include/wait_for_slave_sql_error.inc [errno=1146]
# Extract the error related fields from SSS and PS table and compare
# them for correctness.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Value returned by SSS and PS table for Last_Error_Message is same.
# Verify that the error fields are preserved after STOP SLAVE, thread_id
# changes to NULL and service_state changes to "Off".
# 1. Verify that thread_id changes to NULL and service_state to "off" on
# STOP SLAVE.
include/assert.inc [After STOP SLAVE, thread_id should be NULL]
include/assert.inc [SSS shows Slave_SQL_Running as "No". So, Service_State from this PS table should be "OFF".]
# 2. Extract the error related fields from SSS and PS table and compare
# them. These fields should preserve their values.
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
Value returned by SSS and PS table for Last_Error_Message is same.
connection master;
drop table t;
reset master;
connection slave;
include/stop_slave.inc
reset slave;
reset master;
set @saved_slave_trans_retry_interval= @@GLOBAL.slave_transaction_retry_interval;
set global slave_transaction_retry_interval=1;
include/start_slave.inc
#
# Test Last_Trans_Retry_Count value.
#
connection master;
create table t1 (f int primary key) engine=innodb;
insert into t1 values (10);
connection slave;
connect slave2,127.0.0.1,root,,test,$SLAVE_MYPORT,;
connection slave2;
begin;
update t1 set f=40 where f=10;
connection master;
begin;
update t1 set f=60 where f=10;
commit;
connection slave;
connection slave2;
rollback;
include/assert.inc [Value returned by PS table for Last_Trans_Retry_Count should be > 0.]
connection master;
drop table t1;
connection slave;
disconnect slave2;
set global slave_transaction_retry_interval=@saved_slave_trans_retry_interval;
include/stop_slave.inc
# Restarting servers and setting up MTS now. Since, SQL thread and
# coordinator are the same and follow same code path, we can skip
# testing for coordinator thread in all scenarios. Testing for one
# scenario is enough.
include/rpl_restart_server.inc [server_number=1]
include/rpl_restart_server.inc [server_number=2]
connection slave;
change master to
master_host='127.0.0.1',
master_port=MASTER_MYPORT,
master_user='root';
SET @save.slave_parallel_workers=@@global.slave_parallel_workers;
SET @@global.slave_parallel_workers=1;
set @save.slave_transaction_retries= @@global.slave_transaction_retries;
include/start_slave.inc
include/wait_for_slave_param.inc [Slave_SQL_Running_State]
include/assert.inc [thread_name should should indicate sql thread.]
include/assert.inc [SSS shows Slave_SQL_Running as "Yes". So, Service_State from this PS table should be "ON".]
include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
# Cleanup.
include/stop_slave.inc
set @@global.slave_parallel_workers= @save.slave_parallel_workers;
set @@global.slave_transaction_retries= @save.slave_transaction_retries;
include/start_slave.inc
include/rpl_end.inc
......@@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
sql_delay(0), sql_delay_end(0),
last_trans_retry_count(0), sql_delay(0), sql_delay_end(0),
until_relay_log_names_defer(false),
m_flags(0)
{
......@@ -87,6 +87,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
max_relay_log_size= global_system_variables.max_relay_log_size;
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
bzero(&last_seen_gtid, sizeof(last_seen_gtid));
mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_data_lock,
&data_lock, MY_MUTEX_INIT_FAST);
......
......@@ -539,7 +539,8 @@ class Relay_log_info : public Slave_reporting_capability
int32 get_sql_delay() { return sql_delay; }
void set_sql_delay(int32 _sql_delay) { sql_delay= _sql_delay; }
time_t get_sql_delay_end() { return sql_delay_end; }
rpl_gtid last_seen_gtid;
ulong last_trans_retry_count;
private:
......
......@@ -4450,6 +4450,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(1);
}
rli->last_seen_gtid= serial_rgi->current_gtid;
rli->last_trans_retry_count= serial_rgi->trans_retries;
if (opt_gtid_ignore_duplicates &&
rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
......@@ -5460,6 +5462,7 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
rli->last_seen_gtid= serial_rgi->current_gtid;
if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() &&
rli->restart_gtid_pos.count() > 0)
{
......
......@@ -55,12 +55,14 @@ table_replication_applier_status_by_coordinator::m_share=
sizeof(pos_t), /* ref length */
&m_table_lock,
{ C_STRING_WITH_LEN("CREATE TABLE replication_applier_status_by_coordinator("
"CHANNEL_NAME CHAR(64) collate utf8_general_ci not null,"
"CONNECTION_NAME VARCHAR(256) collate utf8_general_ci not null,"
"THREAD_ID BIGINT UNSIGNED,"
"SERVICE_STATE ENUM('ON','OFF') not null,"
"LAST_SEEN_TRANSACTION CHAR(57) not null,"
"LAST_ERROR_NUMBER INTEGER not null,"
"LAST_ERROR_MESSAGE VARCHAR(1024) not null,"
"LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null)") },
"LAST_ERROR_TIMESTAMP TIMESTAMP(0) not null,"
"LAST_TRANS_RETRY_COUNT INTEGER not null)") },
false /* perpetual */
};
......@@ -104,15 +106,7 @@ int table_replication_applier_status_by_coordinator::rnd_next(void)
{
mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index);
/*
Construct and display SQL Thread's (Coordinator) information in
'replication_applier_status_by_coordinator' table only in the case of
multi threaded slave mode. Code should do nothing in the case of single
threaded slave mode. In case of single threaded slave mode SQL Thread's
status will be reported as part of
'replication_applier_status_by_worker' table.
*/
if (mi && mi->host[0] && /*mi->rli.get_worker_count() > */ 0)
if (mi && mi->host[0])
{
make_row(mi);
m_next_pos.set_after(&m_pos);
......@@ -147,13 +141,20 @@ int table_replication_applier_status_by_coordinator::rnd_pos(const void *pos)
void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
{
m_row_exists= false;
rpl_gtid gtid;
char buf[10+1+10+1+20+1];
String str(buf, sizeof(buf), system_charset_info);
bool first= true;
str.length(0);
DBUG_ASSERT(mi != NULL);
mysql_mutex_lock(&mi->rli.data_lock);
m_row.channel_name_length= static_cast<uint>(mi->connection_name.length);
memcpy(m_row.channel_name, mi->connection_name.str, m_row.channel_name_length);
gtid= mi->rli.last_seen_gtid;
m_row.connection_name_length= static_cast<uint>(mi->connection_name.length);
memcpy(m_row.connection_name, mi->connection_name.str, m_row.connection_name_length);
if (mi->rli.slave_running)
{
......@@ -175,6 +176,18 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
else
m_row.service_state= PS_RPL_NO;
if ((gtid.seq_no > 0 &&
!rpl_slave_state_tostring_helper(&str, &gtid, &first)))
{
strmake(m_row.last_seen_transaction,str.ptr(), str.length());
m_row.last_seen_transaction_length= str.length();
}
else
{
m_row.last_seen_transaction_length= 0;
memcpy(m_row.last_seen_transaction, "", 1);
}
mysql_mutex_lock(&mi->rli.err_lock);
m_row.last_error_number= (long int) mi->rli.last_error().number;
......@@ -190,10 +203,11 @@ void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
m_row.last_error_message_length);
/** time in millisecond since epoch */
m_row.last_error_timestamp= 0;//(ulonglong)mi->rli.last_error().skr*1000000;
m_row.last_error_timestamp= (ulonglong)mi->rli.last_error().skr*1000000;
}
mysql_mutex_unlock(&mi->rli.err_lock);
m_row.last_trans_retry_count= (ulong)mi->rli.last_trans_retry_count;
mysql_mutex_unlock(&mi->rli.data_lock);
m_row_exists= true;
......@@ -217,8 +231,8 @@ int table_replication_applier_status_by_coordinator
{
switch(f->field_index)
{
case 0: /* channel_name */
set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
case 0: /* connection_name */
set_field_varchar_utf8(f, m_row.connection_name, m_row.connection_name_length);
break;
case 1: /*thread_id*/
if (!m_row.thread_id_is_null)
......@@ -229,16 +243,23 @@ int table_replication_applier_status_by_coordinator
case 2: /*service_state*/
set_field_enum(f, m_row.service_state);
break;
case 3: /*last_error_number*/
case 3: /*last_seen_transaction*/
set_field_char_utf8(f, m_row.last_seen_transaction, m_row.last_seen_transaction_length);
break;
case 4: /*last_error_number*/
set_field_ulong(f, m_row.last_error_number);
break;
case 4: /*last_error_message*/
case 5: /*last_error_message*/
set_field_varchar_utf8(f, m_row.last_error_message,
m_row.last_error_message_length);
break;
case 5: /*last_error_timestamp*/
case 6: /*last_error_timestamp*/
set_field_timestamp(f, m_row.last_error_timestamp);
break;
case 7: /*last_trans_retry_count*/
set_field_ulong(f, m_row.last_trans_retry_count);
break;
default:
DBUG_ASSERT(false);
}
......
......@@ -34,8 +34,6 @@
#include "pfs_engine_table.h"
#include "rpl_mi.h"
#include "mysql_com.h"
//#include "rpl_msr.h"
//#include "rpl_info.h" /*CHANNEL_NAME_LENGTH*/
#include "my_thread.h"
class Master_info;
......@@ -59,15 +57,18 @@ enum enum_rpl_yes_no {
additional length field denoted by <field_name>_length.
*/
struct st_row_coordinator {
char channel_name[CHANNEL_NAME_LENGTH];
uint channel_name_length;
char connection_name[CHANNEL_NAME_LENGTH];
uint connection_name_length;
ulonglong thread_id;
bool thread_id_is_null;
enum_rpl_yes_no service_state;
char last_seen_transaction[GTID_MAX_STR_LENGTH + 1];
uint last_seen_transaction_length;
uint last_error_number;
char last_error_message[MAX_SLAVE_ERRMSG];
uint last_error_message_length;
ulonglong last_error_timestamp;
ulong last_trans_retry_count;
};
/** Table PERFORMANCE_SCHEMA.replication_applier_status_by_coordinator */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment