slave.cc 98.2 KB
Newer Older
1
/* Copyright (C) 2000-2003 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
20
#include <myisam.h>
unknown's avatar
unknown committed
21
#include "mini_client.h"
22
#include "slave.h"
23
#include "sql_repl.h"
24
#include "repl_failsafe.h"
unknown's avatar
unknown committed
25
#include <thr_alarm.h>
unknown's avatar
unknown committed
26
#include <my_dir.h>
27
#include <assert.h>
unknown's avatar
unknown committed
28

29 30 31
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

32 33
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

34
volatile bool slave_sql_running = 0, slave_io_running = 0;
35
char* slave_load_tmpdir = 0;
unknown's avatar
unknown committed
36
MASTER_INFO *active_mi;
37
volatile int active_mi_in_use = 0;
38
HASH replicate_do_table, replicate_ignore_table;
unknown's avatar
unknown committed
39
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
40
bool do_table_inited = 0, ignore_table_inited = 0;
unknown's avatar
unknown committed
41
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
42
bool table_rules_on = 0;
43
ulonglong relay_log_space_limit = 0;
unknown's avatar
unknown committed
44 45 46 47 48 49 50

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
51

52
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
53
int events_till_abort = -1;
54
static int events_till_disconnect = -1;
unknown's avatar
unknown committed
55

56
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
unknown's avatar
unknown committed
57

58
void skip_load_data_infile(NET* net);
59
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
unknown's avatar
unknown committed
60
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
unknown's avatar
unknown committed
61
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
62 63
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
unknown's avatar
unknown committed
64
static int count_relay_log_space(RELAY_LOG_INFO* rli);
65
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
66
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
unknown's avatar
unknown committed
67 68
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
unknown's avatar
unknown committed
69
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
70
			     bool reconnect, bool suppress_warnings);
71 72
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
73
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
unknown's avatar
unknown committed
74 75
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
76
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
77
char* rewrite_db(char* db);
78

79 80 81 82 83 84

/*
  Get a bit mask for which threads are running so that we later can
  restart these threads
*/

85 86 87 88 89 90 91 92
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
93 94
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
95 96 97
  *mask = tmp_mask;
}

98

99 100 101 102 103 104 105 106 107 108 109 110 111 112
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

113

114 115
int init_slave()
{
116
  DBUG_ENTER("init_slave");
117

118 119
  /* This is called when mysqld starts */

120 121 122 123
  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
unknown's avatar
unknown committed
124
  active_mi= new MASTER_INFO;
125 126

  /*
127 128 129
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
130
  */
131 132 133 134 135 136 137
  if (!active_mi)
  {
    sql_print_error("Failed to allocate memory for the master info structure");
    goto err;
  }
    
  if(init_master_info(active_mi,master_info_file,relay_log_info_file,
138
		       !master_host))
139
  {
140
    sql_print_error("Failed to initialize the master info structure");
unknown's avatar
unknown committed
141
    goto err;
142
  }
143 144 145 146 147 148 149 150 151

  /*
    make sure slave thread gets started if server_id is set,
    valid master.info is present, and master_host has not been specified
  */
  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

  if (master_host && !opt_skip_slave_start)
152
  {
153 154 155 156 157 158
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
unknown's avatar
unknown committed
159
    {
160
      sql_print_error("Failed to create slave threads");
unknown's avatar
unknown committed
161 162
      goto err;
    }
163
  }
164
  DBUG_RETURN(0);
unknown's avatar
unknown committed
165 166 167

err:
  DBUG_RETURN(1);
168 169
}

170

171 172
static void free_table_ent(TABLE_RULE_ENT* e)
{
unknown's avatar
unknown committed
173
  my_free((gptr) e, MYF(0));
174 175 176 177 178 179 180 181 182
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
    - check proper initialization of master_log_name/master_log_pos
    - We may always want to delete all logs before 'log'.
      Currently if we are not calling this with 'log' as NULL or the first
      log we will never delete relay logs.
      If we want this we should not set skip_log_purge to 1.

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
unknown's avatar
unknown committed
212

213 214 215 216
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
		       const char** errmsg)
{
unknown's avatar
unknown committed
217 218
  DBUG_ENTER("init_relay_log_pos");

219
  *errmsg=0;
220 221 222 223 224
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
  pthread_mutex_lock(log_lock);
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
  
225
  /* Close log file and free buffers if it's already open */
226 227 228 229 230 231 232
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
233
  rli->relay_log_pos = pos;
unknown's avatar
unknown committed
234

unknown's avatar
unknown committed
235 236 237 238
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
239
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
240 241 242 243
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
244 245

  if (log)					// If not first log
unknown's avatar
unknown committed
246
  {
247
    if (strcmp(log, rli->linfo.log_file_name))
248
      rli->skip_log_purge= 1;			// Different name; Don't purge
249
    if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
250 251 252 253
    {
      *errmsg="Could not find target log during relay log initialization";
      goto err;
    }
unknown's avatar
unknown committed
254
  }
255 256
  strmake(rli->relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->relay_log_name)-1);
257 258
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
259 260 261 262 263
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
264
    if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
unknown's avatar
unknown committed
265
	check_binlog_magic(rli->cur_log,errmsg))
266
      goto err;
unknown's avatar
unknown committed
267
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
268 269 270
  }
  else
  {
271 272 273
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
274 275 276 277 278
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
279
  if (pos >= BIN_LOG_HEADER_SIZE)
unknown's avatar
unknown committed
280 281
    my_b_seek(rli->cur_log,(off_t)pos);

282
err:
283 284 285 286 287 288
  /*
    If we don't purge, we can't honour relay_log_space_limit ;
    silently discard it
  */
  if (rli->skip_log_purge)
    rli->log_space_limit= 0;
unknown's avatar
unknown committed
289 290 291
  pthread_cond_broadcast(&rli->data_cond);
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
292 293 294

  /* Isn't this strange: if !need_data_lock, we broadcast with no lock ?? */

unknown's avatar
unknown committed
295 296
  pthread_mutex_unlock(log_lock);
  DBUG_RETURN ((*errmsg) ? 1 : 0);
297 298
}

299

300
/* called from get_options() in mysqld.cc on start-up */
unknown's avatar
unknown committed
301 302

void init_slave_skip_errors(const char* arg)
303
{
unknown's avatar
unknown committed
304
  const char *p;
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
  if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
  for (;isspace(*arg);++arg)
    /* empty */;
  if (!my_casecmp(arg,"all",3))
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
    while (!isdigit(*p) && *p)
      p++;
  }
}

unknown's avatar
unknown committed
330

unknown's avatar
unknown committed
331
/*
unknown's avatar
unknown committed
332
  We assume we have a run lock on rli and that both slave thread
unknown's avatar
unknown committed
333 334 335
  are not running
*/

336 337
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
338
{
339
  int error=0;
unknown's avatar
unknown committed
340
  DBUG_ENTER("purge_relay_logs");
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363

  /*
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
    Indeed, rli->inited==0 does not imply that they already are empty.
    It could be that slave's info initialization partly succeeded : 
    for example if relay-log.info existed but *relay-bin*.*
    have been manually removed, init_relay_log_info reads the old 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
    checks for the existence of the relay log, this fails and
    init_relay_log_info leaves rli->inited to 0.
    In that pathological case, rli->master_log_pos* will be properly reinited
    at the next START SLAVE (as RESET SLAVE or CHANGE
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
    or replace them with correct files), however if the user does SHOW SLAVE
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS 
    to display fine in any case.
  */

  rli->master_log_name[0]= 0;
  rli->master_log_pos= 0;
  rli->pending= 0;

364
  if (!rli->inited)
365
    DBUG_RETURN(0);
unknown's avatar
unknown committed
366

367 368
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
unknown's avatar
unknown committed
369

370 371
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
372
  if (rli->relay_log.reset_logs(thd))
373 374 375 376 377
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
378 379
  /* Save name of used relay log file */
  strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
unknown's avatar
unknown committed
380
	  sizeof(rli->relay_log_name)-1);
unknown's avatar
unknown committed
381 382 383
  // Just first log with magic number and nothing else
  rli->log_space_total= BIN_LOG_HEADER_SIZE;
  rli->relay_log_pos=   BIN_LOG_HEADER_SIZE;
unknown's avatar
unknown committed
384
  rli->relay_log.reset_bytes_written();
385
  if (!just_reset)
386 387
    error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
			      0 /* do not need data lock */, errmsg);
unknown's avatar
unknown committed
388

unknown's avatar
unknown committed
389 390 391 392
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
unknown's avatar
unknown committed
393
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
394
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
395
  DBUG_RETURN(error);
396 397
}

398

399 400 401 402 403 404 405
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
406
  DBUG_ENTER("terminate_slave_threads");
407 408 409 410 411 412 413 414 415 416

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
unknown's avatar
unknown committed
417
    DBUG_PRINT("info",("Terminating IO thread"));
418 419
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
unknown's avatar
unknown committed
420 421 422
				      io_cond_lock,
				      &mi->stop_cond,
				      &mi->slave_running)) &&
423
	!force_all)
424
      DBUG_RETURN(error);
425 426 427
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
unknown's avatar
unknown committed
428
    DBUG_PRINT("info",("Terminating SQL thread"));
429 430 431 432 433 434 435
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
436
      DBUG_RETURN(error);
437
  }
438
  DBUG_RETURN(0);
439 440
}

441

442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
			   volatile bool* slave_running)
{
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
      return ER_SLAVE_NOT_RUNNING;
    }
  }
  DBUG_ASSERT(thd != 0);
unknown's avatar
unknown committed
457 458 459
  /*
    Is is criticate to test if the slave is running. Otherwise, we might
    be referening freed memory trying to kick it
460
  */
461
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
462 463

  while (*slave_running)			// Should always be true
464 465
  {
    KICK_SLAVE(thd);
unknown's avatar
unknown committed
466 467 468
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
469 470
    */
    struct timespec abstime;
unknown's avatar
unknown committed
471
    set_timespec(abstime,2);
472 473 474 475 476 477 478
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
  return 0;
}

479

unknown's avatar
unknown committed
480
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
481
		       pthread_mutex_t *cond_lock,
unknown's avatar
unknown committed
482 483 484
		       pthread_cond_t *start_cond,
		       volatile bool *slave_running,
		       volatile ulong *slave_run_id,
485 486 487
		       MASTER_INFO* mi)
{
  pthread_t th;
unknown's avatar
unknown committed
488
  ulong start_id;
489
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
490 491
  DBUG_ENTER("start_slave_thread");

492 493 494 495 496 497 498 499 500
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
unknown's avatar
unknown committed
501
    DBUG_RETURN(ER_BAD_SLAVE);
502 503 504
  }
  
  if (*slave_running)
505 506 507 508 509
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
510
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
511
  }
unknown's avatar
unknown committed
512 513
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
514 515 516 517
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
518
    DBUG_RETURN(ER_SLAVE_THREAD);
519 520 521 522
  }
  if (start_cond && cond_lock)
  {
    THD* thd = current_thd;
unknown's avatar
unknown committed
523
    while (start_id == *slave_run_id)
524
    {
unknown's avatar
unknown committed
525
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
526
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
527
					    "Waiting for slave thread to start");
528 529 530 531 532
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
      if (thd->killed)
      {
	pthread_mutex_unlock(cond_lock);
unknown's avatar
unknown committed
533
	DBUG_RETURN(ER_SERVER_SHUTDOWN);
534 535 536 537 538
      }
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
unknown's avatar
unknown committed
539
  DBUG_RETURN(0);
540
}
unknown's avatar
unknown committed
541 542 543 544 545 546


/*
  SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
  sense to do that for starting a slave - we always care if it actually
  started the threads that were not previously running
547
*/
unknown's avatar
unknown committed
548

549 550 551 552 553 554 555
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
556
  DBUG_ENTER("start_slave_threads");
557 558 559 560 561 562 563 564 565 566 567 568 569
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
570 571 572

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
unknown's avatar
unknown committed
573 574
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
575 576
			     mi);
  if (!error && (thread_mask & SLAVE_SQL))
577
  {
578 579
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
unknown's avatar
unknown committed
580 581
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
			     mi);
582 583 584
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
585
  DBUG_RETURN(error);
586
}
587

588

589 590 591 592
void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
593
	    (hash_free_key) free_table_ent, 0);
594 595
  *h_inited = 1;
}
unknown's avatar
unknown committed
596

unknown's avatar
unknown committed
597 598
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
599
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
unknown's avatar
unknown committed
600 601 602 603 604 605 606 607 608
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
609
  for (i = 0; i < a->elements; i++)
unknown's avatar
unknown committed
610 611 612
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
613
      if (!wild_case_compare(key, key_end, (const char*)e->db,
unknown's avatar
unknown committed
614 615 616 617 618 619 620
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641

/*
  Checks whether tables match some (wild_)do_table and (wild_)ignore_table
  rules (for replication)

  SYNOPSIS
    tables_ok()
    thd             thread (SQL slave thread normally)
    tables          list of tables to check

  NOTES
    Note that changing the order of the tables in the list can lead to
    different results. Note also the order of precedence of the do/ignore 
    rules (see code below). For that reason, users should not set conflicting 
    rules because they may get unpredicted results.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/

642 643
int tables_ok(THD* thd, TABLE_LIST* tables)
{
644 645
  DBUG_ENTER("tables_ok");

unknown's avatar
unknown committed
646 647
  for (; tables; tables = tables->next)
  {
648 649 650 651
    char hash_key[2*NAME_LEN+2];
    char *end;
    uint len;

unknown's avatar
unknown committed
652 653
    if (!tables->updating) 
      continue;
654 655 656
    end= strmov(hash_key, tables->db ? tables->db : thd->db);
    *end++= '.';
    len= (uint) (strmov(end, tables->real_name) - hash_key);
unknown's avatar
unknown committed
657
    if (do_table_inited) // if there are any do's
658
    {
unknown's avatar
unknown committed
659
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
660
	DBUG_RETURN(1);
unknown's avatar
unknown committed
661
    }
662
    if (ignore_table_inited) // if there are any ignores
unknown's avatar
unknown committed
663 664
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
665
	DBUG_RETURN(0); 
666
    }
unknown's avatar
unknown committed
667 668
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
669
      DBUG_RETURN(1);
unknown's avatar
unknown committed
670 671
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
672
      DBUG_RETURN(0);
unknown's avatar
unknown committed
673
  }
674

unknown's avatar
unknown committed
675 676 677 678
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
  DBUG_RETURN(!do_table_inited && !wild_do_table_inited);
}


/*
  Checks whether a db matches wild_do_table and wild_ignore_table
  rules (for replication)

  SYNOPSIS
    db_ok_with_wild_table()
    db		name of the db to check.
		Is tested with check_db_name() before calling this function.

  NOTES
    Here is the reason for this function.
    We advise users who want to exclude a database 'db1' safely to do it
    with replicate_wild_ignore_table='db1.%' instead of binlog_ignore_db or
    replicate_ignore_db because the two lasts only check for the selected db,
    which won't work in that case:
    USE db2;
    UPDATE db1.t SET ... #this will be replicated and should not
    whereas replicate_wild_ignore_table will work in all cases.
    With replicate_wild_ignore_table, we only check tables. When
    one does 'DROP DATABASE db1', tables are not involved and the
    statement will be replicated, while users could expect it would not (as it
    rougly means 'DROP db1.first_table, DROP db1.second_table...').
    In other words, we want to interpret 'db1.%' as "everything touching db1".
    That is why we want to match 'db1' against 'db1.%' wild table rules.

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated
 */

int db_ok_with_wild_table(const char *db)
{
  char hash_key[NAME_LEN+2];
  char *end;
  int len;
  end= strmov(hash_key, db);
  *end++= '.';
  len= end - hash_key ;
  if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
                                        hash_key, len))
    return 1;
  if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
                                            hash_key, len))
    return 0;
  
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
  return !wild_do_table_inited;
733 734 735 736 737
}


int add_table_rule(HASH* h, const char* table_spec)
{
unknown's avatar
unknown committed
738
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
739
  if (!dot) return 1;
unknown's avatar
unknown committed
740
  // len is always > 0 because we know the there exists a '.'
741 742 743
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
744
  if (!e) return 1;
745 746 747 748 749 750 751 752
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

unknown's avatar
unknown committed
753 754
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
unknown's avatar
unknown committed
755
  const char* dot = strchr(table_spec, '.');
unknown's avatar
unknown committed
756
  if (!dot) return 1;
unknown's avatar
unknown committed
757 758 759
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
unknown's avatar
unknown committed
760
  if (!e) return 1;
unknown's avatar
unknown committed
761 762 763 764 765 766 767 768
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

769 770 771
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
772
  for (i = 0; i < a->elements; i++)
773 774
    {
      char* p;
unknown's avatar
unknown committed
775
      get_dynamic(a, (gptr) &p, i);
776 777 778 779 780
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

781 782
#ifdef NOT_USED_YET

783 784 785 786 787
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
788
#endif
789

unknown's avatar
unknown committed
790

791 792
void end_slave()
{
unknown's avatar
unknown committed
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
  if (active_mi)
  {
    /*
      TODO: replace the line below with
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
      once multi-master code is ready.
    */
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
    end_master_info(active_mi);
    if (do_table_inited)
      hash_free(&replicate_do_table);
    if (ignore_table_inited)
      hash_free(&replicate_ignore_table);
    if (wild_do_table_inited)
      free_string_array(&replicate_wild_do_table);
    if (wild_ignore_table_inited)
      free_string_array(&replicate_wild_ignore_table);
    delete active_mi;
    active_mi= 0;
  }
813
}
unknown's avatar
unknown committed
814

815

816
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
817
{
818 819 820
  DBUG_ASSERT(mi->io_thd == thd);
  DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
  return mi->abort_slave || abort_loop || thd->killed;
unknown's avatar
unknown committed
821 822
}

823

824
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
825 826 827 828 829 830
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

831

832
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
833 834 835
{
  va_list args;
  va_start(args,msg);
836 837 838 839 840
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
		  err_code);
  rli->last_slave_errno = err_code;
841 842
}

843

844
void skip_load_data_infile(NET* net)
845 846 847
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
unknown's avatar
unknown committed
848 849
  (void)my_net_read(net);			// discard response
  send_ok(net);					// the master expects it
850 851
}

852

853
char* rewrite_db(char* db)
unknown's avatar
unknown committed
854
{
unknown's avatar
unknown committed
855 856
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
unknown's avatar
unknown committed
857 858 859
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

unknown's avatar
unknown committed
860 861 862 863 864
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
      return tmp->val;
  }
unknown's avatar
unknown committed
865 866
  return db;
}
867

unknown's avatar
unknown committed
868

869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
/*
  Checks whether a db matches some do_db and ignore_db rules
  (for logging or replication)

  SYNOPSIS
    db_ok()
    db              name of the db to check
    do_list         either binlog_do_db or replicate_do_db
    ignore_list     either binlog_ignore_db or replicate_ignore_db

  RETURN VALUES
    0           should not be logged/replicated
    1           should be logged/replicated                  
*/

unknown's avatar
unknown committed
884 885 886
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
unknown's avatar
unknown committed
887
  if (do_list.is_empty() && ignore_list.is_empty())
unknown's avatar
unknown committed
888 889
    return 1; // ok to replicate if the user puts no constraints

unknown's avatar
unknown committed
890 891 892 893 894
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
unknown's avatar
unknown committed
895
    return 0;
unknown's avatar
unknown committed
896

unknown's avatar
unknown committed
897 898 899 900
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
unknown's avatar
unknown committed
901

unknown's avatar
unknown committed
902 903 904 905
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
unknown's avatar
unknown committed
906
    }
unknown's avatar
unknown committed
907 908
    return 0;
  }
unknown's avatar
unknown committed
909
  else // there are some elements in the don't, otherwise we cannot get here
unknown's avatar
unknown committed
910 911 912
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
unknown's avatar
unknown committed
913

unknown's avatar
unknown committed
914 915 916 917
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
unknown's avatar
unknown committed
918
    }
unknown's avatar
unknown committed
919 920
    return 1;
  }
unknown's avatar
unknown committed
921 922
}

923

unknown's avatar
unknown committed
924 925
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
unknown's avatar
unknown committed
926
{
unknown's avatar
unknown committed
927 928 929 930 931 932 933
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
unknown's avatar
unknown committed
934
    {
unknown's avatar
unknown committed
935 936 937 938
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
unknown's avatar
unknown committed
939
      int c;
unknown's avatar
unknown committed
940
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
unknown's avatar
unknown committed
941
    }
unknown's avatar
unknown committed
942 943 944 945
    return 0;
  }
  else if (default_val)
  {
unknown's avatar
unknown committed
946
    strmake(var,  default_val, max_size-1);
unknown's avatar
unknown committed
947 948
    return 0;
  }
unknown's avatar
unknown committed
949
  return 1;
unknown's avatar
unknown committed
950 951
}

952

unknown's avatar
unknown committed
953
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
unknown's avatar
unknown committed
954 955 956
{
  char buf[32];
  
unknown's avatar
unknown committed
957 958 959 960 961
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
unknown's avatar
unknown committed
962
  else if (default_val)
unknown's avatar
unknown committed
963 964 965 966
  {
    *var = default_val;
    return 0;
  }
unknown's avatar
unknown committed
967
  return 1;
unknown's avatar
unknown committed
968 969
}

970

971 972
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
973
  const char* errmsg= 0;
974
  
unknown's avatar
unknown committed
975 976 977
  /*
    Note the following switch will bug when we have MySQL branch 30 ;)
  */
978
  switch (*mysql->server_version) {
979
  case '3':
unknown's avatar
unknown committed
980 981 982 983
    mi->old_format = 
      (strncmp(mysql->server_version, "3.23.57", 7) < 0) /* < .57 */ ?
      BINLOG_FORMAT_323_LESS_57 : 
      BINLOG_FORMAT_323_GEQ_57 ;
984 985
    break;
  case '4':
unknown's avatar
unknown committed
986
  case '5':
unknown's avatar
unknown committed
987
    mi->old_format = BINLOG_FORMAT_CURRENT;
988 989 990
    break;
  default:
    errmsg = "Master reported unrecognized MySQL version";
991
    break;
992
  }
993

994 995 996 997 998 999 1000 1001
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
  return 0;
}

unknown's avatar
unknown committed
1002 1003 1004 1005

static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
1006
  ulong packet_len = my_net_read(net); // read create table statement
1007
  char *query;
1008 1009
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
1010
  TABLE_LIST tables;
1011 1012
  int error= 1;
  handler *file;
1013
  ulong save_options;
unknown's avatar
unknown committed
1014
  
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
  if (packet_len == packet_error)
  {
    send_error(&thd->net, ER_MASTER_NET_READ);
    return 1;
  }
  if (net->read_pos[0] == 255) // error from master
  {
    net->read_pos[packet_len] = 0;
    net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
    return 1;
  }
unknown's avatar
unknown committed
1026
  thd->command = COM_TABLE_DUMP;
1027 1028
  /* Note that we should not set thd->query until the area is initalized */
  if (!(query = sql_alloc(packet_len + 1)))
1029 1030 1031 1032 1033
  {
    sql_print_error("create_table_from_dump: out of memory");
    net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
    return 1;
  }
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
  memcpy(query, net->read_pos, packet_len);
  query[packet_len]= 0;
  thd->query_length= packet_len;
  /*
    We make the following lock in an attempt to ensure that the compiler will
    not rearrange the code so that thd->query is set too soon
  */
  VOID(pthread_mutex_lock(&LOCK_thread_count));
  thd->query= query;
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
1044 1045 1046
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
1047 1048 1049
  
  /* we do not want to log create table statement */
  save_options = thd->options;
1050
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
unknown's avatar
unknown committed
1051
  thd->proc_info = "Creating table from master dump";
unknown's avatar
unknown committed
1052
  // save old db in case we are creating in a different database
unknown's avatar
unknown committed
1053
  char* save_db = thd->db;
1054
  thd->db = (char*)db;
unknown's avatar
unknown committed
1055
  mysql_parse(thd, thd->query, packet_len); // run create table
1056
  thd->db = save_db;		// leave things the way the were before
1057
  thd->options = save_options;
unknown's avatar
unknown committed
1058
  
1059 1060
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
unknown's avatar
unknown committed
1061 1062 1063

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
1064
  tables.alias= tables.real_name= (char*)table_name;
unknown's avatar
unknown committed
1065 1066
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
unknown's avatar
unknown committed
1067 1068
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
1069
    send_error(&thd->net,0,0);			// Send error from open_ltable
unknown's avatar
unknown committed
1070
    sql_print_error("create_table_from_dump: could not open created table");
1071
    goto err;
unknown's avatar
unknown committed
1072
  }
unknown's avatar
unknown committed
1073
  
1074
  file = tables.table->file;
unknown's avatar
unknown committed
1075
  thd->proc_info = "Reading master dump table data";
unknown's avatar
unknown committed
1076 1077 1078 1079
  if (file->net_read_dump(net))
  {
    net_printf(&thd->net, ER_MASTER_NET_READ);
    sql_print_error("create_table_from_dump::failed in\
unknown's avatar
unknown committed
1080
 handler::net_read_dump()");
1081
    goto err;
unknown's avatar
unknown committed
1082
  }
unknown's avatar
unknown committed
1083 1084

  check_opt.init();
unknown's avatar
unknown committed
1085
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
unknown's avatar
unknown committed
1086
  thd->proc_info = "Rebuilding the index on master dump table";
unknown's avatar
unknown committed
1087 1088 1089 1090 1091
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
1092
  save_vio = thd->net.vio;
unknown's avatar
unknown committed
1093
  thd->net.vio = 0;
1094
  error=file->repair(thd,&check_opt) != 0;
unknown's avatar
unknown committed
1095
  thd->net.vio = save_vio;
1096 1097 1098 1099
  if (error)
    net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);

err:
unknown's avatar
unknown committed
1100 1101 1102 1103 1104
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
  return error; 
}

1105 1106
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
		       MASTER_INFO *mi, MYSQL *mysql)
unknown's avatar
unknown committed
1107
{
1108 1109 1110 1111 1112 1113
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
unknown's avatar
unknown committed
1114

unknown's avatar
merge  
unknown committed
1115
  if (!called_connected)
1116 1117 1118 1119 1120 1121
  { 
    if (!(mysql = mc_mysql_init(NULL)))
    {
      send_error(&thd->net);			// EOM
      DBUG_RETURN(1);
    }
unknown's avatar
merge  
unknown committed
1122
    if (connect_to_master(thd, mysql, mi))
1123
    {
1124 1125 1126
      net_printf(&thd->net, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql));
      mc_mysql_close(mysql);
      DBUG_RETURN(1);
1127
    }
1128 1129
    if (thd->killed)
      goto err;
1130
  }
unknown's avatar
unknown committed
1131

unknown's avatar
unknown committed
1132
  if (request_table_dump(mysql, db_name, table_name))
1133
  {
1134 1135
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
1136 1137
    goto err;
  }
unknown's avatar
merge  
unknown committed
1138
  if (create_table_from_dump(thd, &mysql->net, db_name,
1139
			    table_name))
1140
    goto err;    // create_table_from_dump will have sent the error already
unknown's avatar
unknown committed
1141
  error = 0;
1142

unknown's avatar
unknown committed
1143
 err:
1144
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
1145 1146 1147 1148 1149
  if (!called_connected)
    mc_mysql_close(mysql);
  if (errmsg && thd->net.vio)
    send_error(&thd->net, error, errmsg);
  DBUG_RETURN(test(error));			// Return 1 on error
unknown's avatar
unknown committed
1150 1151
}

1152

1153 1154
void end_master_info(MASTER_INFO* mi)
{
1155 1156
  DBUG_ENTER("end_master_info");

1157
  if (!mi->inited)
1158
    DBUG_VOID_RETURN;
1159 1160
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1161 1162 1163 1164 1165
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1166
  mi->inited = 0;
1167 1168

  DBUG_VOID_RETURN;
1169 1170
}

1171

1172 1173 1174 1175 1176 1177
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1178
  DBUG_ENTER("init_relay_log_info");
unknown's avatar
unknown committed
1179

1180
  if (rli->inited)				// Set if this function called
unknown's avatar
unknown committed
1181 1182
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1183 1184 1185 1186 1187
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->pending = 0;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
1188 1189
  rli->abort_pos_wait=0;
  rli->skip_log_purge=0;
unknown's avatar
unknown committed
1190 1191
  rli->log_space_limit = relay_log_space_limit;
  rli->log_space_total = 0;
1192

1193 1194 1195 1196
  // TODO: make this work with multi-master
  if (!opt_relay_logname)
  {
    char tmp[FN_REFLEN];
unknown's avatar
unknown committed
1197 1198 1199
    /*
      TODO: The following should be using fn_format();  We just need to
      first change fn_format() to cut the file name if it's too long.
1200 1201 1202 1203 1204
    */
    strmake(tmp,glob_hostname,FN_REFLEN-5);
    strmov(strcend(tmp,'.'),"-relay-bin");
    opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
  }
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222

  /*
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. It is
    notable that the last kilobytes of it (8 kB for example) may live in memory,
    not on disk (depending on what the thread using it does). While this is
    efficient, it has a side-effect one must know: 
    the size of the relay log on disk (displayed by 'ls -l' on Unix) can be a
    few kilobytes less than one would expect by doing SHOW SLAVE STATUS; this
    happens when only the IO thread is started (not the SQL thread). The
    "missing" kilobytes are in memory, are preserved during 'STOP SLAVE; START
    SLAVE IO_THREAD', and are flushed to disk when the slave's mysqld stops. So
    this does not cause any bug. Example of how disk size grows by leaps:

     Read_Master_Log_Pos: 7811 -rw-rw----    1 guilhem  qq              4 Jun  5 16:19 gbichot2-relay-bin.002
     ...later...
     Read_Master_Log_Pos: 9744 -rw-rw----    1 guilhem  qq           8192 Jun  5 16:27 gbichot2-relay-bin.002

    See how 4 is less than 7811 and 8192 is less than 9744.
unknown's avatar
unknown committed
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232

    WARNING: this is risky because the slave can stay like this for a long time;
    then if it has a power failure, master.info says the I/O thread has read
    until 9744 while the relay-log contains only until 8192 (the in-memory part
    from 8192 to 9744 has been lost), so the SQL slave thread will miss some
    events, silently breaking replication.
    Ideally we would like to flush master.info only when we know that the relay
    log has no in-memory tail.
    Note that the above problem may arise only when only the IO thread is
    started, which is unlikely.
1233 1234
  */

1235 1236 1237 1238
  if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
	       "-relay-bin", opt_relaylog_index_name,
	       LOG_BIN, 1 /* read_append cache */,
	       1 /* no auto events */))
1239 1240
  {
    sql_print_error("Failed in open_log() called from init_relay_log_info()");
1241
    DBUG_RETURN(1);
1242
  }
1243

1244
  /* if file does not exist */
unknown's avatar
unknown committed
1245
  if (access(fname,F_OK))
1246
  {
unknown's avatar
unknown committed
1247 1248 1249 1250
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1251 1252
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
1253 1254 1255 1256 1257 1258 1259 1260 1261
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
    {
      sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
      msg= current_thd->net.last_error;
      goto err;
    }
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME))) 
1262
    {
1263 1264
      sql_print_error("Failed to create a cache on relay log info file (\
file '%s')", fname);
1265 1266
      msg= current_thd->net.last_error;
      goto err;
1267
    }
1268 1269 1270

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
unknown's avatar
unknown committed
1271
			   &msg))
1272 1273 1274
    {
      sql_print_error("Failed to open the relay log (relay_log_name='FIRST', \
relay_log_pos=4");
1275
      goto err;
1276
    }
1277 1278 1279
    rli->master_log_name[0]= 0;
    rli->master_log_pos= 0;		
    rli->info_fd= info_fd;
1280 1281 1282
  }
  else // file exists
  {
unknown's avatar
unknown committed
1283
    if (info_fd >= 0)
1284
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
1285
    else 
1286
    {
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
      int error=0;
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
      {
        sql_print_error("Failed to open the existing relay log info file (\
file '%s', errno %d)", fname, my_errno);
        error= 1;
      }
      else if (init_io_cache(&rli->info_file, info_fd,
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on relay log info file (\
file '%s')", fname);
        error= 1;
      }
      if (error)
      {
        if (info_fd >= 0)
          my_close(info_fd, MYF(0));
        rli->info_fd= -1;
        rli->relay_log.close(1);
        pthread_mutex_unlock(&rli->data_lock);
        DBUG_RETURN(1);
      }
1310
    }
1311
         
1312
    rli->info_fd = info_fd;
1313
    int relay_log_pos, master_log_pos;
1314
    if (init_strvar_from_file(rli->relay_log_name,
unknown's avatar
unknown committed
1315 1316
			      sizeof(rli->relay_log_name), &rli->info_file,
			      "") ||
1317
       init_intvar_from_file(&relay_log_pos,
unknown's avatar
unknown committed
1318
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1319 1320
       init_strvar_from_file(rli->master_log_name,
			     sizeof(rli->master_log_name), &rli->info_file,
unknown's avatar
unknown committed
1321
			     "") ||
1322
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1323 1324 1325 1326
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1327 1328 1329
    rli->relay_log_pos=  relay_log_pos;
    rli->master_log_pos= master_log_pos;

1330 1331 1332
    if (init_relay_log_pos(rli,
			   rli->relay_log_name,
			   rli->relay_log_pos,
1333 1334
			   0 /* no data lock*/,
			   &msg))
1335 1336 1337 1338
    {
      char llbuf[22];
      sql_print_error("Failed to open the relay log (relay_log_name='%s', \
relay_log_pos=%s", rli->relay_log_name, llstr(rli->relay_log_pos, llbuf));
1339
      goto err;
1340
    }
1341
  }
unknown's avatar
unknown committed
1342
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
1343
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
unknown's avatar
unknown committed
1344 1345 1346 1347
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1348
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1349 1350
  if ((error= flush_relay_log_info(rli)))
    sql_print_error("Failed to flush relay log info file");
unknown's avatar
unknown committed
1351 1352 1353 1354 1355
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1356
  rli->inited= 1;
1357
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1358
  DBUG_RETURN(error);
1359 1360 1361 1362

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1363 1364
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
unknown's avatar
unknown committed
1365
  rli->info_fd= -1;
1366
  rli->relay_log.close(1);
1367
  pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
1368
  DBUG_RETURN(1);
1369 1370
}

1371

unknown's avatar
unknown committed
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1386
#endif  
unknown's avatar
unknown committed
1387 1388 1389
  DBUG_RETURN(0);
}

1390

unknown's avatar
unknown committed
1391 1392
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1393
  bool slave_killed=0;
unknown's avatar
unknown committed
1394 1395
  MASTER_INFO* mi = rli->mi;
  THD* thd = mi->io_thd;
1396

unknown's avatar
unknown committed
1397 1398
  DBUG_ENTER("wait_for_relay_log_space");
  pthread_mutex_lock(&rli->log_space_lock);
unknown's avatar
unknown committed
1399 1400 1401
  const char* save_proc_info= thd->enter_cond(&rli->log_space_cond,
                                              &rli->log_space_lock, 
                                              "Waiting for relay log space to free");
unknown's avatar
unknown committed
1402
  while (rli->log_space_limit < rli->log_space_total &&
1403 1404
	 !(slave_killed=io_slave_killed(thd,mi)) &&
         !rli->ignore_log_space_limit)
unknown's avatar
unknown committed
1405
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
unknown's avatar
unknown committed
1406
  thd->exit_cond(save_proc_info);
unknown's avatar
unknown committed
1407 1408 1409 1410
  pthread_mutex_unlock(&rli->log_space_lock);
  DBUG_RETURN(slave_killed);
}

unknown's avatar
unknown committed
1411

unknown's avatar
unknown committed
1412 1413 1414 1415 1416
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
  rli->log_space_total = 0;
1417
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
unknown's avatar
unknown committed
1418 1419 1420 1421
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
unknown's avatar
unknown committed
1422
  do
unknown's avatar
unknown committed
1423 1424 1425
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1426
  } while (!rli->relay_log.find_next_log(&linfo, 1));
unknown's avatar
unknown committed
1427 1428
  DBUG_RETURN(0);
}
unknown's avatar
unknown committed
1429

unknown's avatar
unknown committed
1430

1431
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
1432 1433
		     const char* slave_info_fname,
		     bool abort_if_no_master_info_file)
unknown's avatar
unknown committed
1434
{
unknown's avatar
unknown committed
1435 1436 1437 1438
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

unknown's avatar
unknown committed
1439
  if (mi->inited)
unknown's avatar
unknown committed
1440
    DBUG_RETURN(0);
unknown's avatar
unknown committed
1441 1442
  mi->mysql=0;
  mi->file_id=1;
1443
  mi->ignore_stop_event=0;
1444
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
unknown's avatar
unknown committed
1445

unknown's avatar
unknown committed
1446 1447 1448 1449
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
unknown's avatar
unknown committed
1450

1451
  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
1452
  fd = mi->fd;
1453 1454

  /* does master.info exist ? */
unknown's avatar
unknown committed
1455
  
1456
  if (access(fname,F_OK))
unknown's avatar
unknown committed
1457
  {
1458 1459 1460 1461 1462
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
unknown's avatar
unknown committed
1463 1464 1465 1466
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
unknown's avatar
unknown committed
1467 1468
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
1469 1470 1471 1472 1473 1474 1475
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
    {
      sql_print_error("Failed to create a new master info file (\
file '%s', errno %d)", fname, my_errno);
      goto err;
    }
    if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
unknown's avatar
unknown committed
1476
		      MYF(MY_WME)))
1477 1478 1479
    {
      sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
unknown's avatar
unknown committed
1480
      goto err;
1481
    }
unknown's avatar
unknown committed
1482

1483
    mi->master_log_name[0] = 0;
unknown's avatar
unknown committed
1484
    mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
unknown's avatar
unknown committed
1485 1486 1487 1488 1489 1490 1491
    mi->fd = fd;
      
    if (master_host)
      strmake(mi->host, master_host, sizeof(mi->host) - 1);
    if (master_user)
      strmake(mi->user, master_user, sizeof(mi->user) - 1);
    if (master_password)
1492
      strmake(mi->password, master_password, HASH_PASSWORD_LENGTH);
unknown's avatar
unknown committed
1493 1494 1495
    mi->port = master_port;
    mi->connect_retry = master_connect_retry;
  }
1496
  else // file exists
unknown's avatar
unknown committed
1497
  {
unknown's avatar
unknown committed
1498
    if (fd >= 0)
unknown's avatar
unknown committed
1499
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
    else 
    {
      if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
      {
        sql_print_error("Failed to open the existing master info file (\
file '%s', errno %d)", fname, my_errno);
        goto err;
      }
      if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
                        0, MYF(MY_WME)))
      {
        sql_print_error("Failed to create a cache on master info file (\
file '%s')", fname);
        goto err;
      }
    }
unknown's avatar
unknown committed
1516

unknown's avatar
unknown committed
1517
    mi->fd = fd;
1518 1519
    int port, connect_retry, master_log_pos;

1520
    if (init_strvar_from_file(mi->master_log_name,
unknown's avatar
unknown committed
1521
			      sizeof(mi->master_log_name), &mi->file,
unknown's avatar
unknown committed
1522
			      "") ||
1523
	init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
unknown's avatar
unknown committed
1524 1525 1526 1527 1528 1529
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			      master_user) || 
	init_strvar_from_file(mi->password, HASH_PASSWORD_LENGTH+1, &mi->file,
			      master_password) ||
1530 1531
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
unknown's avatar
unknown committed
1532
			      master_connect_retry))
unknown's avatar
unknown committed
1533
    {
unknown's avatar
unknown committed
1534
      sql_print_error("Error reading master configuration");
1535
      goto err;
unknown's avatar
unknown committed
1536
    }
1537 1538 1539 1540 1541 1542 1543
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
unknown's avatar
unknown committed
1544
  }
1545 1546 1547
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
1548 1549 1550 1551 1552

  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;
  mi->rli.mi = mi;

unknown's avatar
unknown committed
1553
  mi->inited = 1;
unknown's avatar
unknown committed
1554
  // now change cache READ -> WRITE - must do this before flush_master_info
1555
  reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
1556 1557
  if ((error=test(flush_master_info(mi))))
    sql_print_error("Failed to flush master info file");
1558
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1559
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1560

1561
err:
unknown's avatar
unknown committed
1562 1563 1564 1565 1566 1567
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
1568
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1569
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1570 1571
}

1572

1573 1574 1575 1576 1577
int register_slave_on_master(MYSQL* mysql)
{
  String packet;
  char buf[4];

1578
  if (!report_host)
1579 1580 1581 1582 1583
    return 0;
  
  int4store(buf, server_id);
  packet.append(buf, 4);

1584
  net_store_data(&packet, report_host); 
1585
  if (report_user)
1586 1587 1588 1589
    net_store_data(&packet, report_user);
  else
    packet.append((char)0);
  
unknown's avatar
unknown committed
1590
  if (report_password)
1591
    net_store_data(&packet, report_user);
1592 1593 1594 1595 1596
  else
    packet.append((char)0);

  int2store(buf, (uint16)report_port);
  packet.append(buf, 2);
unknown's avatar
unknown committed
1597 1598 1599 1600
  int4store(buf, rpl_recovery_rank);
  packet.append(buf, 4);
  int4store(buf, 0); /* tell the master will fill in master_id */
  packet.append(buf, 4);
1601

1602
  if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
1603 1604
		       packet.length(), 0))
  {
1605 1606
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
		    mc_mysql_errno(mysql),
1607 1608 1609 1610 1611 1612 1613
		    mc_mysql_error(mysql));
    return 1;
  }

  return 0;
}

1614
int show_master_info(THD* thd, MASTER_INFO* mi)
unknown's avatar
unknown committed
1615
{
1616
  // TODO: fix this for multi-master
unknown's avatar
unknown committed
1617 1618 1619
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
1620
						     sizeof(mi->host)));
unknown's avatar
unknown committed
1621
  field_list.push_back(new Item_empty_string("Master_User",
1622
						     sizeof(mi->user)));
unknown's avatar
unknown committed
1623 1624
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
1625
  field_list.push_back(new Item_empty_string("Master_Log_File",
unknown's avatar
unknown committed
1626
						     FN_REFLEN));
1627 1628 1629 1630 1631 1632 1633 1634
  field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
unknown's avatar
unknown committed
1635 1636
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
1637 1638 1639
  field_list.push_back(new Item_empty_string("Last_errno", 4));
  field_list.push_back(new Item_empty_string("Last_error", 20));
  field_list.push_back(new Item_empty_string("Skip_counter", 12));
1640
  field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
unknown's avatar
unknown committed
1641
  field_list.push_back(new Item_empty_string("Relay_log_space", 12));
unknown's avatar
unknown committed
1642
  if (send_fields(thd, field_list, 1))
unknown's avatar
unknown committed
1643 1644
    DBUG_RETURN(-1);

1645 1646 1647 1648
  if (mi->host[0])
  {
    String *packet= &thd->packet;
    packet->length(0);
unknown's avatar
unknown committed
1649
  
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
    net_store_data(packet, mi->host);
    net_store_data(packet, mi->user);
    net_store_data(packet, (uint32) mi->port);
    net_store_data(packet, (uint32) mi->connect_retry);
    net_store_data(packet, mi->master_log_name);
    net_store_data(packet, (longlong) mi->master_log_pos);
    net_store_data(packet, mi->rli.relay_log_name +
		   dirname_length(mi->rli.relay_log_name));
    net_store_data(packet, (longlong) mi->rli.relay_log_pos);
    net_store_data(packet, mi->rli.master_log_name);
    net_store_data(packet, mi->slave_running ? "Yes":"No");
    net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
    net_store_data(packet, &replicate_do_db);
    net_store_data(packet, &replicate_ignore_db);
    net_store_data(packet, (uint32)mi->rli.last_slave_errno);
    net_store_data(packet, mi->rli.last_slave_error);
    net_store_data(packet, mi->rli.slave_skip_counter);
    net_store_data(packet, (longlong) mi->rli.master_log_pos);
    net_store_data(packet, (longlong) mi->rli.log_space_total);
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
1673
  
1674 1675 1676
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
  }
unknown's avatar
unknown committed
1677 1678 1679 1680
  send_eof(&thd->net);
  DBUG_RETURN(0);
}

1681 1682

bool flush_master_info(MASTER_INFO* mi)
unknown's avatar
unknown committed
1683
{
unknown's avatar
unknown committed
1684
  IO_CACHE* file = &mi->file;
unknown's avatar
unknown committed
1685
  char lbuf[22];
1686 1687 1688
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

unknown's avatar
unknown committed
1689
  my_b_seek(file, 0L);
unknown's avatar
unknown committed
1690
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
1691 1692 1693 1694
	      mi->master_log_name, llstr(mi->master_log_pos, lbuf),
	      mi->host, mi->user,
	      mi->password, mi->port, mi->connect_retry
	      );
unknown's avatar
unknown committed
1695
  flush_io_cache(file);
1696
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1697 1698
}

unknown's avatar
unknown committed
1699 1700 1701

st_relay_log_info::st_relay_log_info()
  :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0),
1702
   cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0),
unknown's avatar
unknown committed
1703 1704
   slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0),
   sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
1705
   slave_running(0), skip_log_purge(0),
unknown's avatar
unknown committed
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
   inside_transaction(0) /* the default is autocommit=1 */
{
  relay_log_name[0] = master_log_name[0] = 0;
  last_slave_error[0]=0;
  

  bzero(&info_file,sizeof(info_file));
  bzero(&cache_buf, sizeof(cache_buf));
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&data_cond, NULL);
  pthread_cond_init(&start_cond, NULL);
  pthread_cond_init(&stop_cond, NULL);
  pthread_cond_init(&log_space_cond, NULL);
}


st_relay_log_info::~st_relay_log_info()
{
  pthread_mutex_destroy(&run_lock);
  pthread_mutex_destroy(&data_lock);
  pthread_mutex_destroy(&log_space_lock);
  pthread_cond_destroy(&data_cond);
  pthread_cond_destroy(&start_cond);
  pthread_cond_destroy(&stop_cond);
  pthread_cond_destroy(&log_space_cond);
}

1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
/*
  Waits until the SQL thread reaches (has executed up to) the
  log/position or timed out.

  SYNOPSIS
    wait_for_pos()
    thd             client thread that sent SELECT MASTER_POS_WAIT
    log_name        log name to wait for
    log_pos         position to wait for 
    timeout         timeout in seconds before giving up waiting

  NOTES
    timeout is longlong whereas it should be ulong ; but this is
    to catch if the user submitted a negative timeout.

  RETURN VALUES
    -2          improper arguments (log_pos<0)
                or slave not running, or master info changed
                during the function's execution,
                or client thread killed. -2 is translated to NULL by caller
    -1          timed out
    >=0         number of log events the function had to wait
                before reaching the desired log/position
 */
1759

1760
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
1761 1762
                                    longlong log_pos,
                                    longlong timeout)
unknown's avatar
unknown committed
1763
{
1764 1765
  if (!inited)
    return -1;
unknown's avatar
unknown committed
1766
  int event_count = 0;
1767
  ulong init_abort_pos_wait;
1768 1769 1770 1771
  int error=0;
  struct timespec abstime; // for timeout checking
  set_timespec(abstime,timeout);

1772
  DBUG_ENTER("wait_for_pos");
1773 1774 1775
  DBUG_PRINT("enter",("master_log_name: '%s'  pos: %lu timeout: %ld",
                      master_log_name, (ulong) master_log_pos, 
                      (long) timeout));
1776

1777
  pthread_mutex_lock(&data_lock);
1778 1779 1780 1781 1782 1783
  /* 
     This function will abort when it notices that
     some CHANGE MASTER or RESET MASTER has changed
     the master info. To catch this, these commands
     modify abort_pos_wait ; we just monitor abort_pos_wait
     and see if it has changed.
1784 1785 1786 1787 1788 1789 1790
     Why do we have this mechanism instead of simply monitoring slave_running in
     the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that the
     SQL thread be stopped? This is in case 
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
     happens very quickly between the moment pthread_cond_wait() wakes up and
     the while() is evaluated: in that case slave_running is again 1 when the
     while() is evaluated.
1791
  */
1792
  init_abort_pos_wait= abort_pos_wait;
1793

1794 1795 1796 1797 1798 1799 1800 1801
  /*
    We'll need to 
    handle all possible log names comparisons (e.g. 999 vs 1000).
    We use ulong for string->number conversion ; this is no 
    stronger limitation than in find_uniq_filename in sql/log.cc
  */
  ulong log_name_extension;
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
unknown's avatar
unknown committed
1802 1803
  char *end= strmake(log_name_tmp, log_name->ptr(), min(log_name->length(),
							FN_REFLEN-1));
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
  char *p= fn_ext(log_name_tmp);
  char *p_end;
  if (!*p || log_pos<0)   
  {
    error= -2; //means improper arguments
    goto err;
  }
  //p points to '.'
  log_name_extension= strtoul(++p, &p_end, 10);
  /*
    p_end points to the first invalid character.
    If it equals to p, no digits were found, error.
    If it contains '\0' it means conversion went ok.
  */
  if (p_end==p || *p_end)
  {
    error= -2;
    goto err;
  }    

  //"compare and wait" main loop
1825
  while (!thd->killed &&
1826
         init_abort_pos_wait == abort_pos_wait &&
1827 1828 1829 1830 1831 1832
         /* 
            formerly we tested mi->slave_running, but what we care about is
            rli->slave_running (because this concerns the SQL thread, while
            mi->slave_running concerns the I/O thread). 
         */
         slave_running)
unknown's avatar
unknown committed
1833
  {
1834 1835
    bool pos_reached;
    int cmp_result= 0;
1836 1837
    DBUG_ASSERT(*master_log_name || master_log_pos == 0);
    if (*master_log_name)
unknown's avatar
unknown committed
1838
    {
1839
      char *basename= master_log_name + dirname_length(master_log_name);
unknown's avatar
unknown committed
1840
      /*
1841 1842 1843 1844
        First compare the parts before the extension.
        Find the dot in the master's log basename,
        and protect against user's input error :
        if the names do not match up to '.' included, return error
1845
      */
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858
      char *q= (char*)(fn_ext(basename)+1);
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
      {
        error= -2;
        break;
      }
      // Now compare extensions.
      char *q_end;
      ulong master_log_name_extension= strtoul(q, &q_end, 10);
      if (master_log_name_extension < log_name_extension)
        cmp_result = -1 ;
      else
        cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ;
unknown's avatar
unknown committed
1859
    }
1860 1861
    pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) ||
                   cmp_result > 0);
unknown's avatar
unknown committed
1862 1863
    if (pos_reached || thd->killed)
      break;
1864 1865

    //wait for master update, with optional timeout.
1866
    
unknown's avatar
unknown committed
1867
    DBUG_PRINT("info",("Waiting for master update"));
1868
    const char* msg = thd->enter_cond(&data_cond, &data_lock,
1869
                                      "Waiting for master update");
1870 1871 1872 1873
    /*
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
      will wake us up.
    */
1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890
    if (timeout > 0)
    {
      /*
        Note that pthread_cond_timedwait checks for the timeout
        before for the condition ; i.e. it returns ETIMEDOUT 
        if the system time equals or exceeds the time specified by abstime
        before the condition variable is signaled or broadcast, _or_ if
        the absolute time specified by abstime has already passed at the time
        of the call.
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
        even if its condition is always immediately signaled (case of a loaded
        master).
      */
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
    }
    else
      pthread_cond_wait(&data_cond, &data_lock);
1891
    DBUG_PRINT("info",("Got signal of master update"));
1892
    thd->exit_cond(msg);
1893 1894 1895 1896 1897
    if (error == ETIMEDOUT || error == ETIME)
    {
      error= -1;
      break;
    }
unknown's avatar
unknown committed
1898
    error=0;
1899
    event_count++;
1900
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
1901
  }
1902 1903

err:
1904
  pthread_mutex_unlock(&data_lock);
1905
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
unknown's avatar
unknown committed
1906
improper_arguments: %d  timed_out: %d",
1907 1908
                     (int) thd->killed,
                     (int) (init_abort_pos_wait != abort_pos_wait),
1909
                     (int) slave_running,
1910 1911 1912
                     (int) (error == -2),
                     (int) (error == -1)));
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
1913
      !slave_running) 
1914 1915 1916 1917
  {
    error= -2;
  }
  DBUG_RETURN( error ? error : event_count );
unknown's avatar
unknown committed
1918 1919
}

1920

1921
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
unknown's avatar
unknown committed
1922 1923 1924
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
1925
  thd->host_or_ip= "";
unknown's avatar
unknown committed
1926 1927
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
1928
  thd->net.read_timeout = slave_net_timeout;
unknown's avatar
unknown committed
1929 1930
  thd->master_access= ~0;
  thd->priv_user = 0;
1931
  thd->slave_thread = 1;
1932
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
unknown's avatar
unknown committed
1933
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1934
  thd->real_id=pthread_self();
unknown's avatar
unknown committed
1935 1936 1937 1938
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

1939
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1940
  {
1941 1942
    thd->cleanup();
    delete thd;
unknown's avatar
unknown committed
1943 1944 1945
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
1946
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1947 1948 1949 1950 1951
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

1952
  if (thd->variables.max_join_size == HA_POS_ERROR)
unknown's avatar
unknown committed
1953 1954
    thd->options |= OPTION_BIG_SELECTS;

1955
  if (thd_type == SLAVE_THD_SQL)
1956
    thd->proc_info= "Waiting for the next event in slave queue";
1957
  else
1958
    thd->proc_info= "Waiting for master update";
unknown's avatar
unknown committed
1959 1960 1961 1962 1963
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
1964

1965 1966
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
unknown's avatar
unknown committed
1967
{
1968
  int nap_time;
unknown's avatar
unknown committed
1969 1970 1971 1972 1973
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

1974
  while ((nap_time= (int) (end_time - start_time)) > 0)
unknown's avatar
unknown committed
1975
  {
1976
    ALARM alarm_buff;
unknown's avatar
unknown committed
1977
    /*
1978
      The only reason we are asking for alarm is so that
unknown's avatar
unknown committed
1979 1980 1981
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
1982
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
unknown's avatar
unknown committed
1983
    sleep(nap_time);
1984
    thr_end_alarm(&alarmed);
unknown's avatar
unknown committed
1985
    
1986
    if ((*thread_killed)(thd,thread_killed_arg))
unknown's avatar
unknown committed
1987 1988 1989 1990 1991 1992
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

1993

unknown's avatar
unknown committed
1994 1995
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
unknown's avatar
unknown committed
1996
{
1997
  char buf[FN_REFLEN + 10];
unknown's avatar
unknown committed
1998 1999
  int len;
  int binlog_flags = 0; // for now
2000
  char* logname = mi->master_log_name;
2001 2002
  DBUG_ENTER("request_dump");

unknown's avatar
unknown committed
2003 2004
  // TODO if big log files: Change next to int8store()
  int4store(buf, (longlong) mi->master_log_pos);
unknown's avatar
unknown committed
2005
  int2store(buf + 4, binlog_flags);
2006
  int4store(buf + 6, server_id);
unknown's avatar
unknown committed
2007
  len = (uint) strlen(logname);
2008
  memcpy(buf + 10, logname,len);
unknown's avatar
unknown committed
2009 2010
  if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  {
unknown's avatar
unknown committed
2011 2012 2013 2014 2015
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
unknown's avatar
unknown committed
2016 2017 2018
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
2019 2020 2021 2022
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
		      mc_mysql_errno(mysql), mc_mysql_error(mysql),
		      master_connect_retry);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2023
  }
unknown's avatar
unknown committed
2024

2025
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2026 2027
}

2028

2029
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
unknown's avatar
unknown committed
2030 2031 2032
{
  char buf[1024];
  char * p = buf;
unknown's avatar
unknown committed
2033 2034
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
unknown's avatar
unknown committed
2035
  if (table_len + db_len > sizeof(buf) - 2)
unknown's avatar
unknown committed
2036 2037 2038 2039
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
unknown's avatar
unknown committed
2040 2041 2042 2043 2044 2045 2046
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
unknown's avatar
unknown committed
2047 2048 2049
  if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  {
    sql_print_error("request_table_dump: Error sending the table dump \
unknown's avatar
unknown committed
2050
command");
unknown's avatar
unknown committed
2051 2052
    return 1;
  }
unknown's avatar
unknown committed
2053 2054 2055 2056

  return 0;
}

2057

unknown's avatar
unknown committed
2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068
/*
  read one event from the master
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
2069

unknown's avatar
unknown committed
2070 2071 2072 2073 2074 2075 2076
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet

*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
unknown's avatar
unknown committed
2077
{
2078
  ulong len;
unknown's avatar
unknown committed
2079

2080
  *suppress_warnings= 0;
unknown's avatar
unknown committed
2081 2082 2083
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
2084 2085

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
unknown's avatar
unknown committed
2086
  */
unknown's avatar
unknown committed
2087
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2088
  if (disconnect_slave_event_count && !(events_till_disconnect--))
unknown's avatar
unknown committed
2089 2090 2091
    return packet_error;      
#endif
  
unknown's avatar
unknown committed
2092
  len = mc_net_safe_read(mysql);
2093
  if (len == packet_error || (long) len < 1)
unknown's avatar
unknown committed
2094
  {
unknown's avatar
unknown committed
2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
      sql_print_error("Error reading packet from server: %s (\
unknown's avatar
unknown committed
2106
server_errno=%d)",
unknown's avatar
unknown committed
2107
		      mc_mysql_error(mysql), mc_mysql_errno(mysql));
unknown's avatar
unknown committed
2108 2109 2110
    return packet_error;
  }

unknown's avatar
unknown committed
2111 2112
  if (len == 1)
  {
unknown's avatar
unknown committed
2113
     sql_print_error("Slave: received 0 length packet from server, apparent\
unknown's avatar
unknown committed
2114 2115
 master shutdown: %s",
		     mc_mysql_error(mysql));
unknown's avatar
unknown committed
2116
     return packet_error;
unknown's avatar
unknown committed
2117
  }
unknown's avatar
unknown committed
2118 2119
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
2120
		      len, mysql->net.read_pos[4]));
unknown's avatar
unknown committed
2121 2122 2123
  return len - 1;   
}

unknown's avatar
unknown committed
2124

2125
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
2126
{
unknown's avatar
unknown committed
2127 2128 2129 2130 2131 2132 2133
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), 
		"Slave: query '%s' partially completed on the master \
2134 2135
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
2136
 slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1;\
2137
 SLAVE START;", thd->query);
unknown's avatar
unknown committed
2138 2139 2140 2141 2142 2143
    rli->last_slave_errno = expected_error;
    sql_print_error("%s",rli->last_slave_error);
    return 1;
  default:
    return 0;
  }
2144
}
2145

2146

2147
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
unknown's avatar
unknown committed
2148
{
2149 2150 2151
  DBUG_ASSERT(rli->sql_thd==thd);
  Log_event * ev = next_event(rli);
  DBUG_ASSERT(rli->sql_thd==thd);
2152
  if (sql_slave_killed(thd,rli))
2153 2154 2155
  {
    /* do not forget to free ev ! */
    if (ev) delete ev;
2156
    return 1;
2157
  }
2158 2159
  if (ev)
  {
2160
    int type_code = ev->get_type_code();
2161
    int exec_res;
2162
    pthread_mutex_lock(&rli->data_lock);
2163 2164 2165 2166 2167 2168 2169 2170 2171

    /*
      Skip queries originating from this server or number of
      queries specified by the user in slave_skip_counter
      We can't however skip event's that has something to do with the
      log files themselves.
    */

    if (ev->server_id == (uint32) ::server_id ||
2172
	(rli->slave_skip_counter && type_code != ROTATE_EVENT))
unknown's avatar
unknown committed
2173
    {
2174 2175
      /* TODO: I/O thread should not even log events with the same server id */
      rli->inc_pos(ev->get_event_len(),
2176
		   type_code != STOP_EVENT ? ev->log_pos : LL(0),
2177 2178
		   1/* skip lock*/);
      flush_relay_log_info(rli);
unknown's avatar
unknown committed
2179 2180 2181 2182 2183 2184 2185 2186

      /*
	Protect against common user error of setting the counter to 1
	instead of 2 while recovering from an failed auto-increment insert
      */
      if (rli->slave_skip_counter && 
	  !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
	    rli->slave_skip_counter == 1))
2187 2188
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
2189 2190 2191
      delete ev;     
      return 0;					// avoid infinite update loops
    }
2192
    pthread_mutex_unlock(&rli->data_lock);
2193 2194
  
    thd->server_id = ev->server_id; // use the original server id for logging
unknown's avatar
unknown committed
2195
    thd->set_time();				// time the query
unknown's avatar
unknown committed
2196
    if (!ev->when)
unknown's avatar
unknown committed
2197
      ev->when = time(NULL);
2198
    ev->thd = thd;
2199 2200 2201
    thd->log_pos = ev->log_pos;
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
2202 2203
    delete ev;
    return exec_res;
2204
  }
unknown's avatar
unknown committed
2205
  else
2206
  {
unknown's avatar
unknown committed
2207
    sql_print_error("\
2208 2209 2210 2211 2212 2213 2214
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
or slave's MySQL code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
unknown's avatar
unknown committed
2215
");
2216 2217
    return 1;
  }
unknown's avatar
unknown committed
2218 2219
}

2220

2221
/* slave I/O thread */
2222
extern "C" pthread_handler_decl(handle_slave_io,arg)
unknown's avatar
unknown committed
2223
{
unknown's avatar
unknown committed
2224 2225 2226 2227 2228 2229 2230 2231
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
  MASTER_INFO *mi = (MASTER_INFO*)arg; 
  char llbuff[22];
  uint retry_count;
  
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
2232
  DBUG_ENTER("handle_slave_io");
unknown's avatar
unknown committed
2233

unknown's avatar
unknown committed
2234
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2235
slave_begin:  
unknown's avatar
unknown committed
2236
#endif  
2237
  DBUG_ASSERT(mi->inited);
unknown's avatar
unknown committed
2238 2239 2240
  mysql= NULL ;
  retry_count= 0;

2241
  pthread_mutex_lock(&mi->run_lock);
unknown's avatar
unknown committed
2242 2243 2244
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

2245
#ifndef DBUG_OFF  
2246
  mi->events_till_abort = abort_slave_event_count;
2247
#endif  
unknown's avatar
unknown committed
2248
  
2249
  thd= new THD; // note that contructor of THD uses DBUG_ !
2250
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2251 2252

  pthread_detach_this_thread();
2253
  if (init_slave_thread(thd, SLAVE_THD_IO))
unknown's avatar
unknown committed
2254 2255 2256 2257 2258 2259
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
2260
  mi->io_thd = thd;
unknown's avatar
unknown committed
2261
  thd->thread_stack = (char*)&thd; // remember where our stack is
2262
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
2263
  threads.append(thd);
2264
  pthread_mutex_unlock(&LOCK_thread_count);
2265 2266 2267
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
2268
  pthread_cond_broadcast(&mi->start_cond);
unknown's avatar
unknown committed
2269
  
2270 2271 2272
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
unknown's avatar
unknown committed
2273
  
unknown's avatar
unknown committed
2274
  if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
unknown's avatar
unknown committed
2275
  {
2276
    sql_print_error("Slave I/O thread: error in mc_mysql_init()");
unknown's avatar
unknown committed
2277 2278
    goto err;
  }
unknown's avatar
unknown committed
2279
  
unknown's avatar
unknown committed
2280

unknown's avatar
unknown committed
2281
  thd->proc_info = "connecting to master";
2282
  // we can get killed during safe_connect
2283
  if (!safe_connect(thd, mysql, mi))
unknown's avatar
unknown committed
2284
    sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
2285
  replication started in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
2286 2287 2288
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
2289
  else
unknown's avatar
unknown committed
2290
  {
2291
    sql_print_error("Slave I/O thread killed while connecting to master");
unknown's avatar
unknown committed
2292 2293
    goto err;
  }
2294

2295
connected:
2296

2297
  thd->slave_net = &mysql->net;
2298
  thd->proc_info = "Checking master version";
2299
  if (check_master_version(mysql, mi))
2300
    goto err;
2301
  if (!mi->old_format)
2302
  {
unknown's avatar
unknown committed
2303 2304 2305 2306 2307
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
2308
    thd->proc_info = "Registering slave on master";
2309
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
2310 2311
      goto err;
  }
unknown's avatar
unknown committed
2312
  
2313
  DBUG_PRINT("info",("Starting reading binary log from master"));
2314
  while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2315
  {
unknown's avatar
unknown committed
2316
    bool suppress_warnings= 0;    
unknown's avatar
unknown committed
2317
    thd->proc_info = "Requesting binlog dump";
unknown's avatar
unknown committed
2318
    if (request_dump(mysql, mi, &suppress_warnings))
unknown's avatar
unknown committed
2319 2320
    {
      sql_print_error("Failed on request_dump()");
unknown's avatar
unknown committed
2321
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2322 2323
      {
	sql_print_error("Slave I/O thread killed while requesting master \
unknown's avatar
unknown committed
2324
dump");
unknown's avatar
unknown committed
2325 2326
	goto err;
      }
unknown's avatar
unknown committed
2327
	  
unknown's avatar
unknown committed
2328 2329 2330 2331 2332 2333 2334
      thd->proc_info = "Waiiting to reconnect after a failed dump request";
      mc_end_server(mysql);
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
2335 2336 2337 2338
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
2339 2340
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
2341
      }
2342
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2343 2344
      {
	sql_print_error("Slave I/O thread killed while retrying master \
unknown's avatar
unknown committed
2345
dump");
unknown's avatar
unknown committed
2346 2347
	goto err;
      }
unknown's avatar
unknown committed
2348

unknown's avatar
unknown committed
2349
      thd->proc_info = "Reconnecting after a failed dump request";
unknown's avatar
unknown committed
2350 2351
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
2352
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2353 2354 2355
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2356 2357
      {
	sql_print_error("Slave I/O thread killed during or \
2358
after reconnect");
unknown's avatar
unknown committed
2359 2360
	goto err;
      }
unknown's avatar
unknown committed
2361

unknown's avatar
unknown committed
2362 2363
      goto connected;
    }
unknown's avatar
unknown committed
2364

2365
    while (!io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2366
    {
unknown's avatar
unknown committed
2367
      bool suppress_warnings= 0;    
unknown's avatar
unknown committed
2368
      thd->proc_info = "Reading master update";
unknown's avatar
unknown committed
2369
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
2370
      if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2371
      {
2372 2373
	if (global_system_variables.log_warnings)
	  sql_print_error("Slave I/O thread killed while reading event");
unknown's avatar
unknown committed
2374 2375
	goto err;
      }
2376
	  	  
unknown's avatar
unknown committed
2377 2378
      if (event_len == packet_error)
      {
2379 2380
	uint mysql_error_number= mc_mysql_errno(mysql);
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
unknown's avatar
unknown committed
2381
	{
2382 2383 2384 2385
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
unknown's avatar
unknown committed
2386
			  thd->variables.max_allowed_packet);
unknown's avatar
unknown committed
2387 2388
	  goto err;
	}
2389 2390 2391 2392 2393 2394
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
			  mc_mysql_error(mysql));
	  goto err;
	}
unknown's avatar
unknown committed
2395 2396
	thd->proc_info = "Waiting to reconnect after a failed read";
	mc_end_server(mysql);
2397 2398 2399 2400
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
2401
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
2402 2403
		     (void*) mi);
	}	    
2404
	if (io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2405
	{
2406 2407
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed while waiting to \
unknown's avatar
unknown committed
2408
reconnect after a failed read");
unknown's avatar
unknown committed
2409 2410 2411
	  goto err;
	}
	thd->proc_info = "Reconnecting after a failed read";
unknown's avatar
unknown committed
2412 2413
	if (!suppress_warnings)
	  sql_print_error("Slave I/O thread: Failed reading log event, \
2414
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
unknown's avatar
unknown committed
2415 2416 2417
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
unknown's avatar
unknown committed
2418
	{
2419 2420
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed during or after a \
unknown's avatar
unknown committed
2421
reconnect done to recover from failed read");
unknown's avatar
unknown committed
2422 2423 2424
	  goto err;
	}
	goto connected;
unknown's avatar
unknown committed
2425
      } // if (event_len == packet_error)
unknown's avatar
unknown committed
2426
	  
2427
      retry_count=0;			// ok event, reset retry counter
unknown's avatar
unknown committed
2428 2429 2430 2431
      thd->proc_info = "Queueing event from master";
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
2432
	sql_print_error("Slave I/O thread could not queue event from master");
unknown's avatar
unknown committed
2433 2434
	goto err;
      }
2435
      flush_master_info(mi);
2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447
      /*
        See if the relay logs take too much space.
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
        and does not introduce any problem:
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
        the clean value is 0), then we are reading only one more event as we
        should, and we'll block only at the next event. No big deal.
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
        the clean value is 1), then we are going into wait_for_relay_log_space()
        for no reason, but this function will do a clean read, notice the clean
        value and exit immediately.
      */
unknown's avatar
unknown committed
2448 2449
      DBUG_PRINT("info", ("ignore_log_space_limit=%d", (int)
                          mi->rli.ignore_log_space_limit)); 
unknown's avatar
unknown committed
2450
      if (mi->rli.log_space_limit && mi->rli.log_space_limit <
2451 2452
	  mi->rli.log_space_total &&
          !mi->rli.ignore_log_space_limit)
unknown's avatar
unknown committed
2453 2454 2455 2456 2457 2458
	if (wait_for_relay_log_space(&mi->rli))
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
unknown's avatar
unknown committed
2459
      // TODO: check debugging abort code
2460
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2461 2462 2463 2464 2465
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
2466
#endif
2467
    } 
2468
  }
unknown's avatar
unknown committed
2469

unknown's avatar
unknown committed
2470
  // error = 0;
unknown's avatar
unknown committed
2471
err:
2472 2473 2474
  // print the current replication position
  sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2475
  VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
2476
  thd->query = thd->db = 0; // extra safety
2477
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
unknown's avatar
unknown committed
2478 2479
  if (mysql)
  {
unknown's avatar
unknown committed
2480
    mc_mysql_close(mysql);
unknown's avatar
unknown committed
2481 2482
    mi->mysql=0;
  }
unknown's avatar
unknown committed
2483
  thd->proc_info = "Waiting for slave mutex on exit";
2484 2485 2486 2487
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
  // TODO: make rpl_status part of MASTER_INFO
2488
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2489 2490
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
unknown's avatar
unknown committed
2491
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2492
  pthread_mutex_lock(&LOCK_thread_count);
2493
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2494
  delete thd;
2495
  pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
2496
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
2497
  pthread_mutex_unlock(&mi->run_lock);
unknown's avatar
unknown committed
2498
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2499
  if (abort_slave_event_count && !events_till_abort)
unknown's avatar
unknown committed
2500 2501
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
2502
  my_thread_end();
unknown's avatar
unknown committed
2503 2504 2505 2506
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

unknown's avatar
unknown committed
2507

2508
/* slave SQL logic thread */
unknown's avatar
unknown committed
2509

2510
extern "C" pthread_handler_decl(handle_slave_sql,arg)
2511
{
2512
  THD *thd;			/* needs to be first for thread_stack */
2513 2514
  char llbuff[22],llbuff1[22];
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
unknown's avatar
unknown committed
2515 2516 2517 2518
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();
2519
  DBUG_ENTER("handle_slave_sql");
unknown's avatar
unknown committed
2520 2521 2522 2523 2524

#ifndef DBUG_OFF
slave_begin:  
#endif  

2525 2526 2527
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
unknown's avatar
unknown committed
2528
  errmsg= 0;
2529 2530 2531
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
2532

unknown's avatar
unknown committed
2533
  thd = new THD; // note that contructor of THD uses DBUG_ !
2534
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2535 2536 2537
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

2538 2539
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
unknown's avatar
unknown committed
2540 2541 2542 2543 2544 2545 2546 2547 2548 2549
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
2550
  rli->sql_thd= thd;
2551
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2552 2553
  thd->thread_stack = (char*)&thd; // remember where our stack is
  pthread_mutex_lock(&LOCK_thread_count);
2554
  threads.append(thd);
2555
  pthread_mutex_unlock(&LOCK_thread_count);
2556 2557 2558
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
2559
  pthread_cond_broadcast(&rli->start_cond);
unknown's avatar
unknown committed
2560 2561
  // This should always be set to 0 when the slave thread is started
  rli->pending = 0;
2562 2563

  //tell the I/O thread to take relay_log_space_limit into account from now on
2564
  pthread_mutex_lock(&rli->log_space_lock);
2565
  rli->ignore_log_space_limit= 0;
2566
  pthread_mutex_unlock(&rli->log_space_lock);
2567

2568 2569 2570 2571
  if (init_relay_log_pos(rli,
			 rli->relay_log_name,
			 rli->relay_log_pos,
			 1 /*need data lock*/, &errmsg))
2572 2573 2574 2575 2576
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
2577
  THD_CHECK_SENTRY(thd);
unknown's avatar
unknown committed
2578
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
2579 2580
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
  DBUG_ASSERT(rli->sql_thd == thd);
2581 2582 2583 2584

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
			    rli->master_log_name,
			    llstr(rli->master_log_pos,llbuff)));
2585 2586
  if (global_system_variables.log_warnings)
    sql_print_error("Slave SQL thread initialized, starting replication in \
unknown's avatar
unknown committed
2587
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2588 2589 2590 2591 2592
		    llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
		    llstr(rli->relay_log_pos,llbuff1));

  /* Read queries from the IO/THREAD until this thread is killed */

2593
  while (!sql_slave_killed(thd,rli))
2594 2595 2596
  {
    thd->proc_info = "Processing master log event"; 
    DBUG_ASSERT(rli->sql_thd == thd);
2597
    THD_CHECK_SENTRY(thd);
2598 2599 2600
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
2601
      if (!sql_slave_killed(thd,rli))
2602 2603
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
unknown's avatar
unknown committed
2604
the slave SQL thread with \"SLAVE START\". We stopped at log \
2605 2606 2607 2608
'%s' position %s",
		      RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
      goto err;
    }
2609
  }
2610

2611
  /* Thread stopped. Print the current replication position to the log */
2612 2613 2614
  sql_print_error("Slave SQL thread exiting, replication stopped in log \
 '%s' at position %s",
		  RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
2615 2616

 err:
2617
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2618
  thd->query = thd->db = 0; // extra safety
2619
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2620 2621
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
2622 2623
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  pthread_mutex_lock(&rli->data_lock);
2624
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
2625 2626 2627 2628 2629 2630
  /* When master_pos_wait() wakes up it will check this and terminate */
  rli->slave_running= 0; 
  /* Wake up master_pos_wait() */
  pthread_mutex_unlock(&rli->data_lock);
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  pthread_cond_broadcast(&rli->data_cond);
unknown's avatar
unknown committed
2631
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2632
  rli->save_temporary_tables = thd->temporary_tables;
unknown's avatar
unknown committed
2633 2634 2635 2636 2637

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
2638 2639 2640 2641
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
2642
  THD_CHECK_SENTRY(thd);
2643
  rli->sql_thd= 0;
2644
  pthread_mutex_lock(&LOCK_thread_count);
2645
  THD_CHECK_SENTRY(thd);
2646 2647 2648 2649 2650 2651 2652 2653 2654
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
unknown's avatar
unknown committed
2655
  my_thread_end(); // clean-up before broadcasting termination
2656 2657 2658
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}
unknown's avatar
unknown committed
2659

unknown's avatar
unknown committed
2660

unknown's avatar
unknown committed
2661 2662 2663 2664 2665 2666 2667
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
  THD* thd;
  NET* net = &mi->mysql->net;
unknown's avatar
unknown committed
2668
  DBUG_ENTER("process_io_create_file");
unknown's avatar
unknown committed
2669 2670

  if (unlikely(!cev->is_valid()))
unknown's avatar
unknown committed
2671
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2672 2673 2674 2675 2676 2677
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
unknown's avatar
unknown committed
2678
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2679 2680 2681 2682
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd = mi->io_thd;
  thd->file_id = cev->file_id = mi->file_id++;
2683
  thd->server_id = cev->server_id;
unknown's avatar
unknown committed
2684 2685 2686 2687 2688 2689 2690 2691 2692
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

unknown's avatar
unknown committed
2693 2694 2695 2696
  /*
    This dummy block is so we could instantiate Append_block_log_event
    once and then modify it slightly instead of doing it multiple times
    in the loop
unknown's avatar
unknown committed
2697 2698
  */
  {
2699
    Append_block_log_event aev(thd,0,0,0);
unknown's avatar
unknown committed
2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
	send_ok(net); /* 3.23 master wants it */
2712
	Execute_load_log_event xev(thd,0);
2713
	xev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2714 2715 2716 2717 2718 2719
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
2720
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
2721 2722 2723 2724 2725 2726
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
2727
	cev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2728 2729 2730 2731 2732 2733 2734
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
unknown's avatar
unknown committed
2735
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
unknown's avatar
unknown committed
2736 2737 2738 2739 2740
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
2741
	aev.log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2742 2743 2744 2745 2746 2747
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
unknown's avatar
unknown committed
2748
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
unknown's avatar
unknown committed
2749 2750 2751 2752 2753
      }
    }
  }
  error=0;
err:
unknown's avatar
unknown committed
2754
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2755
}
unknown's avatar
unknown committed
2756

unknown's avatar
unknown committed
2757
/*
unknown's avatar
unknown committed
2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
    Updates the master info and relay data with the place in the next binary
    log where we should start reading.

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
unknown's avatar
unknown committed
2775 2776
*/

unknown's avatar
unknown committed
2777
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
2778
{
unknown's avatar
unknown committed
2779
  int return_val= 1;
2780
  DBUG_ENTER("process_io_rotate");
unknown's avatar
unknown committed
2781
  safe_mutex_assert_owner(&mi->data_lock);
2782

unknown's avatar
unknown committed
2783
  if (unlikely(!rev->is_valid()))
2784
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2785 2786 2787 2788 2789

  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;
  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
2790
#ifndef DBUG_OFF
unknown's avatar
unknown committed
2791 2792 2793 2794 2795 2796
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
2797
#endif
2798
  DBUG_RETURN(0);
2799 2800
}

unknown's avatar
unknown committed
2801
/*
unknown's avatar
unknown committed
2802 2803 2804
  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master 
unknown's avatar
unknown committed
2805 2806 2807 2808
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
2809
{
unknown's avatar
unknown committed
2810
  const char *errmsg = 0;
unknown's avatar
unknown committed
2811 2812 2813 2814
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
2815 2816
  DBUG_ENTER("queue_old_event");

unknown's avatar
unknown committed
2817 2818 2819
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
unknown's avatar
unknown committed
2820 2821 2822 2823 2824 2825
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
unknown's avatar
unknown committed
2826
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2827 2828 2829 2830 2831
    }
    memcpy(tmp_buf,buf,event_len);
    tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
    buf = (const char*)tmp_buf;
  }
unknown's avatar
unknown committed
2832 2833
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
					    1 /*old format*/ );
2834
  if (unlikely(!ev))
2835 2836
  {
    sql_print_error("Read invalid event from master: '%s',\
unknown's avatar
unknown committed
2837
 master could be corrupt but a more likely cause of this is a bug",
2838
		    errmsg);
unknown's avatar
unknown committed
2839 2840
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
2841
  }
2842
  pthread_mutex_lock(&mi->data_lock);
2843
  ev->log_pos = mi->master_log_pos;
unknown's avatar
unknown committed
2844
  switch (ev->get_type_code()) {
unknown's avatar
unknown committed
2845 2846 2847 2848 2849
  case STOP_EVENT:
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event=0;
    inc_pos= event_len;
    break;
2850
  case ROTATE_EVENT:
2851
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2852 2853
    {
      delete ev;
2854
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2855
      DBUG_RETURN(1);
2856
    }
2857
    mi->ignore_stop_event=1;
unknown's avatar
unknown committed
2858
    inc_pos= 0;
2859
    break;
unknown's avatar
unknown committed
2860 2861
  case CREATE_FILE_EVENT:
  {
unknown's avatar
unknown committed
2862 2863
    /* We come here when and only when tmp_buf != 0 */
    DBUG_ASSERT(tmp_buf);
unknown's avatar
unknown committed
2864
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2865
    delete ev;
unknown's avatar
unknown committed
2866
    mi->master_log_pos += event_len;
2867
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2868
    pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2869
    my_free((char*)tmp_buf, MYF(0));
unknown's avatar
unknown committed
2870
    DBUG_RETURN(error);
unknown's avatar
unknown committed
2871
  }
2872
  default:
2873
    mi->ignore_stop_event=0;
unknown's avatar
unknown committed
2874
    inc_pos= event_len;
2875 2876
    break;
  }
unknown's avatar
unknown committed
2877
  if (likely(!ignore_event))
2878
  {
unknown's avatar
unknown committed
2879
    if (unlikely(rli->relay_log.append(ev)))
2880 2881 2882
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2883
      DBUG_RETURN(1);
2884
    }
unknown's avatar
unknown committed
2885
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2886 2887
  }
  delete ev;
unknown's avatar
unknown committed
2888
  mi->master_log_pos+= inc_pos;
2889
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2890
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2891
  DBUG_RETURN(0);
2892 2893
}

unknown's avatar
unknown committed
2894 2895 2896 2897 2898 2899
/*
  TODO: verify the issue with stop events, see if we need them at all
  in the relay log
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
2900
{
unknown's avatar
unknown committed
2901 2902 2903 2904
  int error= 0;
  ulong inc_pos;
  bool ignore_event= 0;
  RELAY_LOG_INFO *rli= &mi->rli;
unknown's avatar
unknown committed
2905 2906
  DBUG_ENTER("queue_event");

2907
  if (mi->old_format)
unknown's avatar
unknown committed
2908
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
2909 2910

  pthread_mutex_lock(&mi->data_lock);
unknown's avatar
unknown committed
2911

unknown's avatar
unknown committed
2912 2913 2914 2915 2916
  /*
    TODO: figure out if other events in addition to Rotate
    require special processing
  */
  switch (buf[EVENT_TYPE_OFFSET]) {
2917
  case STOP_EVENT:
unknown's avatar
unknown committed
2918 2919 2920
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2921
    break;
2922 2923 2924
  case ROTATE_EVENT:
  {
    Rotate_log_event rev(buf,event_len,0);
2925
    if (unlikely(process_io_rotate(mi,&rev)))
unknown's avatar
unknown committed
2926 2927
    {
      pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2928
      DBUG_RETURN(1);
unknown's avatar
unknown committed
2929 2930 2931
    }
    mi->ignore_stop_event= 1;
    inc_pos= 0;
2932 2933 2934
    break;
  }
  default:
unknown's avatar
unknown committed
2935 2936
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2937 2938 2939
    break;
  }
  
unknown's avatar
unknown committed
2940 2941
  if (likely(!ignore_event &&
	     !(error= rli->relay_log.appendv(buf,event_len,0))))
2942
  {
unknown's avatar
unknown committed
2943
    mi->master_log_pos+= inc_pos;
2944
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
unknown's avatar
unknown committed
2945
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2946
  }
2947
  pthread_mutex_unlock(&mi->data_lock);
unknown's avatar
unknown committed
2948
  DBUG_RETURN(error);
2949 2950
}

unknown's avatar
unknown committed
2951

2952 2953
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
2954 2955
  DBUG_ENTER("end_relay_log_info");

2956
  if (!rli->inited)
2957
    DBUG_VOID_RETURN;
2958
  if (rli->info_fd >= 0)
unknown's avatar
unknown committed
2959 2960
  {
    end_io_cache(&rli->info_file);
2961
    (void) my_close(rli->info_fd, MYF(MY_WME));
unknown's avatar
unknown committed
2962 2963
    rli->info_fd = -1;
  }
2964
  if (rli->cur_log_fd >= 0)
unknown's avatar
unknown committed
2965 2966 2967 2968 2969
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
2970 2971
  rli->inited = 0;
  rli->relay_log.close(1);
2972
  DBUG_VOID_RETURN;
2973 2974 2975
}

/* try to connect until successful or slave killed */
2976
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
2977
{
unknown's avatar
unknown committed
2978
  return connect_to_master(thd, mysql, mi, 0, 0);
unknown's avatar
unknown committed
2979 2980
}

unknown's avatar
unknown committed
2981

2982 2983 2984 2985
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/
unknown's avatar
unknown committed
2986

unknown's avatar
unknown committed
2987
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
unknown's avatar
unknown committed
2988
			     bool reconnect, bool suppress_warnings)
unknown's avatar
unknown committed
2989
{
2990
  int slave_was_killed;
2991 2992
  int last_errno= -2;				// impossible error
  ulong err_count=0;
unknown's avatar
unknown committed
2993
  char llbuff[22];
2994
  DBUG_ENTER("connect_to_master");
unknown's avatar
unknown committed
2995

unknown's avatar
unknown committed
2996 2997 2998
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
2999 3000 3001 3002
  uint client_flag=0;
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

3003
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3004
	 (reconnect ? mc_mysql_reconnect(mysql) != 0:
unknown's avatar
unknown committed
3005
	  !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
3006
			    mi->port, 0, client_flag,
unknown's avatar
unknown committed
3007
			    thd->variables.net_read_timeout)))
unknown's avatar
unknown committed
3008
  {
3009 3010 3011
    /* Don't repeat last error */
    if (mc_mysql_errno(mysql) != last_errno)
    {
3012
      last_errno=mc_mysql_errno(mysql);
unknown's avatar
unknown committed
3013
      suppress_warnings= 0;
3014
      sql_print_error("Slave I/O thread: error %s to master \
3015
'%s@%s:%d': \
3016
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
3017
		      (reconnect ? "reconnecting" : "connecting"),
3018
		      mi->user,mi->host,mi->port,
3019
		      mc_mysql_error(mysql), last_errno,
3020 3021
		      mi->connect_retry,
		      master_retry_count);
3022
    }
unknown's avatar
unknown committed
3023 3024 3025
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
3026
      do not want to have election triggered on the first failure to
unknown's avatar
unknown committed
3027
      connect
3028
    */
3029
    if (++err_count == master_retry_count)
3030 3031
    {
      slave_was_killed=1;
unknown's avatar
unknown committed
3032 3033
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3034 3035
      break;
    }
3036 3037
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
unknown's avatar
unknown committed
3038
  }
3039

3040 3041
  if (!slave_was_killed)
  {
unknown's avatar
unknown committed
3042
    if (reconnect)
unknown's avatar
unknown committed
3043
    { 
3044
      if (!suppress_warnings && global_system_variables.log_warnings)
unknown's avatar
unknown committed
3045
	sql_print_error("Slave: connected to master '%s@%s:%d',\
3046
replication resumed in log '%s' at position %s", mi->user,
unknown's avatar
unknown committed
3047 3048 3049 3050
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
unknown's avatar
unknown committed
3051 3052 3053 3054
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
unknown's avatar
unknown committed
3055
		      mi->user, mi->host, mi->port);
unknown's avatar
unknown committed
3056
    }
3057
#ifdef SIGNAL_WITH_VIO_CLOSE
3058
    thd->set_active_vio(mysql->net.vio);
3059
#endif      
3060
  }
3061 3062
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
unknown's avatar
unknown committed
3063 3064
}

unknown's avatar
unknown committed
3065

unknown's avatar
unknown committed
3066 3067 3068 3069 3070
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/

unknown's avatar
unknown committed
3071 3072
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
unknown's avatar
unknown committed
3073
{
unknown's avatar
unknown committed
3074
  return connect_to_master(thd, mysql, mi, 1, suppress_warnings);
unknown's avatar
unknown committed
3075 3076
}

unknown's avatar
unknown committed
3077

3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
3108
{
3109 3110 3111 3112 3113 3114 3115 3116
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

  /* sql_thd is not set when calling from init_slave() */
  if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
    return 0;					// Wait for COMMIT

3117
  my_b_seek(file, 0L);
3118 3119 3120 3121 3122 3123 3124 3125
  pos=strmov(buff, rli->relay_log_name);
  *pos++='\n';
  pos=longlong2str(rli->relay_log_pos, pos, 10);
  *pos++='\n';
  pos=strmov(pos, rli->master_log_name);
  *pos++='\n';
  pos=longlong2str(rli->master_log_pos, pos, 10);
  *pos='\n';
3126
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
3127 3128 3129 3130 3131 3132
    error=1;
  if (flush_io_cache(file))
    error=1;
  if (flush_io_cache(rli->cur_log))		// QQ Why this call ?
    error=1;
  return error;
3133 3134
}

unknown's avatar
unknown committed
3135 3136 3137 3138 3139 3140 3141

/*
  This function is called when we notice that the current "hot" log
  got rotated under our feet.
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
3142 3143 3144
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
unknown's avatar
unknown committed
3145 3146 3147
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3148
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
unknown's avatar
unknown committed
3149
				   errmsg)) <0)
unknown's avatar
unknown committed
3150
    DBUG_RETURN(0);
3151 3152 3153 3154 3155 3156
  /*
    We want to start exactly where we was before:
    relay_log_pos	Current log pos
    pending		Number of bytes already processed from the event
  */
  my_b_seek(cur_log,rli->relay_log_pos + rli->pending);
unknown's avatar
unknown committed
3157
  DBUG_RETURN(cur_log);
3158 3159
}

unknown's avatar
unknown committed
3160

3161 3162 3163 3164 3165 3166 3167
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
unknown's avatar
unknown committed
3168
  DBUG_ENTER("next_event");
3169 3170
  DBUG_ASSERT(thd != 0);

unknown's avatar
unknown committed
3171 3172 3173 3174 3175 3176 3177
  /*
    For most operations we need to protect rli members with data_lock,
    so we will hold it for the most of the loop below
    However, we will release it whenever it is worth the hassle, 
    and in the cases when we go into a pthread_cond_wait() with the
    non-data_lock mutex
  */
3178 3179
  pthread_mutex_lock(&rli->data_lock);
  
3180
  while (!sql_slave_killed(thd,rli))
unknown's avatar
unknown committed
3181 3182 3183
  {
    /*
      We can have two kinds of log reading:
unknown's avatar
unknown committed
3184 3185 3186 3187 3188 3189 3190 3191
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
unknown's avatar
unknown committed
3192
    */
3193 3194 3195 3196 3197
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
unknown's avatar
unknown committed
3198 3199

      /*
unknown's avatar
unknown committed
3200
	Reading xxx_file_id is safe because the log will only
unknown's avatar
unknown committed
3201 3202
	be rotated when we hold relay_log.LOCK_log
      */
unknown's avatar
unknown committed
3203
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3204
      {
unknown's avatar
unknown committed
3205 3206 3207 3208
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
3209
	  goto err;
unknown's avatar
unknown committed
3210
	hot_log=0;				// Using old binary log
3211 3212
      }
    }
unknown's avatar
unknown committed
3213
    DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3214
    DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
unknown's avatar
unknown committed
3215 3216 3217
    /*
      Relay log is always in new format - if the master is 3.23, the
      I/O thread will convert the format for us
unknown's avatar
unknown committed
3218
    */
unknown's avatar
unknown committed
3219
    if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
3220 3221 3222 3223 3224
    {
      DBUG_ASSERT(thd==rli->sql_thd);
      if (hot_log)
	pthread_mutex_unlock(log_lock);
      pthread_mutex_unlock(&rli->data_lock);
unknown's avatar
unknown committed
3225
      DBUG_RETURN(ev);
3226 3227
    }
    DBUG_ASSERT(thd==rli->sql_thd);
unknown's avatar
unknown committed
3228
    if (opt_reckless_slave)			// For mysql-test
unknown's avatar
unknown committed
3229
      cur_log->error = 0;
unknown's avatar
unknown committed
3230
    if (cur_log->error < 0)
unknown's avatar
unknown committed
3231 3232
    {
      errmsg = "slave SQL thread aborted because of I/O error";
unknown's avatar
unknown committed
3233 3234
      if (hot_log)
	pthread_mutex_unlock(log_lock);
unknown's avatar
unknown committed
3235 3236
      goto err;
    }
3237 3238
    if (!cur_log->error) /* EOF */
    {
unknown's avatar
unknown committed
3239 3240 3241 3242 3243
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
3244 3245
      if (hot_log)
      {
unknown's avatar
unknown committed
3246
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
unknown's avatar
unknown committed
3247 3248 3249 3250
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
3251
	pthread_mutex_unlock(&rli->data_lock);
3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273

        /*
          Possible deadlock : 
          - the I/O thread has reached log_space_limit
          - the SQL thread has read all relay logs, but cannot purge for some
          reason:
            * it has already purged all logs except the current one
            * there are other logs than the current one but they're involved in
            a transaction that finishes in the current one (or is not finished)
          Solution :
          Wake up the possibly waiting I/O thread, and set a boolean asking
          the I/O thread to temporarily ignore the log_space_limit
          constraint, because we do not want the I/O thread to block because of
          space (it's ok if it blocks for any other reason (e.g. because the
          master does not send anything). Then the I/O thread stops waiting 
          and reads more events.
          The SQL thread decides when the I/O thread should take log_space_limit
          into account again : ignore_log_space_limit is reset to 0 
          in purge_first_log (when the SQL thread purges the just-read relay
          log), and also when the SQL thread starts. We should also reset
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
          fact, no need as RESET SLAVE requires that the slave
unknown's avatar
unknown committed
3274 3275
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
          it stops.
3276 3277 3278 3279
        */
        pthread_mutex_lock(&rli->log_space_lock);
        // prevent the I/O thread from blocking next times
        rli->ignore_log_space_limit= 1; 
3280 3281 3282 3283 3284 3285
        /*
          If the I/O thread is blocked, unblock it.
          Ok to broadcast after unlock, because the mutex is only destroyed in
          ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
          destroyed before we exit the present function.
        */
3286
        pthread_mutex_unlock(&rli->log_space_lock);
3287
        pthread_cond_broadcast(&rli->log_space_cond);
3288 3289 3290 3291
        // Note that wait_for_update unlocks lock_log !
        rli->relay_log.wait_for_update(rli->sql_thd);
        // re-acquire data lock since we released it earlier
        pthread_mutex_lock(&rli->data_lock);
3292 3293
	continue;
      }
unknown's avatar
unknown committed
3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
      /*
	TODO: make skip_log_purge a start-up option. At this point this
	is not critical priority
      */
      if (!rli->skip_log_purge)
      {
	// purge_first_log will properly set up relay log coordinates in rli
	if (rli->relay_log.purge_first_log(rli))
	{
	  errmsg = "Error purging processed log";
	  goto err;
	}
      }
3317 3318
      else
      {
unknown's avatar
unknown committed
3319
	/*
3320 3321 3322 3323 3324
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
unknown's avatar
unknown committed
3325
	*/
3326
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3327
	{
unknown's avatar
unknown committed
3328 3329
	  errmsg = "error switching to the next log";
	  goto err;
3330
	}
unknown's avatar
unknown committed
3331 3332
	rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
	rli->pending=0;
3333 3334
	strmake(rli->relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->relay_log_name)-1);
unknown's avatar
unknown committed
3335 3336
	flush_relay_log_info(rli);
      }
3337
	
unknown's avatar
unknown committed
3338 3339 3340
      // next log is hot 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
3341
#ifdef EXTRA_DEBUG
unknown's avatar
unknown committed
3342 3343
	sql_print_error("next log '%s' is currently active",
			rli->linfo.log_file_name);
3344
#endif	  
unknown's avatar
unknown committed
3345 3346 3347
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
3348
	  
unknown's avatar
unknown committed
3349
	/*
unknown's avatar
unknown committed
3350 3351
	  Read pointer has to be at the start since we are the only
	  reader
unknown's avatar
unknown committed
3352
	*/
unknown's avatar
unknown committed
3353
	if (check_binlog_magic(cur_log,&errmsg))
3354
	  goto err;
unknown's avatar
unknown committed
3355
	continue;
3356
      }
unknown's avatar
unknown committed
3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368
      /*
	if we get here, the log was not hot, so we will have to
	open it ourselves
      */
#ifdef EXTRA_DEBUG
      sql_print_error("next log '%s' is not active",
		      rli->linfo.log_file_name);
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
3369
    }
unknown's avatar
unknown committed
3370
    else
3371
    {
unknown's avatar
unknown committed
3372 3373 3374 3375 3376 3377
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
3378
      sql_print_error("Slave SQL thread: I/O error reading \
unknown's avatar
unknown committed
3379
event(errno: %d  cur_log->error: %d)",
3380
		      my_errno,cur_log->error);
unknown's avatar
unknown committed
3381 3382
      // set read position to the beginning of the event
      my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
unknown's avatar
unknown committed
3383 3384
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
3385
      break;					// To end of function
3386 3387
    }
  }
3388
  if (!errmsg && global_system_variables.log_warnings)
3389
    errmsg = "slave SQL thread was killed";
unknown's avatar
unknown committed
3390

3391 3392
err:
  pthread_mutex_unlock(&rli->data_lock);
3393 3394
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
unknown's avatar
unknown committed
3395
  DBUG_RETURN(0);
3396 3397
}

unknown's avatar
unknown committed
3398

unknown's avatar
unknown committed
3399 3400
#ifdef __GNUC__
template class I_List_iterator<i_string>;
unknown's avatar
unknown committed
3401
template class I_List_iterator<i_string_pair>;
unknown's avatar
unknown committed
3402
#endif