Commit e5e8f1a4 authored by He Zhenxing's avatar He Zhenxing

Auto merge 5.1-rep-semisync

parents a8a96593 48e98026
......@@ -29,13 +29,13 @@ set global rpl_semi_sync_master_enabled = 1;
show variables like 'rpl_semi_sync_master_enabled';
Variable_name Value
rpl_semi_sync_master_enabled ON
[ status of semi-sync on master should be OFF without any semi-sync slaves ]
[ status of semi-sync on master should be ON even without any semi-sync slaves ]
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 0
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 0
......@@ -81,7 +81,7 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 0
create table t1(n int) engine = ENGINE_TYPE;
create table t1(a int) engine = ENGINE_TYPE;
[ master state after CREATE TABLE statement ]
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
......@@ -92,6 +92,9 @@ Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 1
select CONNECTIONS_NORMAL_SLAVE - CONNECTIONS_NORMAL_SLAVE as 'Should be 0';
Should be 0
0
[ insert records to table ]
[ master status after inserts ]
show status like 'Rpl_semi_sync_master_status';
......@@ -108,15 +111,19 @@ Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_slave_status';
Variable_name Value
Rpl_semi_sync_slave_status ON
select count(distinct n) from t1;
count(distinct n)
select count(distinct a) from t1;
count(distinct a)
300
select min(n) from t1;
min(n)
select min(a) from t1;
min(a)
1
select max(n) from t1;
max(n)
select max(a) from t1;
max(a)
300
#
# Test semi-sync master will switch OFF after one transacton
# timeout waiting for slave reply.
#
include/stop_slave.inc
[ on master ]
[ master status should be ON ]
......@@ -134,7 +141,16 @@ Variable_name Value
Rpl_semi_sync_master_clients 1
[ semi-sync replication of these transactions will fail ]
insert into t1 values (500);
delete from t1 where n < 500;
[ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 1
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 301
insert into t1 values (100);
[ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status';
......@@ -142,10 +158,13 @@ Variable_name Value
Rpl_semi_sync_master_status OFF
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 3
Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 301
#
# Test semi-sync status on master will be ON again when slave catches up
#
[ on slave ]
[ slave status should be OFF ]
show status like 'Rpl_semi_sync_slave_status';
......@@ -156,33 +175,53 @@ include/start_slave.inc
show status like 'Rpl_semi_sync_slave_status';
Variable_name Value
Rpl_semi_sync_slave_status ON
select count(distinct n) from t1;
count(distinct n)
select count(distinct a) from t1;
count(distinct a)
2
select min(n) from t1;
min(n)
select min(a) from t1;
min(a)
100
select max(n) from t1;
max(n)
select max(a) from t1;
max(a)
500
[ on master ]
[ do something to activate semi-sync ]
drop table t1;
[ master status should be ON again ]
[ master status should be ON again after slave catches up ]
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 3
Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 302
Rpl_semi_sync_master_yes_tx 301
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 1
#
# Test disable/enable master semi-sync on the fly.
#
drop table t1;
[ on slave ]
include/stop_slave.inc
#
# Flush status
#
[ Semi-sync master status variables before FLUSH STATUS ]
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 302
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 302
FLUSH NO_WRITE_TO_BINLOG STATUS;
[ Semi-sync master status variables after FLUSH STATUS ]
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 0
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 0
[ on master ]
show master logs;
Log_name master-bin.000001
......@@ -206,6 +245,9 @@ rpl_semi_sync_master_enabled ON
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status ON
#
# Test RESET MASTER/SLAVE
#
[ on slave ]
include/start_slave.inc
[ on master ]
......@@ -313,7 +355,7 @@ Variable_name Value
Rpl_semi_sync_master_clients 0
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
Rpl_semi_sync_master_status ON
set global rpl_semi_sync_master_enabled= 0;
[ on slave ]
SHOW VARIABLES LIKE 'rpl_semi_sync_slave_enabled';
......@@ -322,17 +364,17 @@ rpl_semi_sync_slave_enabled ON
include/start_slave.inc
[ on master ]
insert into t1 values (8);
[ master semi-sync clients should be 0, status should be OFF ]
[ master semi-sync clients should be 1, status should be OFF ]
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 0
Rpl_semi_sync_master_clients 1
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status OFF
[ on slave ]
show status like 'Rpl_semi_sync_slave_status';
Variable_name Value
Rpl_semi_sync_slave_status OFF
Rpl_semi_sync_slave_status ON
include/stop_slave.inc
[ on master ]
set sql_log_bin=0;
......
......@@ -11,12 +11,17 @@ let $engine_type= InnoDB;
disable_query_log;
connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
connection slave;
call mtr.add_suppression("Master server does not support");
# These will be removed after fix bug#45852
call mtr.add_suppression("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
call mtr.add_suppression("Slave I/O: Fatal error: Failed to run 'after_queue_event' hook, Error_code: 1593");
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
enable_query_log;
connection master;
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master, save the count of connections before start
# semi-sync slave for comparison below.
let $_connections_normal_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
--echo #
--echo # Uninstall semi-sync plugins on master and slave
......@@ -69,7 +74,7 @@ echo [ enable semi-sync on master ];
set global rpl_semi_sync_master_enabled = 1;
show variables like 'rpl_semi_sync_master_enabled';
echo [ status of semi-sync on master should be OFF without any semi-sync slaves ];
echo [ status of semi-sync on master should be ON even without any semi-sync slaves ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_yes_tx';
......@@ -150,13 +155,20 @@ show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
replace_result $engine_type ENGINE_TYPE;
eval create table t1(n int) engine = $engine_type;
eval create table t1(a int) engine = $engine_type;
echo [ master state after CREATE TABLE statement ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
# After fix of BUG#45848, semi-sync slave should not create any extra
# connections on master.
let $_connections_semisync_slave= query_get_value(SHOW STATUS LIKE 'Threads_connected', Value, 1);
replace_result $_connections_semisync_slave CONNECTIONS_SEMISYNC_SLAVE;
replace_result $_connections_normal_slave CONNECTIONS_NORMAL_SLAVE;
eval select $_connections_semisync_slave - $_connections_normal_slave as 'Should be 0';
let $i=300;
echo [ insert records to table ];
disable_query_log;
......@@ -178,10 +190,15 @@ echo [ on slave ];
echo [ slave status after replicated inserts ];
show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1;
select min(n) from t1;
select max(n) from t1;
select count(distinct a) from t1;
select min(a) from t1;
select max(a) from t1;
--echo #
--echo # Test semi-sync master will switch OFF after one transacton
--echo # timeout waiting for slave reply.
--echo #
connection slave;
source include/stop_slave.inc;
connection master;
......@@ -197,8 +214,11 @@ show status like 'Rpl_semi_sync_master_clients';
echo [ semi-sync replication of these transactions will fail ];
insert into t1 values (500);
delete from t1 where n < 500;
insert into t1 values (100);
# Wait for the semi-sync replication of this transaction to timeout
let $status_var= Rpl_semi_sync_master_status;
let $status_var_value= OFF;
source include/wait_for_status_var.inc;
# The second semi-sync check should be off because one transaction
# times out during waiting.
......@@ -207,6 +227,28 @@ show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
# Semi-sync status on master is now OFF, so all these transactions
# will be replicated asynchronously.
let $i=300;
disable_query_log;
while ($i)
{
eval delete from t1 where a=$i;
dec $i;
}
enable_query_log;
insert into t1 values (100);
echo [ master status should be OFF ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
--echo #
--echo # Test semi-sync status on master will be ON again when slave catches up
--echo #
# Save the master position for later use.
save_master_pos;
......@@ -221,28 +263,44 @@ sync_with_master;
echo [ slave status should be ON ];
show status like 'Rpl_semi_sync_slave_status';
select count(distinct n) from t1;
select min(n) from t1;
select max(n) from t1;
select count(distinct a) from t1;
select min(a) from t1;
select max(a) from t1;
connection master;
echo [ on master ];
echo [ do something to activate semi-sync ];
drop table t1;
# The third semi-sync check should be on again.
echo [ master status should be ON again ];
# The master semi-sync status should be on again after slave catches up.
echo [ master status should be ON again after slave catches up ];
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_no_tx';
show status like 'Rpl_semi_sync_master_yes_tx';
show status like 'Rpl_semi_sync_master_clients';
--echo #
--echo # Test disable/enable master semi-sync on the fly.
--echo #
drop table t1;
sync_slave_with_master;
echo [ on slave ];
source include/stop_slave.inc;
--echo #
--echo # Flush status
--echo #
connection master;
echo [ Semi-sync master status variables before FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
# Do not write the FLUSH STATUS to binlog, to make sure we'll get a
# clean status after this.
FLUSH NO_WRITE_TO_BINLOG STATUS;
echo [ Semi-sync master status variables after FLUSH STATUS ];
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
connection master;
echo [ on master ];
......@@ -259,6 +317,10 @@ set global rpl_semi_sync_master_enabled=1;
show variables like 'rpl_semi_sync_master_enabled';
show status like 'Rpl_semi_sync_master_status';
--echo #
--echo # Test RESET MASTER/SLAVE
--echo #
connection slave;
echo [ on slave ];
......@@ -435,7 +497,10 @@ source include/start_slave.inc;
connection master;
echo [ on master ];
insert into t1 values (8);
echo [ master semi-sync clients should be 0, status should be OFF ];
let $status_var= Rpl_semi_sync_master_clients;
let $status_var_value= 1;
source include/wait_for_status_var.inc;
echo [ master semi-sync clients should be 1, status should be OFF ];
show status like 'Rpl_semi_sync_master_clients';
show status like 'Rpl_semi_sync_master_status';
sync_slave_with_master;
......@@ -512,6 +577,8 @@ source include/start_slave.inc;
connection master;
drop table t1;
sync_slave_with_master;
connection master;
drop user rpl@127.0.0.1;
flush privileges;
sync_slave_with_master;
......@@ -18,6 +18,7 @@
pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
-I$(top_srcdir)/regex \
-I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
......
......@@ -18,25 +18,9 @@
#ifndef SEMISYNC_H
#define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER
#define HAVE_REPLICATION
#include <mysql_priv.h>
#include <my_global.h>
#include <my_pthread.h>
#include <mysql/plugin.h>
......@@ -92,4 +76,16 @@ public:
static const unsigned char kPacketFlagSync;
};
/* The layout of a semisync slave reply packet:
1 byte for the magic num
8 bytes for the binlog positon
n bytes for the binlog filename, terminated with a '\0'
*/
#define REPLY_MAGIC_NUM_LEN 1
#define REPLY_BINLOG_POS_LEN 8
#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
#define REPLY_MAGIC_NUM_OFFSET 0
#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
#endif /* SEMISYNC_H */
......@@ -30,16 +30,17 @@ unsigned long rpl_semi_sync_master_yes_transactions = 0;
unsigned long rpl_semi_sync_master_no_transactions = 0;
unsigned long rpl_semi_sync_master_off_times = 0;
unsigned long rpl_semi_sync_master_timefunc_fails = 0;
unsigned long rpl_semi_sync_master_num_timeouts = 0;
unsigned long rpl_semi_sync_master_wait_timeouts = 0;
unsigned long rpl_semi_sync_master_wait_sessions = 0;
unsigned long rpl_semi_sync_master_back_wait_pos = 0;
unsigned long rpl_semi_sync_master_trx_wait_time = 0;
unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
unsigned long rpl_semi_sync_master_net_wait_time = 0;
unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_net_wait_num = 0;
unsigned long rpl_semi_sync_master_clients = 0;
unsigned long long rpl_semi_sync_master_net_wait_total_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_total_time = 0;
unsigned long long rpl_semi_sync_master_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
char rpl_semi_sync_master_wait_no_slave = 1;
static int getWaitTime(const struct timeval& start_tv);
......@@ -379,16 +380,6 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
master_enabled_(false),
wait_timeout_(0L),
state_(0),
enabled_transactions_(0),
disabled_transactions_(0),
switched_off_times_(0),
timefunc_fails_(0),
wait_sessions_(0),
wait_backtraverse_(0),
total_trx_wait_num_(0),
total_trx_wait_time_(0),
total_net_wait_num_(0),
total_net_wait_time_(0),
max_transactions_(0L)
{
strcpy(reply_file_name_, "");
......@@ -535,6 +526,14 @@ void ReplSemiSyncMaster::remove_slave()
{
lock();
rpl_semi_sync_master_clients--;
/* If user has chosen not to wait if no semi-sync slave available
and the last semi-sync slave exits, turn off semi-sync on master
immediately.
*/
if (!rpl_semi_sync_master_wait_no_slave &&
rpl_semi_sync_master_clients == 0)
switch_off();
unlock();
}
......@@ -546,19 +545,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
return val;
}
int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
{
char log_name[FN_REFLEN];
char *endptr;
my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
if (!log_pos || !endptr || *endptr != ':' )
return 1;
endptr++; // skip the ':' seperator
strncpy(log_name, endptr, FN_REFLEN);
uint32 server_id= 0;
return reportReplyBinlog(server_id, log_name, log_pos);
}
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name,
my_off_t log_file_pos)
......@@ -624,7 +610,7 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
log_file_name, (unsigned long)log_file_pos);
}
if (wait_sessions_ > 0)
if (rpl_semi_sync_master_wait_sessions > 0)
{
/* Let us check if some of the waiting threads doing a trx
* commit can now proceed.
......@@ -679,7 +665,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"Waiting for semi-sync ACK from slave");
/* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients)
if (!getMasterEnabled() || !is_on())
goto l_end;
if (trace_level_ & kTraceDetail)
......@@ -691,17 +677,20 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
while (is_on())
{
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos);
if (cmp >= 0)
if (reply_file_name_inited_)
{
/* We have already sent the relevant binlog to the slave: no need to
* wait here.
*/
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
kWho, reply_file_name_, (unsigned long)reply_file_pos_);
break;
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos);
if (cmp >= 0)
{
/* We have already sent the relevant binlog to the slave: no need to
* wait here.
*/
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
kWho, reply_file_name_, (unsigned long)reply_file_pos_);
break;
}
}
/* Let us update the info about the minimum binlog position of waiting
......@@ -709,15 +698,15 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
*/
if (wait_file_name_inited_)
{
cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_);
int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_);
if (cmp <= 0)
{
/* This thd has a lower position, let's update the minimum info. */
strcpy(wait_file_name_, trx_wait_binlog_name);
wait_file_pos_ = trx_wait_binlog_pos;
wait_backtraverse_++;
rpl_semi_sync_master_wait_pos_backtraverse++;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: move back wait position (%s, %lu),",
kWho, wait_file_name_, (unsigned long)wait_file_pos_);
......@@ -762,7 +751,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
* when replication has progressed far enough, we will release
* these waiting threads.
*/
wait_sessions_++;
rpl_semi_sync_master_wait_sessions++;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
......@@ -770,7 +759,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
wait_file_name_, (unsigned long)wait_file_pos_);
wait_result = cond_timewait(&abstime);
wait_sessions_--;
rpl_semi_sync_master_wait_sessions--;
if (wait_result != 0)
{
......@@ -779,7 +768,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"semi-sync up to file %s, position %lu.",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
reply_file_name_, (unsigned long)reply_file_pos_);
total_wait_timeouts_++;
rpl_semi_sync_master_wait_timeouts++;
/* switch semi-sync off */
switch_off();
......@@ -798,12 +787,12 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
}
timefunc_fails_++;
rpl_semi_sync_master_timefunc_fails++;
}
else
{
total_trx_wait_num_++;
total_trx_wait_time_ += wait_time;
rpl_semi_sync_master_trx_wait_num++;
rpl_semi_sync_master_trx_wait_time += wait_time;
}
}
}
......@@ -816,7 +805,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"wait position (%s, %lu)",
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
}
timefunc_fails_++;
rpl_semi_sync_master_timefunc_fails++;
/* switch semi-sync off */
switch_off();
......@@ -824,11 +813,18 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
}
l_end:
/*
At this point, the binlog file and position of this transaction
must have been removed from ActiveTranx.
*/
assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos));
/* Update the status counter. */
if (is_on() && rpl_semi_sync_master_clients)
enabled_transactions_++;
if (is_on())
rpl_semi_sync_master_yes_transactions++;
else
disabled_transactions_++;
rpl_semi_sync_master_no_transactions++;
/* The lock held will be released by thd_exit_cond, so no need to
call unlock() here */
......@@ -868,7 +864,7 @@ int ReplSemiSyncMaster::switch_off()
assert(active_tranxs_ != NULL);
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
switched_off_times_++;
rpl_semi_sync_master_off_times++;
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
sql_print_information("Semi-sync replication switched OFF.");
......@@ -1045,7 +1041,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
* reserve the packet header.
*/
if (sync)
{
(packet)[2] = kPacketFlagSync;
}
return function_exit(kWho, 0);
}
......@@ -1089,7 +1087,7 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
commit_file_name_inited_ = true;
}
if (is_on() && rpl_semi_sync_master_clients)
if (is_on())
{
assert(active_tranxs_ != NULL);
if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
......@@ -1098,8 +1096,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
if insert tranx_node failed, print a warning message
and turn off semi-sync
*/
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul",
log_file_name, log_file_pos);
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
log_file_name, (ulong)log_file_pos);
switch_off();
}
}
......@@ -1110,6 +1108,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
const char *event_buf)
{
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
const unsigned char *packet;
char log_file_name[FN_REFLEN];
my_off_t log_file_pos;
ulong packet_len;
int result = -1;
struct timeval start_tv;
int start_time_err= 0;
ulong trc_level = trace_level_;
function_enter(kWho);
assert((unsigned char)event_buf[1] == kPacketMagicNum);
if ((unsigned char)event_buf[2] != kPacketFlagSync)
{
/* current event does not require reply */
result = 0;
goto l_end;
}
if (trc_level & kTraceNetWait)
start_time_err = gettimeofday(&start_tv, 0);
/* We flush to make sure that the current event is sent to the network,
* instead of being buffered in the TCP/IP stack.
*/
if (net_flush(net))
{
sql_print_error("Semi-sync master failed on net_flush() "
"before waiting for slave reply");
goto l_end;
}
net_clear(net, 0);
if (trc_level & kTraceDetail)
sql_print_information("%s: Wait for replica's reply", kWho);
/* Wait for the network here. Though binlog dump thread can indefinitely wait
* here, transactions would not wait indefintely.
* Transactions wait on binlog replies detected by binlog dump threads. If
* binlog dump threads wait too long, transactions will timeout and continue.
*/
packet_len = my_net_read(net);
if (trc_level & kTraceNetWait)
{
if (start_time_err != 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get start time");
rpl_semi_sync_master_timefunc_fails++;
}
else
{
int wait_time;
wait_time = getWaitTime(start_tv);
if (wait_time < 0)
{
sql_print_error("Semi-sync master wait for reply "
"gettimeofday fail to get wait time.");
rpl_semi_sync_master_timefunc_fails++;
}
else
{
rpl_semi_sync_master_net_wait_num++;
rpl_semi_sync_master_net_wait_time += wait_time;
}
}
}
if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
{
if (packet_len == packet_error)
sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
net->last_error, net->last_errno);
else
sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
net->last_error, net->last_errno);
goto l_end;
}
packet = net->read_pos;
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
{
sql_print_error("Read semi-sync reply magic number error");
goto l_end;
}
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
strcpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET);
if (trc_level & kTraceDetail)
sql_print_information("%s: Got reply (%s, %lu)",
kWho, log_file_name, (ulong)log_file_pos);
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
l_end:
return function_exit(kWho, result);
}
int ReplSemiSyncMaster::resetMaster()
{
const char *kWho = "ReplSemiSyncMaster::resetMaster";
......@@ -1126,16 +1231,16 @@ int ReplSemiSyncMaster::resetMaster()
reply_file_name_inited_ = false;
commit_file_name_inited_ = false;
enabled_transactions_ = 0;
disabled_transactions_ = 0;
switched_off_times_ = 0;
timefunc_fails_ = 0;
wait_sessions_ = 0;
wait_backtraverse_ = 0;
total_trx_wait_num_ = 0;
total_trx_wait_time_ = 0;
total_net_wait_num_ = 0;
total_net_wait_time_ = 0;
rpl_semi_sync_master_yes_transactions = 0;
rpl_semi_sync_master_no_transactions = 0;
rpl_semi_sync_master_off_times = 0;
rpl_semi_sync_master_timefunc_fails = 0;
rpl_semi_sync_master_wait_sessions = 0;
rpl_semi_sync_master_wait_pos_backtraverse = 0;
rpl_semi_sync_master_trx_wait_num = 0;
rpl_semi_sync_master_trx_wait_time = 0;
rpl_semi_sync_master_net_wait_num = 0;
rpl_semi_sync_master_net_wait_time = 0;
unlock();
......@@ -1146,27 +1251,15 @@ void ReplSemiSyncMaster::setExportStats()
{
lock();
rpl_semi_sync_master_status = state_ && rpl_semi_sync_master_clients;
rpl_semi_sync_master_yes_transactions = enabled_transactions_;
rpl_semi_sync_master_no_transactions = disabled_transactions_;
rpl_semi_sync_master_off_times = switched_off_times_;
rpl_semi_sync_master_timefunc_fails = timefunc_fails_;
rpl_semi_sync_master_num_timeouts = total_wait_timeouts_;
rpl_semi_sync_master_wait_sessions = wait_sessions_;
rpl_semi_sync_master_back_wait_pos = wait_backtraverse_;
rpl_semi_sync_master_trx_wait_num = total_trx_wait_num_;
rpl_semi_sync_master_trx_wait_time =
((total_trx_wait_num_) ?
(unsigned long)((double)total_trx_wait_time_ /
((double)total_trx_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_num = total_net_wait_num_;
rpl_semi_sync_master_net_wait_time =
((total_net_wait_num_) ?
(unsigned long)((double)total_net_wait_time_ /
((double)total_net_wait_num_)) : 0);
rpl_semi_sync_master_net_wait_total_time = total_net_wait_time_;
rpl_semi_sync_master_trx_wait_total_time = total_trx_wait_time_;
rpl_semi_sync_master_status = state_;
rpl_semi_sync_master_avg_trx_wait_time=
((rpl_semi_sync_master_trx_wait_num) ?
(unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
((double)rpl_semi_sync_master_trx_wait_num)) : 0);
rpl_semi_sync_master_avg_net_wait_time=
((rpl_semi_sync_master_net_wait_num) ?
(unsigned long)((double)rpl_semi_sync_master_net_wait_time /
((double)rpl_semi_sync_master_net_wait_num)) : 0);
unlock();
}
......
......@@ -81,7 +81,7 @@ public:
/* Insert an active transaction node with the specified position.
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
......@@ -91,7 +91,7 @@ public:
* list and the hash table will be reset to empty.
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos);
......@@ -175,19 +175,7 @@ class ReplSemiSyncMaster
volatile bool master_enabled_; /* semi-sync is enabled on the master */
unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */
/* All status variables. */
bool state_; /* whether semi-sync is switched */
unsigned long enabled_transactions_; /* semi-sync'ed tansactions */
unsigned long disabled_transactions_; /* non-semi-sync'ed tansactions */
unsigned long switched_off_times_; /* how many times are switched off? */
unsigned long timefunc_fails_; /* how many time function fails? */
unsigned long total_wait_timeouts_; /* total number of wait timeouts */
unsigned long wait_sessions_; /* how many sessions wait for replies? */
unsigned long wait_backtraverse_; /* wait position back traverses */
unsigned long long total_trx_wait_num_; /* total trx waits: non-timeout ones */
unsigned long long total_trx_wait_time_; /* total trx wait time: in us */
unsigned long long total_net_wait_num_; /* total network waits */
unsigned long long total_net_wait_time_; /* total network wait time */
/* The number of maximum active transactions. This should be the same as
* maximum connections because MySQL does not do connection sharing now.
......@@ -253,8 +241,6 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave();
int reportReplyBinlog(const char *log_file_pos);
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
*
......@@ -265,7 +251,7 @@ class ReplSemiSyncMaster
* the replies from the slave
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int reportReplyBinlog(uint32 server_id,
const char* log_file_name,
......@@ -284,7 +270,7 @@ class ReplSemiSyncMaster
* trx_wait_binlog_pos - (IN) ending position's file offset
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos);
......@@ -313,7 +299,7 @@ class ReplSemiSyncMaster
* server_id - (IN) master server id number
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int updateSyncHeader(unsigned char *packet,
const char *log_file_name,
......@@ -330,10 +316,23 @@ class ReplSemiSyncMaster
* log_file_pos - (IN) transaction ending position's file offset
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
/* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events.
*
* Input:
* net - (IN) the connection to master
* server_id - (IN) master server id number
* event_buf - (IN) pointer to the event packet
*
* Return:
* 0: success; non-zero: error
*/
int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
/* Export internal statistics for semi-sync replication. */
void setExportStats();
......@@ -345,22 +344,31 @@ class ReplSemiSyncMaster
/* System and status variables for the master component */
extern char rpl_semi_sync_master_enabled;
extern char rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_clients;
extern unsigned long rpl_semi_sync_master_timeout;
extern unsigned long rpl_semi_sync_master_trace_level;
extern char rpl_semi_sync_master_status;
extern unsigned long rpl_semi_sync_master_yes_transactions;
extern unsigned long rpl_semi_sync_master_no_transactions;
extern unsigned long rpl_semi_sync_master_off_times;
extern unsigned long rpl_semi_sync_master_wait_timeouts;
extern unsigned long rpl_semi_sync_master_timefunc_fails;
extern unsigned long rpl_semi_sync_master_num_timeouts;
extern unsigned long rpl_semi_sync_master_wait_sessions;
extern unsigned long rpl_semi_sync_master_back_wait_pos;
extern unsigned long rpl_semi_sync_master_trx_wait_time;
extern unsigned long rpl_semi_sync_master_net_wait_time;
extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse;
extern unsigned long rpl_semi_sync_master_avg_trx_wait_time;
extern unsigned long rpl_semi_sync_master_avg_net_wait_time;
extern unsigned long long rpl_semi_sync_master_net_wait_num;
extern unsigned long long rpl_semi_sync_master_trx_wait_num;
extern unsigned long long rpl_semi_sync_master_net_wait_total_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_total_time;
extern unsigned long rpl_semi_sync_master_clients;
extern unsigned long long rpl_semi_sync_master_net_wait_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_time;
/*
This indicates whether we should keep waiting if no semi-sync slave
is available.
0 : stop waiting if detected no avaialable semi-sync slave.
1 (default) : keep waiting until timeout even no available semi-sync slave.
*/
extern char rpl_semi_sync_master_wait_no_slave;
#endif /* SEMISYNC_MASTER_H */
......@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if (semi_sync_slave)
{
/* One more semi-sync slave */
repl_semisync.add_slave();
/*
Let's assume this semi-sync slave has already received all
binlog events before the filename and position it requests.
*/
repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
}
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id, log_file, (unsigned long)log_pos);
......@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param,
int repl_semi_after_send_event(Binlog_transmit_param *param,
const char *event_buf, unsigned long len)
{
if (repl_semisync.is_semi_sync_slave())
{
THD *thd= current_thd;
/*
Possible errors in reading slave reply are ignored deliberately
because we do not want dump thread to quit on this. Error
messages are already reported.
*/
(void) repl_semisync.readSlaveReply(&thd->net,
param->server_id, event_buf);
thd->clear_error();
}
return 0;
}
......@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
void *ptr,
const void *val);
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val);
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication master (disabled by default). ",
......@@ -161,6 +176,13 @@ static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
fix_rpl_semi_sync_master_timeout, // update
10000, 0, ~0L, 1);
static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
PLUGIN_VAR_OPCMDARG,
"Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
NULL, // check
NULL, // update
1);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.",
......@@ -168,22 +190,11 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0L, 1);
/*
Use a SESSION instead of GLOBAL variable for slave to send reply to
avoid requiring SUPER privilege.
*/
static MYSQL_THDVAR_STR(reply_log_file_pos,
PLUGIN_VAR_NOCMDOPT,
"The log filename and position slave has queued to relay log.",
NULL, // check
&fix_rpl_semi_sync_master_reply_log_file_pos,
"");
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(wait_no_slave),
MYSQL_SYSVAR(trace_level),
MYSQL_SYSVAR(reply_log_file_pos),
NULL,
};
......@@ -228,19 +239,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
return;
}
static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
const char *log_file_pos= *(char **)val;
if (repl_semisync.reportReplyBinlog(log_file_pos))
sql_print_error("report slave binlog reply failed.");
return;
}
Trans_observer trans_observer = {
sizeof(Trans_observer), // len
......@@ -278,45 +276,60 @@ Binlog_transmit_observer transmit_observer = {
return 0; \
}
DEF_SHOW_FUNC(clients, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(off_times, SHOW_LONG)
DEF_SHOW_FUNC(no_transactions, SHOW_LONG)
DEF_SHOW_FUNC(status, SHOW_BOOL)
DEF_SHOW_FUNC(timefunc_fails, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_total_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(clients, SHOW_LONG)
DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(back_wait_pos, SHOW_LONG)
DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
DEF_SHOW_FUNC(yes_transactions, SHOW_LONG)
DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
/* plugin status variables */
static SHOW_VAR semi_sync_master_status_vars[]= {
{"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC},
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_net_wait_time",
(char*) &SHOW_FNAME(net_wait_total_time), SHOW_FUNC},
{"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
{"Rpl_semi_sync_master_no_times", (char*) &SHOW_FNAME(off_times), SHOW_FUNC},
{"Rpl_semi_sync_master_no_tx", (char*) &SHOW_FNAME(no_transactions), SHOW_FUNC},
{"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC},
{"Rpl_semi_sync_master_status",
(char*) &SHOW_FNAME(status),
SHOW_FUNC},
{"Rpl_semi_sync_master_clients",
(char*) &SHOW_FNAME(clients),
SHOW_FUNC},
{"Rpl_semi_sync_master_yes_tx",
(char*) &rpl_semi_sync_master_yes_transactions,
SHOW_LONG},
{"Rpl_semi_sync_master_no_tx",
(char*) &rpl_semi_sync_master_no_transactions,
SHOW_LONG},
{"Rpl_semi_sync_master_wait_sessions",
(char*) &rpl_semi_sync_master_wait_sessions,
SHOW_LONG},
{"Rpl_semi_sync_master_no_times",
(char*) &rpl_semi_sync_master_off_times,
SHOW_LONG},
{"Rpl_semi_sync_master_timefunc_failures",
(char*) &SHOW_FNAME(timefunc_fails), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_avg_wait_time",
(char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_wait_time",
(char*) &SHOW_FNAME(trx_wait_total_time), SHOW_FUNC},
{"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC},
(char*) &rpl_semi_sync_master_timefunc_fails,
SHOW_LONG},
{"Rpl_semi_sync_master_wait_pos_backtraverse",
(char*) &SHOW_FNAME(back_wait_pos), SHOW_FUNC},
{"Rpl_semi_sync_master_wait_sessions",
(char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC},
{"Rpl_semi_sync_master_yes_tx", (char*) &SHOW_FNAME(yes_transactions), SHOW_FUNC},
(char*) &rpl_semi_sync_master_wait_pos_backtraverse,
SHOW_LONG},
{"Rpl_semi_sync_master_tx_wait_time",
(char*) &SHOW_FNAME(trx_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_tx_waits",
(char*) &SHOW_FNAME(trx_wait_num),
SHOW_FUNC},
{"Rpl_semi_sync_master_tx_avg_wait_time",
(char*) &SHOW_FNAME(avg_trx_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_wait_time",
(char*) &SHOW_FNAME(net_wait_time),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_waits",
(char*) &SHOW_FNAME(net_wait_num),
SHOW_FUNC},
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(avg_net_wait_time),
SHOW_FUNC},
{NULL, NULL, SHOW_LONG},
};
......
......@@ -39,16 +39,6 @@ int ReplSemiSyncSlave::initObject()
return result;
}
int ReplSemiSyncSlave::slaveReplyConnect()
{
if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
{
sql_print_error("Semisync slave connect to master for reply failed");
return 1;
}
return 0;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
bool *need_reply,
......@@ -104,19 +94,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
return 0;
}
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
const char *binlog_filename,
my_off_t binlog_filepos)
{
char query[FN_REFLEN + 100];
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
(unsigned long long)log_pos, log_name);
if (mysql_real_query(mysql_reply, query, strlen(query)))
const char *kWho = "ReplSemiSyncSlave::slaveReply";
NET *net= &mysql->net;
uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+ REPLY_BINLOG_POS_LEN
+ REPLY_BINLOG_NAME_LEN];
int reply_res, name_len = strlen(binlog_filename);
function_enter(kWho);
/* Prepare the buffer of the reply. */
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
binlog_filename,
name_len + 1 /* including trailing '\0' */);
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply (%s, %lu)", kWho,
binlog_filename, (ulong)binlog_filepos);
net_clear(net, 0);
/* Send the reply. */
reply_res = my_net_write(net, reply_buffer,
name_len + REPLY_BINLOG_NAME_OFFSET);
if (!reply_res)
{
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
mysql_free_result(mysql_store_result(mysql_reply));
mysql_close(mysql_reply);
mysql_reply= 0;
return 1;
reply_res = net_flush(net);
if (reply_res)
sql_print_error("Semi-sync slave net_flush() reply failed");
}
mysql_free_result(mysql_store_result(mysql_reply));
return 0;
else
{
sql_print_error("Semi-sync slave send reply failed: %s (%d)",
net->last_error, net->last_errno);
}
return function_exit(kWho, reply_res);
}
......@@ -57,7 +57,7 @@ public:
* payload_len - (IN) payload length
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len);
......@@ -67,19 +67,16 @@ public:
* binlog position.
*
* Input:
* log_name - (IN) the reply point's binlog file name
* log_pos - (IN) the reply point's binlog file offset
* mysql - (IN) the mysql network connection
* binlog_filename - (IN) the reply point's binlog file name
* binlog_filepos - (IN) the reply point's binlog file offset
*
* Return:
* 0: success; -1 or otherwise: error
* 0: success; non-zero: error
*/
int slaveReply(const char *log_name, my_off_t log_pos);
int slaveReply(MYSQL *mysql, const char *binlog_filename,
my_off_t binlog_filepos);
/*
Connect to master for sending reply
*/
int slaveReplyConnect();
int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param);
......
......@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
if (!repl_semisync.getSlaveEnabled())
return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
......@@ -63,10 +56,11 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
}
row= mysql_fetch_row(res);
if (!row || strcmp(row[1], "ON"))
if (!row)
{
/* Master does not support or not configured semi-sync */
sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous");
/* Master does not support semi-sync */
sql_print_warning("Master server does not support semi-sync, "
"fallback to asynchronous replication");
rpl_semi_sync_slave_status= 0;
return 0;
}
......@@ -106,8 +100,16 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
uint32 flags)
{
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name,
{
/*
We deliberately ignore the error in slaveReply, such error
should not cause the slave IO thread to stop, and the error
messages are already reported.
*/
(void) repl_semisync.slaveReply(param->mysql,
param->master_log_name,
param->master_log_pos);
}
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment