Commit 19abe79f authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-7145: Delayed replication, intermediate commit.

Initial merge of delayed replication from MySQL git.

The code from the initial push into MySQL is merged, and the
associated test case passes. A number of tasks are still pending:

1. Check full test suite run for any regressions or .result file updates.

2. Extend the feature to also work for parallel replication.

3. There are some todo-comments about future refactoring left from
MySQL, these should be located and merged on top.

4. There are some later related MySQL commits, these should be checked
and merged. These include:
    e134b9362ba0b750d6ac1b444780019622d14aa5
    b38f0f7857c073edfcc0a64675b7f7ede04be00f
    fd2b210383358fe7697f201e19ac9779879ba72a
    afc397376ec50e96b2918ee64e48baf4dda0d37d

5. The testcase from MySQL relies heavily on sleep and timing for
testing, and seems likely to sporadically fail on heavily loaded test
servers in buildbot or distro build farms.
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 50f19ca8
# ==== Purpose ====
#
# Auxiliary file used by rpl_delayed_slave.test. This assumes that an
# 'INSERT INTO t1...' query has been executed on the master. It does
# this:
#
# - After half the delay, check the status. It should be delaying and
# the query should not have executed.
#
# - After one and a half delay, check the status. It should not be
# delaying and the query should be executed.
#
# ==== Usage ====
#
# --source extra/rpl_tests/delayed_slave_wait_on_query.inc
connection master;
--echo [on slave]
--let $slave_timeout= $time1
--source include/sync_slave_io_with_master.inc
--echo # sleep 1*T
--sleep $time1
--echo # Expect query not executed and status is 'Waiting until MASTER_DELAY...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
--source include/show_delayed_slave_state.inc
--echo # sleep 1*T
--sleep $time1
--echo # sync with master (with timeout 1*T)
--source include/sync_with_master.inc
--echo # Expect query executed and status is 'Has read all relay log...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
--source include/show_delayed_slave_state.inc
--source include/check_slave_is_running.inc
......@@ -67,6 +67,9 @@ if ($tmp)
--echo Replicate_Do_Domain_Ids
--echo Replicate_Ignore_Domain_Ids
--echo Parallel_Mode conservative
--echo SQL_Delay 0
--echo SQL_Remaining_Delay NULL
--echo Slave_SQL_Running_State
}
if (!$tmp) {
# Note: after WL#5177, fields 13-18 shall not be filtered-out.
......
# ==== Purpose ====
#
# Display the delay state of the SQL thread.
#
# ==== Usage ====
#
# --let $verbose_delayed_slave_state= [0|1]
# --source extra/rpl_tests/show_delayed_slave_state.inc
#
# By default, the output is normalized so that it does not depend on
# exact timing or exact binlog positions. If
# $verbose_delayed_slave_state is set, then it outputs exact times and
# binlog positions. This can be useful for debugging.
--let $_delayed_slave_status= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running_State, 1)
--let $_delayed_slave_remaining_delay= query_get_value(SHOW SLAVE STATUS, SQL_Remaining_Delay, 1)
--let $_delayed_slave_qualitative_delay= `SELECT CASE WHEN "$_delayed_slave_remaining_delay" = "NULL" THEN "NULL" WHEN "$_delayed_slave_remaining_delay" = "0" THEN "0" ELSE "greater than zero" END`
--let $_delayed_slave_io_pos= query_get_value(SHOW SLAVE STATUS, Read_Master_Log_Pos, 1)
--let $_delayed_slave_sql_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1)
--let $_delayed_slave_qualitative_log_pos= `SELECT IF($_delayed_slave_io_pos > $_delayed_slave_sql_pos, "behind", "in sync with")`
--echo Slave_SQL_Running_State='$_delayed_slave_status'; SQL_Remaining_Delay is $_delayed_slave_qualitative_delay; SQL thread is $_delayed_slave_qualitative_log_pos IO thread
if ($verbose_delayed_slave_state) {
--echo SQL_Remaining_Delay='$_delayed_slave_remaining_delay'; Read_master_log_pos='$_delayed_slave_io_pos'; Exec_Master_Log_Pos='$_delayed_slave_sql_pos'
}
# ==== Purpose ====
#
# This file does the same as the built-in command sync_with_master,
# but can be configured to use a custom timeout. This has the benefit
# that it accepts the same $slave_timeout and $master_connection
# parameters as wait_for_slave_param.inc
#
#
# ==== Usage ====
#
# --connection master
# --source include/save_master_pos.inc
# --connection slave
# --source include/sync_with_master.inc
#
# Parameters to this macro are $slave_timeout and
# $master_connection. See wait_for_slave_param.inc for
# descriptions.
--let $slave_param= Relay_Master_Log_File
--let $slave_param_value= $_master_file
--source include/wait_for_slave_param.inc
--let $slave_param= Exec_Master_Log_Pos
--let $slave_param_value= $_master_pos
--source include/wait_for_slave_param.inc
include/master-slave.inc
[connection master]
call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
[on master]
CREATE TABLE t1 (a VARCHAR(100), b INT AUTO_INCREMENT PRIMARY KEY);
==== Normal setup ====
[on slave]
include/stop_slave.inc
# CHANGE MASTER TO MASTER_DELAY = 2*T
# Checking that delay is what we set it to
# Expect status to be ''
SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST ORDER BY ID DESC LIMIT 1;
STATE
include/start_slave.inc
[on master]
INSERT INTO t1(a) VALUES ('normal setup');
[on slave]
include/sync_slave_io_with_master.inc
# sleep 1*T
# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
# sleep 1*T
# sync with master (with timeout 1*T)
include/wait_for_slave_param.inc [Relay_Master_Log_File]
include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
# Expect query executed and status is 'Has read all relay log...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
normal setup 1
Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
include/check_slave_is_running.inc
==== Slave lags "naturally" after master ====
[on master]
# CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * T); ELSE RETURN 0; END IF; END
INSERT INTO t1(a) SELECT delay_on_slave(3);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
INSERT INTO t1(a) VALUES ('slave is already lagging: this statement should execute immediately');
INSERT INTO t1(a) SELECT delay_on_slave(2);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
[on slave]
include/sync_slave_io_with_master.inc
# sleep 1*T
# Expect no query executed and status is 'Waiting until MASTER_DELAY...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
normal setup 1
Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
# wait for first query to execute
# sleep 1*T
# Expect second query executed and status is executing third query (i.e., 'User sleep')
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
slave is already lagging: this statement should execute immediately 3
Slave_SQL_Running_State='User sleep'; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
# sleep 2*T
# Expect query executed and status is 'Has read all relay log...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
0 4
Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
==== Seconds_Behind_Master ====
# Bring slave to sync.
include/stop_slave.inc
CHANGE MASTER TO MASTER_DELAY = 0;
include/start_slave.inc
INSERT INTO t1(a) VALUES ('Syncing slave');
include/stop_slave.inc
# CHANGE MASTER TO MASTER_DELAY = 2*T
include/start_slave.inc
INSERT INTO t1(a) VALUES (delay_on_slave(1));
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
# sleep 1*T
# sleep 1*T
==== STOP SLAVE and START SLAVE ====
include/stop_slave.inc
# CHANGE MASTER TO MASTER_DELAY = 3*T
include/start_slave.inc
# Checking that delay is what we set it to
[on master]
INSERT INTO t1(a) VALUES ('stop slave and start slave');
[on slave]
# sleep 1*T
SET @before_stop_slave= UNIX_TIMESTAMP();
include/stop_slave.inc
# STOP SLAVE finished in time.
# Expect query not executed and status is ''
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
0 6
Slave_SQL_Running_State=''; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
include/start_slave.inc
# START SLAVE finished in time.
[on slave]
include/sync_slave_io_with_master.inc
# sleep 1*T
# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
0 6
Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
# sleep 1*T
# sync with master (with timeout 1*T)
include/wait_for_slave_param.inc [Relay_Master_Log_File]
include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
# Expect query executed and status is 'Has read all relay log...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
stop slave and start slave 7
Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
include/check_slave_is_running.inc
==== Change back to no delay ====
[on slave]
include/stop_slave.inc
CHANGE MASTER TO MASTER_DELAY = 0;
# Expect delay is 0.
SQL_Delay='0'
include/start_slave.inc
[on master]
INSERT INTO t1(a) VALUES ('change back to no delay');
[on slave]
include/sync_slave_io_with_master.inc
# sleep 1*T
# Expect query executed and status is 'Has read all relay log...'
SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
a b
change back to no delay 8
Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
==== Reset delay with RESET SLAVE ====
include/stop_slave.inc
CHANGE MASTER TO MASTER_DELAY = 71;
include/start_slave.inc
# Expect delay is 71
SQL_Delay='71'
include/stop_slave.inc
RESET SLAVE;
[on master]
RESET MASTER;
[on slave]
include/start_slave.inc
# Expect delay is 0
SQL_Delay='0'
==== Set a bad value for the delay ====
include/stop_slave.inc
# Expect error for setting negative delay
CHANGE MASTER TO MASTER_DELAY = -1;
ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '-1' at line 1
# Expect that it's ok to set delay of 2^31-1
CHANGE MASTER TO MASTER_DELAY = 2147483647;
# Expect error for setting delay between 2^31 and 2^32-1
CHANGE MASTER TO MASTER_DELAY = 2147483648;
ERROR HY000: The requested value 2147483648 for the master delay exceeds the maximum 2147483647
# Expect error for setting delay to nonsense
CHANGE MASTER TO MASTER_DELAY = blah;
ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'blah' at line 1
CHANGE MASTER TO MASTER_DELAY = 0;
include/start_slave.inc
==== Clean up ====
[on master]
DROP TABLE t1;
DROP FUNCTION delay_on_slave;
[on slave]
include/rpl_end.inc
This diff is collapsed.
......@@ -339,6 +339,7 @@ static SYMBOL symbols[] = {
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
{ "MASTER_DELAY", SYM(MASTER_DELAY_SYM)},
{ "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
{ "MASTER_HOST", SYM(MASTER_HOST_SYM)},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)},
......
......@@ -4292,6 +4292,10 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
relay log.
IMPLEMENTATION
- You must hold rli->data_lock before calling this function, since
it writes group_relay_log_pos and similar fields of
Relay_log_info.
- Protects index file with LOCK_index
- Delete relevant relay log files
- Copy all file names after these ones to the front of the index file
......@@ -4305,7 +4309,7 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
read by the SQL slave thread are deleted).
@note
- This is only called from the slave-execute thread when it has read
- This is only called from the slave SQL thread when it has read
all commands from a relay log and want to switch to a new relay log.
- When this happens, we can be in an active transaction as
a transaction can span over two relay logs
......@@ -4336,6 +4340,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
mysql_mutex_assert_owner(&rli->data_lock);
mysql_mutex_lock(&LOCK_index);
ir= rli->inuse_relaylog_list;
......
......@@ -966,6 +966,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Log_event::do_update_pos");
DBUG_ASSERT(!rli->belongs_to_client());
/*
rli is null when (as far as I (Guilhem) know) the caller is
Load_log_event::do_apply_event *and* that one is called from
......@@ -6400,6 +6401,9 @@ bool Rotate_log_event::write()
in a A -> B -> A setup.
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
This must only be called from the Slave SQL thread, since it calls
flush_relay_log_info().
@retval
0 ok
*/
......@@ -8222,6 +8226,9 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
were we must do this cleaning is in
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
This must only be called from the Slave SQL thread, since it calls
flush_relay_log_info().
*/
int Stop_log_event::do_update_pos(rpl_group_info *rgi)
......
......@@ -18,7 +18,7 @@
#include "sql_priv.h"
#include <my_dir.h>
#include "rpl_mi.h"
#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
#include "slave.h"
#include "strfunc.h"
#include "sql_repl.h"
......
This diff is collapsed.
......@@ -29,11 +29,6 @@ class Master_info;
class Rpl_filter;
enum {
LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5
};
/****************************************************************************
Replication SQL Thread
......@@ -78,11 +73,17 @@ class Relay_log_info : public Slave_reporting_capability
};
/*
If flag set, then rli does not store its state in any info file.
This is the case only when we execute BINLOG SQL commands inside
a client, non-replication thread.
The SQL thread owns one Relay_log_info, and each client that has
executed a BINLOG statement owns one Relay_log_info. This function
returns zero for the Relay_log_info object that belongs to the SQL
thread and nonzero for Relay_log_info objects that belong to
clients.
*/
bool no_storage;
inline bool belongs_to_client()
{
DBUG_ASSERT(sql_driver_thd);
return !sql_driver_thd->slave_thread;
}
/*
If true, events with the same server id should be replicated. This
......@@ -194,6 +195,11 @@ class Relay_log_info : public Slave_reporting_capability
relay log and finishing (commiting) on another relay log. Case which can
happen when, for example, the relay log gets rotated because of
max_binlog_size.
Note: group_relay_log_name, group_relay_log_pos must only be
written from the thread owning the Relay_log_info (SQL thread if
!belongs_to_client(); client thread executing BINLOG statement if
belongs_to_client()).
*/
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
......@@ -205,16 +211,17 @@ class Relay_log_info : public Slave_reporting_capability
*/
char future_event_master_log_name[FN_REFLEN];
#ifdef HAVE_valgrind
bool is_fake; /* Mark that this is a fake relay log info structure */
#endif
/*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
in the master's binlog. These concern the *group*, because in the master's
binlog the log_pos that comes with each event is the position of the
beginning of the group.
Note: group_master_log_name, group_master_log_pos must only be
written from the thread owning the Relay_log_info (SQL thread if
!belongs_to_client(); client thread executing BINLOG statement if
belongs_to_client()).
*/
char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
......@@ -244,6 +251,15 @@ class Relay_log_info : public Slave_reporting_capability
bool sql_thread_caught_up;
void clear_until_condition();
/**
Reset the delay.
This is used by RESET SLAVE to clear the delay.
*/
void clear_sql_delay()
{
sql_delay= 0;
}
/*
Needed for problems when slave stops and we want to restart it
......@@ -474,8 +490,72 @@ class Relay_log_info : public Slave_reporting_capability
m_flags&= ~flag;
}
/**
Text used in THD::proc_info when the slave SQL thread is delaying.
*/
static const char *const state_delaying_string;
bool flush();
/**
Reads the relay_log.info file.
*/
int init(const char* info_filename);
/**
Indicate that a delay starts.
This does not actually sleep; it only sets the state of this
Relay_log_info object to delaying so that the correct state can be
reported by SHOW SLAVE STATUS and SHOW PROCESSLIST.
Requires rli->data_lock.
@param delay_end The time when the delay shall end.
*/
void start_sql_delay(time_t delay_end)
{
mysql_mutex_assert_owner(&data_lock);
sql_delay_end= delay_end;
thd_proc_info(sql_driver_thd, state_delaying_string);
}
int32 get_sql_delay() { return sql_delay; }
void set_sql_delay(time_t _sql_delay) { sql_delay= _sql_delay; }
time_t get_sql_delay_end() { return sql_delay_end; }
private:
/**
Delay slave SQL thread by this amount, compared to master (in
seconds). This is set with CHANGE MASTER TO MASTER_DELAY=X.
Guarded by data_lock. Initialized by the client thread executing
START SLAVE. Written by client threads executing CHANGE MASTER TO
MASTER_DELAY=X. Read by SQL thread and by client threads
executing SHOW SLAVE STATUS. Note: must not be written while the
slave SQL thread is running, since the SQL thread reads it without
a lock when executing flush_relay_log_info().
*/
int sql_delay;
/**
During a delay, specifies the point in time when the delay ends.
This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS.
Guarded by data_lock. Written by the sql thread. Read by client
threads executing SHOW SLAVE STATUS.
*/
time_t sql_delay_end;
/*
Before the MASTER_DELAY parameter was added (WL#344),
relay_log.info had 4 lines. Now it has 5 lines.
*/
static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5;
/*
Holds the state of the data in the relay log.
We need this to ensure that we are not in the middle of a
......@@ -874,7 +954,14 @@ class rpl_sql_thread_info
};
// Defined in rpl_rli.cc
/**
Reads the relay_log.info file.
@todo This is a wrapper around Relay_log_info::init(). It's only
kept for historical reasons. It would be good if we removed this
function and replaced all calls to it by calls to
Relay_log_info::init(). /SVEN
*/
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
......
This diff is collapsed.
......@@ -17,6 +17,14 @@
#ifndef SLAVE_H
#define SLAVE_H
/**
MASTER_DELAY can be at most (1 << 31) - 1.
*/
#define MASTER_DELAY_MAX (0x7FFFFFFF)
#if INT_MAX < 0x7FFFFFFF
#error "don't support platforms where INT_MAX < 0x7FFFFFFF"
#endif
/**
@defgroup Replication Replication
@{
......@@ -102,12 +110,14 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
In Master_info: run_lock, data_lock
run_lock protects all information about the run state: slave_running, thd
and the existence of the I/O thread to stop/start it, you need this mutex).
and the existence of the I/O thread (to stop/start it, you need this mutex).
data_lock protects some moving members of the struct: counters (log name,
position) and relay log (MYSQL_BIN_LOG object).
In Relay_log_info: run_lock, data_lock
see Master_info
However, note that run_lock does not protect
Relay_log_info.run_state; that is protected by data_lock.
Order of acquisition: if you want to have LOCK_active_mi and a run_lock, you
must acquire LOCK_active_mi first.
......@@ -247,6 +257,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
struct rpl_group_info *rgi);
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
......
......@@ -17,18 +17,96 @@
#include <my_global.h>
#include "sql_priv.h"
#include "sql_binlog.h"
#include "sql_parse.h" // check_global_access
#include "sql_acl.h" // *_ACL
#include "sql_parse.h"
#include "sql_acl.h"
#include "rpl_rli.h"
#include "base64.h"
#include "slave.h" // apply_event_and_update_pos
#include "log_event.h" // Format_description_log_event,
// EVENT_LEN_OFFSET,
// EVENT_TYPE_OFFSET,
// FORMAT_DESCRIPTION_LOG_EVENT,
// START_EVENT_V3,
// Log_event_type,
// Log_event
#include "slave.h"
#include "log_event.h"
/**
Check if the event type is allowed in a BINLOG statement.
@retval 0 if the event type is ok.
@retval 1 if the event type is not ok.
*/
static int check_event_type(int type, Relay_log_info *rli)
{
Format_description_log_event *fd_event=
rli->relay_log.description_event_for_exec;
/*
Convert event type id of certain old versions (see comment in
Format_description_log_event::Format_description_log_event(char*,...)).
*/
if (fd_event && fd_event->event_type_permutation)
{
IF_DBUG({
int new_type= fd_event->event_type_permutation[type];
DBUG_PRINT("info",
("converting event type %d to %d (%s)",
type, new_type,
Log_event::get_type_str((Log_event_type)new_type)));
},
(void)0);
type= fd_event->event_type_permutation[type];
}
switch (type)
{
case START_EVENT_V3:
case FORMAT_DESCRIPTION_EVENT:
/*
We need a preliminary FD event in order to parse the FD event,
if we don't already have one.
*/
if (!fd_event)
if (!(rli->relay_log.description_event_for_exec=
new Format_description_log_event(4)))
{
my_error(ER_OUTOFMEMORY, MYF(0), 1);
return 1;
}
/* It is always allowed to execute FD events. */
return 0;
case TABLE_MAP_EVENT:
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
case PRE_GA_WRITE_ROWS_EVENT:
case PRE_GA_UPDATE_ROWS_EVENT:
case PRE_GA_DELETE_ROWS_EVENT:
/*
Row events are only allowed if a Format_description_event has
already been seen.
*/
if (fd_event)
return 0;
else
{
my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
MYF(0), Log_event::get_type_str((Log_event_type)type));
return 1;
}
break;
default:
/*
It is not meaningful to execute other events than row-events and
FD events. It would even be dangerous to execute Stop_log_event
and Rotate_log_event since they call flush_relay_log_info, which
is not allowed to call by other threads than the slave SQL
thread when the slave SQL thread is running.
*/
my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
MYF(0), Log_event::get_type_str((Log_event_type)type));
return 1;
}
}
/**
Execute a BINLOG statement.
......@@ -73,31 +151,13 @@ void mysql_client_binlog_statement(THD* thd)
Allocation
*/
/*
If we do not have a Format_description_event, we create a dummy
one here. In this case, the first event we read must be a
Format_description_event.
*/
my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
rpl_group_info *rgi;
rli= thd->rli_fake;
if (!rli)
{
rli= thd->rli_fake= new Relay_log_info(FALSE);
#ifdef HAVE_valgrind
rli->is_fake= TRUE;
#endif
have_fd_event= FALSE;
}
if (rli && !rli->relay_log.description_event_for_exec)
{
rli->relay_log.description_event_for_exec=
new Format_description_log_event(4);
have_fd_event= FALSE;
}
if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE)))
rli->sql_driver_thd= thd;
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
rgi->thd= thd;
......@@ -109,16 +169,13 @@ void mysql_client_binlog_statement(THD* thd)
/*
Out of memory check
*/
if (!(rli &&
rli->relay_log.description_event_for_exec &&
buf))
if (!(rli && buf))
{
my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */
goto end;
}
rli->sql_driver_thd= thd;
rli->no_storage= TRUE;
DBUG_ASSERT(rli->belongs_to_client());
for (char const *strptr= thd->lex->comment.str ;
strptr < thd->lex->comment.str + thd->lex->comment.length ; )
......@@ -185,23 +242,8 @@ void mysql_client_binlog_statement(THD* thd)
DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
event_len, bytes_decoded));
/*
If we have not seen any Format_description_event, then we must
see one; it is the only statement that can be read in base64
without a prior Format_description_event.
*/
if (!have_fd_event)
{
int type = (uchar)bufptr[EVENT_TYPE_OFFSET];
if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3)
have_fd_event= TRUE;
else
{
my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
MYF(0), Log_event::get_type_str((Log_event_type)type));
goto end;
}
}
if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
goto end;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
......@@ -212,7 +254,7 @@ void mysql_client_binlog_statement(THD* thd)
{
/*
This could actually be an out-of-memory, but it is more likely
causes by a bad statement
caused by a bad statement
*/
my_error(ER_SYNTAX_ERROR, MYF(0));
goto end;
......
......@@ -4831,4 +4831,3 @@ void binlog_unsafe_map_init()
BINLOG_DIRECT_OFF & TRX_CACHE_NOT_EMPTY);
}
#endif
......@@ -240,11 +240,12 @@ struct LEX_MASTER_INFO
ulong server_id;
uint port, connect_retry;
float heartbeat_period;
int sql_delay;
/*
Enum is used for making it possible to detect if the user
changed variable or if it should be left at old value
*/
enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
enum {LEX_MI_UNCHANGED= 0, LEX_MI_DISABLE, LEX_MI_ENABLE}
ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
repl_do_domain_ids_opt, repl_ignore_domain_ids_opt;
enum {
......@@ -260,6 +261,7 @@ struct LEX_MASTER_INFO
sizeof(ulong), 0, 16, MYF(0));
my_init_dynamic_array(&repl_ignore_domain_ids,
sizeof(ulong), 0, 16, MYF(0));
sql_delay= -1;
}
void reset(bool is_change_master)
{
......@@ -280,6 +282,7 @@ struct LEX_MASTER_INFO
repl_ignore_domain_ids_opt= LEX_MI_UNCHANGED;
gtid_pos_str= null_lex_str;
use_gtid_opt= LEX_GTID_UNCHANGED;
sql_delay= -1;
}
};
......
......@@ -3319,6 +3319,7 @@ int reset_slave(THD *thd, Master_info* mi)
mi->clear_error();
mi->rli.clear_error();
mi->rli.clear_until_condition();
mi->rli.clear_sql_delay();
mi->rli.slave_skip_counter= 0;
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
......@@ -3630,6 +3631,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->sql_delay != -1)
mi->rli.set_sql_delay(lex_mi->sql_delay);
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
......
......@@ -1341,6 +1341,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token LOOP_SYM
%token LOW_PRIORITY
%token MASTER_CONNECT_RETRY_SYM
%token MASTER_DELAY_SYM
%token MASTER_GTID_POS_SYM
%token MASTER_HOST_SYM
%token MASTER_LOG_FILE_SYM
......@@ -2255,6 +2256,16 @@ master_def:
{
Lex->mi.connect_retry = $3;
}
| MASTER_DELAY_SYM '=' ulong_num
{
if ($3 > MASTER_DELAY_MAX)
{
my_error(ER_MASTER_DELAY_VALUE_OUT_OF_RANGE, MYF(0),
$3, MASTER_DELAY_MAX);
}
else
Lex->mi.sql_delay = $3;
}
| MASTER_SSL_SYM '=' ulong_num
{
Lex->mi.ssl= $3 ?
......@@ -7660,6 +7671,7 @@ slave:
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_ALL_START;
lex->type = 0;
/* If you change this code don't forget to update STOP SLAVE too */
}
{}
| STOP_SYM SLAVE optional_connection_name slave_thread_opts
......@@ -14099,6 +14111,7 @@ keyword_sp:
| MASTER_PASSWORD_SYM {}
| MASTER_SERVER_ID_SYM {}
| MASTER_CONNECT_RETRY_SYM {}
| MASTER_DELAY_SYM {}
| MASTER_SSL_SYM {}
| MASTER_SSL_CA_SYM {}
| MASTER_SSL_CAPATH_SYM {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment