Commit 2b4183f1 authored by Seppo Jaakola's avatar Seppo Jaakola

bzr merge -r3890..3891 lp:codership-mysql/5.5

parent 9129c8f1
......@@ -50,6 +50,7 @@ IF(WITH_WSREP)
wsrep_sst.cc
wsrep_utils.cc
wsrep_var.cc
wsrep_thd.cc
)
SET(WSREP_LIB wsrep)
ENDIF()
......
......@@ -24,6 +24,7 @@
#include <mysql/psi/mysql_stage.h>
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
#include "wsrep_thd.h"
extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
extern "C" char *wsrep_thd_query(THD *thd);
void sql_print_information(const char *format, ...)
......
......@@ -73,6 +73,8 @@
#include "debug_sync.h"
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
#include "wsrep_thd.h"
#include "wsrep_sst.h"
ulong wsrep_running_threads = 0; // # of currently running wsrep threads
#endif
#include "sql_callback.h"
......@@ -741,7 +743,7 @@ mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
int wsrep_replaying= 0;
static void wsrep_close_threads(THD* thd);
#endif
#endif /* WITH_WSREP */
/* replication parameters, if master_host is not NULL, we are a slave */
uint report_port= 0;
......@@ -5230,42 +5232,6 @@ pthread_handler_t start_wsrep_THD(void *arg)
return(NULL);
}
void wsrep_create_rollbacker()
{
if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
pthread_t hThread;
/* create rollbacker */
if (pthread_create( &hThread, &connection_attrib,
start_wsrep_THD, (void*)wsrep_rollback_process))
WSREP_WARN("Can't create thread to manage wsrep rollback");
}
}
void wsrep_create_appliers(long threads)
{
if (!wsrep_connected)
{
/* see wsrep_replication_start() for the logic */
if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
WSREP_ERROR("Trying to launch slave threads before creating "
"connection at '%s'", wsrep_cluster_address);
assert(0);
}
return;
}
long wsrep_threads=0;
pthread_t hThread;
while (wsrep_threads++ < threads) {
if (pthread_create(
&hThread, &connection_attrib,
start_wsrep_THD, (void*)wsrep_replication_process))
WSREP_WARN("Can't create thread to manage wsrep replication");
}
}
/**/
static bool abort_replicated(THD *thd)
{
......
......@@ -240,7 +240,7 @@ extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active,
#ifdef WITH_WSREP
extern PSI_mutex_key key_LOCK_wsrep_thd;
extern PSI_cond_key key_COND_wsrep_thd;
#endif /* HAVE_MMAP */
#endif /* HAVE_WSREP */
#ifdef HAVE_OPENSSL
extern PSI_mutex_key key_LOCK_des_key_file;
......@@ -580,8 +580,8 @@ enum options_mysqld
OPT_WSREP_START_POSITION,
OPT_WSREP_SST_AUTH,
OPT_WSREP_RECOVER,
#endif
OPT_which_is_always_the_last
#endif /* WITH_WSREP */
};
#endif
......@@ -724,5 +724,9 @@ extern uint internal_tmp_table_max_key_segments;
extern uint volatile global_disable_checkpoint;
extern my_bool opt_help;
#ifdef WITH_WSREP
#include "my_pthread.h"
pthread_handler_t start_wsrep_THD(void*);
#endif /* WITH_WSREP */
#endif /* MYSQLD_INCLUDED */
......@@ -63,7 +63,7 @@
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
#include "wsrep_thd.h"
#endif // WITH_WSREP
bool
......
......@@ -105,7 +105,7 @@
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
static void wsrep_client_rollback(THD *thd);
#include "wsrep_thd.h"
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
Parser_state *parser_state);
#endif /* WITH_WSREP */
......@@ -6571,90 +6571,6 @@ void mysql_init_multi_delete(LEX *lex)
}
#ifdef WITH_WSREP
static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*);
void wsrep_replay_transaction(THD *thd)
{
/* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY)
{
if (thd->wsrep_exec_mode!= REPL_RECV)
{
if (thd->get_stmt_da()->is_sent())
{
WSREP_ERROR("replay issue, thd has reported status already");
}
thd->get_stmt_da()->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_reset_thd_for_next_command(thd, opt_userstat_running);
thd->killed= NOT_KILLED;
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("releasing table lock for replaying (%ld)",
thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
thd->mdl_context.release_transactional_locks();
thd_proc_info(thd, "wsrep replaying trx");
WSREP_DEBUG("replay trx: %s %lld",
thd->query() ? thd->query() : "void",
(long long)wsrep_thd_trx_seqno(thd));
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
int rcode = wsrep->replay_trx(wsrep,
&thd->wsrep_ws_handle,
(void *)thd);
wsrep_return_from_bf_mode(thd, &shadow);
if (thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (rcode)
{
case WSREP_OK:
thd->wsrep_conflict_state= NO_CONFLICT;
wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id);
break;
case WSREP_TRX_FAIL:
if (thd->stmt_da->is_sent)
{
WSREP_ERROR("replay failed, thd has reported status");
}
else
{
WSREP_DEBUG("replay failed, rolling back");
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
}
thd->wsrep_conflict_state= ABORTED;
wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
break;
default:
WSREP_ERROR("trx_replay failed for: %d, query: %s",
rcode, thd->query() ? thd->query() : "void");
/* we're now in inconsistent state, must abort */
unireg_abort(1);
break;
}
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
}
}
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
Parser_state *parser_state)
{
......@@ -8597,611 +8513,6 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name)
return definer;
}
#ifdef WITH_WSREP
/* must have (&thd->LOCK_wsrep_thd) */
static void wsrep_client_rollback(THD *thd)
{
WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
thd->thread_id, thd->query());
thd->wsrep_conflict_state= ABORTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
trans_rollback(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
if (thd->global_read_lock.is_acquired())
{
WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
thd->global_read_lock.unlock_global_read_lock(thd);
}
/* Release transactional metadata locks. */
thd->mdl_context.release_transactional_locks();
/* release explicit MDL locks */
thd->mdl_context.release_explicit_locks();
if (thd->get_binlog_table_maps())
{
WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
thd->clear_binlog_table_maps();
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state= ABORTED;
thd->wsrep_bf_thd = NULL;
}
static enum wsrep_status wsrep_apply_sql(
THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed)
{
int error;
enum wsrep_status ret_code= WSREP_OK;
DBUG_ENTER("wsrep_bf_execute_cb");
thd->wsrep_exec_mode= REPL_RECV;
thd->net.vio= 0;
thd->start_time= timeval;
thd->wsrep_rand= randseed;
thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
DBUG_PRINT("wsrep", ("SQL: %s", sql));
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_EXEC;
/* preserve replaying mode */
if (thd->wsrep_conflict_state!= REPLAYING)
thd->wsrep_conflict_state= NO_CONFLICT;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) {
WSREP_WARN("BF SQL apply failed: %d, %lld",
thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
DBUG_RETURN(WSREP_FATAL);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if (thd->wsrep_conflict_state!= NO_CONFLICT &&
thd->wsrep_conflict_state!= REPLAYING) {
ret_code= WSREP_FATAL;
WSREP_DEBUG("BF thd ending, with: %d, %lld",
thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
assert(thd->wsrep_exec_mode== REPL_RECV);
DBUG_RETURN(ret_code);
}
void wsrep_write_rbr_buf(
THD *thd, const void* rbr_buf, size_t buf_len)
{
char filename[PATH_MAX]= {0};
int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
wsrep_data_home_dir, thd->thread_id,
(long long)thd->wsrep_trx_seqno);
if (len >= PATH_MAX)
{
WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
return;
}
FILE *of= fopen(filename, "wb");
if (of)
{
fwrite (rbr_buf, buf_len, 1, of);
fclose(of);
}
else
{
WSREP_ERROR("Failed to open file '%s': %d (%s)",
filename, errno, strerror(errno));
}
}
static inline wsrep_status_t wsrep_apply_rbr(
THD *thd, const uchar *rbr_buf, size_t buf_len)
{
char *buf= (char *)rbr_buf;
int rcode= 0;
int event= 1;
Format_description_log_event *description_event = wsrep_format_desc;
DBUG_ENTER("wsrep_apply_rbr");
if (thd->killed == KILL_CONNECTION)
{
WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
(long long) thd->wsrep_trx_seqno);
DBUG_RETURN(WSREP_FATAL);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_EXEC;
if (thd->wsrep_conflict_state!= REPLAYING)
thd->wsrep_conflict_state= NO_CONFLICT;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) thd->wsrep_trx_seqno);
if ((rcode= trans_begin(thd)))
WSREP_WARN("begin for rbr apply failed: %lld, code: %d",
(long long) thd->wsrep_trx_seqno, rcode);
while(buf_len)
{
int exec_res;
int error = 0;
Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event);
if (!ev)
{
WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld",
(long long)thd->wsrep_trx_seqno, buf_len);
rcode= 1;
goto error;
}
switch (ev->get_type_code()) {
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
DBUG_ASSERT(buf_len != 0 ||
((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F));
break;
case FORMAT_DESCRIPTION_EVENT:
description_event = (Format_description_log_event *)ev;
break;
default:
break;
}
thd->variables.server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
wsrep_xid_init(&thd->transaction.xid_state.xid,
wsrep_cluster_uuid(),
thd->wsrep_trx_seqno);
thd->lex->current_select= 0;
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd;
exec_res = ev->apply_event(thd->wsrep_rli);
DBUG_PRINT("info", ("exec_event result: %d", exec_res));
if (exec_res)
{
WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno);
rcode= exec_res;
/* stop processing for the first error */
delete ev;
goto error;
}
event++;
if (thd->wsrep_conflict_state!= NO_CONFLICT &&
thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("conflict state after RBR event applying: %d, %lld",
thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno);
if (thd->wsrep_conflict_state == MUST_ABORT) {
WSREP_WARN("RBR event apply failed, rolling back: %lld",
(long long) thd->wsrep_trx_seqno);
trans_rollback(thd);
thd->locked_tables_list.unlock_locked_tables(thd);
/* Release transactional metadata locks. */
thd->mdl_context.release_transactional_locks();
thd->wsrep_conflict_state= NO_CONFLICT;
DBUG_RETURN(WSREP_FATAL);
}
if ((ev->get_type_code() == WRITE_ROWS_EVENT ||
ev->get_type_code() == UPDATE_ROWS_EVENT ||
ev->get_type_code() == DELETE_ROWS_EVENT) &&
((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
{
thd->wsrep_rli->cleanup_context(thd, 0);
if (error == 0)
{
thd->clear_error();
}
else
WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
ev->get_type_str(), (long long)thd->wsrep_trx_seqno);
}
if (description_event != ev)
delete ev;
}
error:
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_IDLE;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
assert(thd->wsrep_exec_mode== REPL_RECV);
if (thd->killed == KILL_CONNECTION)
WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno);
if (rcode) DBUG_RETURN(WSREP_FATAL);
DBUG_RETURN(WSREP_OK);
}
wsrep_status_t wsrep_apply_cb(void* const ctx,
const void* const buf, size_t const buf_len,
wsrep_seqno_t const global_seqno)
{
THD* const thd((THD*)ctx);
thd->wsrep_trx_seqno= global_seqno;
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"applying write set %lld: %p, %zu",
(long long)thd->wsrep_trx_seqno, buf, buf_len);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "applying write set");
#endif /* WSREP_PROC_INFO */
wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len));
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"applied write set %lld", (long long)thd->wsrep_trx_seqno);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "applied write set");
#endif /* WSREP_PROC_INFO */
if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len);
TABLE *tmp;
while ((tmp = thd->temporary_tables))
{
WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s",
thd->thread_id,
(tmp->s) ? tmp->s->db.str : "void",
(tmp->s) ? tmp->s->table_name.str : "void");
close_temporary_table(thd, tmp, 1, 1);
}
return rcode;
}
#if DELETE // this does not work in 5.5
/* a common wrapper for end_trans() function - to put all necessary stuff */
static inline wsrep_status_t
wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end)
{
if (0 == end_trans(thd, end))
{
return WSREP_OK;
}
else
{
return WSREP_FATAL;
}
}
#endif
wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno)
{
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"committing %lld", (long long)thd->wsrep_trx_seqno);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "committing");
#endif /* WSREP_PROC_INFO */
wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0));
// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT));
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"committed %lld", (long long)thd->wsrep_trx_seqno);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "committed");
#endif /* WSREP_PROC_INFO */
if (WSREP_OK == rcode)
{
// TODO: mark snapshot with global_seqno.
}
return rcode;
}
wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno)
{
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"rolling back %lld", (long long)thd->wsrep_trx_seqno);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "rolling back");
#endif /* WSREP_PROC_INFO */
wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0));
// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK));
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"rolled back %lld", (long long)thd->wsrep_trx_seqno);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "rolled back");
#endif /* WSREP_PROC_INFO */
return rcode;
}
wsrep_status_t wsrep_commit_cb(void* const ctx,
wsrep_seqno_t const global_seqno,
bool const commit)
{
THD* const thd((THD*)ctx);
assert(global_seqno == thd->wsrep_trx_seqno);
if (commit)
return wsrep_commit(thd, global_seqno);
else
return wsrep_rollback(thd, global_seqno);
}
Relay_log_info* wsrep_relay_log_init(const char* log_fname)
{
Relay_log_info* rli= new Relay_log_info(false);
LEX_STRING conn = {"wsrep",5};
/*
* problem is that mariaDB requires master info for rli, and wsrep replication
* really should not have it. Allocating empty mi here just for the sake of
* getting rpl_filter pointer initialized for mi, rpl_filter will be needed in
* several places
*/
rli->mi= new Master_info(&conn, false);
rli->no_storage= true;
if (!rli->relay_log.description_event_for_exec)
{
rli->relay_log.description_event_for_exec=
new Format_description_log_event(4);
}
rli->sql_thd= current_thd;
return rli;
}
static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
{
shadow->options = thd->variables.option_bits;
shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
shadow->vio = thd->net.vio;
if (opt_log_slave_updates)
thd->variables.option_bits|= OPTION_BIN_LOG;
else
thd->variables.option_bits&= ~(OPTION_BIN_LOG);
if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
thd->wsrep_exec_mode= REPL_RECV;
thd->net.vio= 0;
thd->clear_error();
thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
shadow->tx_isolation = thd->variables.tx_isolation;
thd->variables.tx_isolation = ISO_READ_COMMITTED;
thd->tx_isolation = ISO_READ_COMMITTED;
}
static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
{
thd->variables.option_bits = shadow->options;
thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
thd->net.vio = shadow->vio;
thd->variables.tx_isolation = shadow->tx_isolation;
}
void wsrep_replication_process(THD *thd)
{
int rcode;
DBUG_ENTER("wsrep_replication_process");
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
rcode = wsrep->recv(wsrep, (void *)thd);
DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
WSREP_INFO("applier thread exiting (code:%d)", rcode);
switch (rcode) {
case WSREP_OK:
case WSREP_NOT_IMPLEMENTED:
case WSREP_CONN_FAIL:
/* provider does not support slave operations / disconnected from group,
* just close applier thread */
break;
case WSREP_NODE_FAIL:
/* data inconsistency => SST is needed */
/* Note: we cannot just blindly restart replication here,
* SST might require server restart if storage engines must be
* initialized after SST */
WSREP_ERROR("node consistency compromised, aborting");
wsrep_kill_mysql(thd);
break;
case WSREP_WARNING:
case WSREP_TRX_FAIL:
case WSREP_TRX_MISSING:
/* these suggests a bug in provider code */
WSREP_WARN("bad return from recv() call: %d", rcode);
/* fall through to node shutdown */
case WSREP_FATAL:
/* Cluster connectivity is lost.
*
* If applier was killed on purpose (KILL_CONNECTION), we
* avoid mysql shutdown. This is because the killer will then handle
* shutdown processing (or replication restarting)
*/
if (thd->killed != KILL_CONNECTION)
{
wsrep_kill_mysql(thd);
}
break;
}
mysql_mutex_lock(&LOCK_thread_count);
wsrep_close_applier(thd);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
if (thd->temporary_tables)
{
WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
}
wsrep_return_from_bf_mode(thd, &shadow);
DBUG_VOID_RETURN;
}
void wsrep_rollback_process(THD *thd)
{
DBUG_ENTER("wsrep_rollback_process");
mysql_mutex_lock(&LOCK_wsrep_rollback);
wsrep_aborting_thd= NULL;
while (thd->killed == NOT_KILLED) {
thd_proc_info(thd, "wsrep aborter idle");
thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
thd->mysys_var->current_cond= &COND_wsrep_rollback;
mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
WSREP_DEBUG("WSREP rollback thread wakes for signal");
mysql_mutex_lock(&thd->mysys_var->mutex);
thd_proc_info(thd, "wsrep aborter active");
thd->mysys_var->current_mutex= 0;
thd->mysys_var->current_cond= 0;
mysql_mutex_unlock(&thd->mysys_var->mutex);
/* check for false alarms */
if (!wsrep_aborting_thd)
{
WSREP_DEBUG("WSREP rollback thread has empty abort queue");
}
/* process all entries in the queue */
while (wsrep_aborting_thd) {
THD *aborting;
wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
aborting = wsrep_aborting_thd->aborting_thd;
my_free(wsrep_aborting_thd);
wsrep_aborting_thd= next;
/*
* must release mutex, appliers my want to add more
* aborting thds in our work queue, while we rollback
*/
mysql_mutex_unlock(&LOCK_wsrep_rollback);
mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
if (aborting->wsrep_conflict_state== ABORTED)
{
WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
(long long)aborting->real_id,
aborting->wsrep_conflict_state);
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
mysql_mutex_lock(&LOCK_wsrep_rollback);
continue;
}
aborting->wsrep_conflict_state= ABORTING;
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
wsrep_client_rollback(aborting);
WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
aborting->thread_id, (long long)aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
mysql_mutex_lock(&LOCK_wsrep_rollback);
}
}
mysql_mutex_unlock(&LOCK_wsrep_rollback);
sql_print_information("WSREP: rollbacker thread exiting");
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
DBUG_VOID_RETURN;
}
extern
int wsrep_thd_is_brute_force(void *thd_ptr)
{
if (thd_ptr) {
switch (((THD *)thd_ptr)->wsrep_exec_mode) {
case LOCAL_STATE:
{
if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
{
return 1;
}
return 0;
}
case REPL_RECV: return 1;
case TOTAL_ORDER: return 2;
case LOCAL_COMMIT: return 3;
}
}
return 0;
}
extern "C"
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
{
THD *victim_thd = (THD *) victim_thd_ptr;
THD *bf_thd = (THD *) bf_thd_ptr;
DBUG_ENTER("wsrep_abort_thd");
if ( (WSREP(bf_thd) ||
( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
victim_thd)
{
WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
(long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
}
else
{
WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
}
DBUG_RETURN(1);
}
extern "C"
int wsrep_thd_in_locking_session(void *thd_ptr)
{
if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
return 1;
}
return 0;
}
#endif
/**
Retuns information about user or current user.
......
......@@ -4063,7 +4063,8 @@ static Sys_var_tz Sys_time_zone(
DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG);
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
#include "wsrep_var.h"
#include "wsrep_sst.h"
static Sys_var_charptr Sys_wsrep_provider(
"wsrep_provider", "Path to replication provider library",
......@@ -4245,7 +4246,6 @@ static Sys_var_mybool Sys_wsrep_causal_reads(
"wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
SESSION_VAR(wsrep_causal_reads),
CMD_LINE(OPT_ARG), DEFAULT(FALSE));
// ON_UPDATE(wsrep_causal_reads_update));
static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
static Sys_var_enum Sys_wsrep_OSU_method(
......
......@@ -17,6 +17,10 @@
#include <sql_class.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
#include "wsrep_thd.h"
#include "wsrep_sst.h"
#include "wsrep_utils.h"
#include "wsrep_var.h"
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
......@@ -88,7 +92,6 @@ const char* wsrep_provider_version = provider_version;
const char* wsrep_provider_vendor = provider_vendor;
/* End wsrep status variables */
wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
wsp::node_status local_status;
......@@ -99,14 +102,7 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
// action execute callback
extern wsrep_status_t wsrep_apply_cb(void *ctx,
const void* buf, size_t buf_len,
wsrep_seqno_t global_seqno);
extern wsrep_status_t wsrep_commit_cb (void *ctx,
wsrep_seqno_t global_seqno,
bool commit);
/* wsrep callbacks */
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
......
......@@ -79,11 +79,6 @@ extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround;
extern my_bool wsrep_incremental_data_collection;
extern const char* wsrep_sst_method;
extern const char* wsrep_sst_receive_address;
extern char* wsrep_sst_auth;
extern const char* wsrep_sst_donor;
extern my_bool wsrep_sst_donor_rejects_queries;
extern const char* wsrep_start_position;
extern long long wsrep_max_ws_size;
extern long wsrep_max_ws_rows;
......@@ -117,71 +112,21 @@ extern const char* wsrep_provider_vendor;
extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff);
extern void wsrep_free_status(THD *thd);
#define WSREP_SST_ADDRESS_AUTO "AUTO"
#define WSREP_NODE_INCOMING_AUTO "AUTO"
// MySQL variables funcs
#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
#define INIT_ARGS (const char* opt)
extern int wsrep_init_vars();
extern void wsrep_provider_init (const char* provider);
extern void wsrep_start_position_init (const char* position);
extern void wsrep_sst_auth_init (const char* auth);
extern bool wsrep_on_update UPDATE_ARGS;
extern void wsrep_causal_reads_update UPDATE_ARGS;
extern bool wsrep_start_position_check CHECK_ARGS;
extern bool wsrep_start_position_update UPDATE_ARGS;
extern void wsrep_start_position_init INIT_ARGS;
extern bool wsrep_provider_check CHECK_ARGS;
extern bool wsrep_provider_update UPDATE_ARGS;
extern void wsrep_provider_init INIT_ARGS;
extern bool wsrep_provider_options_check CHECK_ARGS;
extern bool wsrep_provider_options_update UPDATE_ARGS;
extern void wsrep_provider_options_init INIT_ARGS;
extern bool wsrep_cluster_address_check CHECK_ARGS;
extern bool wsrep_cluster_address_update UPDATE_ARGS;
extern void wsrep_cluster_address_init INIT_ARGS;
extern bool wsrep_cluster_name_check CHECK_ARGS;
extern bool wsrep_cluster_name_update UPDATE_ARGS;
extern bool wsrep_node_name_check CHECK_ARGS;
extern bool wsrep_node_name_update UPDATE_ARGS;
extern bool wsrep_node_address_check CHECK_ARGS;
extern bool wsrep_node_address_update UPDATE_ARGS;
extern void wsrep_node_address_init INIT_ARGS;
extern bool wsrep_sst_method_check CHECK_ARGS;
extern bool wsrep_sst_method_update UPDATE_ARGS;
extern void wsrep_sst_method_init INIT_ARGS;
extern bool wsrep_sst_receive_address_check CHECK_ARGS;
extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
extern bool wsrep_sst_auth_check CHECK_ARGS;
extern bool wsrep_sst_auth_update UPDATE_ARGS;
extern void wsrep_sst_auth_init INIT_ARGS;
extern bool wsrep_sst_donor_check CHECK_ARGS;
extern bool wsrep_sst_donor_update UPDATE_ARGS;
extern bool wsrep_slave_threads_check CHECK_ARGS;
extern bool wsrep_slave_threads_update UPDATE_ARGS;
extern bool wsrep_desync_check CHECK_ARGS;
extern bool wsrep_desync_update UPDATE_ARGS;
extern bool wsrep_before_SE(); // initialize wsrep before storage
// engines (true) or after (false)
extern int wsrep_init();
extern void wsrep_deinit();
extern void wsrep_recover();
extern bool wsrep_before_SE(); // initialize wsrep before storage
// engines (true) or after (false)
/* wsrep initialization sequence at startup
* @param before wsrep_before_SE() value */
extern void wsrep_init_startup(bool before);
......@@ -215,17 +160,11 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
/* wsrep initialization sequence at startup
* @param first wsrep_before_SE() value */
extern void wsrep_init_startup(bool before);
extern void wsrep_close_client_connections(my_bool wait_to_end);
extern int wsrep_wait_committing_connections_close(int wait_time);
extern void wsrep_close_applier(THD *thd);
extern void wsrep_wait_appliers_close(THD *thd);
extern void wsrep_close_applier_threads(int count);
extern void wsrep_create_appliers(long threads = wsrep_slave_threads);
extern void wsrep_create_rollbacker();
extern void wsrep_kill_mysql(THD *thd);
/* new defines */
......@@ -286,18 +225,6 @@ extern wsrep_seqno_t wsrep_locked_seqno;
if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \
}
/*! Synchronizes applier thread start with init thread */
extern void wsrep_sst_grab();
/*! Init thread waits for SST completion */
extern bool wsrep_sst_wait();
/*! Signals wsrep that initialization is complete, writesets can be applied */
extern void wsrep_sst_continue();
extern void wsrep_SE_init_grab(); /*! grab init critical section */
extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
extern void wsrep_ready_wait();
enum wsrep_trx_status {
......@@ -311,17 +238,10 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
class Ha_trx_info;
struct THD_TRANS;
void wsrep_register_hton(THD* thd, bool all);
void wsrep_replication_process(THD *thd);
void wsrep_rollback_process(THD *thd);
void wsrep_brute_force_killer(THD *thd);
int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id);
extern "C" bool wsrep_consistency_check(void *thd_ptr);
//extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
extern int wsrep_thd_is_brute_force(void *thd_ptr);
extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
/* this is visible for client build so that innodb plugin gets this */
typedef struct wsrep_aborting_thd {
......
......@@ -15,6 +15,7 @@
#include <mysqld.h>
#include "wsrep_priv.h"
#include "wsrep_utils.h"
const char* wsrep_notify_cmd="";
......@@ -64,7 +65,7 @@ void wsrep_notify_status (wsrep_member_status_t status,
{
char uuid_str[40];
wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
wsrep_uuid_print (&view->state_id.uuid, uuid_str, sizeof(uuid_str));
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --uuid %s", uuid_str);
......
/* Copyright 2010 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <mysqld.h>
#include "wsrep_priv.h"
const char* wsrep_notify_cmd="";
static const char* _status_str(wsrep_member_status_t status)
{
switch (status)
{
case WSREP_MEMBER_UNDEFINED: return "Undefined";
case WSREP_MEMBER_JOINER: return "Joiner";
case WSREP_MEMBER_DONOR: return "Donor";
case WSREP_MEMBER_JOINED: return "Joined";
case WSREP_MEMBER_SYNCED: return "Synced";
default: return "Error(?)";
}
}
void wsrep_notify_status (wsrep_member_status_t status,
const wsrep_view_info_t* view)
{
if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd))
{
WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification.");
return;
}
char cmd_buf[1 << 16]; // this can be long
long cmd_len = sizeof(cmd_buf) - 1;
char* cmd_ptr = cmd_buf;
long cmd_off = 0;
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s",
wsrep_notify_cmd);
if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR)
{
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s",
_status_str(status));
}
else
{
/* here we preserve provider error codes */
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --status 'Error(%d)'", status);
}
if (0 != view)
{
char uuid_str[40];
wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --uuid %s", uuid_str);
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --primary %s", view->view >= 0 ? "yes" : "no");
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --index %d", view->my_idx);
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members");
for (int i = 0; i < view->memb_num; i++)
{
wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str));
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
"%c%s/%s/%s", i > 0 ? ',' : ' ',
uuid_str, view->members[i].name,
view->members[i].incoming);
}
}
if (cmd_off == cmd_len)
{
WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.",
cmd_len);
return;
}
wsp::process p(cmd_ptr, "r");
p.wait();
int err = p.error();
if (err)
{
WSREP_ERROR("Notification command failed: %d (%s): \"%s\"",
err, strerror(err), cmd_ptr);
}
}
......@@ -26,208 +26,30 @@
#include <pthread.h>
#include <cstdio>
extern void wsrep_ready_set (my_bool x);
void wsrep_ready_set (my_bool x);
extern ssize_t wsrep_sst_prepare (void** msg);
extern int wsrep_sst_donate_cb (void* app_ctx,
ssize_t wsrep_sst_prepare (void** msg);
wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx,
void* recv_ctx,
const void* msg, size_t msg_len,
const wsrep_uuid_t* current_uuid,
wsrep_seqno_t current_seqno,
const wsrep_gtid_t* state_id,
const char* state, size_t state_len,
bool bypass);
extern size_t guess_ip (char* buf, size_t buf_len);
extern size_t guess_address(char* buf, size_t buf_len);
extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno;
// a helper function
void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
const void*, size_t);
/*! SST thread signals init thread about sst completion */
extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool);
void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
extern void wsrep_notify_status (wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0);
namespace wsp {
class node_status
{
public:
node_status() : status(WSREP_MEMBER_UNDEFINED) {}
void set(wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0)
{
if (status != new_status || 0 != view)
{
wsrep_notify_status(new_status, view);
status = new_status;
}
}
wsrep_member_status_t get() const { return status; }
private:
wsrep_member_status_t status;
};
} /* namespace wsp */
extern wsp::node_status local_status;
namespace wsp {
/* A small class to run external programs. */
class process
{
private:
const char* const str_;
FILE* io_;
int err_;
pid_t pid_;
public:
/*! @arg type is a pointer to a null-terminated string which must contain
either the letter 'r' for reading or the letter 'w' for writing.
*/
process (const char* cmd, const char* type);
~process ();
FILE* pipe () { return io_; }
int error() { return err_; }
int wait ();
const char* cmd() { return str_; }
};
#ifdef REMOVED
class lock
{
pthread_mutex_t* const mtx_;
public:
lock (pthread_mutex_t* mtx) : mtx_(mtx)
{
int err = pthread_mutex_lock (mtx_);
if (err)
{
WSREP_ERROR("Mutex lock failed: %s", strerror(err));
abort();
}
}
virtual ~lock ()
{
int err = pthread_mutex_unlock (mtx_);
if (err)
{
WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
abort();
}
}
inline void wait (pthread_cond_t* cond)
{
pthread_cond_wait (cond, mtx_);
}
private:
lock (const lock&);
lock& operator=(const lock&);
};
class monitor
{
int mutable refcnt;
pthread_mutex_t mutable mtx;
pthread_cond_t mutable cond;
public:
monitor() : refcnt(0)
{
pthread_mutex_init (&mtx, NULL);
pthread_cond_init (&cond, NULL);
}
~monitor()
{
pthread_mutex_destroy (&mtx);
pthread_cond_destroy (&cond);
}
void enter() const
{
lock l(&mtx);
while (refcnt)
{
l.wait(&cond);
}
refcnt++;
}
void leave() const
{
lock l(&mtx);
refcnt--;
if (refcnt == 0)
{
pthread_cond_signal (&cond);
}
}
private:
monitor (const monitor&);
monitor& operator= (const monitor&);
};
class critical
{
const monitor& mon;
public:
critical(const monitor& m) : mon(m) { mon.enter(); }
~critical() { mon.leave(); }
private:
critical (const critical&);
critical& operator= (const critical&);
};
#endif
class thd
{
class thd_init
{
public:
thd_init() { my_thread_init(); }
~thd_init() { my_thread_end(); }
}
init;
thd (const thd&);
thd& operator= (const thd&);
public:
thd(my_bool wsrep_on);
~thd();
THD* const ptr;
};
/* binlog-related stuff */
int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len);
class string
{
public:
string() : string_(0) {}
void set(char* str) { if (string_) free (string_); string_ = str; }
~string() { set (0); }
private:
char* string_;
};
} // namespace wsrep
#endif /* WSREP_PRIV_H */
/* Copyright 2010 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
//! @file declares symbols private to wsrep integration layer
#ifndef WSREP_PRIV_H
#define WSREP_PRIV_H
#include "wsrep_mysqld.h"
#include "../wsrep/wsrep_api.h"
#include <log.h>
#include <pthread.h>
#include <cstdio>
extern void wsrep_ready_set (my_bool x);
extern ssize_t wsrep_sst_prepare (void** msg);
extern int wsrep_sst_donate_cb (void* app_ctx,
void* recv_ctx,
const void* msg, size_t msg_len,
const wsrep_uuid_t* current_uuid,
wsrep_seqno_t current_seqno,
const char* state, size_t state_len,
bool bypass);
extern size_t guess_ip (char* buf, size_t buf_len);
extern size_t guess_address(char* buf, size_t buf_len);
extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno;
/*! SST thread signals init thread about sst completion */
extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool);
extern void wsrep_notify_status (wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0);
namespace wsp {
class node_status
{
public:
node_status() : status(WSREP_MEMBER_UNDEFINED) {}
void set(wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0)
{
if (status != new_status || 0 != view)
{
wsrep_notify_status(new_status, view);
status = new_status;
}
}
wsrep_member_status_t get() const { return status; }
private:
wsrep_member_status_t status;
};
} /* namespace wsp */
extern wsp::node_status local_status;
namespace wsp {
/* A small class to run external programs. */
class process
{
private:
const char* const str_;
FILE* io_;
int err_;
pid_t pid_;
public:
/*! @arg type is a pointer to a null-terminated string which must contain
either the letter 'r' for reading or the letter 'w' for writing.
*/
process (const char* cmd, const char* type);
~process ();
FILE* pipe () { return io_; }
int error() { return err_; }
int wait ();
const char* cmd() { return str_; }
};
#ifdef REMOVED
class lock
{
pthread_mutex_t* const mtx_;
public:
lock (pthread_mutex_t* mtx) : mtx_(mtx)
{
int err = pthread_mutex_lock (mtx_);
if (err)
{
WSREP_ERROR("Mutex lock failed: %s", strerror(err));
abort();
}
}
virtual ~lock ()
{
int err = pthread_mutex_unlock (mtx_);
if (err)
{
WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
abort();
}
}
inline void wait (pthread_cond_t* cond)
{
pthread_cond_wait (cond, mtx_);
}
private:
lock (const lock&);
lock& operator=(const lock&);
};
class monitor
{
int mutable refcnt;
pthread_mutex_t mutable mtx;
pthread_cond_t mutable cond;
public:
monitor() : refcnt(0)
{
pthread_mutex_init (&mtx, NULL);
pthread_cond_init (&cond, NULL);
}
~monitor()
{
pthread_mutex_destroy (&mtx);
pthread_cond_destroy (&cond);
}
void enter() const
{
lock l(&mtx);
while (refcnt)
{
l.wait(&cond);
}
refcnt++;
}
void leave() const
{
lock l(&mtx);
refcnt--;
if (refcnt == 0)
{
pthread_cond_signal (&cond);
}
}
private:
monitor (const monitor&);
monitor& operator= (const monitor&);
};
class critical
{
const monitor& mon;
public:
critical(const monitor& m) : mon(m) { mon.enter(); }
~critical() { mon.leave(); }
private:
critical (const critical&);
critical& operator= (const critical&);
};
#endif
class thd
{
class thd_init
{
public:
thd_init() { my_thread_init(); }
~thd_init() { my_thread_end(); }
}
init;
thd (const thd&);
thd& operator= (const thd&);
public:
thd(my_bool wsrep_on);
~thd();
THD* const ptr;
};
class string
{
public:
string() : string_(0) {}
void set(char* str) { if (string_) free (string_); string_ = str; }
~string() { set (0); }
private:
char* string_;
};
} // namespace wsrep
#endif /* WSREP_PRIV_H */
......@@ -13,6 +13,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "wsrep_sst.h"
#include <mysqld.h>
#include <sql_class.h>
#include <set_var.h>
......@@ -20,6 +22,7 @@
#include <sql_reload.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
#include "wsrep_utils.h"
#include <cstdio>
#include <cstdlib>
......@@ -58,7 +61,6 @@ const char* wsrep_sst_donor = "";
// container for real auth string
static const char* sst_auth_real = NULL;
my_bool wsrep_sst_donor_rejects_queries = FALSE;
bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
......
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#ifndef WSREP_SST_H
#define WSREP_SST_H
#include <mysql.h> // my_bool
/* system variables */
extern const char* wsrep_sst_method;
extern const char* wsrep_sst_receive_address;
extern const char* wsrep_sst_donor;
extern char* wsrep_sst_auth;
extern my_bool wsrep_sst_donor_rejects_queries;
/*! Synchronizes applier thread start with init thread */
extern void wsrep_sst_grab();
/*! Init thread waits for SST completion */
extern bool wsrep_sst_wait();
/*! Signals wsrep that initialization is complete, writesets can be applied */
extern void wsrep_sst_continue();
extern void wsrep_SE_init_grab(); /*! grab init critical section */
extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
#endif /* WSREP_SST_H */
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#include "wsrep_thd.h"
#include "transaction.h"
#include "rpl_rli.h"
#include "log_event.h"
#include "sql_parse.h"
#include "slave.h" // opt_log_slave_updates
#include "sql_base.h" // close_thread_tables()
#include "mysqld.h" // start_wsrep_THD();
/* must have (&thd->LOCK_wsrep_thd) */
void wsrep_client_rollback(THD *thd)
{
WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
thd->thread_id, thd->query());
thd->wsrep_conflict_state= ABORTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
trans_rollback(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
if (thd->global_read_lock.is_acquired())
{
WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
thd->global_read_lock.unlock_global_read_lock(thd);
}
/* Release transactional metadata locks. */
thd->mdl_context.release_transactional_locks();
/* release explicit MDL locks */
thd->mdl_context.release_explicit_locks();
if (thd->get_binlog_table_maps())
{
WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
thd->clear_binlog_table_maps();
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state= ABORTED;
}
static Relay_log_info* wsrep_relay_log_init(const char* log_fname)
{
Relay_log_info* rli= new Relay_log_info(false);
rli->no_storage= true;
if (!rli->relay_log.description_event_for_exec)
{
rli->relay_log.description_event_for_exec=
new Format_description_log_event(4);
}
rli->sql_thd= current_thd;
return rli;
}
static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
{
shadow->options = thd->variables.option_bits;
shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
shadow->vio = thd->net.vio;
if (opt_log_slave_updates)
thd->variables.option_bits|= OPTION_BIN_LOG;
else
thd->variables.option_bits&= ~(OPTION_BIN_LOG);
if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
thd->wsrep_exec_mode= REPL_RECV;
thd->net.vio= 0;
thd->clear_error();
thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
shadow->tx_isolation = thd->variables.tx_isolation;
thd->variables.tx_isolation = ISO_READ_COMMITTED;
thd->tx_isolation = ISO_READ_COMMITTED;
}
static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
{
thd->variables.option_bits = shadow->options;
thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
thd->net.vio = shadow->vio;
thd->variables.tx_isolation = shadow->tx_isolation;
}
void wsrep_replay_transaction(THD *thd)
{
/* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY) {
if (thd->wsrep_exec_mode!= REPL_RECV) {
if (thd->stmt_da->is_sent)
{
WSREP_ERROR("replay issue, thd has reported status already");
}
thd->stmt_da->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_reset_thd_for_next_command(thd);
thd->killed= THD::NOT_KILLED;
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("releasing table lock for replaying (%ld)",
thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
thd->mdl_context.release_transactional_locks();
thd_proc_info(thd, "wsrep replaying trx");
WSREP_DEBUG("replay trx: %s %lld",
thd->query() ? thd->query() : "void",
(long long)wsrep_thd_trx_seqno(thd));
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
int rcode = wsrep->replay_trx(wsrep,
&thd->wsrep_ws_handle,
(void *)thd);
wsrep_return_from_bf_mode(thd, &shadow);
if (thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (rcode)
{
case WSREP_OK:
thd->wsrep_conflict_state= NO_CONFLICT;
wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id);
if (thd->stmt_da->is_sent)
{
WSREP_WARN("replay ok, thd has reported status");
}
else
{
my_ok(thd);
}
break;
case WSREP_TRX_FAIL:
if (thd->stmt_da->is_sent)
{
WSREP_ERROR("replay failed, thd has reported status");
}
else
{
WSREP_DEBUG("replay failed, rolling back");
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
}
thd->wsrep_conflict_state= ABORTED;
wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
break;
default:
WSREP_ERROR("trx_replay failed for: %d, query: %s",
rcode, thd->query() ? thd->query() : "void");
/* we're now in inconsistent state, must abort */
unireg_abort(1);
break;
}
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
}
}
static void wsrep_replication_process(THD *thd)
{
int rcode;
DBUG_ENTER("wsrep_replication_process");
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
rcode = wsrep->recv(wsrep, (void *)thd);
DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
WSREP_INFO("applier thread exiting (code:%d)", rcode);
switch (rcode) {
case WSREP_OK:
case WSREP_NOT_IMPLEMENTED:
case WSREP_CONN_FAIL:
/* provider does not support slave operations / disconnected from group,
* just close applier thread */
break;
case WSREP_NODE_FAIL:
/* data inconsistency => SST is needed */
/* Note: we cannot just blindly restart replication here,
* SST might require server restart if storage engines must be
* initialized after SST */
WSREP_ERROR("node consistency compromised, aborting");
wsrep_kill_mysql(thd);
break;
case WSREP_WARNING:
case WSREP_TRX_FAIL:
case WSREP_TRX_MISSING:
/* these suggests a bug in provider code */
WSREP_WARN("bad return from recv() call: %d", rcode);
/* fall through to node shutdown */
case WSREP_FATAL:
/* Cluster connectivity is lost.
*
* If applier was killed on purpose (KILL_CONNECTION), we
* avoid mysql shutdown. This is because the killer will then handle
* shutdown processing (or replication restarting)
*/
if (thd->killed != THD::KILL_CONNECTION)
{
wsrep_kill_mysql(thd);
}
break;
}
mysql_mutex_lock(&LOCK_thread_count);
wsrep_close_applier(thd);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
if (thd->temporary_tables)
{
WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
}
wsrep_return_from_bf_mode(thd, &shadow);
DBUG_VOID_RETURN;
}
void wsrep_create_appliers(long threads)
{
if (!wsrep_connected)
{
/* see wsrep_replication_start() for the logic */
if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
WSREP_ERROR("Trying to launch slave threads before creating "
"connection at '%s'", wsrep_cluster_address);
assert(0);
}
return;
}
long wsrep_threads=0;
pthread_t hThread;
while (wsrep_threads++ < threads) {
if (pthread_create(
&hThread, &connection_attrib,
start_wsrep_THD, (void*)wsrep_replication_process))
WSREP_WARN("Can't create thread to manage wsrep replication");
}
}
static void wsrep_rollback_process(THD *thd)
{
DBUG_ENTER("wsrep_rollback_process");
mysql_mutex_lock(&LOCK_wsrep_rollback);
wsrep_aborting_thd= NULL;
while (thd->killed == THD::NOT_KILLED) {
thd_proc_info(thd, "wsrep aborter idle");
thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
thd->mysys_var->current_cond= &COND_wsrep_rollback;
mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
WSREP_DEBUG("WSREP rollback thread wakes for signal");
mysql_mutex_lock(&thd->mysys_var->mutex);
thd_proc_info(thd, "wsrep aborter active");
thd->mysys_var->current_mutex= 0;
thd->mysys_var->current_cond= 0;
mysql_mutex_unlock(&thd->mysys_var->mutex);
/* check for false alarms */
if (!wsrep_aborting_thd)
{
WSREP_DEBUG("WSREP rollback thread has empty abort queue");
}
/* process all entries in the queue */
while (wsrep_aborting_thd) {
THD *aborting;
wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
aborting = wsrep_aborting_thd->aborting_thd;
my_free(wsrep_aborting_thd);
wsrep_aborting_thd= next;
/*
* must release mutex, appliers my want to add more
* aborting thds in our work queue, while we rollback
*/
mysql_mutex_unlock(&LOCK_wsrep_rollback);
mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
if (aborting->wsrep_conflict_state== ABORTED)
{
WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
(long long)aborting->real_id,
aborting->wsrep_conflict_state);
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
mysql_mutex_lock(&LOCK_wsrep_rollback);
continue;
}
aborting->wsrep_conflict_state= ABORTING;
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
wsrep_client_rollback(aborting);
WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
aborting->thread_id, (long long)aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
mysql_mutex_lock(&LOCK_wsrep_rollback);
}
}
mysql_mutex_unlock(&LOCK_wsrep_rollback);
sql_print_information("WSREP: rollbacker thread exiting");
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
DBUG_VOID_RETURN;
}
void wsrep_create_rollbacker()
{
if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
pthread_t hThread;
/* create rollbacker */
if (pthread_create( &hThread, &connection_attrib,
start_wsrep_THD, (void*)wsrep_rollback_process))
WSREP_WARN("Can't create thread to manage wsrep rollback");
}
}
extern "C"
int wsrep_thd_is_brute_force(void *thd_ptr)
{
if (thd_ptr) {
switch (((THD *)thd_ptr)->wsrep_exec_mode) {
case LOCAL_STATE:
{
if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
{
return 1;
}
return 0;
}
case REPL_RECV: return 1;
case TOTAL_ORDER: return 2;
case LOCAL_COMMIT: return 3;
}
}
return 0;
}
extern "C"
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
{
THD *victim_thd = (THD *) victim_thd_ptr;
THD *bf_thd = (THD *) bf_thd_ptr;
DBUG_ENTER("wsrep_abort_thd");
if ( (WSREP(bf_thd) ||
( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
victim_thd)
{
WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
(long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
}
else
{
WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
}
DBUG_RETURN(1);
}
extern "C"
int wsrep_thd_in_locking_session(void *thd_ptr)
{
if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
return 1;
}
return 0;
}
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#ifndef WSREP_THD_H
#define WSREP_THD_H
#include "sql_class.h"
void wsrep_client_rollback(THD *thd);
void wsrep_replay_transaction(THD *thd);
void wsrep_create_appliers(long threads);
void wsrep_create_rollbacker();
extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
#endif /* WSREP_THD_H */
......@@ -20,6 +20,10 @@
#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag
#endif
#include "wsrep_utils.h"
#include "wsrep_mysqld.h"
//#include "wsrep_api.h"
//#include "wsrep_priv.h"
#include <spawn.h> // posix_spawn()
#include <unistd.h> // pipe()
#include <errno.h> // errno
......
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#ifndef WSREP_UTILS_H
#define WSREP_UTILS_H
#include "wsrep_priv.h"
unsigned int wsrep_check_ip (const char* addr);
size_t wsrep_guess_ip (char* buf, size_t buf_len);
size_t wsrep_guess_address(char* buf, size_t buf_len);
namespace wsp {
class node_status
{
public:
node_status() : status(WSREP_MEMBER_UNDEFINED) {}
void set(wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0)
{
if (status != new_status || 0 != view)
{
wsrep_notify_status(new_status, view);
status = new_status;
}
}
wsrep_member_status_t get() const { return status; }
private:
wsrep_member_status_t status;
};
} /* namespace wsp */
extern wsp::node_status local_status;
namespace wsp {
/* A small class to run external programs. */
class process
{
private:
const char* const str_;
FILE* io_;
int err_;
pid_t pid_;
public:
/*! @arg type is a pointer to a null-terminated string which must contain
either the letter 'r' for reading or the letter 'w' for writing.
*/
process (const char* cmd, const char* type);
~process ();
FILE* pipe () { return io_; }
int error() { return err_; }
int wait ();
const char* cmd() { return str_; }
};
class thd
{
class thd_init
{
public:
thd_init() { my_thread_init(); }
~thd_init() { my_thread_end(); }
}
init;
thd (const thd&);
thd& operator= (const thd&);
public:
thd(my_bool wsrep_on);
~thd();
THD* const ptr;
};
class string
{
public:
string() : string_(0) {}
void set(char* str) { if (string_) free (string_); string_ = str; }
~string() { set (0); }
private:
char* string_;
};
#ifdef REMOVED
class lock
{
pthread_mutex_t* const mtx_;
public:
lock (pthread_mutex_t* mtx) : mtx_(mtx)
{
int err = pthread_mutex_lock (mtx_);
if (err)
{
WSREP_ERROR("Mutex lock failed: %s", strerror(err));
abort();
}
}
virtual ~lock ()
{
int err = pthread_mutex_unlock (mtx_);
if (err)
{
WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
abort();
}
}
inline void wait (pthread_cond_t* cond)
{
pthread_cond_wait (cond, mtx_);
}
private:
lock (const lock&);
lock& operator=(const lock&);
};
class monitor
{
int mutable refcnt;
pthread_mutex_t mutable mtx;
pthread_cond_t mutable cond;
public:
monitor() : refcnt(0)
{
pthread_mutex_init (&mtx, NULL);
pthread_cond_init (&cond, NULL);
}
~monitor()
{
pthread_mutex_destroy (&mtx);
pthread_cond_destroy (&cond);
}
void enter() const
{
lock l(&mtx);
while (refcnt)
{
l.wait(&cond);
}
refcnt++;
}
void leave() const
{
lock l(&mtx);
refcnt--;
if (refcnt == 0)
{
pthread_cond_signal (&cond);
}
}
private:
monitor (const monitor&);
monitor& operator= (const monitor&);
};
class critical
{
const monitor& mon;
public:
critical(const monitor& m) : mon(m) { mon.enter(); }
~critical() { mon.leave(); }
private:
critical (const critical&);
critical& operator= (const critical&);
};
#endif
} // namespace wsrep
#endif /* WSREP_UTILS_H */
......@@ -13,12 +13,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "wsrep_var.h"
#include <mysqld.h>
#include <sql_class.h>
#include <sql_plugin.h>
#include <set_var.h>
#include <sql_acl.h>
#include "wsrep_priv.h"
#include "wsrep_thd.h"
#include <my_dir.h>
#include <cstdio>
#include <cstdlib>
......
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#ifndef WSREP_VAR_H
#define WSREP_VAR_H
#define WSREP_NODE_INCOMING_AUTO "AUTO"
// MySQL variables funcs
#include "sql_priv.h"
class sys_var;
class set_var;
class THD;
#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
#define INIT_ARGS (const char* opt)
extern bool wsrep_on_update UPDATE_ARGS;
extern void wsrep_causal_reads_update UPDATE_ARGS;
extern bool wsrep_start_position_check CHECK_ARGS;
extern bool wsrep_start_position_update UPDATE_ARGS;
extern void wsrep_start_position_init INIT_ARGS;
extern bool wsrep_provider_check CHECK_ARGS;
extern bool wsrep_provider_update UPDATE_ARGS;
extern void wsrep_provider_init INIT_ARGS;
extern bool wsrep_provider_options_check CHECK_ARGS;
extern bool wsrep_provider_options_update UPDATE_ARGS;
extern void wsrep_provider_options_init INIT_ARGS;
extern bool wsrep_cluster_address_check CHECK_ARGS;
extern bool wsrep_cluster_address_update UPDATE_ARGS;
extern void wsrep_cluster_address_init INIT_ARGS;
extern bool wsrep_cluster_name_check CHECK_ARGS;
extern bool wsrep_cluster_name_update UPDATE_ARGS;
extern bool wsrep_node_name_check CHECK_ARGS;
extern bool wsrep_node_name_update UPDATE_ARGS;
extern bool wsrep_node_address_check CHECK_ARGS;
extern bool wsrep_node_address_update UPDATE_ARGS;
extern void wsrep_node_address_init INIT_ARGS;
extern bool wsrep_sst_method_check CHECK_ARGS;
extern bool wsrep_sst_method_update UPDATE_ARGS;
extern void wsrep_sst_method_init INIT_ARGS;
extern bool wsrep_sst_receive_address_check CHECK_ARGS;
extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
extern bool wsrep_sst_auth_check CHECK_ARGS;
extern bool wsrep_sst_auth_update UPDATE_ARGS;
extern void wsrep_sst_auth_init INIT_ARGS;
extern bool wsrep_sst_donor_check CHECK_ARGS;
extern bool wsrep_sst_donor_update UPDATE_ARGS;
extern bool wsrep_slave_threads_check CHECK_ARGS;
extern bool wsrep_slave_threads_update UPDATE_ARGS;
extern bool wsrep_desync_check CHECK_ARGS;
extern bool wsrep_desync_update UPDATE_ARGS;
#endif /* WSREP_VAR_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment