Commit cd9e69ec authored by Monty's avatar Monty

Cleaned up code for setting slave_parallel_mode

Now this works the same way as all other multi source variables.
parent 0ee879ff
...@@ -4,7 +4,11 @@ SELECT @@slave_parallel_mode; ...@@ -4,7 +4,11 @@ SELECT @@slave_parallel_mode;
@@slave_parallel_mode @@slave_parallel_mode
domain domain
SELECT @@m1.slave_parallel_mode; SELECT @@m1.slave_parallel_mode;
ERROR HY000: There is no master connection 'm1' @@m1.slave_parallel_mode
NULL
Warnings:
Warning 1617 There is no master connection 'm1'
Warning 1617 There is no master connection 'm1'
CHANGE MASTER TO master_host='127.0.0.1', master_port=3310, master_user='root'; CHANGE MASTER TO master_host='127.0.0.1', master_port=3310, master_user='root';
SELECT @@``.slave_parallel_mode; SELECT @@``.slave_parallel_mode;
@@``.slave_parallel_mode @@``.slave_parallel_mode
...@@ -14,9 +18,14 @@ SELECT @@slave_parallel_mode; ...@@ -14,9 +18,14 @@ SELECT @@slave_parallel_mode;
domain domain
Parallel_Mode = 'domain' Parallel_Mode = 'domain'
SELECT @@m2.slave_parallel_mode; SELECT @@m2.slave_parallel_mode;
ERROR HY000: There is no master connection 'm2' @@m2.slave_parallel_mode
NULL
Warnings:
Warning 1617 There is no master connection 'm2'
Warning 1617 There is no master connection 'm2'
SET GLOBAL m2.slave_parallel_mode = ''; SET GLOBAL m2.slave_parallel_mode = '';
ERROR HY000: There is no master connection 'm2' Warnings:
Warning 1617 There is no master connection 'm2'
CHANGE MASTER 'm1' TO master_host='127.0.0.1', master_port=3311, master_user='root'; CHANGE MASTER 'm1' TO master_host='127.0.0.1', master_port=3311, master_user='root';
SELECT @@m1.slave_parallel_mode; SELECT @@m1.slave_parallel_mode;
@@m1.slave_parallel_mode @@m1.slave_parallel_mode
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
SET SESSION slave_parallel_mode= ''; SET SESSION slave_parallel_mode= '';
SELECT @@slave_parallel_mode; SELECT @@slave_parallel_mode;
--error WARN_NO_MASTER_INFO
SELECT @@m1.slave_parallel_mode; SELECT @@m1.slave_parallel_mode;
CHANGE MASTER TO master_host='127.0.0.1', master_port=3310, master_user='root'; CHANGE MASTER TO master_host='127.0.0.1', master_port=3310, master_user='root';
...@@ -15,9 +14,7 @@ SELECT @@slave_parallel_mode; ...@@ -15,9 +14,7 @@ SELECT @@slave_parallel_mode;
--let $status_items= Parallel_Mode --let $status_items= Parallel_Mode
--source include/show_slave_status.inc --source include/show_slave_status.inc
--error WARN_NO_MASTER_INFO
SELECT @@m2.slave_parallel_mode; SELECT @@m2.slave_parallel_mode;
--error WARN_NO_MASTER_INFO
SET GLOBAL m2.slave_parallel_mode = ''; SET GLOBAL m2.slave_parallel_mode = '';
CHANGE MASTER 'm1' TO master_host='127.0.0.1', master_port=3311, master_user='root'; CHANGE MASTER 'm1' TO master_host='127.0.0.1', master_port=3311, master_user='root';
...@@ -33,6 +30,5 @@ SELECT @@slave_parallel_mode; ...@@ -33,6 +30,5 @@ SELECT @@slave_parallel_mode;
SET default_master_connection= ''; SET default_master_connection= '';
SELECT @@slave_parallel_mode; SELECT @@slave_parallel_mode;
RESET SLAVE ALL; RESET SLAVE ALL;
RESET SLAVE 'm1' ALL; RESET SLAVE 'm1' ALL;
...@@ -2120,7 +2120,6 @@ void clean_up(bool print_message) ...@@ -2120,7 +2120,6 @@ void clean_up(bool print_message)
free_all_rpl_filters(); free_all_rpl_filters();
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
end_slave_list(); end_slave_list();
mi_cmdline_destroy();
#endif #endif
my_uuid_end(); my_uuid_end();
delete binlog_filter; delete binlog_filter;
...@@ -8849,6 +8848,12 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument) ...@@ -8849,6 +8848,12 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument)
cur_rpl_filter->add_db_rewrite(key, val); cur_rpl_filter->add_db_rewrite(key, val);
break; break;
} }
case (int)OPT_SLAVE_PARALLEL_MODE:
{
/* Store latest mode for Master::Info */
cur_rpl_filter->set_parallel_mode(opt_slave_parallel_mode);
break;
}
case (int)OPT_BINLOG_IGNORE_DB: case (int)OPT_BINLOG_IGNORE_DB:
{ {
...@@ -9094,6 +9099,7 @@ mysql_getopt_value(const char *name, uint length, ...@@ -9094,6 +9099,7 @@ mysql_getopt_value(const char *name, uint length,
return (uchar**) &key_cache->changed_blocks_hash_size; return (uchar**) &key_cache->changed_blocks_hash_size;
} }
} }
#ifdef HAVE_REPLICATION
case OPT_REPLICATE_DO_DB: case OPT_REPLICATE_DO_DB:
case OPT_REPLICATE_DO_TABLE: case OPT_REPLICATE_DO_TABLE:
case OPT_REPLICATE_IGNORE_DB: case OPT_REPLICATE_IGNORE_DB:
...@@ -9101,6 +9107,7 @@ mysql_getopt_value(const char *name, uint length, ...@@ -9101,6 +9107,7 @@ mysql_getopt_value(const char *name, uint length,
case OPT_REPLICATE_WILD_DO_TABLE: case OPT_REPLICATE_WILD_DO_TABLE:
case OPT_REPLICATE_WILD_IGNORE_TABLE: case OPT_REPLICATE_WILD_IGNORE_TABLE:
case OPT_REPLICATE_REWRITE_DB: case OPT_REPLICATE_REWRITE_DB:
case OPT_SLAVE_PARALLEL_MODE:
{ {
/* Store current filter for mysqld_get_one_option() */ /* Store current filter for mysqld_get_one_option() */
if (!(cur_rpl_filter= get_or_create_rpl_filter(name, length))) if (!(cur_rpl_filter= get_or_create_rpl_filter(name, length)))
...@@ -9108,24 +9115,15 @@ mysql_getopt_value(const char *name, uint length, ...@@ -9108,24 +9115,15 @@ mysql_getopt_value(const char *name, uint length,
if (error) if (error)
*error= EXIT_OUT_OF_MEMORY; *error= EXIT_OUT_OF_MEMORY;
} }
return 0; if (option->id == OPT_SLAVE_PARALLEL_MODE)
}
#ifdef HAVE_REPLICATION
case OPT_SLAVE_PARALLEL_MODE:
{
ulonglong *ptr;
LEX_STRING connection_name;
if (!length)
return &opt_slave_parallel_mode;
connection_name.str= const_cast<char *>(name);
connection_name.length= length;
if (mi_slave_parallel_mode_ptr(&connection_name, &ptr, true))
{ {
if (error) /*
*error= EXIT_OUT_OF_MEMORY; Ensure parallel_mode variable is shown in --help. The other
return NULL; variables are not easily printable here.
*/
return (char**) &opt_slave_parallel_mode;
} }
return ptr; return 0;
} }
#endif #endif
} }
......
...@@ -60,6 +60,16 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */ ...@@ -60,6 +60,16 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */
#define OPT_SESSION SHOW_OPT_SESSION #define OPT_SESSION SHOW_OPT_SESSION
#define OPT_GLOBAL SHOW_OPT_GLOBAL #define OPT_GLOBAL SHOW_OPT_GLOBAL
/*
Bit masks for the values in --slave-parallel-mode.
Note that these values cannot be changed - they are stored in master.info,
so need to be possible to read back in a different version of the server.
*/
#define SLAVE_PARALLEL_DOMAIN (1ULL << 0)
#define SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT (1ULL << 1)
#define SLAVE_PARALLEL_TRX (1ULL << 2)
#define SLAVE_PARALLEL_WAITING (1ULL << 3)
/* Function prototypes */ /* Function prototypes */
void kill_mysql(void); void kill_mysql(void);
void close_connection(THD *thd, uint sql_errno= 0); void close_connection(THD *thd, uint sql_errno= 0);
......
...@@ -24,7 +24,9 @@ ...@@ -24,7 +24,9 @@
#define TABLE_RULE_ARR_SIZE 16 #define TABLE_RULE_ARR_SIZE 16
Rpl_filter::Rpl_filter() : Rpl_filter::Rpl_filter() :
table_rules_on(0), do_table_inited(0), ignore_table_inited(0), parallel_mode(SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT),
table_rules_on(0),
do_table_inited(0), ignore_table_inited(0),
wild_do_table_inited(0), wild_ignore_table_inited(0) wild_do_table_inited(0), wild_ignore_table_inited(0)
{ {
do_db.empty(); do_db.empty();
......
...@@ -76,6 +76,16 @@ public: ...@@ -76,6 +76,16 @@ public:
int set_do_db(const char* db_spec); int set_do_db(const char* db_spec);
int set_ignore_db(const char* db_spec); int set_ignore_db(const char* db_spec);
void set_parallel_mode(ulonglong mode)
{
parallel_mode= mode;
}
/* Return given parallel mode or if one is not given, the default mode */
int get_parallel_mode()
{
return parallel_mode;
}
void add_db_rewrite(const char* from_db, const char* to_db); void add_db_rewrite(const char* from_db, const char* to_db);
/* Getters - to get information about current rules */ /* Getters - to get information about current rules */
...@@ -97,7 +107,6 @@ public: ...@@ -97,7 +107,6 @@ public:
void get_ignore_db(String* str); void get_ignore_db(String* str);
private: private:
bool table_rules_on;
void init_table_rule_hash(HASH* h, bool* h_inited); void init_table_rule_hash(HASH* h, bool* h_inited);
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited); void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited);
...@@ -128,7 +137,9 @@ private: ...@@ -128,7 +137,9 @@ private:
HASH ignore_table; HASH ignore_table;
DYNAMIC_ARRAY wild_do_table; DYNAMIC_ARRAY wild_do_table;
DYNAMIC_ARRAY wild_ignore_table; DYNAMIC_ARRAY wild_ignore_table;
ulonglong parallel_mode;
bool table_rules_on;
bool do_table_inited; bool do_table_inited;
bool ignore_table_inited; bool ignore_table_inited;
bool wild_do_table_inited; bool wild_do_table_inited;
......
...@@ -64,15 +64,16 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, ...@@ -64,15 +64,16 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
connection_name.length+1); connection_name.length+1);
my_casedn_str(system_charset_info, cmp_connection_name.str); my_casedn_str(system_charset_info, cmp_connection_name.str);
} }
/* When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf /*
* will lose. So if you want a setting will not lose after restarting, you When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf
* should add them into my.cnf will be lost. If you want to lose a setting after restart, you
* */ should add them into my.cnf
*/
rpl_filter= get_or_create_rpl_filter(connection_name.str, rpl_filter= get_or_create_rpl_filter(connection_name.str,
connection_name.length); connection_name.length);
copy_filter_setting(rpl_filter, global_rpl_filter); copy_filter_setting(rpl_filter, global_rpl_filter);
mi_slave_parallel_mode_lookup(&connection_name, &parallel_mode); parallel_mode= rpl_filter->get_parallel_mode();
my_init_dynamic_array(&ignore_server_ids, my_init_dynamic_array(&ignore_server_ids,
sizeof(global_system_variables.server_id), 16, 16, sizeof(global_system_variables.server_id), 16, 16,
...@@ -178,7 +179,6 @@ void init_master_log_pos(Master_info* mi) ...@@ -178,7 +179,6 @@ void init_master_log_pos(Master_info* mi)
mi->events_queued_since_last_gtid= 0; mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0; mi->gtid_reconnect_event_skip_count= 0;
mi->gtid_event_seen= false; mi->gtid_event_seen= false;
mi_slave_parallel_mode_lookup(&mi->connection_name, &mi->parallel_mode);
/* Intentionally init ssl_verify_server_cert to 0, no option available */ /* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0; mi->ssl_verify_server_cert= 0;
...@@ -1716,165 +1716,4 @@ void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids) ...@@ -1716,165 +1716,4 @@ void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids)
return; return;
} }
/*
We need to handle per-Master_info command line options special.
Because when command line options are parsed, we do not yet have any
Master_info objects.
*/
static HASH mi_cmdline_hash;
static bool mi_cmdline_hash_inited= false;
struct mi_cmdline_entry {
LEX_STRING cmp_connection_name;
ulonglong parallel_mode;
};
static uchar *
get_key_cmdline(const uchar *ptr, size_t *length_p,
my_bool unused __attribute__((unused)))
{
mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr;
*length_p= entry->cmp_connection_name.length;
return (uchar *)entry->cmp_connection_name.str;
}
static void
free_key_cmdline(void *ptr)
{
mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr;
my_free(entry->cmp_connection_name.str);
my_free(entry);
}
int
mi_cmdline_init()
{
if (mi_cmdline_hash_inited)
return 0;
if (my_hash_init(&mi_cmdline_hash, system_charset_info,
MAX_REPLICATION_THREAD, 0, 0, get_key_cmdline,
free_key_cmdline, HASH_UNIQUE))
{
sql_print_error("Initializing Master_info command line option hash table failed");
return 1;
}
mi_cmdline_hash_inited= true;
return 0;
}
void
mi_cmdline_destroy()
{
if (mi_cmdline_hash_inited)
my_hash_free(&mi_cmdline_hash);
mi_cmdline_hash_inited= false;
}
static mi_cmdline_entry *
mi_cmdline_entry_get(LEX_STRING *connection_name, bool create_if_missing)
{
LEX_STRING cmp_connection_name;
mi_cmdline_entry *entry;
if (!mi_cmdline_hash_inited &&
(!create_if_missing || mi_cmdline_init()))
return NULL;
/* Create a lowercase key for hash lookup. */
cmp_connection_name.length= connection_name->length;
if (!(cmp_connection_name.str= (char *)my_malloc(connection_name->length+1,
MYF(MY_WME))))
{
if (create_if_missing)
my_error(ER_OUTOFMEMORY, MYF(0), connection_name->length+1);
return NULL;
}
memcpy(cmp_connection_name.str, connection_name->str,
connection_name->length + 1);
my_casedn_str(system_charset_info, cmp_connection_name.str);
if ((entry= (mi_cmdline_entry *)
my_hash_search(&mi_cmdline_hash, (uchar *)cmp_connection_name.str,
cmp_connection_name.length)) ||
!create_if_missing)
{
my_free(cmp_connection_name.str);
return entry;
}
if (!(entry= (mi_cmdline_entry *)my_malloc(sizeof(*entry), MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*entry));
my_free(cmp_connection_name.str);
return NULL;
}
entry->cmp_connection_name= cmp_connection_name;
entry->parallel_mode= opt_slave_parallel_mode;
if (my_hash_insert(&mi_cmdline_hash, (uchar *)entry))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
my_free(entry);
my_free(cmp_connection_name.str);
return NULL;
}
return entry;
}
/*
Look up a command line value for slave_parallel_mode. The value is returned
in *out_mode. If no command line value was given for this particular
connection name, the default value in opt_slave_parallel_mode is used.
*/
void
mi_slave_parallel_mode_lookup(LEX_STRING *connection_name, ulonglong *out_mode)
{
mi_cmdline_entry *entry;
if (!mi_cmdline_hash_inited ||
!(entry= mi_cmdline_entry_get(connection_name, false)))
*out_mode= opt_slave_parallel_mode;
else
*out_mode= entry->parallel_mode;
}
/*
Get a pointer to the location holding the value of the slave_parallel_mode
command line option for the given connection name. The pointer is returned
in *out_mode_ptr
If create_if_missing is true, then a new entry will be created if one did
not already exists. If false, then NULL will be returned in *out_mode_ptr if
an entry does not exist.
Returns 1 on error, 0 if ok.
*/
int
mi_slave_parallel_mode_ptr(LEX_STRING *connection_name,
ulonglong **out_mode_ptr, bool create_if_missing)
{
mi_cmdline_entry *entry;
*out_mode_ptr= NULL;
if (!create_if_missing && !mi_cmdline_hash_inited)
return 0;
entry= mi_cmdline_entry_get(connection_name, create_if_missing);
if (!entry)
{
if (create_if_missing)
return 1;
else
return 0;
}
*out_mode_ptr= &entry->parallel_mode;
return 0;
}
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
...@@ -16,18 +16,6 @@ ...@@ -16,18 +16,6 @@
#ifndef RPL_MI_H #ifndef RPL_MI_H
#define RPL_MI_H #define RPL_MI_H
/*
Bit masks for the values in --slave-parallel-mode.
Note that these values cannot be changed - they are stored in master.info,
so need to be possible to read back in a different version of the server.
*/
#define SLAVE_PARALLEL_DOMAIN (1ULL << 0)
#define SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT (1ULL << 1)
#define SLAVE_PARALLEL_TRX (1ULL << 2)
#define SLAVE_PARALLEL_WAITING (1ULL << 3)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
#include "rpl_rli.h" #include "rpl_rli.h"
...@@ -383,14 +371,5 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length, ...@@ -383,14 +371,5 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
uchar *get_key_master_info(Master_info *mi, size_t *length, uchar *get_key_master_info(Master_info *mi, size_t *length,
my_bool not_used __attribute__((unused))); my_bool not_used __attribute__((unused)));
void free_key_master_info(Master_info *mi); void free_key_master_info(Master_info *mi);
int mi_cmdline_init();
void mi_cmdline_destroy();
void mi_slave_parallel_mode_lookup(LEX_STRING *connection_name,
ulonglong *out_mode);
int mi_slave_parallel_mode_ptr(LEX_STRING *connection_name,
ulonglong **out_mode_ptr,
bool create_if_missing);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
#endif /* RPL_MI_H */ #endif /* RPL_MI_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment