repl_failsafe.cc 26.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
18 19
#ifdef HAVE_REPLICATION

20
#include "repl_failsafe.h"
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
21 22
#include "sql_repl.h"
#include "slave.h"
23
#include "log_event.h"
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
24
#include <mysql.h>
25 26 27 28

#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)

29 30 31 32

RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status;
33
HASH slave_list;
34

35 36
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
37
			    rpl_role_type, NULL};
38

39 40 41 42 43
const char* rpl_status_type[]=
{
  "AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", "LOST_SOLDIER","TROOP_SOLDIER",
  "RECOVERY_CAPTAIN","NULL",NullS
};
44
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
45
			     rpl_status_type, NULL};
46

47

48 49 50 51
static Slave_log_event* find_slave_event(IO_CACHE* log,
					 const char* log_file_name,
					 char* errmsg);

52 53 54 55 56 57 58
/*
  All of the functions defined in this file which are not used (the ones to
  handle failsafe) are not used; their code has not been updated for more than
  one year now so should be considered as BADLY BROKEN. Do not enable it.
  The used functions (to handle LOAD DATA FROM MASTER, plus some small
  functions like register_slave()) are working.
*/
59

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
60 61 62
static int init_failsafe_rpl_thread(THD* thd)
{
  DBUG_ENTER("init_failsafe_rpl_thread");
63 64 65 66 67
  /*
    thd->bootstrap is to report errors barely to stderr; if this code is
    enable again one day, one should check if bootstrap is still needed (maybe
    this thread has no other error reporting method).
  */
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
68
  thd->system_thread = thd->bootstrap = 1;
69
  thd->host_or_ip= "";
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
70 71
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
72 73
  thd->net.read_timeout = slave_net_timeout;
  thd->max_client_packet_length=thd->net.max_packet;
74
  thd->master_access= ~(ulong)0;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
75 76 77 78 79
  thd->priv_user = 0;
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

80
  if (init_thr_lock() || thd->store_globals())
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
81
  {
82
    close_connection(thd, ER_OUT_OF_RESOURCES, 1); // is this needed?
83
    statistic_increment(aborted_connects,&LOCK_status);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
84 85 86 87
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
88
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
89 90 91 92 93
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

94
  thd->mem_root->free= thd->mem_root->used= 0;
95
  if (thd->variables.max_join_size == HA_POS_ERROR)
96
    thd->options|= OPTION_BIG_SELECTS;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
97 98 99 100 101 102 103

  thd->proc_info="Thread initialized";
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

104

105 106 107 108 109
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
  pthread_mutex_lock(&LOCK_rpl_status);
  if (rpl_status == from_status || rpl_status == RPL_ANY)
    rpl_status = to_status;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
110 111 112 113
  pthread_cond_signal(&COND_rpl_status);
  pthread_mutex_unlock(&LOCK_rpl_status);
}

114

115 116 117 118 119 120 121 122 123
#define get_object(p, obj) \
{\
  uint len = (uint)*p++;  \
  if (p + len > p_end || len >= sizeof(obj)) \
    goto err; \
  strmake(obj,(char*) p,len); \
  p+= len; \
}\

124

125 126 127 128 129 130
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
{
  return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
			mi->pos);
}

131

132 133 134 135
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
  if (thd->server_id)
  {
136 137 138
    if (need_mutex)
      pthread_mutex_lock(&LOCK_slave_list);

139 140 141 142 143
    SLAVE_INFO* old_si;
    if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
					   (byte*)&thd->server_id, 4)) &&
	(!only_mine || old_si->thd == thd))
    hash_delete(&slave_list, (byte*)old_si);
144 145 146

    if (need_mutex)
      pthread_mutex_unlock(&LOCK_slave_list);
147 148 149
  }
}

150

151 152 153 154 155 156 157 158
/*
  Register slave in 'slave_list' hash table

  RETURN VALUES
  0	ok
  1	Error.   Error message sent to client
*/

159 160
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
161
  int res;
162
  SLAVE_INFO *si;
163
  uchar *p= packet, *p_end= packet + packet_length;
164

165
  if (check_access(thd, REPL_SLAVE_ACL, any_db,0,0,0,0))
166 167
    return 1;
  if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
168
    goto err2;
169

170 171
  thd->server_id= si->server_id= uint4korr(p);
  p+= 4;
172 173 174
  get_object(p,si->host);
  get_object(p,si->user);
  get_object(p,si->password);
175 176 177
  if (p+10 > p_end)
    goto err;
  si->port= uint2korr(p);
178
  p += 2;
179
  si->rpl_recovery_rank= uint4korr(p);
180
  p += 4;
181 182 183
  if (!(si->master_id= uint4korr(p)))
    si->master_id= server_id;
  si->thd= thd;
184

185
  pthread_mutex_lock(&LOCK_slave_list);
186
  unregister_slave(thd,0,0);
hf@deer.(none)'s avatar
SCRUM  
hf@deer.(none) committed
187
  res= my_hash_insert(&slave_list, (byte*) si);
188 189 190 191
  pthread_mutex_unlock(&LOCK_slave_list);
  return res;

err:
192 193 194 195 196
  my_free((gptr) si, MYF(MY_WME));
  my_message(ER_UNKNOWN_ERROR, "Wrong parameters to function register_slave",
	     MYF(0));
err2:
  return 1;
197 198
}

199
extern "C" uint32
200 201
*slave_list_key(SLAVE_INFO* si, uint* len,
		my_bool not_used __attribute__((unused)))
202 203 204 205 206
{
  *len = 4;
  return &si->server_id;
}

207
extern "C" void slave_info_free(void *s)
208 209 210 211 212 213
{
  my_free((gptr) s, MYF(MY_WME));
}

void init_slave_list()
{
214
  hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
215
	    (hash_get_key) slave_list_key, (hash_free_key) slave_info_free, 0);
216 217 218 219 220
  pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}

void end_slave_list()
{
221 222 223 224 225 226
  /* No protection by a mutex needed as we are only called at shutdown */
  if (hash_inited(&slave_list))
  {
    hash_free(&slave_list);
    pthread_mutex_destroy(&LOCK_slave_list);
  }
227 228
}

229
static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg)
230
{
231
  my_off_t log_pos =	    (my_off_t) mi->pos;
232 233 234 235 236
  uint32 target_server_id = mi->server_id;

  for (;;)
  {
    Log_event* ev;
237
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0)))
238 239 240 241 242 243 244 245 246 247
    {
      if (log->error > 0)
	strmov(errmsg, "Binary log truncated in the middle of event");
      else if (log->error < 0)
	strmov(errmsg, "I/O error reading binary log");
      else
	strmov(errmsg, "Could not find target event in the binary log");
      return 1;
    }

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
248
    if (ev->log_pos >= log_pos && ev->server_id == target_server_id)
249 250 251 252 253 254 255
    {
      delete ev;
      mi->pos = my_b_tell(log);
      return 0;
    }
    delete ev;
  }
256
  /* Impossible */
257 258
}

259 260 261 262 263 264
/* 
  Before 4.0.15 we had a member of THD called log_pos, it was meant for
  failsafe replication code in repl_failsafe.cc which is disabled until
  it is reworked. Event's log_pos used to be preserved through 
  log-slave-updates to make code in repl_failsafe.cc work (this 
  function, SHOW NEW MASTER); but on the other side it caused unexpected
265
  values in Exec_Master_Log_Pos in A->B->C replication setup, 
266 267 268 269 270
  synchronization problems in master_pos_wait(), ... So we 
  (Dmitri & Guilhem) removed it.
  
  So for now this function is broken. 
*/
271 272 273 274

int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
{
  LOG_INFO linfo;
275
  char last_log_name[FN_REFLEN];
276 277 278 279 280 281 282 283 284
  IO_CACHE log;
  File file = -1, last_file = -1;
  pthread_mutex_t *log_lock;
  const char* errmsg_p;
  Slave_log_event* sev = 0;
  my_off_t last_pos = 0;
  int error = 1;
  int cmp_res;
  LINT_INIT(cmp_res);
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
285
  DBUG_ENTER("translate_master");
286 287 288 289

  if (!mysql_bin_log.is_open())
  {
    strmov(errmsg,"Binary log is not open");
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
290
    DBUG_RETURN(1);
291 292 293 294 295
  }

  if (!server_id_supplied)
  {
    strmov(errmsg, "Misconfigured master - server id was not set");
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
296
    DBUG_RETURN(1);
297 298
  }

299
  if (mysql_bin_log.find_log_pos(&linfo, NullS, 1))
300 301
  {
    strmov(errmsg,"Could not find first log");
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
302
    DBUG_RETURN(1);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
  }
  thd->current_linfo = &linfo;

  bzero((char*) &log,sizeof(log));
  log_lock = mysql_bin_log.get_log_lock();
  pthread_mutex_lock(log_lock);

  for (;;)
  {
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
    {
      strmov(errmsg, errmsg_p);
      goto err;
    }

    if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
      goto err;

    cmp_res = cmp_master_pos(sev, mi);
    delete sev;

    if (!cmp_res)
    {
      /* Copy basename */
      fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
      mi->pos = my_b_tell(&log);
      goto mi_inited;
    }
    else if (cmp_res > 0)
    {
      if (!last_pos)
      {
	strmov(errmsg,
	       "Slave event in first log points past the target position");
	goto err;
      }
      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
      if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
			MYF(MY_WME)))
      {
	errmsg[0] = 0;
	goto err;
      }
      break;
    }

    strmov(last_log_name, linfo.log_file_name);
    last_pos = my_b_tell(&log);

353
    switch (mysql_bin_log.find_next_log(&linfo, 1)) {
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
    case LOG_INFO_EOF:
      if (last_file >= 0)
       (void)my_close(last_file, MYF(MY_WME));
      last_file = -1;
      goto found_log;
    case 0:
      break;
    default:
      strmov(errmsg, "Error reading log index");
      goto err;
    }

    end_io_cache(&log);
    if (last_file >= 0)
     (void) my_close(last_file, MYF(MY_WME));
    last_file = file;
  }

found_log:
  my_b_seek(&log, last_pos);
  if (find_target_pos(mi,&log,errmsg))
    goto err;
  fn_format(mi->log_file_name, last_log_name, "","",1);  /* Copy basename */

mi_inited:
  error = 0;
err:
  pthread_mutex_unlock(log_lock);
  end_io_cache(&log);
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
  if (last_file >= 0 && last_file != file)
    (void) my_close(last_file, MYF(MY_WME));

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
391
  DBUG_RETURN(error);
392 393
}

394 395 396 397 398

/*
  Caller must delete result when done
*/

399 400 401 402 403 404 405 406
static Slave_log_event* find_slave_event(IO_CACHE* log,
					 const char* log_file_name,
					 char* errmsg)
{
  Log_event* ev;
  int i;
  bool slave_event_found = 0;
  LINT_INIT(ev);
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
407

408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  for (i = 0; i < 2; i++)
  {
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
    {
      my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
		  "Error reading event in log '%s'",
		  (char*)log_file_name);
      return 0;
    }
    if (ev->get_type_code() == SLAVE_EVENT)
    {
      slave_event_found = 1;
      break;
    }
    delete ev;
  }
  if (!slave_event_found)
  {
    my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
		"Could not find slave event in log '%s'",
		(char*)log_file_name);
    return 0;
  }

  return (Slave_log_event*)ev;
}

435 436 437
/*
   This function is broken now. See comment for translate_master().
 */
438

439
bool show_new_master(THD* thd)
440
{
441
  Protocol *protocol= thd->protocol;
442 443 444
  DBUG_ENTER("show_new_master");
  List<Item> field_list;
  char errmsg[SLAVE_ERRMSG_SIZE];
445
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
446 447 448 449 450

  errmsg[0]=0;					// Safety
  if (translate_master(thd, lex_mi, errmsg))
  {
    if (errmsg[0])
451 452
      my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
               "SHOW NEW MASTER", errmsg);
453
    DBUG_RETURN(TRUE);
454 455 456 457
  }
  else
  {
    field_list.push_back(new Item_empty_string("Log_name", 20));
458 459
    field_list.push_back(new Item_return_int("Log_pos", 10,
					     MYSQL_TYPE_LONGLONG));
460 461
    if (protocol->send_fields(&field_list,
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
462
      DBUG_RETURN(TRUE);
463
    protocol->prepare_for_resend();
464
    protocol->store(lex_mi->log_file_name, &my_charset_bin);
465 466
    protocol->store((ulonglong) lex_mi->pos);
    if (protocol->write())
467
      DBUG_RETURN(TRUE);
468
    send_eof(thd);
469
    DBUG_RETURN(FALSE);
470 471 472
  }
}

473 474
/*
  Asks the master for the list of its other connected slaves.
475 476 477 478 479
  This is for failsafe replication: 
  in order for failsafe replication to work, the servers involved in
  replication must know of each other. We accomplish this by having each
  slave report to the master how to reach it, and on connection, each
  slave receives information about where the other slaves are.
480 481 482 483 484 485 486 487 488 489 490

  SYNOPSIS
    update_slave_list()
    mysql           pre-existing connection to the master
    mi              master info

  NOTES
    mi is used only to give detailed error messages which include the
    hostname/port of the master, the username used by the slave to connect to
    the master.
    If the user used by the slave to connect to the master does not have the
491 492
    REPLICATION SLAVE privilege, it will pop in this function because
    SHOW SLAVE HOSTS will fail on the master.
493 494 495 496 497

  RETURN VALUES
    1           error
    0           success
 */
498

499
int update_slave_list(MYSQL* mysql, MASTER_INFO* mi)
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
500 501 502 503 504 505
{
  MYSQL_RES* res=0;
  MYSQL_ROW row;
  const char* error=0;
  bool have_auth_info;
  int port_ind;
506
  DBUG_ENTER("update_slave_list");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
507

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
508 509
  if (mysql_real_query(mysql,"SHOW SLAVE HOSTS",16) ||
      !(res = mysql_store_result(mysql)))
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
510
  {
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
511
    error= mysql_error(mysql);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
512 513 514
    goto err;
  }

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
515
  switch (mysql_num_fields(res)) {
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
516 517 518 519 520 521 522 523 524
  case 5:
    have_auth_info = 0;
    port_ind=2;
    break;
  case 7:
    have_auth_info = 1;
    port_ind=4;
    break;
  default:
525 526
    error= "the master returned an invalid number of fields for SHOW SLAVE \
HOSTS";
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
527 528 529 530 531
    goto err;
  }

  pthread_mutex_lock(&LOCK_slave_list);

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
532
  while ((row= mysql_fetch_row(res)))
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
533 534 535 536
  {
    uint32 server_id;
    SLAVE_INFO* si, *old_si;
    server_id = atoi(row[0]);
537 538
    if ((old_si= (SLAVE_INFO*)hash_search(&slave_list,
					  (byte*)&server_id,4)))
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
539 540 541 542 543
      si = old_si;
    else
    {
      if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
      {
544
	error= "the slave is out of memory";
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
545 546 547 548
	pthread_mutex_unlock(&LOCK_slave_list);
	goto err;
      }
      si->server_id = server_id;
hf@deer.(none)'s avatar
SCRUM  
hf@deer.(none) committed
549
      my_hash_insert(&slave_list, (byte*)si);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
550
    }
551
    strmake(si->host, row[1], sizeof(si->host)-1);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
552 553 554 555 556
    si->port = atoi(row[port_ind]);
    si->rpl_recovery_rank = atoi(row[port_ind+1]);
    si->master_id = atoi(row[port_ind+2]);
    if (have_auth_info)
    {
557 558
      strmake(si->user, row[2], sizeof(si->user)-1);
      strmake(si->password, row[3], sizeof(si->password)-1);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
559 560 561
    }
  }
  pthread_mutex_unlock(&LOCK_slave_list);
562

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
563 564
err:
  if (res)
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
565
    mysql_free_result(res);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
566 567
  if (error)
  {
568 569 570
    sql_print_error("While trying to obtain the list of slaves from the master \
'%s:%d', user '%s' got the following error: '%s'", 
                    mi->host, mi->port, mi->user, error);
571
    DBUG_RETURN(1);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
572
  }
573
  DBUG_RETURN(0);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
574 575
}

576

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
577 578 579 580 581
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
  return 0;
}

582

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
583 584 585 586 587 588
pthread_handler_decl(handle_failsafe_rpl,arg)
{
  DBUG_ENTER("handle_failsafe_rpl");
  THD *thd = new THD;
  thd->thread_stack = (char*)&thd;
  MYSQL* recovery_captain = 0;
guilhem@mysql.com's avatar
guilhem@mysql.com committed
589 590
  const char* msg;

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
591
  pthread_detach_this_thread();
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
592
  if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mysql_init(0)))
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
593 594 595 596 597
  {
    sql_print_error("Could not initialize failsafe replication thread");
    goto err;
  }
  pthread_mutex_lock(&LOCK_rpl_status);
guilhem@mysql.com's avatar
guilhem@mysql.com committed
598 599
  msg= thd->enter_cond(&COND_rpl_status,
                       &LOCK_rpl_status, "Waiting for request");
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
600 601 602 603
  while (!thd->killed && !abort_loop)
  {
    bool break_req_chain = 0;
    pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
604
    thd->proc_info="Processing request";
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
605 606
    while (!break_req_chain)
    {
607
      switch (rpl_status) {
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
608 609 610 611 612 613 614 615 616 617 618 619 620
      case RPL_LOST_SOLDIER:
	if (find_recovery_captain(thd, recovery_captain))
	  rpl_status=RPL_TROOP_SOLDIER;
	else
	  rpl_status=RPL_RECOVERY_CAPTAIN;
	break_req_chain=1; /* for now until other states are implemented */
	break;
      default:
	break_req_chain=1;
	break;
      }
    }
  }
guilhem@mysql.com's avatar
guilhem@mysql.com committed
621
  thd->exit_cond(msg);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
622 623
err:
  if (recovery_captain)
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
624
    mysql_close(recovery_captain);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
625 626 627 628
  delete thd;
  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
629
}
630

631

632
bool show_slave_hosts(THD* thd)
633 634
{
  List<Item> field_list;
635
  Protocol *protocol= thd->protocol;
636 637
  DBUG_ENTER("show_slave_hosts");

638 639
  field_list.push_back(new Item_return_int("Server_id", 10,
					   MYSQL_TYPE_LONG));
640 641 642 643 644 645
  field_list.push_back(new Item_empty_string("Host", 20));
  if (opt_show_slave_auth_info)
  {
    field_list.push_back(new Item_empty_string("User",20));
    field_list.push_back(new Item_empty_string("Password",20));
  }
646 647 648 649 650
  field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_return_int("Rpl_recovery_rank", 7,
					   MYSQL_TYPE_LONG));
  field_list.push_back(new Item_return_int("Master_id", 10,
					   MYSQL_TYPE_LONG));
651

652 653
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
654
    DBUG_RETURN(TRUE);
655 656 657 658 659 660

  pthread_mutex_lock(&LOCK_slave_list);

  for (uint i = 0; i < slave_list.records; ++i)
  {
    SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
661 662
    protocol->prepare_for_resend();
    protocol->store((uint32) si->server_id);
663
    protocol->store(si->host, &my_charset_bin);
664 665
    if (opt_show_slave_auth_info)
    {
666 667
      protocol->store(si->user, &my_charset_bin);
      protocol->store(si->password, &my_charset_bin);
668
    }
669 670 671 672
    protocol->store((uint32) si->port);
    protocol->store((uint32) si->rpl_recovery_rank);
    protocol->store((uint32) si->master_id);
    if (protocol->write())
673 674
    {
      pthread_mutex_unlock(&LOCK_slave_list);
675
      DBUG_RETURN(TRUE);
676 677 678
    }
  }
  pthread_mutex_unlock(&LOCK_slave_list);
679
  send_eof(thd);
680
  DBUG_RETURN(FALSE);
681 682
}

683

684 685
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
686
  DBUG_ENTER("connect_to_master");
687

688 689
  if (!mi->host || !*mi->host)			/* empty host */
  {
690
    strmov(mysql->net.last_error, "Master is not configured");
691 692
    DBUG_RETURN(1);
  }
693 694
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
695 696 697 698 699 700 701 702 703 704 705

#ifdef HAVE_OPENSSL
  if (mi->ssl)
    mysql_ssl_set(mysql, 
        mi->ssl_key[0]?mi->ssl_key:0,
        mi->ssl_cert[0]?mi->ssl_cert:0,
        mi->ssl_ca[0]?mi->ssl_ca:0, 
        mi->ssl_capath[0]?mi->ssl_capath:0,
        mi->ssl_cipher[0]?mi->ssl_cipher:0);
#endif
    
706 707
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
708
  if (!mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
hf@deer.(none)'s avatar
hf@deer.(none) committed
709
			mi->port, 0, 0))
710
    DBUG_RETURN(1);
711
  mysql->reconnect= 1;
712
  DBUG_RETURN(0);
713 714 715 716
}


static inline void cleanup_mysql_results(MYSQL_RES* db_res,
717
					 MYSQL_RES** cur, MYSQL_RES** start)
718
{
719
  for (; cur >= start; --cur)
720 721
  {
    if (*cur)
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
722
      mysql_free_result(*cur);
723
  }
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
724
  mysql_free_result(db_res);
725 726 727
}


728 729
static int fetch_db_tables(THD *thd, MYSQL *mysql, const char *db,
			   MYSQL_RES *table_res, MASTER_INFO *mi)
730 731
{
  MYSQL_ROW row;
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
732 733
  for (row = mysql_fetch_row(table_res); row;
       row = mysql_fetch_row(table_res))
734 735
  {
    TABLE_LIST table;
736
    const char* table_name= row[0];
737 738 739
    int error;
    if (table_rules_on)
    {
740
      bzero((char*) &table, sizeof(table)); //just for safe
741
      table.db= (char*) db;
742
      table.table_name= (char*) table_name;
743
      table.updating= 1;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
744

745 746 747
      if (!tables_ok(thd, &table))
	continue;
    }
748 749
    /* download master's table and overwrite slave's table */
    if ((error= fetch_master_table(thd, db, table_name, mi, mysql, 1)))
750 751 752 753 754
      return error;
  }
  return 0;
}

755 756 757 758 759 760
/*
  Load all MyISAM tables from master to this slave.

  REQUIREMENTS
   - No active transaction (flush_relay_log_info would not work in this case)
*/
761

762
bool load_master_data(THD* thd)
763 764 765 766
{
  MYSQL mysql;
  MYSQL_RES* master_status_res = 0;
  int error = 0;
767 768
  const char* errmsg=0;
  int restart_thread_mask;
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
769 770
  HA_CREATE_INFO create_info;

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
771
  mysql_init(&mysql);
772

773 774 775 776
  /*
    We do not want anyone messing with the slave at all for the entire
    duration of the data load.
  */
777
  pthread_mutex_lock(&LOCK_active_mi);
778 779 780 781 782
  lock_slave_threads(active_mi);
  init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
  if (restart_thread_mask &&
      (error=terminate_slave_threads(active_mi,restart_thread_mask,
				     1 /*skip lock*/)))
783
  {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
784
    my_message(error, ER(error), MYF(0));
785
    unlock_slave_threads(active_mi);
786
    pthread_mutex_unlock(&LOCK_active_mi);
787
    return TRUE;
788
  }
789 790
  
  if (connect_to_master(thd, &mysql, active_mi))
791
  {
792
    my_error(error= ER_CONNECT_TO_MASTER, MYF(0), mysql_error(&mysql));
793 794 795 796 797 798 799 800
    goto err;
  }

  // now that we are connected, get all database and tables in each
  {
    MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
    uint num_dbs;

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
801 802
    if (mysql_real_query(&mysql, "SHOW DATABASES", 14) ||
	!(db_res = mysql_store_result(&mysql)))
803
    {
804
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
805 806 807
      goto err;
    }

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
808
    if (!(num_dbs = (uint) mysql_num_rows(db_res)))
809
      goto err;
810 811 812 813
    /*
      In theory, the master could have no databases at all
      and run with skip-grant
    */
814 815 816

    if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
    {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
817
      my_message(error = ER_OUTOFMEMORY, ER(ER_OUTOFMEMORY), MYF(0));
818 819 820
      goto err;
    }

821 822 823 824 825 826
    /*
      This is a temporary solution until we have online backup
      capabilities - to be replaced once online backup is working
      we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
      can to minimize the lock time.
    */
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
827 828 829
    if (mysql_real_query(&mysql, "FLUSH TABLES WITH READ LOCK", 27) ||
	mysql_real_query(&mysql, "SHOW MASTER STATUS",18) ||
	!(master_status_res = mysql_store_result(&mysql)))
830
    {
831
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
832 833 834
      goto err;
    }

835 836 837 838
    /*
      Go through every table in every database, and if the replication
      rules allow replicating it, get it
    */
839 840 841

    table_res_end = table_res + num_dbs;

842 843
    for (cur_table_res = table_res; cur_table_res < table_res_end;
	 cur_table_res++)
844 845
    {
      // since we know how many rows we have, this can never be NULL
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
846
      MYSQL_ROW row = mysql_fetch_row(db_res);
847 848 849
      char* db = row[0];

      /*
850 851 852 853 854
	Do not replicate databases excluded by rules. We also test
	replicate_wild_*_table rules (replicate_wild_ignore_table='db1.%' will
	be considered as "ignore the 'db1' database as a whole, as it already
	works for CREATE DATABASE and DROP DATABASE).
	Also skip 'mysql' database - in most cases the user will
855 856 857
	mess up and not exclude mysql database with the rules when
	he actually means to - in this case, he is up for a surprise if
	his priv tables get dropped and downloaded from master
858
	TODO - add special option, not enabled
859 860 861 862 863
	by default, to allow inclusion of mysql database into load
	data from master
      */

      if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
864
          !db_ok_with_wild_table(db) ||
865
	  !strcmp(db,"mysql"))
866 867 868 869 870
      {
	*cur_table_res = 0;
	continue;
      }

monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
871 872 873 874
      bzero((char*) &create_info, sizeof(create_info));
      create_info.options= HA_LEX_CREATE_IF_NOT_EXISTS;

      if (mysql_create_db(thd, db, &create_info, 1))
875 876 877 878 879
      {
	cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
	goto err;
      }

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
880 881 882
      if (mysql_select_db(&mysql, db) ||
	  mysql_real_query(&mysql, "SHOW TABLES", 11) ||
	  !(*cur_table_res = mysql_store_result(&mysql)))
883
      {
884
	my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
885 886 887 888
	cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
	goto err;
      }

889
      if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi)))
890 891 892 893 894 895 896 897 898
      {
	// we do not report the error - fetch_db_tables handles it
	cleanup_mysql_results(db_res, cur_table_res, table_res);
	goto err;
      }
    }

    cleanup_mysql_results(db_res, cur_table_res - 1, table_res);

899
    // adjust replication coordinates from the master
900 901
    if (master_status_res)
    {
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
902
      MYSQL_ROW row = mysql_fetch_row(master_status_res);
903 904 905 906 907

      /*
	We need this check because the master may not be running with
	log-bin, but it will still allow us to do all the steps
	of LOAD DATA FROM MASTER - no reason to forbid it, really,
908
	although it does not make much sense for the user to do it
909
      */
910
      if (row && row[0] && row[1])
911
      {
912 913 914 915 916 917
        /*
          If the slave's master info is not inited, we init it, then we write
          the new coordinates to it. Must call init_master_info() *before*
          setting active_mi, because init_master_info() sets active_mi with
          defaults.
        */
monty@mysql.com's avatar
monty@mysql.com committed
918 919
        int error;

920 921
        if (init_master_info(active_mi, master_info_file, relay_log_info_file, 
			     0, (SLAVE_IO | SLAVE_SQL)))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
922
          my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
923 924
	strmake(active_mi->master_log_name, row[0],
		sizeof(active_mi->master_log_name));
monty@mysql.com's avatar
monty@mysql.com committed
925
	active_mi->master_log_pos= my_strtoll10(row[1], (char**) 0, &error);
926
        /* at least in recent versions, the condition below should be false */
927 928
	if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
	  active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
guilhem@gbichot2's avatar
guilhem@gbichot2 committed
929 930 931 932 933 934
        /*
          Relay log's IO_CACHE may not be inited (even if we are sure that some
          host was specified; there could have been a problem when replication
          started, which led to relay log's IO_CACHE to not be inited.
        */
	flush_master_info(active_mi, 0);
935
      }
hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
936
      mysql_free_result(master_status_res);
937 938
    }

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
939
    if (mysql_real_query(&mysql, "UNLOCK TABLES", 13))
940
    {
941
      my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql));
942 943 944
      goto err;
    }
  }
945
  thd->proc_info="purging old relay logs";
946 947
  if (purge_relay_logs(&active_mi->rli,thd,
		       0 /* not only reset, but also reinit */,
948 949
		       &errmsg))
  {
950
    my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
951
    unlock_slave_threads(active_mi);
952
    pthread_mutex_unlock(&LOCK_active_mi);
953
    return TRUE;
954 955
  }
  pthread_mutex_lock(&active_mi->rli.data_lock);
956 957 958
  active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
  strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
	  sizeof(active_mi->rli.group_master_log_name)-1);
959 960 961 962 963 964
  /*
     Cancel the previous START SLAVE UNTIL, as the fact to download
     a new copy logically makes UNTIL irrelevant.
  */
  clear_until_condition(&active_mi->rli);

965 966 967 968
  /*
    No need to update rli.event* coordinates, they will be when the slave
    threads start ; only rli.group* coordinates are necessary here.
  */
969
  flush_relay_log_info(&active_mi->rli);
970 971 972 973 974
  pthread_cond_broadcast(&active_mi->rli.data_cond);
  pthread_mutex_unlock(&active_mi->rli.data_lock);
  thd->proc_info = "starting slave";
  if (restart_thread_mask)
  {
975 976 977 978
    error=start_slave_threads(0 /* mutex not needed */,
			      1 /* wait for start */,
			      active_mi,master_info_file,relay_log_info_file,
			      restart_thread_mask);
979
  }
980 981

err:
982
  unlock_slave_threads(active_mi);
983
  pthread_mutex_unlock(&LOCK_active_mi);
984 985
  thd->proc_info = 0;

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
986
  mysql_close(&mysql); // safe to call since we always do mysql_init()
987
  if (!error)
988
    send_ok(thd);
989 990 991

  return error;
}
992

hf@deer.mysql.r18.ru's avatar
SCRUM  
hf@deer.mysql.r18.ru committed
993
#endif /* HAVE_REPLICATION */
994