Commit 36d7df75 authored by unknown's avatar unknown

Fixed bug in wait_for_update() that I had introduced.

Changed option variables to my_bool (to avoid bugs in my_getopt())
Added new thread specific mutex LOCK_delete to be able to free LOCK_thread_count early.
Changed usage of LOCK_thread_count -> LOCK_status for statistics variables


libmysqld/lib_sql.cc:
  Removed not needed LOCK
mysql-test/mysql-test-run.sh:
  Log name of running test
mysql-test/r/rpl_sporadic_master.result:
  Cleaned up test
mysql-test/t/rpl_sporadic_master.test:
  cleaned up test
sql/log.cc:
  Cleanup.
  Fixed bug in wait_for_update() that I had introduced.
sql/mini_client.cc:
  Indentation changes.
sql/mysql_priv.h:
  Changed option variables to my_bool.
sql/mysqld.cc:
  Changed option variables to my_bool.
  Removed not used LOCK_server_id
  Minor code cleanups.
sql/repl_failsafe.cc:
  Minor code cleanups
sql/slave.cc:
  Minor code cleanups.
  Fixed usage of wait_for_update().
sql/slave.h:
  Changed option variables to my_bool.
sql/sql_class.cc:
  Added new thread specific mutex LOCK_delete to be able to free LOCK_thread_count early
sql/sql_class.h:
  Added new thread specific mutex LOCK_delete to be able to free LOCK_thread_count early
sql/sql_insert.cc:
  Do broadcast after unlock()
sql/sql_parse.cc:
  Removed not needed LOCK
  Changed usage of LOCK_thread_count -> LOCK_status for statistics variables
  Changed killing of threads to not lock LOCK_thread_count for long.
sql/sql_repl.cc:
  Changed options variables to my_bool
  Fixed usage of wait_for_update()
  Fixed loop to kill slaves to not lock LOCK_thread_count for long.
  Code optimization.
sql/sql_repl.h:
  bool -> my_bool
  Fixed KICK_SLAVE to use LOCK_delete
parent d99e50c8
...@@ -384,7 +384,6 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups) ...@@ -384,7 +384,6 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
......
...@@ -343,6 +343,7 @@ SLAVE_MYPID="$MYRUN_DIR/slave.pid" ...@@ -343,6 +343,7 @@ SLAVE_MYPID="$MYRUN_DIR/slave.pid"
SLAVE_MYLOG="$MYSQL_TEST_DIR/var/log/slave.log" SLAVE_MYLOG="$MYSQL_TEST_DIR/var/log/slave.log"
SLAVE_MYERR="$MYSQL_TEST_DIR/var/log/slave.err" SLAVE_MYERR="$MYSQL_TEST_DIR/var/log/slave.err"
CURRENT_TEST="$MYSQL_TEST_DIR/var/log/current_test"
SMALL_SERVER="-O key_buffer_size=1M -O sort_buffer=256K -O max_heap_table_size=1M" SMALL_SERVER="-O key_buffer_size=1M -O sort_buffer=256K -O max_heap_table_size=1M"
export MASTER_MYPORT export MASTER_MYPORT
...@@ -1034,6 +1035,7 @@ run_testcase () ...@@ -1034,6 +1035,7 @@ run_testcase ()
master_init_script=$TESTDIR/$tname-master.sh master_init_script=$TESTDIR/$tname-master.sh
slave_init_script=$TESTDIR/$tname-slave.sh slave_init_script=$TESTDIR/$tname-slave.sh
slave_master_info_file=$TESTDIR/$tname-slave-master-info.opt slave_master_info_file=$TESTDIR/$tname-slave-master-info.opt
echo $tname > $CURRENT_TEST
SKIP_SLAVE=`$EXPR \( $tname : rpl \) = 0` SKIP_SLAVE=`$EXPR \( $tname : rpl \) = 0`
if [ $USE_MANAGER = 1 ] ; then if [ $USE_MANAGER = 1 ] ; then
many_slaves=`$EXPR \( $tname : rpl_failsafe \) != 0` many_slaves=`$EXPR \( $tname : rpl_failsafe \) != 0`
......
...@@ -4,7 +4,6 @@ reset master; ...@@ -4,7 +4,6 @@ reset master;
reset slave; reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start; slave start;
drop table if exists t1,t2;
create table t2(n int); create table t2(n int);
create table t1(n int not null auto_increment primary key); create table t1(n int not null auto_increment primary key);
insert into t1 values (NULL),(NULL); insert into t1 values (NULL),(NULL);
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
# COM_BINLOG_DUMP and additionally limits the number of events per dump # COM_BINLOG_DUMP and additionally limits the number of events per dump
source include/master-slave.inc; source include/master-slave.inc;
drop table if exists t1,t2;
create table t2(n int); create table t2(n int);
create table t1(n int not null auto_increment primary key); create table t1(n int not null auto_increment primary key);
......
...@@ -810,12 +810,8 @@ void MYSQL_LOG::new_file(bool need_lock) ...@@ -810,12 +810,8 @@ void MYSQL_LOG::new_file(bool need_lock)
if (log_type == LOG_BIN) if (log_type == LOG_BIN)
{ {
if (generate_new_name(new_name, name)) if (generate_new_name(new_name, name))
{ goto end; /* Error; Continue using old log file */
/* Error; Continue using old log file */
if (need_lock)
VOID(pthread_mutex_unlock(&LOCK_log));
return; // Something went wrong
}
new_name_ptr=new_name; new_name_ptr=new_name;
if (!no_auto_events) if (!no_auto_events)
{ {
...@@ -853,6 +849,7 @@ void MYSQL_LOG::new_file(bool need_lock) ...@@ -853,6 +849,7 @@ void MYSQL_LOG::new_file(bool need_lock)
no_auto_events); no_auto_events);
my_free(old_name,MYF(0)); my_free(old_name,MYF(0));
end:
if (need_lock) if (need_lock)
{ {
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
...@@ -1358,16 +1355,24 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -1358,16 +1355,24 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
NOTES NOTES
One must have a lock on LOCK_log before calling this function. One must have a lock on LOCK_log before calling this function.
This lock will be freed before return!
The reason for the above is that for enter_cond() / exit_cond() to
work the mutex must be got before enter_cond() but releases before
exit_cond().
If you don't do it this way, you will get a deadlock in THD::awake()
*/ */
void MYSQL_LOG:: wait_for_update(THD* thd) void MYSQL_LOG:: wait_for_update(THD* thd)
{ {
safe_mutex_assert_owner(&LOCK_log);
const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
"Slave: waiting for binlog update"); "Slave: waiting for binlog update");
pthread_cond_wait(&update_cond, &LOCK_log); pthread_cond_wait(&update_cond, &LOCK_log);
pthread_mutex_unlock(&LOCK_log); // See NOTES
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
} }
/* /*
......
...@@ -353,7 +353,7 @@ mc_net_safe_read(MYSQL *mysql) ...@@ -353,7 +353,7 @@ mc_net_safe_read(MYSQL *mysql)
else else
strmov(net->last_error, "Packet too large - increase \ strmov(net->last_error, "Packet too large - increase \
max_allowed_packet on this server"); max_allowed_packet on this server");
} }
return(packet_error); return(packet_error);
} }
if (net->read_pos[0] == 255) if (net->read_pos[0] == 255)
...@@ -671,7 +671,7 @@ mc_mysql_connect(MYSQL *mysql,const char *host, const char *user, ...@@ -671,7 +671,7 @@ mc_mysql_connect(MYSQL *mysql,const char *host, const char *user,
if ((pkt_length=mc_net_safe_read(mysql)) == packet_error) if ((pkt_length=mc_net_safe_read(mysql)) == packet_error)
goto error; goto error;
/* Check if version of protocoll matches current one */ /* Check if version of protocol matches current one */
mysql->protocol_version= net->read_pos[0]; mysql->protocol_version= net->read_pos[0];
DBUG_DUMP("packet",(char*) net->read_pos,10); DBUG_DUMP("packet",(char*) net->read_pos,10);
......
...@@ -637,15 +637,16 @@ extern uint test_flags,select_errors,ha_open_options; ...@@ -637,15 +637,16 @@ extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables; extern uint protocol_version,dropping_tables;
extern uint delay_key_write_options; extern uint delay_key_write_options;
extern bool opt_endinfo, using_udf_functions, locked_in_memory; extern bool opt_endinfo, using_udf_functions, locked_in_memory;
extern bool opt_using_transactions, use_temp_pool, mysql_embedded; extern bool opt_using_transactions, mysql_embedded;
extern bool using_update_log, opt_large_files; extern bool using_update_log, opt_large_files;
extern bool opt_log, opt_update_log, opt_bin_log, opt_slow_log; extern bool opt_log, opt_update_log, opt_bin_log, opt_slow_log;
extern bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern bool opt_disable_networking, opt_skip_show_db;
extern bool opt_disable_networking, opt_skip_show_db, opt_enable_named_pipe;
extern bool volatile abort_loop, shutdown_in_progress, grant_option; extern bool volatile abort_loop, shutdown_in_progress, grant_option;
extern uint volatile thread_count, thread_running, global_read_lock; extern uint volatile thread_count, thread_running, global_read_lock;
extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern my_bool opt_safe_show_db, opt_local_infile, lower_case_table_names; extern my_bool opt_safe_show_db, opt_local_infile, lower_case_table_names;
extern my_bool opt_slave_compressed_protocol; extern my_bool opt_slave_compressed_protocol, use_temp_pool;
extern my_bool opt_enable_named_pipe;
extern char f_fyllchar; extern char f_fyllchar;
extern MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log; extern MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log;
...@@ -656,7 +657,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open, ...@@ -656,7 +657,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status, LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status,
LOCK_grant, LOCK_error_log, LOCK_delayed_insert, LOCK_grant, LOCK_error_log, LOCK_delayed_insert,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_server_id, LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables; LOCK_global_system_variables;
extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager; extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
extern pthread_attr_t connection_attrib; extern pthread_attr_t connection_attrib;
......
...@@ -246,18 +246,18 @@ bool opt_large_files= sizeof(my_off_t) > 4; ...@@ -246,18 +246,18 @@ bool opt_large_files= sizeof(my_off_t) > 4;
Variables to store startup options Variables to store startup options
*/ */
bool opt_skip_slave_start = 0; // If set, slave is not autostarted my_bool opt_skip_slave_start = 0; // If set, slave is not autostarted
/* /*
If set, some standard measures to enforce slave data integrity will not If set, some standard measures to enforce slave data integrity will not
be performed be performed
*/ */
bool opt_reckless_slave = 0; my_bool opt_reckless_slave = 0;
ulong back_log, connect_timeout, concurrency; ulong back_log, connect_timeout, concurrency;
char mysql_home[FN_REFLEN], pidfile_name[FN_REFLEN], time_zone[30]; char mysql_home[FN_REFLEN], pidfile_name[FN_REFLEN], time_zone[30];
bool opt_log, opt_update_log, opt_bin_log, opt_slow_log; bool opt_log, opt_update_log, opt_bin_log, opt_slow_log;
bool opt_disable_networking=0, opt_skip_show_db=0; bool opt_disable_networking=0, opt_skip_show_db=0;
bool opt_enable_named_pipe= 0; my_bool opt_enable_named_pipe= 0;
my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol; my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol;
uint delay_key_write_options= (uint) DELAY_KEY_WRITE_ON; uint delay_key_write_options= (uint) DELAY_KEY_WRITE_ON;
...@@ -272,11 +272,12 @@ static my_string opt_logname=0,opt_update_logname=0, ...@@ -272,11 +272,12 @@ static my_string opt_logname=0,opt_update_logname=0,
static char* mysql_home_ptr= mysql_home; static char* mysql_home_ptr= mysql_home;
static char* pidfile_name_ptr= pidfile_name; static char* pidfile_name_ptr= pidfile_name;
static pthread_t select_thread; static pthread_t select_thread;
static bool opt_noacl, opt_bootstrap=0, opt_myisam_log=0; static my_bool opt_noacl=0, opt_bootstrap=0, opt_myisam_log=0;
bool opt_sql_bin_update = 0, opt_log_slave_updates = 0; my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_safe_show_db=0, lower_case_table_names, opt_old_rpl_compat; my_bool opt_safe_show_db=0, lower_case_table_names, opt_old_rpl_compat;
my_bool opt_show_slave_auth_info; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
volatile bool mqh_used = 0; volatile bool mqh_used = 0;
FILE *bootstrap_file=0; FILE *bootstrap_file=0;
int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice
...@@ -408,7 +409,7 @@ TYPELIB sql_mode_typelib= {array_elements(sql_mode_names)-1,"", ...@@ -408,7 +409,7 @@ TYPELIB sql_mode_typelib= {array_elements(sql_mode_names)-1,"",
sql_mode_names}; sql_mode_names};
MY_BITMAP temp_pool; MY_BITMAP temp_pool;
bool use_temp_pool=0; my_bool use_temp_pool=0;
pthread_key(MEM_ROOT*,THR_MALLOC); pthread_key(MEM_ROOT*,THR_MALLOC);
pthread_key(THD*, THR_THD); pthread_key(THD*, THR_THD);
...@@ -418,7 +419,7 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count, ...@@ -418,7 +419,7 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
LOCK_error_log, LOCK_error_log,
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received, LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
LOCK_server_id, LOCK_global_system_variables, LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi; LOCK_user_conn, LOCK_slave_list, LOCK_active_mi;
pthread_cond_t COND_refresh,COND_thread_count, COND_slave_stopped, pthread_cond_t COND_refresh,COND_thread_count, COND_slave_stopped,
...@@ -1553,10 +1554,16 @@ static void *signal_hand(void *arg __attribute__((unused))) ...@@ -1553,10 +1554,16 @@ static void *signal_hand(void *arg __attribute__((unused)))
} }
#endif /* HAVE_STACK_TRACE_ON_SEGV */ #endif /* HAVE_STACK_TRACE_ON_SEGV */
// signal to start_signal_handler that we are ready /*
signal to start_signal_handler that we are ready
This works by waiting for start_signal_handler to free mutex,
after which we signal it that we are ready.
At this pointer there is no other threads running, so there
should not be any other pthread_cond_signal() calls.
*/
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
(void) pthread_cond_signal(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_sigmask(SIG_BLOCK,&set,NULL); (void) pthread_sigmask(SIG_BLOCK,&set,NULL);
for (;;) for (;;)
...@@ -1860,7 +1867,6 @@ int main(int argc, char **argv) ...@@ -1860,7 +1867,6 @@ int main(int argc, char **argv)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
...@@ -2170,9 +2176,7 @@ The server will not act as a slave."); ...@@ -2170,9 +2176,7 @@ The server will not act as a slave.");
} }
} }
while (handler_count > 0) while (handler_count > 0)
{
pthread_cond_wait(&COND_handler_count,&LOCK_thread_count); pthread_cond_wait(&COND_handler_count,&LOCK_thread_count);
}
} }
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
} }
...@@ -2194,8 +2198,8 @@ The server will not act as a slave."); ...@@ -2194,8 +2198,8 @@ The server will not act as a slave.");
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex")); DBUG_PRINT("quit", ("Got thread_count mutex"));
select_thread_in_use=0; // For close_connections select_thread_in_use=0; // For close_connections
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
#ifdef EXTRA_DEBUG2 #ifdef EXTRA_DEBUG2
sql_print_error("After lock_thread_count"); sql_print_error("After lock_thread_count");
#endif #endif
...@@ -2204,9 +2208,7 @@ The server will not act as a slave."); ...@@ -2204,9 +2208,7 @@ The server will not act as a slave.");
/* Wait until cleanup is done */ /* Wait until cleanup is done */
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
while (!ready_to_exit) while (!ready_to_exit)
{
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
}
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
#if defined(__WIN__) && !defined(EMBEDDED_LIBRARY) #if defined(__WIN__) && !defined(EMBEDDED_LIBRARY)
...@@ -2794,8 +2796,8 @@ pthread_handler_decl(handle_connections_namedpipes,arg) ...@@ -2794,8 +2796,8 @@ pthread_handler_decl(handle_connections_namedpipes,arg)
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
handler_count--; handler_count--;
pthread_cond_signal(&COND_handler_count);
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
pthread_cond_signal(&COND_handler_count);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
#endif /* __NT__ */ #endif /* __NT__ */
...@@ -3144,25 +3146,23 @@ struct my_option my_long_options[] = ...@@ -3144,25 +3146,23 @@ struct my_option my_long_options[] =
"Syntax: myisam-recover[=option[,option...]], where option can be DEFAULT, BACKUP or FORCE.", "Syntax: myisam-recover[=option[,option...]], where option can be DEFAULT, BACKUP or FORCE.",
(gptr*) &myisam_recover_options_str, (gptr*) &myisam_recover_options_str, 0, (gptr*) &myisam_recover_options_str, (gptr*) &myisam_recover_options_str, 0,
GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
/*
Option needs to be available for the test case to pass in non-debugging
mode. is a no-op.
*/
{"memlock", OPT_MEMLOCK, "Lock mysqld in memory", (gptr*) &locked_in_memory, {"memlock", OPT_MEMLOCK, "Lock mysqld in memory", (gptr*) &locked_in_memory,
(gptr*) &locked_in_memory, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, (gptr*) &locked_in_memory, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"disconnect-slave-event-count", OPT_DISCONNECT_SLAVE_EVENT_COUNT, {"disconnect-slave-event-count", OPT_DISCONNECT_SLAVE_EVENT_COUNT,
"Undocumented: Meant for debugging and testing of replication", "Option used by mysql-test for debugging and testing of replication",
(gptr*) &disconnect_slave_event_count, (gptr*) &disconnect_slave_event_count,
(gptr*) &disconnect_slave_event_count, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, (gptr*) &disconnect_slave_event_count, 0, GET_INT, REQUIRED_ARG, 0, 0, 0,
0, 0, 0}, 0, 0, 0},
{"abort-slave-event-count", OPT_ABORT_SLAVE_EVENT_COUNT, {"abort-slave-event-count", OPT_ABORT_SLAVE_EVENT_COUNT,
"Undocumented: Meant for debugging and testing of replication", "Option used by mysql-test for debugging and testing of replication",
(gptr*) &abort_slave_event_count, (gptr*) &abort_slave_event_count, (gptr*) &abort_slave_event_count, (gptr*) &abort_slave_event_count,
0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"max-binlog-dump-events", OPT_MAX_BINLOG_DUMP_EVENTS, "Undocumented", {"max-binlog-dump-events", OPT_MAX_BINLOG_DUMP_EVENTS,
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &max_binlog_dump_events, (gptr*) &max_binlog_dump_events, 0, (gptr*) &max_binlog_dump_events, (gptr*) &max_binlog_dump_events, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"sporadic-binlog-dump-fail", OPT_SPORADIC_BINLOG_DUMP_FAIL, "Undocumented", {"sporadic-binlog-dump-fail", OPT_SPORADIC_BINLOG_DUMP_FAIL,
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &opt_sporadic_binlog_dump_fail, (gptr*) &opt_sporadic_binlog_dump_fail,
(gptr*) &opt_sporadic_binlog_dump_fail, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, (gptr*) &opt_sporadic_binlog_dump_fail, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0}, 0},
...@@ -3176,7 +3176,7 @@ struct my_option my_long_options[] = ...@@ -3176,7 +3176,7 @@ struct my_option my_long_options[] =
(gptr*) &opt_no_mix_types, (gptr*) &opt_no_mix_types, 0, GET_BOOL, NO_ARG, (gptr*) &opt_no_mix_types, (gptr*) &opt_no_mix_types, 0, GET_BOOL, NO_ARG,
0, 0, 0, 0, 0, 0}, 0, 0, 0, 0, 0, 0},
#endif #endif
{"old-protocol", 'o', "Use the old (3.20) protocol", {"old-protocol", 'o', "Use the old (3.20) protocol client/server protocol",
(gptr*) &protocol_version, (gptr*) &protocol_version, 0, GET_UINT, NO_ARG, (gptr*) &protocol_version, (gptr*) &protocol_version, 0, GET_UINT, NO_ARG,
PROTOCOL_VERSION, 0, 0, 0, 0, 0}, PROTOCOL_VERSION, 0, 0, 0, 0, 0},
{"old-rpl-compat", OPT_OLD_RPL_COMPAT, {"old-rpl-compat", OPT_OLD_RPL_COMPAT,
......
...@@ -276,9 +276,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) ...@@ -276,9 +276,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
return 1; return 1;
} }
linfo.index_file_offset = 0; if (mysql_bin_log.find_log_pos(&linfo, NullS, 1))
if (mysql_bin_log.find_log_pos(&linfo, NullS))
{ {
strmov(errmsg,"Could not find first log"); strmov(errmsg,"Could not find first log");
return 1; return 1;
...@@ -332,7 +330,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) ...@@ -332,7 +330,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
strmov(last_log_name, linfo.log_file_name); strmov(last_log_name, linfo.log_file_name);
last_pos = my_b_tell(&log); last_pos = my_b_tell(&log);
switch (mysql_bin_log.find_next_log(&linfo)) { switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case LOG_INFO_EOF: case LOG_INFO_EOF:
if (last_file >= 0) if (last_file >= 0)
(void)my_close(last_file, MYF(MY_WME)); (void)my_close(last_file, MYF(MY_WME));
......
...@@ -230,7 +230,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -230,7 +230,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
Test to see if the previous run was with the skip of purging Test to see if the previous run was with the skip of purging
If yes, we do not purge when we restart If yes, we do not purge when we restart
*/ */
if (rli->relay_log.find_log_pos(&rli->linfo,NullS)) if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
{ {
*errmsg="Could not find first log during relay log initialization"; *errmsg="Could not find first log during relay log initialization";
goto err; goto err;
...@@ -240,7 +240,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -240,7 +240,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
{ {
if (strcmp(log, rli->linfo.log_file_name)) if (strcmp(log, rli->linfo.log_file_name))
rli->skip_log_purge=1; // Different name; Don't purge rli->skip_log_purge=1; // Different name; Don't purge
if (rli->relay_log.find_log_pos(&rli->linfo, log)) if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{ {
*errmsg="Could not find target log during relay log initialization"; *errmsg="Could not find target log during relay log initialization";
goto err; goto err;
...@@ -1201,7 +1201,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) ...@@ -1201,7 +1201,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
LOG_INFO linfo; LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space"); DBUG_ENTER("count_relay_log_space");
rli->log_space_total = 0; rli->log_space_total = 0;
if (rli->relay_log.find_log_pos(&linfo, NullS)) if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{ {
sql_print_error("Could not find first log while counting relay log space"); sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -1210,7 +1210,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) ...@@ -1210,7 +1210,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
{ {
if (add_relay_log(rli,&linfo)) if (add_relay_log(rli,&linfo))
DBUG_RETURN(1); DBUG_RETURN(1);
} while (!rli->relay_log.find_next_log(&linfo)); } while (!rli->relay_log.find_next_log(&linfo, 1));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2538,9 +2538,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ...@@ -2538,9 +2538,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
{ {
last_errno=mc_mysql_errno(mysql); last_errno=mc_mysql_errno(mysql);
suppress_warnings= 0; suppress_warnings= 0;
sql_print_error("Slave I/O thread: error connecting to master \ sql_print_error("Slave I/O thread: error %s to master \
'%s@%s:%d': \ '%s@%s:%d': \
Error: '%s' errno: %d retry-time: %d retries: %d", Error: '%s' errno: %d retry-time: %d retries: %d",
(reconnect ? "reconnecting" : "connecting"),
mi->user,mi->host,mi->port, mi->user,mi->host,mi->port,
mc_mysql_error(mysql), last_errno, mc_mysql_error(mysql), last_errno,
mi->connect_retry, mi->connect_retry,
...@@ -2771,8 +2772,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -2771,8 +2772,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
update. If we do not, show slave status will block update. If we do not, show slave status will block
*/ */
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
/* Note that wait_for_update unlocks lock_log ! */
rli->relay_log.wait_for_update(rli->sql_thd); rli->relay_log.wait_for_update(rli->sql_thd);
pthread_mutex_unlock(log_lock);
// re-acquire data lock since we released it earlier // re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
......
...@@ -30,7 +30,8 @@ extern bool use_slave_mask; ...@@ -30,7 +30,8 @@ extern bool use_slave_mask;
extern char* slave_load_tmpdir; extern char* slave_load_tmpdir;
extern my_string master_info_file,relay_log_info_file; extern my_string master_info_file,relay_log_info_file;
extern my_string opt_relay_logname, opt_relaylog_index_name; extern my_string opt_relay_logname, opt_relaylog_index_name;
extern bool opt_skip_slave_start, opt_reckless_slave; extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
extern ulong relay_log_space_limit; extern ulong relay_log_space_limit;
struct st_master_info; struct st_master_info;
...@@ -408,7 +409,6 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos, ...@@ -408,7 +409,6 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg); const char** errmsg);
extern bool opt_log_slave_updates ;
pthread_handler_decl(handle_slave_io,arg); pthread_handler_decl(handle_slave_io,arg);
pthread_handler_decl(handle_slave_sql,arg); pthread_handler_decl(handle_slave_sql,arg);
extern bool volatile abort_loop; extern bool volatile abort_loop;
......
...@@ -116,8 +116,8 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), ...@@ -116,8 +116,8 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
active_vio = 0; active_vio = 0;
pthread_mutex_init(&active_vio_lock, MY_MUTEX_INIT_FAST);
#endif #endif
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
/* Variables with default values */ /* Variables with default values */
proc_info="login"; proc_info="login";
...@@ -189,6 +189,10 @@ THD::~THD() ...@@ -189,6 +189,10 @@ THD::~THD()
{ {
THD_CHECK_SENTRY(this); THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()"); DBUG_ENTER("~THD()");
/* Ensure that no one is using THD */
pthread_mutex_lock(&LOCK_delete);
pthread_mutex_unlock(&LOCK_delete);
/* Close connection */ /* Close connection */
if (net.vio) if (net.vio)
{ {
...@@ -217,18 +221,19 @@ THD::~THD() ...@@ -217,18 +221,19 @@ THD::~THD()
free_root(&mem_root,MYF(0)); free_root(&mem_root,MYF(0));
free_root(&transaction.mem_root,MYF(0)); free_root(&transaction.mem_root,MYF(0));
mysys_var=0; // Safety (shouldn't be needed) mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE pthread_mutex_destroy(&LOCK_delete);
pthread_mutex_destroy(&active_vio_lock);
#endif
#ifndef DBUG_OFF #ifndef DBUG_OFF
dbug_sentry = THD_SENTRY_GONE; dbug_sentry = THD_SENTRY_GONE;
#endif #endif
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void THD::awake(bool prepare_to_die) void THD::awake(bool prepare_to_die)
{ {
THD_CHECK_SENTRY(this); THD_CHECK_SENTRY(this);
safe_mutex_assert_owner(&LOCK_delete);
if (prepare_to_die) if (prepare_to_die)
killed = 1; killed = 1;
thr_alarm_kill(real_id); thr_alarm_kill(real_id);
......
...@@ -141,8 +141,8 @@ class MYSQL_LOG { ...@@ -141,8 +141,8 @@ class MYSQL_LOG {
// iterating through the log index file // iterating through the log index file
int find_log_pos(LOG_INFO* linfo, const char* log_name, int find_log_pos(LOG_INFO* linfo, const char* log_name,
bool need_mutex=1); bool need_mutex);
int find_next_log(LOG_INFO* linfo, bool need_mutex=1); int find_next_log(LOG_INFO* linfo, bool need_mutex);
int get_current_log(LOG_INFO* linfo); int get_current_log(LOG_INFO* linfo);
uint next_file_id(); uint next_file_id();
...@@ -330,7 +330,8 @@ class THD :public ilink { ...@@ -330,7 +330,8 @@ class THD :public ilink {
struct sockaddr_in remote; // client socket address struct sockaddr_in remote; // client socket address
struct rand_struct rand; // used for authentication struct rand_struct rand; // used for authentication
struct system_variables variables; // Changeable local variables struct system_variables variables; // Changeable local variables
pthread_mutex_t LOCK_delete; // Locked before thd is deleted
char *query; // Points to the current query, char *query; // Points to the current query,
/* /*
A pointer to the stack frame of handle_one_connection(), A pointer to the stack frame of handle_one_connection(),
...@@ -410,7 +411,6 @@ class THD :public ilink { ...@@ -410,7 +411,6 @@ class THD :public ilink {
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
Vio* active_vio; Vio* active_vio;
pthread_mutex_t active_vio_lock;
#endif #endif
ulonglong next_insert_id,last_insert_id,current_insert_id, ulonglong next_insert_id,last_insert_id,current_insert_id,
limit_found_rows; limit_found_rows;
...@@ -465,25 +465,25 @@ class THD :public ilink { ...@@ -465,25 +465,25 @@ class THD :public ilink {
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
inline void set_active_vio(Vio* vio) inline void set_active_vio(Vio* vio)
{ {
pthread_mutex_lock(&active_vio_lock); pthread_mutex_lock(&LOCK_delete);
active_vio = vio; active_vio = vio;
pthread_mutex_unlock(&active_vio_lock); pthread_mutex_unlock(&LOCK_delete);
} }
inline void clear_active_vio() inline void clear_active_vio()
{ {
pthread_mutex_lock(&active_vio_lock); pthread_mutex_lock(&LOCK_delete);
active_vio = 0; active_vio = 0;
pthread_mutex_unlock(&active_vio_lock); pthread_mutex_unlock(&LOCK_delete);
} }
inline void close_active_vio() inline void close_active_vio()
{ {
pthread_mutex_lock(&active_vio_lock); pthread_mutex_lock(&LOCK_delete);
if (active_vio) if (active_vio)
{ {
vio_close(active_vio); vio_close(active_vio);
active_vio = 0; active_vio = 0;
} }
pthread_mutex_unlock(&active_vio_lock); pthread_mutex_unlock(&LOCK_delete);
} }
#endif #endif
void awake(bool prepare_to_die); void awake(bool prepare_to_die);
......
...@@ -562,8 +562,8 @@ class delayed_insert :public ilink { ...@@ -562,8 +562,8 @@ class delayed_insert :public ilink {
thd.user=thd.host=0; thd.user=thd.host=0;
thread_count--; thread_count--;
delayed_insert_threads--; delayed_insert_threads--;
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
VOID(pthread_mutex_unlock(&LOCK_thread_count)); VOID(pthread_mutex_unlock(&LOCK_thread_count));
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
} }
/* The following is for checking when we can delete ourselves */ /* The following is for checking when we can delete ourselves */
......
...@@ -585,7 +585,7 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -585,7 +585,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (!(test_flags & TEST_NO_THREADS) & my_thread_init()) if (!(test_flags & TEST_NO_THREADS) & my_thread_init())
{ {
close_connection(&thd->net,ER_OUT_OF_RESOURCES); close_connection(&thd->net,ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_thread_count); statistic_increment(aborted_connects,&LOCK_status);
end_thread(thd,0); end_thread(thd,0);
return 0; return 0;
} }
...@@ -612,7 +612,7 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -612,7 +612,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (thd->store_globals()) if (thd->store_globals())
{ {
close_connection(&thd->net,ER_OUT_OF_RESOURCES); close_connection(&thd->net,ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_thread_count); statistic_increment(aborted_connects,&LOCK_status);
end_thread(thd,0); end_thread(thd,0);
return 0; return 0;
} }
...@@ -634,7 +634,7 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -634,7 +634,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (vio_type(net->vio) == VIO_TYPE_NAMEDPIPE) if (vio_type(net->vio) == VIO_TYPE_NAMEDPIPE)
sleep(1); /* must wait after eof() */ sleep(1); /* must wait after eof() */
#endif #endif
statistic_increment(aborted_connects,&LOCK_thread_count); statistic_increment(aborted_connects,&LOCK_status);
goto end_thread; goto end_thread;
} }
...@@ -668,7 +668,7 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -668,7 +668,7 @@ pthread_handler_decl(handle_one_connection,arg)
(net->last_errno ? ER(net->last_errno) : (net->last_errno ? ER(net->last_errno) :
ER(ER_UNKNOWN_ERROR))); ER(ER_UNKNOWN_ERROR)));
send_error(net,net->last_errno,NullS); send_error(net,net->last_errno,NullS);
thread_safe_increment(aborted_threads,&LOCK_thread_count); thread_safe_increment(aborted_threads,&LOCK_status);
} }
end_thread: end_thread:
...@@ -757,8 +757,8 @@ pthread_handler_decl(handle_bootstrap,arg) ...@@ -757,8 +757,8 @@ pthread_handler_decl(handle_bootstrap,arg)
end: end:
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
thread_count--; thread_count--;
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
my_thread_end(); my_thread_end();
pthread_exit(0); pthread_exit(0);
DBUG_RETURN(0); // Never reached DBUG_RETURN(0); // Never reached
...@@ -883,7 +883,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -883,7 +883,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->lex.select_lex.options=0; // We store status here thd->lex.select_lex.options=0; // We store status here
switch (command) { switch (command) {
case COM_INIT_DB: case COM_INIT_DB:
thread_safe_increment(com_stat[SQLCOM_CHANGE_DB],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_CHANGE_DB],&LOCK_status);
if (!mysql_change_db(thd,packet)) if (!mysql_change_db(thd,packet))
mysql_log.write(thd,command,"%s",thd->db); mysql_log.write(thd,command,"%s",thd->db);
break; break;
...@@ -895,7 +895,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -895,7 +895,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
} }
case COM_TABLE_DUMP: case COM_TABLE_DUMP:
{ {
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other, &LOCK_status);
slow_command = TRUE; slow_command = TRUE;
uint db_len = *(uchar*)packet; uint db_len = *(uchar*)packet;
uint tbl_len = *(uchar*)(packet + db_len + 1); uint tbl_len = *(uchar*)(packet + db_len + 1);
...@@ -912,7 +912,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -912,7 +912,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
} }
case COM_CHANGE_USER: case COM_CHANGE_USER:
{ {
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other,&LOCK_status);
char *user= (char*) packet; char *user= (char*) packet;
char *passwd= strend(user)+1; char *passwd= strend(user)+1;
char *db= strend(passwd)+1; char *db= strend(passwd)+1;
...@@ -992,7 +992,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -992,7 +992,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
{ {
char *fields; char *fields;
TABLE_LIST table_list; TABLE_LIST table_list;
thread_safe_increment(com_stat[SQLCOM_SHOW_FIELDS],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_SHOW_FIELDS],&LOCK_status);
bzero((char*) &table_list,sizeof(table_list)); bzero((char*) &table_list,sizeof(table_list));
if (!(table_list.db=thd->db)) if (!(table_list.db=thd->db))
{ {
...@@ -1027,7 +1027,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1027,7 +1027,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
case COM_CREATE_DB: // QQ: To be removed case COM_CREATE_DB: // QQ: To be removed
{ {
thread_safe_increment(com_stat[SQLCOM_CREATE_DB],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_CREATE_DB],&LOCK_status);
char *db=thd->strdup(packet); char *db=thd->strdup(packet);
// null test to handle EOM // null test to handle EOM
if (!db || !strip_sp(db) || check_db_name(db)) if (!db || !strip_sp(db) || check_db_name(db))
...@@ -1045,7 +1045,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1045,7 +1045,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
} }
case COM_DROP_DB: // QQ: To be removed case COM_DROP_DB: // QQ: To be removed
{ {
thread_safe_increment(com_stat[SQLCOM_DROP_DB],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_DROP_DB],&LOCK_status);
char *db=thd->strdup(packet); char *db=thd->strdup(packet);
// null test to handle EOM // null test to handle EOM
if (!db || !strip_sp(db) || check_db_name(db)) if (!db || !strip_sp(db) || check_db_name(db))
...@@ -1066,7 +1066,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1066,7 +1066,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
} }
case COM_BINLOG_DUMP: case COM_BINLOG_DUMP:
{ {
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other,&LOCK_status);
slow_command = TRUE; slow_command = TRUE;
if (check_global_access(thd, REPL_SLAVE_ACL)) if (check_global_access(thd, REPL_SLAVE_ACL))
break; break;
...@@ -1078,11 +1078,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1078,11 +1078,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* TODO: The following has to be changed to an 8 byte integer */ /* TODO: The following has to be changed to an 8 byte integer */
pos = uint4korr(packet); pos = uint4korr(packet);
flags = uint2korr(packet + 4); flags = uint2korr(packet + 4);
pthread_mutex_lock(&LOCK_server_id);
thd->server_id=0; /* avoid suicide */ thd->server_id=0; /* avoid suicide */
kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6)); kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6));
thd->server_id = slave_server_id; thd->server_id = slave_server_id;
pthread_mutex_unlock(&LOCK_server_id);
mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags); mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags);
unregister_slave(thd,1,1); unregister_slave(thd,1,1);
// fake COM_QUIT -- if we get here, the thread needs to terminate // fake COM_QUIT -- if we get here, the thread needs to terminate
...@@ -1092,7 +1090,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1092,7 +1090,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
} }
case COM_REFRESH: case COM_REFRESH:
{ {
thread_safe_increment(com_stat[SQLCOM_FLUSH],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_FLUSH],&LOCK_status);
ulong options= (ulong) (uchar) packet[0]; ulong options= (ulong) (uchar) packet[0];
if (check_global_access(thd,RELOAD_ACL)) if (check_global_access(thd,RELOAD_ACL))
break; break;
...@@ -1104,7 +1102,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1104,7 +1102,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break; break;
} }
case COM_SHUTDOWN: case COM_SHUTDOWN:
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other,&LOCK_status);
if (check_global_access(thd,SHUTDOWN_ACL)) if (check_global_access(thd,SHUTDOWN_ACL))
break; /* purecov: inspected */ break; /* purecov: inspected */
DBUG_PRINT("quit",("Got shutdown command")); DBUG_PRINT("quit",("Got shutdown command"));
...@@ -1127,7 +1125,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1127,7 +1125,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
case COM_STATISTICS: case COM_STATISTICS:
{ {
mysql_log.write(thd,command,NullS); mysql_log.write(thd,command,NullS);
thread_safe_increment(com_stat[SQLCOM_SHOW_STATUS],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_SHOW_STATUS],&LOCK_status);
char buff[200]; char buff[200];
ulong uptime = (ulong) (thd->start_time - start_time); ulong uptime = (ulong) (thd->start_time - start_time);
sprintf((char*) buff, sprintf((char*) buff,
...@@ -1146,11 +1144,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1146,11 +1144,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break; break;
} }
case COM_PING: case COM_PING:
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other,&LOCK_status);
send_ok(net); // Tell client we are alive send_ok(net); // Tell client we are alive
break; break;
case COM_PROCESS_INFO: case COM_PROCESS_INFO:
thread_safe_increment(com_stat[SQLCOM_SHOW_PROCESSLIST],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_SHOW_PROCESSLIST],&LOCK_status);
if (!thd->priv_user[0] && check_global_access(thd,PROCESS_ACL)) if (!thd->priv_user[0] && check_global_access(thd,PROCESS_ACL))
break; break;
mysql_log.write(thd,command,NullS); mysql_log.write(thd,command,NullS);
...@@ -1159,13 +1157,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1159,13 +1157,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break; break;
case COM_PROCESS_KILL: case COM_PROCESS_KILL:
{ {
thread_safe_increment(com_stat[SQLCOM_KILL],&LOCK_thread_count); thread_safe_increment(com_stat[SQLCOM_KILL],&LOCK_status);
ulong id=(ulong) uint4korr(packet); ulong id=(ulong) uint4korr(packet);
kill_one_thread(thd,id); kill_one_thread(thd,id);
break; break;
} }
case COM_DEBUG: case COM_DEBUG:
thread_safe_increment(com_other,&LOCK_thread_count); thread_safe_increment(com_other,&LOCK_status);
if (check_global_access(thd, SUPER_ACL)) if (check_global_access(thd, SUPER_ACL))
break; /* purecov: inspected */ break; /* purecov: inspected */
mysql_print_status(thd); mysql_print_status(thd);
...@@ -1265,7 +1263,7 @@ mysql_execute_command(void) ...@@ -1265,7 +1263,7 @@ mysql_execute_command(void)
!tables_ok(thd,tables))) !tables_ok(thd,tables)))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
thread_safe_increment(com_stat[lex->sql_command],&LOCK_thread_count); thread_safe_increment(com_stat[lex->sql_command],&LOCK_status);
switch (lex->sql_command) { switch (lex->sql_command) {
case SQLCOM_SELECT: case SQLCOM_SELECT:
{ {
...@@ -3393,28 +3391,46 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables) ...@@ -3393,28 +3391,46 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
} }
/*
kill on thread
SYNOPSIS
kill_one_thread()
thd Thread class
id Thread id
NOTES
This is written such that we have a short lock on LOCK_thread_count
*/
void kill_one_thread(THD *thd, ulong id) void kill_one_thread(THD *thd, ulong id)
{ {
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
THD *tmp; THD *tmp;
uint error=ER_NO_SUCH_THREAD; uint error=ER_NO_SUCH_THREAD;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++)) while ((tmp=it++))
{ {
if (tmp->thread_id == id) if (tmp->thread_id == id)
{ {
if ((thd->master_access & SUPER_ACL) || pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
!strcmp(thd->user,tmp->user)) break;
{
tmp->awake(1 /*prepare to die*/);
error=0;
}
else
error=ER_KILL_DENIED_ERROR;
break; // Found thread
} }
} }
VOID(pthread_mutex_unlock(&LOCK_thread_count)); VOID(pthread_mutex_unlock(&LOCK_thread_count));
if (tmp)
{
if ((thd->master_access & SUPER_ACL) ||
!strcmp(thd->user,tmp->user))
{
tmp->awake(1 /*prepare to die*/);
error=0;
}
else
error=ER_KILL_DENIED_ERROR;
pthread_mutex_unlock(&tmp->LOCK_delete);
}
if (!error) if (!error)
send_ok(&thd->net); send_ok(&thd->net);
else else
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
extern const char* any_db; extern const char* any_db;
int max_binlog_dump_events = 0; // unlimited int max_binlog_dump_events = 0; // unlimited
bool opt_sporadic_binlog_dump_fail = 0; my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0; static int binlog_dump_count = 0;
int check_binlog_magic(IO_CACHE* log, const char** errmsg) int check_binlog_magic(IO_CACHE* log, const char** errmsg)
...@@ -247,7 +247,8 @@ bool log_in_use(const char* log_name) ...@@ -247,7 +247,8 @@ bool log_in_use(const char* log_name)
pthread_mutex_lock(&linfo->lock); pthread_mutex_lock(&linfo->lock);
result = !memcmp(log_name, linfo->log_file_name, log_name_len); result = !memcmp(log_name, linfo->log_file_name, log_name_len);
pthread_mutex_unlock(&linfo->lock); pthread_mutex_unlock(&linfo->lock);
if (result) break; if (result)
break;
} }
} }
...@@ -346,7 +347,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -346,7 +347,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
linfo.index_file_offset = 0; linfo.index_file_offset = 0;
thd->current_linfo = &linfo; thd->current_linfo = &linfo;
if (mysql_bin_log.find_log_pos(&linfo, name)) if (mysql_bin_log.find_log_pos(&linfo, name, 1))
{ {
errmsg = "Could not find first log file name in binary log index file"; errmsg = "Could not find first log file name in binary log index file";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
...@@ -496,22 +497,28 @@ Increase max_allowed_packet on master"; ...@@ -496,22 +497,28 @@ Increase max_allowed_packet on master";
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) { switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
case 0: case 0:
/* we read successfully, so we'll need to send it to the slave */ /* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1; read_packet = 1;
break; break;
case LOG_READ_EOF: case LOG_READ_EOF:
DBUG_PRINT("wait",("waiting for data in binary log")); DBUG_PRINT("wait",("waiting for data in binary log"));
if (!thd->killed) if (!thd->killed)
{
/* Note that the following call unlocks lock_log */
mysql_bin_log.wait_for_update(thd); mysql_bin_log.wait_for_update(thd);
}
else
pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update")); DBUG_PRINT("wait",("binary log received update"));
break; break;
default: default:
pthread_mutex_unlock(log_lock);
fatal_error = 1; fatal_error = 1;
break; break;
} }
pthread_mutex_unlock(log_lock);
if (read_packet) if (read_packet)
{ {
thd->proc_info = "sending update to slave"; thd->proc_info = "sending update to slave";
...@@ -552,7 +559,7 @@ Increase max_allowed_packet on master"; ...@@ -552,7 +559,7 @@ Increase max_allowed_packet on master";
bool loop_breaker = 0; bool loop_breaker = 0;
// need this to break out of the for loop from switch // need this to break out of the for loop from switch
thd->proc_info = "switching to next log"; thd->proc_info = "switching to next log";
switch (mysql_bin_log.find_next_log(&linfo)) { switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case LOG_INFO_EOF: case LOG_INFO_EOF:
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK); loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
break; break;
...@@ -739,17 +746,21 @@ void kill_zombie_dump_threads(uint32 slave_server_id) ...@@ -739,17 +746,21 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
if (tmp->command == COM_BINLOG_DUMP && if (tmp->command == COM_BINLOG_DUMP &&
tmp->server_id == slave_server_id) tmp->server_id == slave_server_id)
{ {
/* pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
Here we do not call kill_one_thread() as break;
it will be slow because it will iterate through the list
again. Plus it double-locks LOCK_tread_count, which
make safe_mutex complain and abort.
We just to do kill the thread ourselves.
*/
tmp->awake(1/*prepare to die*/);
} }
} }
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
if (tmp)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. We just to do kill the thread ourselves.
*/
tmp->awake(1/*prepare to die*/);
pthread_mutex_unlock(&tmp->LOCK_delete);
}
} }
...@@ -927,9 +938,10 @@ int show_binlog_events(THD* thd) ...@@ -927,9 +938,10 @@ int show_binlog_events(THD* thd)
my_off_t pos = lex_mi->pos; my_off_t pos = lex_mi->pos;
char search_file_name[FN_REFLEN], *name; char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name; const char *log_file_name = lex_mi->log_file_name;
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
LOG_INFO linfo; LOG_INFO linfo;
Log_event* ev; Log_event* ev;
limit_start = thd->lex.select->offset_limit; limit_start = thd->lex.select->offset_limit;
limit_end = thd->lex.select->select_limit + limit_start; limit_end = thd->lex.select->select_limit + limit_start;
...@@ -942,7 +954,7 @@ int show_binlog_events(THD* thd) ...@@ -942,7 +954,7 @@ int show_binlog_events(THD* thd)
linfo.index_file_offset = 0; linfo.index_file_offset = 0;
thd->current_linfo = &linfo; thd->current_linfo = &linfo;
if (mysql_bin_log.find_log_pos(&linfo, name)) if (mysql_bin_log.find_log_pos(&linfo, name, 1))
{ {
errmsg = "Could not find target log"; errmsg = "Could not find target log";
goto err; goto err;
...@@ -957,7 +969,7 @@ int show_binlog_events(THD* thd) ...@@ -957,7 +969,7 @@ int show_binlog_events(THD* thd)
goto err; goto err;
} }
pthread_mutex_lock(mysql_bin_log.get_log_lock()); pthread_mutex_lock(log_lock);
my_b_seek(&log, pos); my_b_seek(&log, pos);
for (event_count = 0; for (event_count = 0;
...@@ -968,7 +980,7 @@ int show_binlog_events(THD* thd) ...@@ -968,7 +980,7 @@ int show_binlog_events(THD* thd)
{ {
errmsg = "Net error"; errmsg = "Net error";
delete ev; delete ev;
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); pthread_mutex_unlock(log_lock);
goto err; goto err;
} }
...@@ -982,11 +994,11 @@ int show_binlog_events(THD* thd) ...@@ -982,11 +994,11 @@ int show_binlog_events(THD* thd)
if (event_count < limit_end && log.error) if (event_count < limit_end && log.error)
{ {
errmsg = "Wrong offset or I/O error"; errmsg = "Wrong offset or I/O error";
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); pthread_mutex_unlock(log_lock);
goto err; goto err;
} }
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); pthread_mutex_unlock(log_lock);
} }
err: err:
......
...@@ -18,9 +18,9 @@ extern bool server_id_supplied; ...@@ -18,9 +18,9 @@ extern bool server_id_supplied;
extern I_List<i_string> binlog_do_db, binlog_ignore_db; extern I_List<i_string> binlog_do_db, binlog_ignore_db;
extern int max_binlog_dump_events; extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail; extern my_bool opt_sporadic_binlog_dump_fail;
#define KICK_SLAVE(thd) thd->awake(0 /* do not prepare to die*/); #define KICK_SLAVE(thd) { pthread_mutex_lock(&(thd)->LOCK_delete); (thd)->awake(0 /* do not prepare to die*/); pthread_mutex_unlock(&(thd)->LOCK_delete); }
File open_binlog(IO_CACHE *log, const char *log_file_name, File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg); const char **errmsg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment