repl-tests/test-repl-ts/repl-timestamp.master.reject

    this file needs to be deleted
sql/log_event.cc
    fixed warnings
sql/log_event.h
    fixed () #define bug
sql/mysqlbinlog.cc
    fixed length argument - was supposed to be one less
sql/mysqld.cc
    replicate-do/ignore-table now works, wild does not yet
sql/mysql_priv.h
    updating argument to add_table_to_list() -- needed for replicate-do/ignore table
sql/slave.cc
    changes fore replicate-do/ignore-table
    close the socket before going to sleep sleeping after error
    bad event was being freed too early
sql/slave.h
    changes for replicate-do/ignore-table
sql/sql_class.cc
    slave_thread variable to THD
sql/sql_class.h
    added slave_thread to THD, fixed bug in end_time()
sql/sql_parse.cc
    updating argument to add_tables_to_list()
sql/sql_table.cc
    fixed bug in mysql_rm_table()
sql/sql_yacc.yy
    fixed up add_table_to_list() calls to accept updating argument
sql/table.h
    added updating to TABLE_LIST
parent d2340770
......@@ -264,7 +264,7 @@ void Log_event::print_header(FILE* file)
{
fputc('#', file);
print_timestamp(file);
fprintf(file, " server id %ld ", server_id);
fprintf(file, " server id %d ", server_id);
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
......@@ -421,12 +421,11 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
Query_log_event::Query_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0), query(NULL), db(NULL)
{
if (event_len < QUERY_EVENT_OVERHEAD)
if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
ulong data_len;
buf += EVENT_LEN_OFFSET;
data_len = event_len - QUERY_EVENT_OVERHEAD;
exec_time = uint4korr(buf + 8);
error_code = uint2korr(buf + 13);
......@@ -613,7 +612,7 @@ Load_log_event::Load_log_event(const char* buf, int event_len):
{
ulong data_len;
if(event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
return;
buf += EVENT_LEN_OFFSET;
......@@ -709,7 +708,7 @@ void Load_log_event::print(FILE* file, bool short_form)
}
if((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", skip_lines);
fprintf(file, " IGNORE %d LINES ", skip_lines);
if(num_fields)
{
......
......@@ -39,7 +39,7 @@
#define EVENT_LEN_OFFSET 9
#define EVENT_TYPE_OFFSET 4
#define MAX_EVENT_LEN 4*1024*1024
#define QUERY_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN
#define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info))
......
......@@ -362,6 +362,7 @@ bool add_field_to_list(char *field_name, enum enum_field_types type,
void store_position_for_column(const char *name);
bool add_to_list(SQL_LIST &list,Item *group,bool asc=0);
TABLE_LIST *add_table_to_list(Table_ident *table,LEX_STRING *alias,
bool updating,
thr_lock_type flags=TL_UNLOCK,
List<String> *use_index=0,
List<String> *ignore_index=0);
......
......@@ -295,7 +295,7 @@ Unfortunately, no sweepstakes today, adjusted position to 4\n");
len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 ,
len);
len - 1);
if(ev)
{
ev->print(stdout, short_form);
......
......@@ -2230,7 +2230,8 @@ enum options {
OPT_SKIP_CONCURRENT_INSERT, OPT_MEMLOCK, OPT_MYISAM_RECOVER,
OPT_REPLICATE_REWRITE_DB, OPT_SERVER_ID, OPT_SKIP_SLAVE_START,
OPT_SKIP_INNOBASE,OPT_SAFEMALLOC_MEM_LIMIT,
OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE
OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE,
OPT_REPL_WILD_DO_TABLE, OPT_REPL_WILD_IGNORE_TABLE
};
static struct option long_options[] = {
......@@ -2300,10 +2301,14 @@ static struct option long_options[] = {
{"replicate-do-db", required_argument, 0, (int) OPT_REPLICATE_DO_DB},
{"replicate-do-table", required_argument, 0,
(int) OPT_REPLICATE_DO_TABLE},
{"replicate-wild-do-table", required_argument, 0,
(int) OPT_REPL_WILD_DO_TABLE},
{"replicate-ignore-db", required_argument, 0,
(int) OPT_REPLICATE_IGNORE_DB},
{"replicate-ignore-table", required_argument, 0,
(int) OPT_REPLICATE_IGNORE_TABLE},
{"replicate-wild-ignore-table", required_argument, 0,
(int) OPT_REPL_WILD_IGNORE_TABLE},
{"replicate-rewrite-db", required_argument, 0,
(int) OPT_REPLICATE_REWRITE_DB},
{"safe-mode", no_argument, 0, (int) OPT_SAFE},
......@@ -2938,6 +2943,30 @@ static void get_options(int argc,char **argv)
binlog_do_db.push_back(db);
break;
}
case (int)OPT_REPLICATE_DO_TABLE:
{
if(!do_table_inited)
init_table_rule_hash(&replicate_do_table, &do_table_inited);
if(add_table_rule(&replicate_do_table, optarg))
{
fprintf(stderr, "could not add do table rule '%s'\n", optarg);
exit(1);
}
table_rules_on = 1;
break;
}
case (int)OPT_REPLICATE_IGNORE_TABLE:
{
if(!ignore_table_inited)
init_table_rule_hash(&replicate_ignore_table, &ignore_table_inited);
if(add_table_rule(&replicate_ignore_table, optarg))
{
fprintf(stderr, "could not add ignore table rule '%s'\n", optarg);
exit(1);
}
table_rules_on = 1;
break;
}
case (int) OPT_SQL_BIN_UPDATE_SAME:
opt_sql_bin_update = 1;
break;
......
......@@ -27,6 +27,7 @@ pthread_t slave_real_id;
MASTER_INFO glob_mi;
HASH replicate_do_table, replicate_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
bool table_rules_on = 0;
static inline void skip_load_data_infile(NET* net);
......@@ -60,6 +61,52 @@ void init_table_rule_hash(HASH* h, bool* h_inited)
*h_inited = 1;
}
int tables_ok(THD* thd, TABLE_LIST* tables)
{
for(; tables; tables = tables->next)
{
if(!tables->updating)
continue;
char hash_key[2*NAME_LEN+2];
char* p;
p = strmov(hash_key, tables->db ? tables->db : thd->db);
*p++ = '.';
uint len = strmov(p, tables->real_name) - hash_key ;
if(do_table_inited) // if there are any do's
{
if(hash_search(&replicate_do_table, hash_key, len))
return 1;
}
if(ignore_table_inited) // if there are any do's
{
if(hash_search(&replicate_ignore_table, hash_key, len))
return 0;
}
}
return !do_table_inited; // if no explicit rule found
// and there was a do list, do not replicate. If there was
// no do list, go ahead
}
int add_table_rule(HASH* h, const char* table_spec)
{
char* dot = strchr(table_spec, '.');
if(!dot) return 1;
uint len = (uint)strlen(table_spec);
if(!len) return 1;
TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ len, MYF(MY_WME));
if(!e) return 1;
e->db = (char*)e + sizeof(TABLE_RULE_ENT);
e->tbl_name = e->db + (dot - table_spec) + 1;
e->key_len = len;
memcpy(e->db, table_spec, len);
(void)hash_insert(h, (byte*)e);
return 0;
}
static inline bool slave_killed(THD* thd)
{
return abort_slave || abort_loop || thd->killed;
......@@ -438,6 +485,7 @@ static int init_slave_thread(THD* thd)
thd->max_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
thd->slave_thread = 1;
thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0)
| OPTION_AUTO_COMMIT | OPTION_AUTO_IS_NULL) ;
thd->system_thread = 1;
......@@ -655,15 +703,17 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
// unless set explictly
close_thread_tables(thd);
free_root(&thd->mem_root,0);
delete ev;
if (thd->query_error)
{
sql_print_error("Slave: error running query '%s' ",
qev->query);
delete ev;
return 1;
}
delete ev;
if(thd->fatal_error)
{
sql_print_error("Slave: Fatal error running query '%s' ",
......@@ -897,6 +947,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
goto err;
thd->proc_info = "waiting to reconnect after a failed dump request";
vio_close(mysql->net.vio);
safe_sleep(thd, glob_mi.connect_retry);
if(slave_killed(thd))
goto err;
......@@ -922,6 +973,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
if (event_len == packet_error)
{
thd->proc_info = "waiting to reconnect after a failed read";
vio_close(mysql->net.vio);
safe_sleep(thd, glob_mi.connect_retry);
if(slave_killed(thd))
goto err;
......
......@@ -67,11 +67,16 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi);
int show_master_info(THD* thd);
int show_binlog_info(THD* thd);
int tables_ok(THD* thd, TABLE_LIST* tables);
// see if the query uses any tables that should not be replicated
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list );
// check to see if the database is ok to operate on with respect to the
// do and ignore lists - used in replication
int add_table_rule(HASH* h, const char* table_spec);
void init_table_rule_hash(HASH* h, bool* h_inited);
int init_master_info(MASTER_INFO* mi);
extern bool opt_log_slave_updates ;
......@@ -81,7 +86,10 @@ extern bool slave_running;
extern pthread_t slave_real_id;
extern MASTER_INFO glob_mi;
extern HASH replicate_do_table, replicate_ignore_table;
extern bool do_table_inited, ignore_table_inited;
extern DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
extern bool do_table_inited, ignore_table_inited,
wild_do_table_inited, wild_ignore_table_inited;
extern bool table_rules_on;
// the master variables are defaults read from my.cnf or command line
extern uint master_port, master_connect_retry;
......@@ -93,3 +101,5 @@ extern I_List<i_string_pair> replicate_rewrite_db;
extern I_List<THD> threads;
#endif
......@@ -95,6 +95,7 @@ THD::THD()
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
start_time=(time_t) 0;
current_linfo = 0;
slave_thread = 0;
last_nx_table = last_nx_db = 0;
inactive_timeout=net_wait_timeout;
open_options=ha_open_options;
......
......@@ -262,6 +262,7 @@ class THD :public ilink {
pthread_t real_id;
uint current_tablenr,tmp_table,cond_count,col_access,query_length;
uint server_status,open_options;
bool slave_thread;
char scramble[9];
bool set_query_id,locked,count_cuted_fields,some_tables_deleted;
bool no_errors, allow_sum_func, password, fatal_error;
......@@ -278,7 +279,7 @@ class THD :public ilink {
bool store_globals();
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() { if (!user_time) time_after_lock=time(&start_time); }
inline void end_time() { time(&start_time); }
inline void end_time() { if(!user_time) time(&start_time); }
inline void set_time(time_t t) { time_after_lock=start_time=t; user_time=1; }
inline void lock_time() { time(&time_after_lock); }
inline void insert_id(ulonglong id)
......
......@@ -843,6 +843,11 @@ mysql_execute_command(void)
TABLE_LIST *tables=(TABLE_LIST*) lex->table_list.first;
DBUG_ENTER("mysql_execute_command");
if(thd->slave_thread && table_rules_on && tables && !tables_ok(thd,tables))
DBUG_VOID_RETURN; // skip if we are in the slave thread, some table
// rules have been given and the table list says the query should not be
// replicated
switch (lex->sql_command) {
case SQLCOM_SELECT:
{
......@@ -2344,9 +2349,11 @@ bool add_to_list(SQL_LIST &list,Item *item,bool asc)
TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
bool updating,
thr_lock_type flags,
List<String> *use_index,
List<String> *ignore_index)
List<String> *ignore_index
)
{
register TABLE_LIST *ptr;
THD *thd=current_thd;
......@@ -2378,6 +2385,7 @@ TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
ptr->real_name=table->table.str;
ptr->name=alias_str;
ptr->lock_type=flags;
ptr->updating=updating;
if (use_index)
ptr->use_index=(List<String> *) thd->memdup((gptr) use_index,
sizeof(*use_index));
......
......@@ -67,8 +67,7 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists)
error = 1;
goto err;
}
while (global_read_lock && ! thd->killed ||
thd->version != refresh_version)
while (global_read_lock && ! thd->killed)
{
(void) pthread_cond_wait(&COND_refresh,&LOCK_open);
}
......@@ -156,7 +155,7 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists)
if (wrong_tables.length())
{
my_error(ER_BAD_TABLE_ERROR,MYF(0),wrong_tables.c_ptr());
DBUG_RETURN(-1);
error=1;
}
if(error)
DBUG_RETURN(-1);
......
......@@ -628,7 +628,7 @@ create:
lex->sql_command= SQLCOM_CREATE_TABLE;
if (!add_table_to_list($5,
($2 & HA_LEX_CREATE_TMP_TABLE ?
&tmp_table_alias : (LEX_STRING*) 0)))
&tmp_table_alias : (LEX_STRING*) 0),1))
YYABORT;
lex->create_list.empty();
lex->key_list.empty();
......@@ -643,7 +643,7 @@ create:
| CREATE opt_unique_or_fulltext INDEX ident ON table_ident
{
Lex->sql_command= SQLCOM_CREATE_INDEX;
if (!add_table_to_list($6,NULL))
if (!add_table_to_list($6,NULL,1))
YYABORT;
Lex->create_list.empty();
Lex->key_list.empty();
......@@ -1022,7 +1022,7 @@ alter:
LEX *lex=Lex;
lex->sql_command = SQLCOM_ALTER_TABLE;
lex->name=0;
if (!add_table_to_list($4, NULL))
if (!add_table_to_list($4, NULL,1))
YYABORT;
lex->drop_primary=0;
lex->create_list.empty();
......@@ -1198,8 +1198,8 @@ table_to_table_list:
table_to_table:
table_ident TO_SYM table_ident
{ if (!add_table_to_list($1,NULL,TL_IGNORE) ||
!add_table_to_list($3,NULL,TL_IGNORE))
{ if (!add_table_to_list($1,NULL,1,TL_IGNORE) ||
!add_table_to_list($3,NULL,1,TL_IGNORE))
YYABORT;
}
......@@ -1749,7 +1749,7 @@ normal_join:
join_table:
{ Lex->use_index_ptr=Lex->ignore_index_ptr=0; }
table_ident opt_table_alias opt_key_definition
{ if (!($$=add_table_to_list($2,$3,TL_UNLOCK, Lex->use_index_ptr,
{ if (!($$=add_table_to_list($2,$3,0,TL_UNLOCK, Lex->use_index_ptr,
Lex->ignore_index_ptr))) YYABORT; }
| '{' ident join_table LEFT OUTER JOIN_SYM join_table ON expr '}'
{ add_join_on($7,$9); $7->outer_join|=JOIN_TYPE_LEFT; $$=$7; }
......@@ -1953,7 +1953,7 @@ drop:
Lex->drop_list.empty();
Lex->drop_list.push_back(new Alter_drop(Alter_drop::KEY,
$3.str));
if (!add_table_to_list($5,NULL))
if (!add_table_to_list($5,NULL, 1))
YYABORT;
}
| DROP DATABASE if_exists ident
......@@ -1975,7 +1975,7 @@ table_list:
table:
table_ident
{ if (!add_table_to_list($1,NULL)) YYABORT; }
{ if (!add_table_to_list($1,NULL,1)) YYABORT; }
if_exists:
/* empty */ { $$=0; }
......@@ -2150,7 +2150,7 @@ show_param:
Lex->sql_command= SQLCOM_SHOW_FIELDS;
if ($4)
$3->change_db($4);
if (!add_table_to_list($3,NULL))
if (!add_table_to_list($3,NULL,0))
YYABORT;
}
| MASTER_SYM LOGS_SYM
......@@ -2162,7 +2162,7 @@ show_param:
Lex->sql_command= SQLCOM_SHOW_KEYS;
if ($4)
$3->change_db($4);
if (!add_table_to_list($3,NULL))
if (!add_table_to_list($3,NULL,0))
YYABORT;
}
| STATUS_SYM wild
......@@ -2179,7 +2179,7 @@ show_param:
| CREATE TABLE_SYM table_ident
{
Lex->sql_command = SQLCOM_SHOW_CREATE;
if(!add_table_to_list($3, NULL))
if(!add_table_to_list($3, NULL,0))
YYABORT;
}
| MASTER_SYM STATUS_SYM
......@@ -2205,7 +2205,7 @@ describe:
{
Lex->wild=0;
Lex->sql_command=SQLCOM_SHOW_FIELDS;
if (!add_table_to_list($2, NULL))
if (!add_table_to_list($2, NULL,0))
YYABORT;
}
opt_describe_column
......@@ -2290,14 +2290,14 @@ load: LOAD DATA_SYM opt_low_priority opt_local INFILE TEXT_STRING
opt_duplicate INTO TABLE_SYM table_ident opt_field_term opt_line_term
opt_ignore_lines opt_field_spec
{
if (!add_table_to_list($11,NULL))
if (!add_table_to_list($11,NULL,1))
YYABORT;
}
|
LOAD TABLE_SYM table_ident FROM MASTER_SYM
{
Lex->sql_command = SQLCOM_LOAD_MASTER_TABLE;
if (!add_table_to_list($3,NULL))
if (!add_table_to_list($3,NULL,1))
YYABORT;
}
......@@ -2686,7 +2686,7 @@ table_lock_list:
table_lock:
table_ident opt_table_alias lock_option
{ if (!add_table_to_list($1,$2,(thr_lock_type) $3)) YYABORT; }
{ if (!add_table_to_list($1,$2,0,(thr_lock_type) $3)) YYABORT; }
lock_option:
READ_SYM { $$=TL_READ_NO_INSERT; }
......@@ -2791,7 +2791,7 @@ opt_table:
}
| table_ident
{
if (!add_table_to_list($1,NULL))
if (!add_table_to_list($1,NULL,0))
YYABORT;
if (Lex->grant == UINT_MAX)
Lex->grant = TABLE_ACLS & ~GRANT_ACL;
......
......@@ -138,4 +138,5 @@ typedef struct st_table_list {
thr_lock_type lock_type;
uint outer_join; /* Which join type */
bool straight; /* optimize with prev table */
bool updating; /* for replicate-do/ignore table */
} TABLE_LIST;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment