Commit 609388fc authored by Seppo Jaakola's avatar Seppo Jaakola

Merged changes from lp:codership-mysql up to rev 3743

-r3725..3737
-r3738..3740
-r3741..3743
parent e0015163
...@@ -8130,19 +8130,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) ...@@ -8130,19 +8130,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/* A small test to verify that objects have consistent types */ /* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0)) if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{ {
uint actual_error= thd->stmt_da->sql_errno();
#ifdef WITH_WSREP #ifdef WITH_WSREP
uint actual_error= ER_SERVER_SHUTDOWN; if (WSREP(thd))
if (WSREP(thd) && !thd->is_fatal_error)
{ {
sql_print_information("WSREP, BF applier interrupted in log_event.cc"); WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d "
"wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)",
thd->stmt_da->sql_errno(),
thd->is_fatal_error,
thd->wsrep_exec_mode,
thd->wsrep_conflict_state,
(long long)thd->wsrep_trx_seqno);
} }
else
actual_error= thd->stmt_da->sql_errno();
#else
uint actual_error= thd->stmt_da->sql_errno();
#endif #endif
if (thd->is_slave_error || thd->is_fatal_error) if (thd->is_slave_error || thd->is_fatal_error)
{ {
...@@ -10878,6 +10879,8 @@ Format_description_log_event *wsrep_format_desc; // TODO: free them at the end ...@@ -10878,6 +10879,8 @@ Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
At the end (*buf) is shitfed to point to the following event or NULL and At the end (*buf) is shitfed to point to the following event or NULL and
(*buf_len) will be changed to account just being read bytes of the 1st event. (*buf_len) will be changed to account just being read bytes of the 1st event.
*/ */
#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max
Log_event* wsrep_read_log_event( Log_event* wsrep_read_log_event(
char **arg_buf, size_t *arg_buf_len, char **arg_buf, size_t *arg_buf_len,
const Format_description_log_event *description_event) const Format_description_log_event *description_event)
...@@ -10889,12 +10892,8 @@ Log_event* wsrep_read_log_event( ...@@ -10889,12 +10892,8 @@ Log_event* wsrep_read_log_event(
char *buf= (*arg_buf); char *buf= (*arg_buf);
const char *error= 0; const char *error= 0;
Log_event *res= 0; Log_event *res= 0;
#ifndef max_allowed_packet
THD *thd=current_thd;
uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
#endif
if (data_len > max_allowed_packet) if (data_len > WSREP_MAX_ALLOWED_PACKET)
{ {
error = "Event too big"; error = "Event too big";
goto err; goto err;
......
...@@ -100,11 +100,14 @@ bool Alter_table_statement::execute(THD *thd) ...@@ -100,11 +100,14 @@ bool Alter_table_statement::execute(THD *thd)
thd->enable_slow_log= opt_log_slow_admin_statements; thd->enable_slow_log= opt_log_slow_admin_statements;
#ifdef WITH_WSREP #ifdef WITH_WSREP
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
if ((!thd->is_current_stmt_binlog_format_row() || if ((!thd->is_current_stmt_binlog_format_row() ||
!find_temporary_table(thd, first_table)) && !find_temporary_table(thd, first_table)) &&
wsrep_to_isolation_begin(thd, first_table->db, first_table->table_name)) wsrep_to_isolation_begin(thd,
lex->name.str ? select_lex->db : NULL,
lex->name.str ? lex->name.str : NULL,
first_table))
{ {
WSREP_WARN("ALTER TABLE isolation failure"); WSREP_WARN("ALTER TABLE isolation failure");
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
......
...@@ -62,6 +62,7 @@ ...@@ -62,6 +62,7 @@
#ifdef WITH_WSREP #ifdef WITH_WSREP
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
#endif // WITH_WSREP #endif // WITH_WSREP
bool bool
...@@ -5077,6 +5078,24 @@ bool open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags, ...@@ -5077,6 +5078,24 @@ bool open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags,
} }
} }
} }
#ifdef WITH_WSREP
#define WSREP_TO_ISOLATION_BEGIN(db_, table_, table_list_) \
if (WSREP(thd) && wsrep_to_isolation_begin(thd, db_, table_, table_list_)) goto err;
if ((thd->lex->sql_command== SQLCOM_INSERT ||
thd->lex->sql_command== SQLCOM_INSERT_SELECT ||
thd->lex->sql_command== SQLCOM_REPLACE ||
thd->lex->sql_command== SQLCOM_REPLACE_SELECT ||
thd->lex->sql_command== SQLCOM_UPDATE ||
thd->lex->sql_command== SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command== SQLCOM_LOAD ||
thd->lex->sql_command== SQLCOM_DELETE) &&
wsrep_replicate_myisam &&
(*start)->table && (*start)->table->file->ht->db_type == DB_TYPE_MYISAM)
{
WSREP_TO_ISOLATION_BEGIN(NULL, NULL, (*start));
}
#endif
err: err:
#ifdef WITH_WSREP #ifdef WITH_WSREP
......
...@@ -100,9 +100,6 @@ int get_or_create_user_conn(THD *thd, const char *user, ...@@ -100,9 +100,6 @@ int get_or_create_user_conn(THD *thd, const char *user,
} }
thd->user_connect=uc; thd->user_connect=uc;
uc->connections++; uc->connections++;
#ifdef WITH_WSREP
thd->wsrep_client_thread= 1;
#endif /* WITH_WSREP */
end: end:
mysql_mutex_unlock(&LOCK_user_conn); mysql_mutex_unlock(&LOCK_user_conn);
return return_val; return return_val;
...@@ -1203,6 +1200,9 @@ bool thd_prepare_connection(THD *thd) ...@@ -1203,6 +1200,9 @@ bool thd_prepare_connection(THD *thd)
(char *) thd->security_ctx->host_or_ip); (char *) thd->security_ctx->host_or_ip);
prepare_new_connection_state(thd); prepare_new_connection_state(thd);
#ifdef WITH_WSREP
thd->wsrep_client_thread= 1;
#endif /* WITH_WSREP */
return FALSE; return FALSE;
} }
......
This diff is collapsed.
...@@ -521,7 +521,7 @@ bool Truncate_statement::execute(THD *thd) ...@@ -521,7 +521,7 @@ bool Truncate_statement::execute(THD *thd)
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd) && wsrep_to_isolation_begin(thd, if (WSREP(thd) && wsrep_to_isolation_begin(thd,
first_table->db, first_table->db,
first_table->table_name)) first_table->table_name, NULL))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
if (! (res= truncate_table(thd, first_table))) if (! (res= truncate_table(thd, first_table)))
......
...@@ -3779,6 +3779,9 @@ static Sys_var_mybool Sys_wsrep_recover_datadir( ...@@ -3779,6 +3779,9 @@ static Sys_var_mybool Sys_wsrep_recover_datadir(
READ_ONLY GLOBAL_VAR(wsrep_recovery), READ_ONLY GLOBAL_VAR(wsrep_recovery),
CMD_LINE(OPT_ARG, OPT_WSREP_RECOVER), DEFAULT(FALSE)); CMD_LINE(OPT_ARG, OPT_WSREP_RECOVER), DEFAULT(FALSE));
static Sys_var_mybool Sys_wsrep_replicate_myisam(
"wsrep_replicate_myisam", "To enable myisam replication",
GLOBAL_VAR(wsrep_replicate_myisam), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
......
...@@ -158,7 +158,7 @@ bool trans_begin(THD *thd, uint flags) ...@@ -158,7 +158,7 @@ bool trans_begin(THD *thd, uint flags)
#ifdef WITH_WSREP #ifdef WITH_WSREP
thd->wsrep_PA_safe= true; thd->wsrep_PA_safe= true;
if (thd->wsrep_client_thread && wsrep_causal_wait(thd)) if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <mysqld.h> #include <mysqld.h>
#include "sql_base.h" #include "sql_base.h"
#include "rpl_filter.h"
#include <sql_class.h> #include <sql_class.h>
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
#include "wsrep_priv.h" #include "wsrep_priv.h"
...@@ -174,6 +175,7 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) ...@@ -174,6 +175,7 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates; extern my_bool opt_log_slave_updates;
enum wsrep_trx_status enum wsrep_trx_status
wsrep_run_wsrep_commit( wsrep_run_wsrep_commit(
...@@ -238,7 +240,8 @@ wsrep_run_wsrep_commit( ...@@ -238,7 +240,8 @@ wsrep_run_wsrep_commit(
while (wsrep_replaying > 0 && while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT && thd->wsrep_conflict_state == NO_CONFLICT &&
thd->killed == NOT_KILLED && thd->killed == NOT_KILLED &&
!shutdown_in_progress) { !shutdown_in_progress)
{
mysql_mutex_unlock(&LOCK_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying);
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
...@@ -278,7 +281,8 @@ wsrep_run_wsrep_commit( ...@@ -278,7 +281,8 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("innobase_commit abort after replaying wait %s", WSREP_DEBUG("innobase_commit abort after replaying wait %s",
(thd->query()) ? thd->query() : "void"); (thd->query()) ? thd->query() : "void");
DBUG_RETURN(WSREP_TRX_ROLLBACK); DBUG_RETURN(WSREP_TRX_ROLLBACK);
} thd->wsrep_query_state = QUERY_COMMITTING; }
thd->wsrep_query_state = QUERY_COMMITTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
cache = get_trans_log(thd); cache = get_trans_log(thd);
...@@ -296,8 +300,18 @@ wsrep_run_wsrep_commit( ...@@ -296,8 +300,18 @@ wsrep_run_wsrep_commit(
{ {
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_exec_mode = LOCAL_COMMIT; thd->wsrep_exec_mode = LOCAL_COMMIT;
WSREP_DEBUG("empty rbr buffer, query: %s", thd->query());
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
if (thd->stmt_da->is_ok() &&
thd->stmt_da->affected_rows() > 0 &&
!binlog_filter->is_on())
{
WSREP_WARN("empty rbr buffer, query: %s, affected rows: %llu",
thd->query(), thd->stmt_da->affected_rows());
}
else
{
WSREP_DEBUG("empty rbr buffer, query: %s", thd->query());
}
DBUG_RETURN(WSREP_TRX_OK); DBUG_RETURN(WSREP_TRX_OK);
} }
if (!rcode) { if (!rcode) {
......
...@@ -48,6 +48,7 @@ my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key ...@@ -48,6 +48,7 @@ my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
long wsrep_max_protocol_version = 1; // maximum protocol version to use long wsrep_max_protocol_version = 1; // maximum protocol version to use
ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
my_bool wsrep_recovery = 0; // recovery my_bool wsrep_recovery = 0; // recovery
my_bool wsrep_replicate_myisam = 0; // enable myisam replication
/* /*
* End configuration options * End configuration options
...@@ -456,9 +457,25 @@ int wsrep_init() ...@@ -456,9 +457,25 @@ int wsrep_init()
} }
} }
char node_addr[256] = {0, };
if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{
size_t node_addr_max= sizeof(node_addr);
size_t ret= default_ip(node_addr, node_addr_max);
if (!(ret > 0 && ret < node_addr_max))
{
WSREP_WARN("Failed to autoguess base node address");
node_addr[0]= 0;
}
}
else if (wsrep_node_address)
{
strncpy(node_addr, wsrep_node_address, sizeof(node_addr) - 1);
}
wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.data_dir = wsrep_data_home_dir;
wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
wsrep_args.node_address = (wsrep_node_address) ? wsrep_node_address : ""; wsrep_args.node_address = node_addr;
wsrep_args.node_incoming = wsrep_node_incoming_address; wsrep_args.node_incoming = wsrep_node_incoming_address;
wsrep_args.options = (wsrep_provider_options) ? wsrep_args.options = (wsrep_provider_options) ?
wsrep_provider_options : ""; wsrep_provider_options : "";
...@@ -629,7 +646,8 @@ bool ...@@ -629,7 +646,8 @@ bool
wsrep_causal_wait (THD* thd) wsrep_causal_wait (THD* thd)
{ {
if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on && if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on &&
!thd->in_active_multi_stmt_transaction()) !thd->in_active_multi_stmt_transaction() &&
thd->wsrep_conflict_state != REPLAYING)
{ {
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows. // TODO: modify to check if thd has locked any rows.
...@@ -667,10 +685,42 @@ wsrep_causal_wait (THD* thd) ...@@ -667,10 +685,42 @@ wsrep_causal_wait (THD* thd)
return false; return false;
} }
bool wsrep_prepare_key_for_isolation(const char* db, /*
const char* table, * Helpers to deal with TOI key arrays
wsrep_key_part_t* key, */
size_t* key_len) typedef struct wsrep_key_arr
{
wsrep_key_t* keys;
size_t keys_len;
} wsrep_key_arr_t;
static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
{
my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts);
}
my_free(key_arr->keys);
key_arr->keys= 0;
key_arr->keys_len= 0;
}
/*!
* @param db Database string
* @param table Table string
* @param key Array of wsrep_key_t
* @param key_len In: number of elements in key array, Out: number of
* elements populated
*
* @return true if preparation was successful, otherwise false.
*/
static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
wsrep_key_part_t* key,
size_t* key_len)
{ {
if (*key_len < 2) return false; if (*key_len < 2) return false;
...@@ -707,6 +757,89 @@ bool wsrep_prepare_key_for_isolation(const char* db, ...@@ -707,6 +757,89 @@ bool wsrep_prepare_key_for_isolation(const char* db,
return true; return true;
} }
/* Prepare key list from db/table and table_list */
static bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
const TABLE_LIST* table_list,
wsrep_key_arr_t* ka)
{
ka->keys= 0;
ka->keys_len= 0;
extern TABLE* find_temporary_table(THD*, const TABLE_LIST*);
if (db || table)
{
TABLE_LIST tmp_table;
bzero((char*) &tmp_table,sizeof(tmp_table));
tmp_table.table_name= (char*)db;
tmp_table.db= (char*)table;
if (!table || !find_temporary_table(thd, &tmp_table))
{
if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
{
sql_print_error("Can't allocate memory for key_array");
goto err;
}
ka->keys_len= 1;
if (!(ka->keys[0].key_parts= (wsrep_key_part_t*)
my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
{
sql_print_error("Can't allocate memory for key_parts");
goto err;
}
ka->keys[0].key_parts_len= 2;
if (!wsrep_prepare_key_for_isolation(
db, table,
(wsrep_key_part_t*)ka->keys[0].key_parts,
&ka->keys[0].key_parts_len))
{
sql_print_error("Preparing keys for isolation failed");
goto err;
}
}
}
for (const TABLE_LIST* table= table_list; table; table= table->next_global)
{
if (!find_temporary_table(thd, table))
{
wsrep_key_t* tmp;
tmp= (wsrep_key_t*)my_realloc(
ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
if (!tmp)
{
sql_print_error("Can't allocate memory for key_array");
goto err;
}
ka->keys= tmp;
if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*)
my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
{
sql_print_error("Can't allocate memory for key_parts");
goto err;
}
ka->keys[ka->keys_len].key_parts_len= 2;
++ka->keys_len;
if (!wsrep_prepare_key_for_isolation(
table->db, table->table_name,
(wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts,
&ka->keys[ka->keys_len - 1].key_parts_len))
{
sql_print_error("Preparing keys for isolation failed");
goto err;
}
}
}
return true;
err:
wsrep_keys_free(ka);
return false;
}
bool wsrep_prepare_key_for_innodb(const uchar* cache_key, bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
size_t cache_key_len, size_t cache_key_len,
const uchar* row_id, const uchar* row_id,
...@@ -842,15 +975,14 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) ...@@ -842,15 +975,14 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
} }
static int wsrep_TOI_begin(THD *thd, char *db_, char *table_) static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list)
{ {
wsrep_status_t ret(WSREP_WARNING); wsrep_status_t ret(WSREP_WARNING);
uchar* buf(0); uchar* buf(0);
uint buf_len(0); uint buf_len(0);
int buf_err; int buf_err;
wsrep_key_part_t wkey_part[2];
wsrep_key_t wkey = {wkey_part, 2};
WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
thd->wsrep_exec_mode, thd->query() ); thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command) switch (thd->lex->sql_command)
...@@ -874,17 +1006,18 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_) ...@@ -874,17 +1006,18 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_)
break; break;
} }
wsrep_key_arr_t key_arr= {0, 0};
if (!buf_err && if (!buf_err &&
wsrep_prepare_key_for_isolation(db_, table_, wkey_part, wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
&wkey.key_parts_len) &&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
&wkey, 1, key_arr.keys, key_arr.keys_len,
buf, buf_len, buf, buf_len,
&thd->wsrep_trx_seqno))) &thd->wsrep_trx_seqno)))
{ {
thd->wsrep_exec_mode= TOTAL_ORDER; thd->wsrep_exec_mode= TOTAL_ORDER;
wsrep_to_isolation++; wsrep_to_isolation++;
if (buf) my_free(buf); if (buf) my_free(buf);
wsrep_keys_free(&key_arr);
WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
thd->wsrep_exec_mode); thd->wsrep_exec_mode);
} }
...@@ -896,6 +1029,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_) ...@@ -896,6 +1029,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_)
my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
"your wsrep connection state and retry the query."); "your wsrep connection state and retry the query.");
if (buf) my_free(buf); if (buf) my_free(buf);
wsrep_keys_free(&key_arr);
return -1; return -1;
} }
return 0; return 0;
...@@ -959,13 +1093,15 @@ static void wsrep_RSU_end(THD *thd) ...@@ -959,13 +1093,15 @@ static void wsrep_RSU_end(THD *thd)
return; return;
} }
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_) int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list)
{ {
int ret= 0; int ret= 0;
if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
{ {
switch (wsrep_OSU_method_options) { switch (wsrep_OSU_method_options) {
case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_); break; case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_,
table_list); break;
case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break;
} }
if (!ret) if (!ret)
...@@ -1046,7 +1182,7 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx, ...@@ -1046,7 +1182,7 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
} }
else else
{ {
WSREP_MDL_LOG(INFO, "MDL conflict -> BF abort", request_thd, granted_thd); WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd);
mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
return FALSE; return FALSE;
......
...@@ -60,6 +60,7 @@ extern long wsrep_protocol_version; ...@@ -60,6 +60,7 @@ extern long wsrep_protocol_version;
extern ulong wsrep_forced_binlog_format; extern ulong wsrep_forced_binlog_format;
extern ulong wsrep_OSU_method_options; extern ulong wsrep_OSU_method_options;
extern my_bool wsrep_recovery; extern my_bool wsrep_recovery;
extern my_bool wsrep_replicate_myisam;
enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU };
...@@ -162,6 +163,9 @@ extern wsrep_seqno_t wsrep_locked_seqno; ...@@ -162,6 +163,9 @@ extern wsrep_seqno_t wsrep_locked_seqno;
#define WSREP(thd) \ #define WSREP(thd) \
(WSREP_ON && (thd && thd->variables.wsrep_on)) (WSREP_ON && (thd && thd->variables.wsrep_on))
#define WSREP_CLIENT(thd) \
(WSREP(thd) && thd->wsrep_client_thread)
#define WSREP_EMULATE_BINLOG(thd) \ #define WSREP_EMULATE_BINLOG(thd) \
(WSREP(thd) && wsrep_emulate_bin_log) (WSREP(thd) && wsrep_emulate_bin_log)
...@@ -206,20 +210,6 @@ class Ha_trx_info; ...@@ -206,20 +210,6 @@ class Ha_trx_info;
struct THD_TRANS; struct THD_TRANS;
void wsrep_register_hton(THD* thd, bool all); void wsrep_register_hton(THD* thd, bool all);
/*!
* @param db Database string
* @param table Table string
* @param key Array of wsrep_key_t
* @param key_len In: number of elements in key array, Out: number of
* elements populated
*
* @return true if preparation was successful, otherwise false.
*/
bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
wsrep_key_part_t* key,
size_t *key_len);
void wsrep_replication_process(THD *thd); void wsrep_replication_process(THD *thd);
void wsrep_rollback_process(THD *thd); void wsrep_rollback_process(THD *thd);
void wsrep_brute_force_killer(THD *thd); void wsrep_brute_force_killer(THD *thd);
...@@ -274,7 +264,9 @@ extern PSI_cond_key key_COND_wsrep_rollback; ...@@ -274,7 +264,9 @@ extern PSI_cond_key key_COND_wsrep_rollback;
extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying;
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_); struct TABLE_LIST;
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list);
void wsrep_to_isolation_end(THD *thd); void wsrep_to_isolation_end(THD *thd);
void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
......
...@@ -332,7 +332,7 @@ size_t default_ip (char* buf, size_t buf_len) ...@@ -332,7 +332,7 @@ size_t default_ip (char* buf, size_t buf_len)
"awk '{ print $2 }' | awk -F : '{ print $2 }'"; "awk '{ print $2 }' | awk -F : '{ print $2 }'";
#elif defined(__sun__) #elif defined(__sun__)
const char cmd[] = "/sbin/ifconfig -a | " const char cmd[] = "/sbin/ifconfig -a | "
"grep -m1 -1 -E 'net[0-9]:' | tail -n 1 | awk '{ print $2 }'"; "/usr/gnu/bin/grep -m1 -1 -E 'net[0-9]:' | tail -n 1 | awk '{ print $2 }'";
#else #else
char *cmd; char *cmd;
#error "OS not supported" #error "OS not supported"
......
...@@ -6945,11 +6945,6 @@ wsrep_append_foreign_key( ...@@ -6945,11 +6945,6 @@ wsrep_append_foreign_key(
byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1]; byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1];
ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH; ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
if (!dict_index_is_clust(clust_index)) {
WSREP_ERROR("clustered index not passed for FK append");
return DB_ERROR;
}
key[0] = '\0'; key[0] = '\0';
rcode = wsrep_rec_get_primary_key( rcode = wsrep_rec_get_primary_key(
&key[1], &len, clust_rec, clust_index); &key[1], &len, clust_rec, clust_index);
......
...@@ -1790,25 +1790,23 @@ wsrep_rec_get_primary_key( ...@@ -1790,25 +1790,23 @@ wsrep_rec_get_primary_key(
uint key_parts; uint key_parts;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
const ulint* offsets;
ut_ad(index); ut_ad(index);
key_parts = dict_index_get_n_unique_in_tree(index);
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &heap); rec_offs_init(offsets_);
if (UNIV_LIKELY_NULL(heap)) { offsets = rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &heap);
mem_heap_free(heap);
}
ut_ad(rec_offs_validate(rec, NULL, offsets_)); ut_ad(rec_offs_validate(rec, NULL, offsets));
ut_ad(rec); ut_ad(rec);
key_parts = dict_index_get_n_unique_in_tree(index);
for (i = 0; i < key_parts; i++) { for (i = 0; i < key_parts; i++) {
dict_field_t* field = dict_index_get_nth_field(index, i); dict_field_t* field = dict_index_get_nth_field(index, i);
const dict_col_t* col = dict_field_get_col(field); const dict_col_t* col = dict_field_get_col(field);
data = rec_get_nth_field(rec, offsets_, i, &len); data = rec_get_nth_field(rec, offsets, i, &len);
if (key_len + len > ((col->prtype & DATA_NOT_NULL) ? if (key_len + len > ((col->prtype & DATA_NOT_NULL) ?
*buf_len : *buf_len - 1)) { *buf_len : *buf_len - 1)) {
fprintf (stderr, fprintf (stderr,
...@@ -1836,11 +1834,19 @@ wsrep_rec_get_primary_key( ...@@ -1836,11 +1834,19 @@ wsrep_rec_get_primary_key(
} }
} }
rec_validate(rec, offsets_); rec_validate(rec, offsets);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
*buf_len = key_len; *buf_len = key_len;
return DB_SUCCESS; return DB_SUCCESS;
err_out: err_out:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return DB_ERROR; return DB_ERROR;
} }
#endif // WITH_WSREP #endif // WITH_WSREP
...@@ -7589,11 +7589,6 @@ wsrep_append_foreign_key( ...@@ -7589,11 +7589,6 @@ wsrep_append_foreign_key(
byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1]; byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1];
ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH; ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
if (!dict_index_is_clust(clust_index)) {
WSREP_ERROR("clustered index not passed for FK append");
return DB_ERROR;
}
key[0] = '\0'; key[0] = '\0';
rcode = wsrep_rec_get_primary_key( rcode = wsrep_rec_get_primary_key(
&key[1], &len, clust_rec, clust_index); &key[1], &len, clust_rec, clust_index);
......
...@@ -1790,25 +1790,23 @@ wsrep_rec_get_primary_key( ...@@ -1790,25 +1790,23 @@ wsrep_rec_get_primary_key(
uint key_parts; uint key_parts;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
const ulint* offsets;
ut_ad(index); ut_ad(index);
key_parts = dict_index_get_n_unique_in_tree(index);
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &heap); rec_offs_init(offsets_);
if (UNIV_LIKELY_NULL(heap)) { offsets = rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &heap);
mem_heap_free(heap);
}
ut_ad(rec_offs_validate(rec, NULL, offsets_)); ut_ad(rec_offs_validate(rec, NULL, offsets));
ut_ad(rec); ut_ad(rec);
key_parts = dict_index_get_n_unique_in_tree(index);
for (i = 0; i < key_parts; i++) { for (i = 0; i < key_parts; i++) {
dict_field_t* field = dict_index_get_nth_field(index, i); dict_field_t* field = dict_index_get_nth_field(index, i);
const dict_col_t* col = dict_field_get_col(field); const dict_col_t* col = dict_field_get_col(field);
data = rec_get_nth_field(rec, offsets_, i, &len); data = rec_get_nth_field(rec, offsets, i, &len);
if (key_len + len > ((col->prtype & DATA_NOT_NULL) ? if (key_len + len > ((col->prtype & DATA_NOT_NULL) ?
*buf_len : *buf_len - 1)) { *buf_len : *buf_len - 1)) {
fprintf (stderr, fprintf (stderr,
...@@ -1836,11 +1834,19 @@ wsrep_rec_get_primary_key( ...@@ -1836,11 +1834,19 @@ wsrep_rec_get_primary_key(
} }
} }
rec_validate(rec, offsets_); rec_validate(rec, offsets);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
*buf_len = key_len; *buf_len = key_len;
return DB_SUCCESS; return DB_SUCCESS;
err_out: err_out:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return DB_ERROR; return DB_ERROR;
} }
#endif // WITH_WSREP #endif // WITH_WSREP
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment