Commit b1fabf6c authored by Monty's avatar Monty

Performance improvements to test if WSREP if active

parent 93281221
......@@ -1291,7 +1291,7 @@ bool do_command(THD *thd)
Aborted by background rollbacker thread.
Handle error here and jump straight to out
*/
if (wsrep_before_command(thd))
if (unlikely(wsrep_service_started) && wsrep_before_command(thd))
{
thd->store_globals();
WSREP_LOG_THD(thd, "enter found BF aborted");
......@@ -1368,7 +1368,8 @@ bool do_command(THD *thd)
if (packet_length != packet_error)
{
/* there was a command to process, and before_command() has been called */
wsrep_after_command_after_result(thd);
if (unlikely(wsrep_service_started))
wsrep_after_command_after_result(thd);
}
#endif /* WITH_WSREP */
DBUG_RETURN(return_value);
......@@ -1659,14 +1660,20 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
{
thd->status_var.com_other++;
#ifdef WITH_WSREP
wsrep_after_command_ignore_result(thd);
wsrep_close(thd);
if (unlikely(wsrep_service_started))
{
wsrep_after_command_ignore_result(thd);
wsrep_close(thd);
}
#endif /* WITH_WSREP */
thd->change_user();
thd->clear_error(); // if errors from rollback
#ifdef WITH_WSREP
wsrep_open(thd);
wsrep_before_command(thd);
if (unlikely(wsrep_service_started))
{
wsrep_open(thd);
wsrep_before_command(thd);
}
#endif /* WITH_WSREP */
/* Restore original charset from client authentication packet.*/
if(thd->org_charset)
......@@ -1680,13 +1687,19 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
status_var_increment(thd->status_var.com_other);
#ifdef WITH_WSREP
wsrep_after_command_ignore_result(thd);
wsrep_close(thd);
if (unlikely(wsrep_service_started))
{
wsrep_after_command_ignore_result(thd);
wsrep_close(thd);
}
#endif /* WITH_WSREP */
thd->change_user();
#ifdef WITH_WSREP
wsrep_open(thd);
wsrep_before_command(thd);
if (unlikely(wsrep_service_started))
{
wsrep_open(thd);
wsrep_before_command(thd);
}
#endif /* WITH_WSREP */
thd->clear_error(); // if errors from rollback
......@@ -2376,46 +2389,52 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
do_end_of_statement= true;
#ifdef WITH_WSREP
/*
BF aborted before sending response back to client
Next test should really be WSREP(thd), but that causes a failure when doing
'set WSREP_ON=0'
*/
if (thd->killed == KILL_QUERY)
{
WSREP_DEBUG("THD is killed at dispatch_end");
}
wsrep_after_command_before_result(thd);
if (wsrep_current_error(thd) &&
!(command == COM_STMT_PREPARE ||
command == COM_STMT_FETCH ||
command == COM_STMT_SEND_LONG_DATA ||
command == COM_STMT_CLOSE
))
{
/* todo: Pass wsrep client state current error to override */
wsrep_override_error(thd, wsrep_current_error(thd),
wsrep_current_error_status(thd));
WSREP_LOG_THD(thd, "leave");
}
if (WSREP(thd))
if (unlikely(wsrep_service_started))
{
/*
MDEV-10812
In the case of COM_QUIT/COM_STMT_CLOSE thread status should be disabled.
BF aborted before sending response back to client
*/
DBUG_ASSERT((command != COM_QUIT && command != COM_STMT_CLOSE)
if (thd->killed == KILL_QUERY)
{
WSREP_DEBUG("THD is killed at dispatch_end");
}
wsrep_after_command_before_result(thd);
if (wsrep_current_error(thd) &&
!(command == COM_STMT_PREPARE ||
command == COM_STMT_FETCH ||
command == COM_STMT_SEND_LONG_DATA ||
command == COM_STMT_CLOSE
))
{
/* todo: Pass wsrep client state current error to override */
wsrep_override_error(thd, wsrep_current_error(thd),
wsrep_current_error_status(thd));
WSREP_LOG_THD(thd, "leave");
}
if (WSREP(thd))
{
/*
MDEV-10812
In the case of COM_QUIT/COM_STMT_CLOSE thread status should be disabled.
*/
DBUG_ASSERT((command != COM_QUIT && command != COM_STMT_CLOSE)
|| thd->get_stmt_da()->is_disabled());
DBUG_ASSERT(thd->wsrep_trx().state() != wsrep::transaction::s_replaying);
/* wsrep BF abort in query exec phase */
mysql_mutex_lock(&thd->LOCK_thd_kill);
do_end_of_statement= thd_is_connection_alive(thd);
mysql_mutex_unlock(&thd->LOCK_thd_kill);
DBUG_ASSERT(thd->wsrep_trx().state() != wsrep::transaction::s_replaying);
/* wsrep BF abort in query exec phase */
mysql_mutex_lock(&thd->LOCK_thd_kill);
do_end_of_statement= thd_is_connection_alive(thd);
mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
}
else
do_end_of_statement= true;
#endif /* WITH_WSREP */
if (do_end_of_statement)
{
DBUG_ASSERT(thd->derived_tables == NULL &&
......
......@@ -99,6 +99,7 @@ my_bool wsrep_desync; // De(re)synchronize the node fr
my_bool wsrep_strict_ddl; // Reject DDL to
// effected tables not
// supporting Galera replication
bool wsrep_service_started; // If Galera was initialized
long wsrep_slave_threads; // No. of slave appliers threads
ulong wsrep_retry_autocommit; // Retry aborted autocommit trx
ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) size
......@@ -831,10 +832,6 @@ int wsrep_init()
return err;
}
global_system_variables.wsrep_on= 1;
WSREP_ON_= wsrep_provider && strcmp(wsrep_provider, WSREP_NONE);
if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates)
{
WSREP_ERROR("Option --log-slave-updates is required if "
......@@ -866,6 +863,11 @@ int wsrep_init()
return 1;
}
/* Now WSREP is fully initialized */
global_system_variables.wsrep_on= 1;
WSREP_ON_= wsrep_provider && strcmp(wsrep_provider, WSREP_NONE);
wsrep_service_started= 1;
wsrep_init_provider_status_variables();
wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(),
&wsrep_provider_capabilities);
......@@ -1162,18 +1164,21 @@ bool wsrep_start_replication()
bool wsrep_must_sync_wait (THD* thd, uint mask)
{
bool ret;
mysql_mutex_lock(&thd->LOCK_thd_data);
ret= (thd->variables.wsrep_sync_wait & mask) &&
thd->wsrep_client_thread &&
thd->variables.wsrep_on &&
!(thd->variables.wsrep_dirty_reads &&
!is_update_query(thd->lex->sql_command)) &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_trx().state() !=
wsrep::transaction::s_replaying &&
thd->wsrep_cs().sync_wait_gtid().is_undefined();
mysql_mutex_unlock(&thd->LOCK_thd_data);
bool ret= 0;
if (thd->variables.wsrep_on)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
ret= (thd->variables.wsrep_sync_wait & mask) &&
thd->wsrep_client_thread &&
thd->variables.wsrep_on &&
!(thd->variables.wsrep_dirty_reads &&
!is_update_query(thd->lex->sql_command)) &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_trx().state() !=
wsrep::transaction::s_replaying &&
thd->wsrep_cs().sync_wait_gtid().is_undefined();
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
return ret;
}
......
......@@ -186,6 +186,7 @@ void wsrep_recover_sr_from_storage(THD *);
// Other wsrep global variables
extern my_bool wsrep_inited; // whether wsrep is initialized ?
extern bool wsrep_service_started;
extern "C" void wsrep_fire_rollbacker(THD *thd);
extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment