Commit dca487a7 authored by He Zhenxing's avatar He Zhenxing

Backporting WL#4398 WL#1720

Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
parent 8e481c7c
[MYSQL] [MYSQL]
post_commit_to = "commits@lists.mysql.com" post_commit_to = "commits@lists.mysql.com"
post_push_to = "commits@lists.mysql.com" post_push_to = "commits@lists.mysql.com"
tree_name = "mysql-5.1" tree_name = "mysql-5.1-rep-semisync"
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#ifndef _my_plugin_h #ifndef _my_plugin_h
#define _my_plugin_h #define _my_plugin_h
/* size_t */
#include <stdlib.h>
typedef struct st_mysql MYSQL;
/* /*
On Windows, exports from DLL need to be declared On Windows, exports from DLL need to be declared
...@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID; ...@@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID;
#define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */ #define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */
#define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */ #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
#define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */ #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 5 /* The number of plugin types */ #define MYSQL_REPLICATION_PLUGIN 5 /* The replication plugin type */
#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
/* We use the following strings to define licenses for plugins */ /* We use the following strings to define licenses for plugins */
#define PLUGIN_LICENSE_PROPRIETARY 0 #define PLUGIN_LICENSE_PROPRIETARY 0
...@@ -650,6 +656,17 @@ struct st_mysql_information_schema ...@@ -650,6 +656,17 @@ struct st_mysql_information_schema
int interface_version; int interface_version;
}; };
/*
API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
*/
#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
/**
Replication plugin descriptor
*/
struct Mysql_replication {
int interface_version;
};
/* /*
st_mysql_value struct for reading values from mysqld. st_mysql_value struct for reading values from mysqld.
...@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd, ...@@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd,
const char *key, unsigned int key_length, const char *key, unsigned int key_length,
int using_trx); int using_trx);
/**
Get the value of user variable as an integer.
This function will return the value of variable @a name as an
integer. If the original value of the variable is not an integer,
the value will be converted into an integer.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_int(const char *name,
long long int *value, int *null_value);
/**
Get the value of user variable as a double precision float number.
This function will return the value of variable @a name as real
number. If the original value of the variable is not a real number,
the value will be converted into a real number.
@param name user variable name
@param value pointer to return the value
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_real(const char *name,
double *value, int *null_value);
/**
Get the value of user variable as a string.
This function will return the value of variable @a name as
string. If the original value of the variable is not a string,
the value will be converted into a string.
@param name user variable name
@param value pointer to the value buffer
@param len length of the value buffer
@param precision precision of the value if it is a float number
@param null_value if not NULL, the function will set it to true if
the value of variable is null, set to false if not
@retval 0 Success
@retval 1 Variable not found
*/
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
#include <stdlib.h>
typedef struct st_mysql MYSQL;
struct st_mysql_lex_string struct st_mysql_lex_string
{ {
char *str; char *str;
...@@ -105,6 +107,9 @@ struct st_mysql_information_schema ...@@ -105,6 +107,9 @@ struct st_mysql_information_schema
{ {
int interface_version; int interface_version;
}; };
struct Mysql_replication {
int interface_version;
};
struct st_mysql_value struct st_mysql_value
{ {
int (*value_type)(struct st_mysql_value *); int (*value_type)(struct st_mysql_value *);
...@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid); ...@@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(void* thd, void mysql_query_cache_invalidate4(void* thd,
const char *key, unsigned int key_length, const char *key, unsigned int key_length,
int using_trx); int using_trx);
int get_user_var_int(const char *name,
long long int *value, int *null_value);
int get_user_var_real(const char *name,
double *value, int *null_value);
int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
...@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ...@@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc ../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc
../sql/partition_info.cc ../sql/sql_connect.cc ../sql/partition_info.cc ../sql/sql_connect.cc
../sql/scheduler.cc ../sql/event_parse_data.cc ../sql/scheduler.cc ../sql/event_parse_data.cc
../sql/rpl_handler.cc
${GEN_SOURCES} ${GEN_SOURCES}
${LIB_SOURCES}) ${LIB_SOURCES})
......
...@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \ ...@@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \ rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
sql_tablespace.cc \ sql_tablespace.cc \
rpl_injector.cc my_user.c partition_info.cc \ rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc event_parse_data.cc sql_servers.cc event_parse_data.cc \
rpl_handler.cc
libmysqld_int_a_SOURCES= $(libmysqld_sources) libmysqld_int_a_SOURCES= $(libmysqld_sources)
nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources) nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
......
...@@ -1815,6 +1815,26 @@ sub environment_setup { ...@@ -1815,6 +1815,26 @@ sub environment_setup {
$ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";"; $ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";";
} }
# --------------------------------------------------------------------------
# Add the path where mysqld will find semisync plugins
# --------------------------------------------------------------------------
my $lib_semisync_master_plugin=
mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_master.so");
my $lib_semisync_slave_plugin=
mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_slave.so");
if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin)
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin);
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= basename($lib_semisync_slave_plugin);
$ENV{'SEMISYNC_PLUGIN_OPT'}= "--plugin-dir=".dirname($lib_semisync_master_plugin);
}
else
{
$ENV{'SEMISYNC_MASTER_PLUGIN'}= "";
$ENV{'SEMISYNC_SLAVE_PLUGIN'}= "";
$ENV{'SEMISYNC_PLUGIN_OPT'}="";
}
# ---------------------------------------------------- # ----------------------------------------------------
# Add the path where mysqld will find mypluglib.so # Add the path where mysqld will find mypluglib.so
# ---------------------------------------------------- # ----------------------------------------------------
......
...@@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa ...@@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa
grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab'; grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab';
connection slave; connection slave;
start slave; start slave;
source include/wait_for_slave_to_start.inc;
connection master; connection master;
--disable_warnings --disable_warnings
drop table if exists t1; drop table if exists t1;
......
# Copyright (C) 2006 MySQL AB
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
## Makefile.am for semi-synchronous replication
pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
-I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
pkgplugin_LTLIBRARIES = libsemisync_master.la libsemisync_slave.la
libsemisync_master_la_LDFLAGS = -module
libsemisync_master_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_master_la_SOURCES = semisync.cc semisync_master.cc semisync_master_plugin.cc
libsemisync_slave_la_LDFLAGS = -module
libsemisync_slave_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
libsemisync_slave_la_SOURCES = semisync.cc semisync_slave.cc semisync_slave_plugin.cc
# configure.in for semi-synchronous replication
AC_INIT(mysql-semi-sync-plugin, 0.2)
AM_INIT_AUTOMAKE
AC_DISABLE_STATIC
AC_PROG_LIBTOOL
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
MYSQL_PLUGIN(semisync,[Semi-synchronous Replication Plugin],
[Semi-synchronous replication plugin.])
MYSQL_PLUGIN_DYNAMIC(semisync, [libsemisync_master.la libsemisync_slave.la])
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync.h"
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
const unsigned long Trace::kTraceGeneral = 0x0001;
const unsigned long Trace::kTraceDetail = 0x0010;
const unsigned long Trace::kTraceNetWait = 0x0020;
const unsigned long Trace::kTraceFunction = 0x0040;
const char ReplSemiSyncBase::kSyncHeader[2] =
{ReplSemiSyncBase::kPacketMagicNum, 0};
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_H
#define SEMISYNC_H
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <pthread.h>
#include <mysql.h>
typedef uint32_t uint32;
typedef unsigned long long my_off_t;
#define FN_REFLEN 512 /* Max length of full path-name */
void sql_print_error(const char *format, ...);
void sql_print_warning(const char *format, ...);
void sql_print_information(const char *format, ...);
extern unsigned long max_connections;
#define MYSQL_SERVER
#define HAVE_REPLICATION
#include <my_global.h>
#include <my_pthread.h>
#include <mysql/plugin.h>
#include <replication.h>
typedef struct st_mysql_show_var SHOW_VAR;
typedef struct st_mysql_sys_var SYS_VAR;
/**
This class is used to trace function calls and other process
information
*/
class Trace {
public:
static const unsigned long kTraceFunction;
static const unsigned long kTraceGeneral;
static const unsigned long kTraceDetail;
static const unsigned long kTraceNetWait;
unsigned long trace_level_; /* the level for tracing */
inline void function_enter(const char *func_name)
{
if (trace_level_ & kTraceFunction)
sql_print_information("---> %s enter", func_name);
}
inline int function_exit(const char *func_name, int exit_code)
{
if (trace_level_ & kTraceFunction)
sql_print_information("<--- %s exit (%d)", func_name, exit_code);
return exit_code;
}
Trace()
:trace_level_(0L)
{}
Trace(unsigned long trace_level)
:trace_level_(trace_level)
{}
};
/**
Base class for semi-sync master and slave classes
*/
class ReplSemiSyncBase
:public Trace {
public:
static const char kSyncHeader[2]; /* three byte packet header */
/* Constants in network packet header. */
static const unsigned char kPacketMagicNum;
static const unsigned char kPacketFlagSync;
};
#endif /* SEMISYNC_H */
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
char rpl_semi_sync_slave_enabled;
unsigned long rpl_semi_sync_slave_status= 0;
unsigned long rpl_semi_sync_slave_trace_level;
int ReplSemiSyncSlave::initObject()
{
int result= 0;
const char *kWho = "ReplSemiSyncSlave::initObject";
if (init_done_)
{
fprintf(stderr, "%s called twice\n", kWho);
return 1;
}
init_done_ = true;
/* References to the parameter works after set_options(). */
setSlaveEnabled(rpl_semi_sync_slave_enabled);
setTraceLevel(rpl_semi_sync_slave_trace_level);
return result;
}
int ReplSemiSyncSlave::slaveReplyConnect()
{
if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL)))
{
sql_print_error("Semisync slave connect to master for reply failed");
return 1;
}
return 0;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
bool *need_reply,
const char **payload,
unsigned long *payload_len)
{
const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
int read_res = 0;
function_enter(kWho);
if ((unsigned char)(header[0]) == kPacketMagicNum)
{
*need_reply = (header[1] & kPacketFlagSync);
*payload_len = total_len - 2;
*payload = header + 2;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: reply - %d", kWho, *need_reply);
}
else
{
sql_print_error("Missing magic number for semi-sync packet, packet "
"len: %lu", total_len);
read_res = -1;
}
return function_exit(kWho, read_res);
}
int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
{
bool semi_sync= getSlaveEnabled();
sql_print_information("Slave I/O thread: Start %s replication to\
master '%s@%s:%d' in log '%s' at position %lu",
semi_sync ? "semi-sync" : "asynchronous",
param->user, param->host, param->port,
param->master_log_name[0] ? param->master_log_name : "FIRST",
(unsigned long)param->master_log_pos);
if (semi_sync && !rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 1;
return 0;
}
int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
{
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
if (mysql_reply)
mysql_close(mysql_reply);
mysql_reply= 0;
return 0;
}
int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
{
char query[FN_REFLEN + 100];
sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
(unsigned long long)log_pos, log_name);
if (mysql_real_query(mysql_reply, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
mysql_free_result(mysql_store_result(mysql_reply));
mysql_close(mysql_reply);
mysql_reply= 0;
return 1;
}
mysql_free_result(mysql_store_result(mysql_reply));
return 0;
}
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SEMISYNC_SLAVE_H
#define SEMISYNC_SLAVE_H
#include "semisync.h"
/**
The extension class for the slave of semi-synchronous replication
*/
class ReplSemiSyncSlave
:public ReplSemiSyncBase {
public:
ReplSemiSyncSlave()
:slave_enabled_(false)
{}
~ReplSemiSyncSlave() {}
void setTraceLevel(unsigned long trace_level) {
trace_level_ = trace_level;
}
/* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time.
*/
int initObject();
bool getSlaveEnabled() {
return slave_enabled_;
}
void setSlaveEnabled(bool enabled) {
slave_enabled_ = enabled;
}
/* A slave reads the semi-sync packet header and separate the metadata
* from the payload data.
*
* Input:
* header - (IN) packet header pointer
* total_len - (IN) total packet length: metadata + payload
* need_reply - (IN) whether the master is waiting for the reply
* payload - (IN) payload: the replication event
* payload_len - (IN) payload length
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified
* binlog position.
*
* Input:
* log_name - (IN) the reply point's binlog file name
* log_pos - (IN) the reply point's binlog file offset
*
* Return:
* 0: success; -1 or otherwise: error
*/
int slaveReply(const char *log_name, my_off_t log_pos);
/*
Connect to master for sending reply
*/
int slaveReplyConnect();
int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param);
private:
/* True when initObject has been called */
bool init_done_;
bool slave_enabled_; /* semi-sycn is enabled on the slave */
MYSQL *mysql_reply; /* connection to send reply */
};
/* System and status variables for the slave component */
extern char rpl_semi_sync_slave_enabled;
extern unsigned long rpl_semi_sync_slave_trace_level;
extern unsigned long rpl_semi_sync_slave_status;
#endif /* SEMISYNC_SLAVE_H */
/* Copyright (C) 2007 Google Inc.
Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "semisync_slave.h"
ReplSemiSyncSlave repl_semisync;
/*
indicate whether or not the slave should send a reply to the master.
This is set to true in repl_semi_slave_read_event if the current
event read is the last event of a transaction. And the value is
checked in repl_semi_slave_queue_event.
*/
bool semi_sync_need_reply= false;
int repl_semi_reset_slave(Binlog_relay_IO_param *param)
{
// TODO: reset semi-sync slave status here
return 0;
}
int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
uint32 flags)
{
MYSQL *mysql= param->mysql;
MYSQL_RES *res= 0;
MYSQL_ROW row;
const char *query;
if (!repl_semisync.getSlaveEnabled())
return 0;
/*
Create the connection that is used to send slave ACK replies to
master
*/
if (repl_semisync.slaveReplyConnect())
return 1;
/* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
!(res= mysql_store_result(mysql)))
{
mysql_free_result(mysql_store_result(mysql));
sql_print_error("Execution failed on master: %s", query);
return 1;
}
row= mysql_fetch_row(res);
if (!row || strcmp(row[1], "ON"))
{
/* Master does not support or not configured semi-sync */
sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous");
rpl_semi_sync_slave_status= 0;
return 0;
}
/*
Tell master dump thread that we want to do semi-sync
replication
*/
query= "SET @rpl_semi_sync_slave= 1";
if (mysql_real_query(mysql, query, strlen(query)))
{
sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
mysql_free_result(mysql_store_result(mysql));
return 1;
}
mysql_free_result(mysql_store_result(mysql));
rpl_semi_sync_slave_status= 1;
return 0;
}
int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
const char *packet, unsigned long len,
const char **event_buf, unsigned long *event_len)
{
if (rpl_semi_sync_slave_status)
return repl_semisync.slaveReadSyncHeader(packet, len,
&semi_sync_need_reply,
event_buf, event_len);
*event_buf= packet;
*event_len= len;
return 0;
}
int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
const char *event_buf,
unsigned long event_len,
uint32 flags)
{
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
return repl_semisync.slaveReply(param->master_log_name,
param->master_log_pos);
return 0;
}
int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStart(param);
}
int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
{
return repl_semisync.slaveStop(param);
}
static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(char *)ptr= *(char *)val;
repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
return;
}
static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd,
SYS_VAR *var,
void *ptr,
const void *val)
{
*(unsigned long *)ptr= *(unsigned long *)val;
repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level);
return;
}
/* plugin system variables */
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication slave (disabled by default). ",
NULL, // check
&fix_rpl_semi_sync_slave_enabled, // update
0);
static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
PLUGIN_VAR_OPCMDARG,
"The tracing level for semi-sync replication.",
NULL, // check
&fix_rpl_semi_sync_trace_level, // update
32, 0, ~0L, 1);
static SYS_VAR* semi_sync_slave_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(trace_level),
NULL,
};
/* plugin status variables */
static SHOW_VAR semi_sync_slave_status_vars[]= {
{"Rpl_semi_sync_slave_status",
(char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
{NULL, NULL, SHOW_BOOL},
};
Binlog_relay_IO_observer relay_io_observer = {
sizeof(Binlog_relay_IO_observer), // len
repl_semi_slave_io_start, // start
repl_semi_slave_io_end, // stop
repl_semi_slave_request_dump, // request_transmit
repl_semi_slave_read_event, // after_read_event
repl_semi_slave_queue_event, // after_queue_event
repl_semi_reset_slave, // reset
};
static int semi_sync_slave_plugin_init(void *p)
{
if (repl_semisync.initObject())
return 1;
if (register_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
static int semi_sync_slave_plugin_deinit(void *p)
{
if (unregister_binlog_relay_io_observer(&relay_io_observer, p))
return 1;
return 0;
}
struct Mysql_replication semi_sync_slave_plugin= {
MYSQL_REPLICATION_INTERFACE_VERSION
};
/*
Plugin library descriptor
*/
mysql_declare_plugin(semi_sync_slave)
{
MYSQL_REPLICATION_PLUGIN,
&semi_sync_slave_plugin,
"rpl_semi_sync_slave",
"He Zhenxing",
"Semi-synchronous replication slave",
PLUGIN_LICENSE_GPL,
semi_sync_slave_plugin_init, /* Plugin Init */
semi_sync_slave_plugin_deinit, /* Plugin Deinit */
0x0100 /* 1.0 */,
semi_sync_slave_status_vars, /* status variables */
semi_sync_slave_system_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
...@@ -75,6 +75,7 @@ SET (SQL_SOURCE ...@@ -75,6 +75,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc rpl_rli.cc rpl_mi.cc sql_servers.cc
sql_connect.cc scheduler.cc sql_connect.cc scheduler.cc
sql_profile.cc event_parse_data.cc sql_profile.cc event_parse_data.cc
rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h
......
...@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ ...@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_plugin.h authors.h event_parse_data.h \ sql_plugin.h authors.h event_parse_data.h \
event_data_objects.h event_scheduler.h \ event_data_objects.h event_scheduler.h \
sql_partition.h partition_info.h partition_element.h \ sql_partition.h partition_info.h partition_element.h \
contributors.h sql_servers.h contributors.h sql_servers.h \
rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \
...@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ ...@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
event_queue.cc event_db_repository.cc events.cc \ event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \ sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc \
sql_servers.cc event_parse_data.cc sql_servers.cc event_parse_data.cc \
rpl_handler.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#endif #endif
#include "mysql_priv.h" #include "mysql_priv.h"
#include "rpl_handler.h"
#include "rpl_filter.h" #include "rpl_filter.h"
#include <myisampack.h> #include <myisampack.h>
#include <errno.h> #include <errno.h>
...@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type, ...@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
return NULL; return NULL;
} }
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
switch (database_type) { switch (database_type) {
#ifndef NO_HASH #ifndef NO_HASH
case DB_TYPE_HASH: case DB_TYPE_HASH:
...@@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie) if (cookie)
tc_log->unlog(cookie, xid); tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", abort();); DBUG_EXECUTE_IF("crash_commit_after", abort(););
RUN_HOOK(transaction, after_commit, (thd, FALSE));
end: end:
if (rw_trans) if (rw_trans)
start_waiting_global_read_lock(thd); start_waiting_global_read_lock(thd);
...@@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all) ...@@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK)); ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, int error) ...@@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, int error)
thd->variables.tx_isolation=thd->session_tx_isolation; thd->variables.tx_isolation=thd->session_tx_isolation;
} }
else
#endif #endif
{
if (!error)
RUN_HOOK(transaction, after_commit, (thd, FALSE));
else
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#endif #endif
#include <mysql/plugin.h> #include <mysql/plugin.h>
#include "rpl_handler.h"
/* max size of the log message */ /* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024 #define MAX_LOG_BUFFER_SIZE 1024
...@@ -3683,9 +3684,11 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) ...@@ -3683,9 +3684,11 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
} }
bool MYSQL_BIN_LOG::flush_and_sync() bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{ {
int err=0, fd=log_file.file; int err=0, fd=log_file.file;
if (synced)
*synced= 0;
safe_mutex_assert_owner(&LOCK_log); safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file)) if (flush_io_cache(&log_file))
return 1; return 1;
...@@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync() ...@@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{ {
sync_binlog_counter= 0; sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME)); err=my_sync(fd, MYF(MY_WME));
if (synced)
*synced= 1;
} }
return err; return err;
} }
...@@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, ...@@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (file == &log_file) if (file == &log_file)
{ {
error= flush_and_sync(); error= flush_and_sync(0);
if (!error) if (!error)
{ {
signal_update(); signal_update();
...@@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) ...@@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (file == &log_file) // we are writing to the real log (disk) if (file == &log_file) // we are writing to the real log (disk)
{ {
if (flush_and_sync()) bool synced= 0;
if (flush_and_sync(&synced))
goto err; goto err;
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))) {
sql_print_error("Failed to run 'after_flush' hooks");
goto err;
}
signal_update(); signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
} }
...@@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) ...@@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
DBUG_ASSERT(carry == 0); DBUG_ASSERT(carry == 0);
if (sync_log) if (sync_log)
flush_and_sync(); flush_and_sync(0);
return 0; // All OK return 0; // All OK
} }
...@@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) ...@@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
ev.write(&log_file); ev.write(&log_file);
if (lock) if (lock)
{ {
if (!error && !(error= flush_and_sync())) if (!error && !(error= flush_and_sync(0)))
{ {
signal_update(); signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
...@@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, ...@@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
if (incident && write_incident(thd, FALSE)) if (incident && write_incident(thd, FALSE))
goto err; goto err;
if (flush_and_sync()) bool synced= 0;
if (flush_and_sync(&synced))
goto err; goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
if (cache->error) // Error on read if (cache->error) // Error on read
...@@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, ...@@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
write_error=1; // Don't give more errors write_error=1; // Don't give more errors
goto err; goto err;
} }
if (RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, log_file.pos_in_file, synced)))
{
sql_print_error("Failed to run 'after_flush' hooks");
write_error=1;
goto err;
}
signal_update(); signal_update();
} }
......
...@@ -378,7 +378,21 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -378,7 +378,21 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
bool is_active(const char* log_file_name); bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads); int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags); void rotate_and_purge(uint flags);
bool flush_and_sync();
/**
Flush binlog cache and synchronize to disk.
This function flushes events in binlog cache to binary log file,
it will do synchronizing according to the setting of system
variable 'sync_binlog'. If file is synchronized, @c synced will
be set to 1, otherwise 0.
@param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
@retval 0 Success
@retval other Failure
*/
bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included, int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads, bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space); ulonglong *decrease_log_space);
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "rpl_injector.h" #include "rpl_injector.h"
#include "rpl_handler.h"
#ifdef HAVE_SYS_PRCTL_H #ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h> #include <sys/prctl.h>
#endif #endif
...@@ -1284,6 +1286,7 @@ void clean_up(bool print_message) ...@@ -1284,6 +1286,7 @@ void clean_up(bool print_message)
ha_end(); ha_end();
if (tc_log) if (tc_log)
tc_log->close(); tc_log->close();
delegates_destroy();
xid_cache_free(); xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free(); multi_keycache_free();
...@@ -3760,6 +3763,13 @@ static int init_server_components() ...@@ -3760,6 +3763,13 @@ static int init_server_components()
unireg_abort(1); unireg_abort(1);
} }
/* initialize delegates for extension observers */
if (delegates_init())
{
sql_print_error("Initialize extension delegates failed");
unireg_abort(1);
}
/* need to configure logging before initializing storage engines */ /* need to configure logging before initializing storage engines */
if (opt_update_log) if (opt_update_log)
{ {
......
This diff is collapsed.
This diff is collapsed.
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H
#include "mysql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
plugin_ref plugin;
Observer_info(void *ob, st_plugin_int *p)
:observer(ob), plugin_int(p)
{
plugin= plugin_int_to_ref(plugin_int);
}
};
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (!info)
{
info= new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret= TRUE;
}
else
ret= TRUE;
unlock();
return ret;
}
int remove_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (info)
iter.remove();
else
ret= TRUE;
unlock();
return ret;
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return rw_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return rw_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return rw_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate()
{
inited= FALSE;
if (my_rwlock_init(&lock, NULL))
return;
init_sql_alloc(&memroot, 1024, 0);
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
rw_lock_t lock;
MEM_ROOT memroot;
bool inited;
};
class Trans_delegate
:public Delegate {
public:
typedef Trans_observer Observer;
int before_commit(THD *thd, bool all);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
};
class Binlog_storage_delegate
:public Delegate {
public:
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos, bool synced);
};
#ifdef HAVE_REPLICATION
class Binlog_transmit_delegate
:public Delegate {
public:
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags,
const char *log_file, my_off_t log_pos);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags,
String *packet, const
char *log_file, my_off_t log_pos );
int after_send_event(THD *thd, ushort flags,
String *packet);
int after_reset_master(THD *thd, ushort flags);
};
class Binlog_relay_IO_delegate
:public Delegate {
public:
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi,
const char *event_buf, ulong event_len,
bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */
int delegates_init();
void delegates_destroy();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
if there is no observers in the delegate, we can return 0
immediately.
*/
#define RUN_HOOK(group, hook, args) \
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)
#endif /* RPL_HANDLER_H */
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include <errmsg.h> #include <errmsg.h>
#include <mysqld_error.h> #include <mysqld_error.h>
#include <mysys_err.h> #include <mysys_err.h>
#include "rpl_handler.h"
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
...@@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0; ...@@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1; int events_till_abort = -1;
static pthread_key(Master_info*, RPL_MASTER_INFO);
enum enum_slave_reconnect_actions enum enum_slave_reconnect_actions
{ {
SLAVE_RECON_ACT_REG= 0, SLAVE_RECON_ACT_REG= 0,
...@@ -231,6 +234,10 @@ int init_slave() ...@@ -231,6 +234,10 @@ int init_slave()
TODO: re-write this to interate through the list of files TODO: re-write this to interate through the list of files
for multi-master for multi-master
*/ */
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
active_mi= new Master_info; active_mi= new Master_info;
/* /*
...@@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, ...@@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
} }
static int request_dump(MYSQL* mysql, Master_info* mi, static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
bool *suppress_warnings) bool *suppress_warnings)
{ {
uchar buf[FN_REFLEN + 10]; uchar buf[FN_REFLEN + 10];
int len; int len;
int binlog_flags = 0; // for now ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name; char* logname = mi->master_log_name;
DBUG_ENTER("request_dump"); DBUG_ENTER("request_dump");
*suppress_warnings= FALSE; *suppress_warnings= FALSE;
if (RUN_HOOK(binlog_relay_io,
before_request_transmit,
(thd, mi, binlog_flags)))
DBUG_RETURN(1);
// TODO if big log files: Change next to int8store() // TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos); int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags); int2store(buf + 4, binlog_flags);
...@@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *arg)
mi->master_log_name, mi->master_log_name,
llstr(mi->master_log_pos,llbuff))); llstr(mi->master_log_pos,llbuff)));
/* This must be called before run any binlog_relay_io hooks */
my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
goto err;
}
if (!(mi->mysql = mysql = mysql_init(NULL))) if (!(mi->mysql = mysql = mysql_init(NULL)))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
...@@ -2621,7 +2643,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -2621,7 +2643,7 @@ pthread_handler_t handle_slave_io(void *arg)
while (!io_slave_killed(thd,mi)) while (!io_slave_killed(thd,mi))
{ {
thd_proc_info(thd, "Requesting binlog dump"); thd_proc_info(thd, "Requesting binlog dump");
if (request_dump(mysql, mi, &suppress_warnings)) if (request_dump(thd, mysql, mi, &suppress_warnings))
{ {
sql_print_error("Failed on request_dump()"); sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
...@@ -2641,6 +2663,7 @@ requesting master dump") || ...@@ -2641,6 +2663,7 @@ requesting master dump") ||
goto err; goto err;
goto connected; goto connected;
}); });
const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0); DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi)) while (!io_slave_killed(thd,mi))
...@@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log"); thd_proc_info(thd, "Queueing master event to the relay log");
if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_buf= (const char*)mysql->net.read_pos + 1;
event_len)) if (RUN_HOOK(binlog_relay_io, after_read_event,
(thd, mi,(const char*)mysql->net.read_pos + 1,
event_len, &event_buf, &event_len)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_read_event' hook");
goto err;
}
/* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */
bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master"); "could not queue event from master");
goto err; goto err;
} }
if (RUN_HOOK(binlog_relay_io, after_queue_event,
(thd, mi, event_buf, event_len, synced)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_queue_event' hook");
goto err;
}
if (flush_master_info(mi, 1)) if (flush_master_info(mi, 1))
{ {
sql_print_error("Failed to flush master info file"); sql_print_error("Failed to flush master info file");
...@@ -2750,6 +2796,7 @@ log space"); ...@@ -2750,6 +2796,7 @@ log space");
// print the current replication position // print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->set_query(NULL, 0); thd->set_query(NULL, 0);
thd->reset_db(NULL, 0); thd->reset_db(NULL, 0);
if (mysql) if (mysql)
...@@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, ...@@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
} }
MYSQL *rpl_connect_master(MYSQL *mysql)
{
THD *thd= current_thd;
Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
if (!mi)
{
sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
return NULL;
}
bool allocated= false;
if (!mysql)
{
if(!(mysql= mysql_init(NULL)))
{
sql_print_error("rpl_connect_master: failed in mysql_init()");
return NULL;
}
allocated= true;
}
/*
XXX: copied from connect_to_master, this function should not
change the slave status, so we cannot use connect_to_master
directly
TODO: make this part a seperate function to eliminate duplication
*/
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
#ifdef HAVE_OPENSSL
if (mi->ssl)
{
mysql_ssl_set(mysql,
mi->ssl_key[0]?mi->ssl_key:0,
mi->ssl_cert[0]?mi->ssl_cert:0,
mi->ssl_ca[0]?mi->ssl_ca:0,
mi->ssl_capath[0]?mi->ssl_capath:0,
mi->ssl_cipher[0]?mi->ssl_cipher:0);
mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
&mi->ssl_verify_server_cert);
}
#endif
mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
/* This one is not strictly needed but we have it here for completeness */
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
if (io_slave_killed(thd, mi)
|| !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
if (!io_slave_killed(thd, mi))
sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
mysql_error(mysql), mysql_errno(mysql));
if (allocated)
mysql_close(mysql); // this will free the object
return NULL;
}
return mysql;
}
/* /*
Store the file and position where the execute-slave thread are in the Store the file and position where the execute-slave thread are in the
relay log. relay log.
......
...@@ -277,6 +277,42 @@ const char *set_thd_proc_info(THD *thd, const char *info, ...@@ -277,6 +277,42 @@ const char *set_thd_proc_info(THD *thd, const char *info,
return old_info; return old_info;
} }
extern "C"
const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
pthread_mutex_t *mutex, const char *msg)
{
if (!thd)
thd= current_thd;
const char* old_msg = thd->proc_info;
safe_mutex_assert_owner(mutex);
thd->mysys_var->current_mutex = mutex;
thd->mysys_var->current_cond = cond;
thd->proc_info = msg;
return old_msg;
}
extern "C"
void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
{
if (!thd)
thd= current_thd;
/*
Putting the mutex unlock in thd_exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(thd->mysys_var->current_mutex);
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex = 0;
thd->mysys_var->current_cond = 0;
thd->proc_info = old_msg;
pthread_mutex_unlock(&thd->mysys_var->mutex);
return;
}
extern "C" extern "C"
void **thd_ha_data(const THD *thd, const struct handlerton *hton) void **thd_ha_data(const THD *thd, const struct handlerton *hton)
{ {
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "log.h" #include "log.h"
#include "rpl_tblmap.h" #include "rpl_tblmap.h"
#include "replication.h"
/** /**
An interface that is used to take an action when An interface that is used to take an action when
...@@ -1940,27 +1941,11 @@ class THD :public Statement, ...@@ -1940,27 +1941,11 @@ class THD :public Statement,
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg) const char* msg)
{ {
const char* old_msg = proc_info; return thd_enter_cond(this, cond, mutex, msg);
safe_mutex_assert_owner(mutex);
mysys_var->current_mutex = mutex;
mysys_var->current_cond = cond;
proc_info = msg;
return old_msg;
} }
inline void exit_cond(const char* old_msg) inline void exit_cond(const char* old_msg)
{ {
/* thd_exit_cond(this, old_msg);
Putting the mutex unlock in exit_cond() ensures that
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
locked (if that would not be the case, you'll get a deadlock if someone
does a THD::awake() on you).
*/
pthread_mutex_unlock(mysys_var->current_mutex);
pthread_mutex_lock(&mysys_var->mutex);
mysys_var->current_mutex = 0;
mysys_var->current_cond = 0;
proc_info = old_msg;
pthread_mutex_unlock(&mysys_var->mutex);
} }
inline time_t query_start() { query_start_used=1; return start_time; } inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() inline void set_time()
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <m_ctype.h> #include <m_ctype.h>
#include <myisam.h> #include <myisam.h>
#include <my_dir.h> #include <my_dir.h>
#include "rpl_handler.h"
#include "sp_head.h" #include "sp_head.h"
#include "sp.h" #include "sp.h"
......
...@@ -19,14 +19,6 @@ ...@@ -19,14 +19,6 @@
#define REPORT_TO_LOG 1 #define REPORT_TO_LOG 1
#define REPORT_TO_USER 2 #define REPORT_TO_USER 2
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
extern struct st_mysql_plugin *mysqld_builtins[]; extern struct st_mysql_plugin *mysqld_builtins[];
/** /**
...@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ C_STRING_WITH_LEN("STORAGE ENGINE") }, { C_STRING_WITH_LEN("STORAGE ENGINE") },
{ C_STRING_WITH_LEN("FTPARSER") }, { C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") }, { C_STRING_WITH_LEN("DAEMON") },
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") } { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
{ C_STRING_WITH_LEN("REPLICATION") },
}; };
extern int initialize_schema_table(st_plugin_int *plugin); extern int initialize_schema_table(st_plugin_int *plugin);
...@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
}; };
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ {
...@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= ...@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
MYSQL_REPLICATION_INTERFACE_VERSION,
}; };
static bool initialized= 0; static bool initialized= 0;
......
...@@ -18,6 +18,14 @@ ...@@ -18,6 +18,14 @@
class sys_var; class sys_var;
#ifdef DBUG_OFF
#define plugin_ref_to_int(A) A
#define plugin_int_to_ref(A) A
#else
#define plugin_ref_to_int(A) (A ? A[0] : NULL)
#define plugin_int_to_ref(A) &(A)
#endif
/* /*
the following flags are valid for plugin_init() the following flags are valid for plugin_init()
*/ */
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment