repl_failsafe.cc 23.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

// Sasha Pachev <sasha@mysql.com> is currently in charge of this file

#include "mysql_priv.h"
unknown's avatar
SCRUM  
unknown committed
20 21
#ifdef HAVE_REPLICATION

22
#include "repl_failsafe.h"
unknown's avatar
unknown committed
23 24
#include "sql_repl.h"
#include "slave.h"
25
#include "sql_acl.h"
unknown's avatar
unknown committed
26
#include "mini_client.h"
27
#include "log_event.h"
unknown's avatar
unknown committed
28
#include <mysql.h>
29 30 31 32

#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)

33 34 35 36

RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status;
37 38
HASH slave_list;
extern const char* any_db;
39

40 41 42
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
			    rpl_role_type};
43

44 45 46 47 48
const char* rpl_status_type[]=
{
  "AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", "LOST_SOLDIER","TROOP_SOLDIER",
  "RECOVERY_CAPTAIN","NULL",NullS
};
49 50 51
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
			     rpl_status_type};

52

53 54 55 56
static Slave_log_event* find_slave_event(IO_CACHE* log,
					 const char* log_file_name,
					 char* errmsg);

57

unknown's avatar
unknown committed
58 59 60 61
static int init_failsafe_rpl_thread(THD* thd)
{
  DBUG_ENTER("init_failsafe_rpl_thread");
  thd->system_thread = thd->bootstrap = 1;
62
  thd->host_or_ip= "";
unknown's avatar
unknown committed
63 64
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
unknown's avatar
unknown committed
65 66
  thd->net.read_timeout = slave_net_timeout;
  thd->max_client_packet_length=thd->net.max_packet;
unknown's avatar
unknown committed
67 68 69 70 71 72
  thd->master_access= ~0;
  thd->priv_user = 0;
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

unknown's avatar
unknown committed
73
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
74 75 76 77 78 79
  {
    close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

unknown's avatar
unknown committed
80
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
unknown's avatar
unknown committed
81 82 83 84 85 86
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  thd->mem_root.free=thd->mem_root.used=0;	
87
  if (thd->variables.max_join_size == HA_POS_ERROR)
unknown's avatar
unknown committed
88
    thd->options|= OPTION_BIG_SELECTS;
unknown's avatar
unknown committed
89 90 91 92 93 94 95

  thd->proc_info="Thread initialized";
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

96

97 98 99 100 101
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
  pthread_mutex_lock(&LOCK_rpl_status);
  if (rpl_status == from_status || rpl_status == RPL_ANY)
    rpl_status = to_status;
unknown's avatar
unknown committed
102 103 104 105
  pthread_cond_signal(&COND_rpl_status);
  pthread_mutex_unlock(&LOCK_rpl_status);
}

106

107 108 109 110 111 112 113 114 115
#define get_object(p, obj) \
{\
  uint len = (uint)*p++;  \
  if (p + len > p_end || len >= sizeof(obj)) \
    goto err; \
  strmake(obj,(char*) p,len); \
  p+= len; \
}\

116

117 118 119 120 121 122
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
{
  return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
			mi->pos);
}

123

124 125 126 127
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
  if (thd->server_id)
  {
128 129 130
    if (need_mutex)
      pthread_mutex_lock(&LOCK_slave_list);

131 132 133 134 135
    SLAVE_INFO* old_si;
    if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
					   (byte*)&thd->server_id, 4)) &&
	(!only_mine || old_si->thd == thd))
    hash_delete(&slave_list, (byte*)old_si);
136 137 138

    if (need_mutex)
      pthread_mutex_unlock(&LOCK_slave_list);
139 140 141
  }
}

142

143 144 145 146 147 148 149 150
/*
  Register slave in 'slave_list' hash table

  RETURN VALUES
  0	ok
  1	Error.   Error message sent to client
*/

151 152
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
153
  int res;
154
  SLAVE_INFO *si;
155
  uchar *p= packet, *p_end= packet + packet_length;
156

unknown's avatar
unknown committed
157
  if (check_access(thd, REPL_SLAVE_ACL, any_db))
158 159 160
    return 1;

  if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
161
    goto err2;
162

163 164
  thd->server_id= si->server_id= uint4korr(p);
  p+= 4;
165 166 167
  get_object(p,si->host);
  get_object(p,si->user);
  get_object(p,si->password);
168 169 170
  if (p+10 > p_end)
    goto err;
  si->port= uint2korr(p);
171
  p += 2;
172
  si->rpl_recovery_rank= uint4korr(p);
173
  p += 4;
174 175 176
  if (!(si->master_id= uint4korr(p)))
    si->master_id= server_id;
  si->thd= thd;
177

178
  pthread_mutex_lock(&LOCK_slave_list);
179
  unregister_slave(thd,0,0);
180
  res= hash_insert(&slave_list, (byte*) si);
181 182 183 184
  pthread_mutex_unlock(&LOCK_slave_list);
  return res;

err:
185 186 187 188
  my_free((gptr) si, MYF(MY_WME));
  my_message(ER_UNKNOWN_ERROR, "Wrong parameters to function register_slave",
	     MYF(0));
err2:
189
  send_error(thd);
190
  return 1;
191 192
}

193
extern "C" uint32
194 195
*slave_list_key(SLAVE_INFO* si, uint* len,
		my_bool not_used __attribute__((unused)))
196 197 198 199 200
{
  *len = 4;
  return &si->server_id;
}

201
extern "C" void slave_info_free(void *s)
202 203 204 205 206 207
{
  my_free((gptr) s, MYF(MY_WME));
}

void init_slave_list()
{
unknown's avatar
unknown committed
208
  hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
209
	    (hash_get_key) slave_list_key, (hash_free_key) slave_info_free, 0);
210 211 212 213 214
  pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}

void end_slave_list()
{
unknown's avatar
unknown committed
215 216 217 218 219 220
  /* No protection by a mutex needed as we are only called at shutdown */
  if (hash_inited(&slave_list))
  {
    hash_free(&slave_list);
    pthread_mutex_destroy(&LOCK_slave_list);
  }
221 222
}

223
static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg)
224
{
225
  my_off_t log_pos =	    (my_off_t) mi->pos;
226 227 228 229 230
  uint32 target_server_id = mi->server_id;

  for (;;)
  {
    Log_event* ev;
231
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0)))
232 233 234 235 236 237 238 239 240 241
    {
      if (log->error > 0)
	strmov(errmsg, "Binary log truncated in the middle of event");
      else if (log->error < 0)
	strmov(errmsg, "I/O error reading binary log");
      else
	strmov(errmsg, "Could not find target event in the binary log");
      return 1;
    }

unknown's avatar
unknown committed
242
    if (ev->log_pos >= log_pos && ev->server_id == target_server_id)
243 244 245 246 247 248 249
    {
      delete ev;
      mi->pos = my_b_tell(log);
      return 0;
    }
    delete ev;
  }
250
  /* Impossible */
251 252 253 254 255 256
}


int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
{
  LOG_INFO linfo;
257
  char last_log_name[FN_REFLEN];
258 259 260 261 262 263 264 265 266
  IO_CACHE log;
  File file = -1, last_file = -1;
  pthread_mutex_t *log_lock;
  const char* errmsg_p;
  Slave_log_event* sev = 0;
  my_off_t last_pos = 0;
  int error = 1;
  int cmp_res;
  LINT_INIT(cmp_res);
unknown's avatar
unknown committed
267
  DBUG_ENTER("translate_master");
268 269 270 271

  if (!mysql_bin_log.is_open())
  {
    strmov(errmsg,"Binary log is not open");
unknown's avatar
unknown committed
272
    DBUG_RETURN(1);
273 274 275 276 277
  }

  if (!server_id_supplied)
  {
    strmov(errmsg, "Misconfigured master - server id was not set");
unknown's avatar
unknown committed
278
    DBUG_RETURN(1);
279 280
  }

281
  if (mysql_bin_log.find_log_pos(&linfo, NullS, 1))
282 283
  {
    strmov(errmsg,"Could not find first log");
unknown's avatar
unknown committed
284
    DBUG_RETURN(1);
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
  }
  thd->current_linfo = &linfo;

  bzero((char*) &log,sizeof(log));
  log_lock = mysql_bin_log.get_log_lock();
  pthread_mutex_lock(log_lock);

  for (;;)
  {
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
    {
      strmov(errmsg, errmsg_p);
      goto err;
    }

    if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
      goto err;

    cmp_res = cmp_master_pos(sev, mi);
    delete sev;

    if (!cmp_res)
    {
      /* Copy basename */
      fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
      mi->pos = my_b_tell(&log);
      goto mi_inited;
    }
    else if (cmp_res > 0)
    {
      if (!last_pos)
      {
	strmov(errmsg,
	       "Slave event in first log points past the target position");
	goto err;
      }
      end_io_cache(&log);
      (void) my_close(file, MYF(MY_WME));
      if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
			MYF(MY_WME)))
      {
	errmsg[0] = 0;
	goto err;
      }
      break;
    }

    strmov(last_log_name, linfo.log_file_name);
    last_pos = my_b_tell(&log);

335
    switch (mysql_bin_log.find_next_log(&linfo, 1)) {
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
    case LOG_INFO_EOF:
      if (last_file >= 0)
       (void)my_close(last_file, MYF(MY_WME));
      last_file = -1;
      goto found_log;
    case 0:
      break;
    default:
      strmov(errmsg, "Error reading log index");
      goto err;
    }

    end_io_cache(&log);
    if (last_file >= 0)
     (void) my_close(last_file, MYF(MY_WME));
    last_file = file;
  }

found_log:
  my_b_seek(&log, last_pos);
  if (find_target_pos(mi,&log,errmsg))
    goto err;
  fn_format(mi->log_file_name, last_log_name, "","",1);  /* Copy basename */

mi_inited:
  error = 0;
err:
  pthread_mutex_unlock(log_lock);
  end_io_cache(&log);
  pthread_mutex_lock(&LOCK_thread_count);
  thd->current_linfo = 0;
  pthread_mutex_unlock(&LOCK_thread_count);
  if (file >= 0)
    (void) my_close(file, MYF(MY_WME));
  if (last_file >= 0 && last_file != file)
    (void) my_close(last_file, MYF(MY_WME));

unknown's avatar
unknown committed
373
  DBUG_RETURN(error);
374 375
}

376 377 378 379 380

/*
  Caller must delete result when done
*/

381 382 383 384 385 386 387 388
static Slave_log_event* find_slave_event(IO_CACHE* log,
					 const char* log_file_name,
					 char* errmsg)
{
  Log_event* ev;
  int i;
  bool slave_event_found = 0;
  LINT_INIT(ev);
unknown's avatar
unknown committed
389

390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
  for (i = 0; i < 2; i++)
  {
    if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
    {
      my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
		  "Error reading event in log '%s'",
		  (char*)log_file_name);
      return 0;
    }
    if (ev->get_type_code() == SLAVE_EVENT)
    {
      slave_event_found = 1;
      break;
    }
    delete ev;
  }
  if (!slave_event_found)
  {
    my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
		"Could not find slave event in log '%s'",
		(char*)log_file_name);
    delete ev;
    return 0;
  }

  return (Slave_log_event*)ev;
}


int show_new_master(THD* thd)
{
421
  Protocol *protocol= thd->protocol;
422 423 424 425 426 427 428 429 430
  DBUG_ENTER("show_new_master");
  List<Item> field_list;
  char errmsg[SLAVE_ERRMSG_SIZE];
  LEX_MASTER_INFO* lex_mi = &thd->lex.mi;

  errmsg[0]=0;					// Safety
  if (translate_master(thd, lex_mi, errmsg))
  {
    if (errmsg[0])
unknown's avatar
unknown committed
431 432 433
      my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
	       "SHOW NEW MASTER", errmsg);
    DBUG_RETURN(-1);
434 435 436 437
  }
  else
  {
    field_list.push_back(new Item_empty_string("Log_name", 20));
438 439 440
    field_list.push_back(new Item_return_int("Log_pos", 10,
					     MYSQL_TYPE_LONGLONG));
    if (protocol->send_fields(&field_list, 1))
441
      DBUG_RETURN(-1);
442
    protocol->prepare_for_resend();
443
    protocol->store(lex_mi->log_file_name, &my_charset_bin);
444 445
    protocol->store((ulonglong) lex_mi->pos);
    if (protocol->write())
446
      DBUG_RETURN(-1);
447
    send_eof(thd);
448 449 450 451
    DBUG_RETURN(0);
  }
}

452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
/*
  Asks the master for the list of its other connected slaves.
  This is for failsafe replication : 
  in order for failsafe replication to work, the servers involved in replication
  must know of each other. We accomplish this by having each slave report to the
  master how to reach it, and on connection, each slave receives information
  about where the other slaves are.

  SYNOPSIS
    update_slave_list()
    mysql           pre-existing connection to the master
    mi              master info

  NOTES
    mi is used only to give detailed error messages which include the
    hostname/port of the master, the username used by the slave to connect to
    the master.
    If the user used by the slave to connect to the master does not have the
    REPLICATION SLAVE privilege, it will pop in this function because SHOW SLAVE
    HOSTS will fail on the master.

  RETURN VALUES
    1           error
    0           success
 */
477

478
int update_slave_list(MYSQL* mysql, MASTER_INFO* mi)
unknown's avatar
unknown committed
479 480 481 482 483 484
{
  MYSQL_RES* res=0;
  MYSQL_ROW row;
  const char* error=0;
  bool have_auth_info;
  int port_ind;
485
  DBUG_ENTER("update_slave_list");
unknown's avatar
unknown committed
486

487
  if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",16) ||
unknown's avatar
unknown committed
488 489
      !(res = mc_mysql_store_result(mysql)))
  {
490
    error= mc_mysql_error(mysql);
unknown's avatar
unknown committed
491 492 493
    goto err;
  }

494
  switch (mc_mysql_num_fields(res)) {
unknown's avatar
unknown committed
495 496 497 498 499 500 501 502 503
  case 5:
    have_auth_info = 0;
    port_ind=2;
    break;
  case 7:
    have_auth_info = 1;
    port_ind=4;
    break;
  default:
504 505
    error= "the master returned an invalid number of fields for SHOW SLAVE \
HOSTS";
unknown's avatar
unknown committed
506 507 508 509 510
    goto err;
  }

  pthread_mutex_lock(&LOCK_slave_list);

511
  while ((row= mc_mysql_fetch_row(res)))
unknown's avatar
unknown committed
512 513 514 515
  {
    uint32 server_id;
    SLAVE_INFO* si, *old_si;
    server_id = atoi(row[0]);
516 517
    if ((old_si= (SLAVE_INFO*)hash_search(&slave_list,
					  (byte*)&server_id,4)))
unknown's avatar
unknown committed
518 519 520 521 522
      si = old_si;
    else
    {
      if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
      {
523
	error= "the slave is out of memory";
unknown's avatar
unknown committed
524 525 526 527
	pthread_mutex_unlock(&LOCK_slave_list);
	goto err;
      }
      si->server_id = server_id;
unknown's avatar
unknown committed
528
      hash_insert(&slave_list, (byte*)si);
unknown's avatar
unknown committed
529
    }
530
    strmake(si->host, row[1], sizeof(si->host)-1);
unknown's avatar
unknown committed
531 532 533 534 535
    si->port = atoi(row[port_ind]);
    si->rpl_recovery_rank = atoi(row[port_ind+1]);
    si->master_id = atoi(row[port_ind+2]);
    if (have_auth_info)
    {
536 537
      strmake(si->user, row[2], sizeof(si->user)-1);
      strmake(si->password, row[3], sizeof(si->password)-1);
unknown's avatar
unknown committed
538 539 540
    }
  }
  pthread_mutex_unlock(&LOCK_slave_list);
541

unknown's avatar
unknown committed
542 543 544 545 546
err:
  if (res)
    mc_mysql_free_result(res);
  if (error)
  {
547 548 549
    sql_print_error("While trying to obtain the list of slaves from the master \
'%s:%d', user '%s' got the following error: '%s'", 
                    mi->host, mi->port, mi->user, error);
550
    DBUG_RETURN(1);
unknown's avatar
unknown committed
551
  }
552
  DBUG_RETURN(0);
unknown's avatar
unknown committed
553 554
}

555

unknown's avatar
unknown committed
556 557 558 559 560
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
  return 0;
}

561

unknown's avatar
unknown committed
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
pthread_handler_decl(handle_failsafe_rpl,arg)
{
  DBUG_ENTER("handle_failsafe_rpl");
  THD *thd = new THD;
  thd->thread_stack = (char*)&thd;
  MYSQL* recovery_captain = 0;
  pthread_detach_this_thread();
  if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
  {
    sql_print_error("Could not initialize failsafe replication thread");
    goto err;
  }
  pthread_mutex_lock(&LOCK_rpl_status);
  while (!thd->killed && !abort_loop)
  {
    bool break_req_chain = 0;
    const char* msg = thd->enter_cond(&COND_rpl_status,
				      &LOCK_rpl_status, "Waiting for request");
    pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
581
    thd->proc_info="Processing request";
unknown's avatar
unknown committed
582 583
    while (!break_req_chain)
    {
584
      switch (rpl_status) {
unknown's avatar
unknown committed
585 586 587 588 589 590 591 592 593 594 595 596 597 598
      case RPL_LOST_SOLDIER:
	if (find_recovery_captain(thd, recovery_captain))
	  rpl_status=RPL_TROOP_SOLDIER;
	else
	  rpl_status=RPL_RECOVERY_CAPTAIN;
	break_req_chain=1; /* for now until other states are implemented */
	break;
      default:
	break_req_chain=1;
	break;
      }
    }
    thd->exit_cond(msg);
  }
599
  pthread_mutex_unlock(&LOCK_rpl_status);
unknown's avatar
unknown committed
600 601 602 603 604 605 606
err:
  if (recovery_captain)
    mc_mysql_close(recovery_captain);
  delete thd;
  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
607
}
608

609

610 611 612 613
int show_slave_hosts(THD* thd)
{
  List<Item> field_list;
  NET* net = &thd->net;
614
  Protocol *protocol= thd->protocol;
615 616
  DBUG_ENTER("show_slave_hosts");

617 618
  field_list.push_back(new Item_return_int("Server_id", 10,
					   MYSQL_TYPE_LONG));
619 620 621 622 623 624
  field_list.push_back(new Item_empty_string("Host", 20));
  if (opt_show_slave_auth_info)
  {
    field_list.push_back(new Item_empty_string("User",20));
    field_list.push_back(new Item_empty_string("Password",20));
  }
625 626 627 628 629
  field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
  field_list.push_back(new Item_return_int("Rpl_recovery_rank", 7,
					   MYSQL_TYPE_LONG));
  field_list.push_back(new Item_return_int("Master_id", 10,
					   MYSQL_TYPE_LONG));
630

631
  if (protocol->send_fields(&field_list, 1))
632 633 634 635 636 637 638
    DBUG_RETURN(-1);

  pthread_mutex_lock(&LOCK_slave_list);

  for (uint i = 0; i < slave_list.records; ++i)
  {
    SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
639 640
    protocol->prepare_for_resend();
    protocol->store((uint32) si->server_id);
641
    protocol->store(si->host, &my_charset_bin);
642 643
    if (opt_show_slave_auth_info)
    {
644 645
      protocol->store(si->user, &my_charset_bin);
      protocol->store(si->password, &my_charset_bin);
646
    }
647 648 649 650
    protocol->store((uint32) si->port);
    protocol->store((uint32) si->rpl_recovery_rank);
    protocol->store((uint32) si->master_id);
    if (protocol->write())
651 652 653 654 655 656
    {
      pthread_mutex_unlock(&LOCK_slave_list);
      DBUG_RETURN(-1);
    }
  }
  pthread_mutex_unlock(&LOCK_slave_list);
657
  send_eof(thd);
658 659 660
  DBUG_RETURN(0);
}

661

662 663
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
664
  DBUG_ENTER("connect_to_master");
665

666 667
  if (!mi->host || !*mi->host)			/* empty host */
  {
668
    strmov(mysql->net.last_error, "Master is not configured");
669 670
    DBUG_RETURN(1);
  }
671
  if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
unknown's avatar
unknown committed
672 673
			mi->port, 0, 0,
			slave_net_timeout))
674 675
    DBUG_RETURN(1);
  DBUG_RETURN(0);
676 677 678 679
}


static inline void cleanup_mysql_results(MYSQL_RES* db_res,
680
					 MYSQL_RES** cur, MYSQL_RES** start)
681
{
682
  for (; cur >= start; --cur)
683 684 685 686 687 688 689 690
  {
    if (*cur)
      mc_mysql_free_result(*cur);
  }
  mc_mysql_free_result(db_res);
}


691 692
static int fetch_db_tables(THD *thd, MYSQL *mysql, const char *db,
			   MYSQL_RES *table_res, MASTER_INFO *mi)
693 694
{
  MYSQL_ROW row;
695
  for (row = mc_mysql_fetch_row(table_res); row;
696 697 698
       row = mc_mysql_fetch_row(table_res))
  {
    TABLE_LIST table;
699
    const char* table_name= row[0];
700 701 702
    int error;
    if (table_rules_on)
    {
703 704 705 706
      table.next= 0;
      table.db= (char*) db;
      table.real_name= (char*) table_name;
      table.updating= 1;
707 708 709
      if (!tables_ok(thd, &table))
	continue;
    }
710
    if ((error= fetch_master_table(thd, db, table_name, mi, mysql)))
711 712 713 714 715
      return error;
  }
  return 0;
}

716 717 718 719 720 721
/*
  Load all MyISAM tables from master to this slave.

  REQUIREMENTS
   - No active transaction (flush_relay_log_info would not work in this case)
*/
722 723 724 725 726 727

int load_master_data(THD* thd)
{
  MYSQL mysql;
  MYSQL_RES* master_status_res = 0;
  int error = 0;
728 729
  const char* errmsg=0;
  int restart_thread_mask;
730 731
  mc_mysql_init(&mysql);

unknown's avatar
unknown committed
732 733 734 735
  /*
    We do not want anyone messing with the slave at all for the entire
    duration of the data load.
  */
736 737 738 739 740 741
  LOCK_ACTIVE_MI;
  lock_slave_threads(active_mi);
  init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
  if (restart_thread_mask &&
      (error=terminate_slave_threads(active_mi,restart_thread_mask,
				     1 /*skip lock*/)))
742
  {
743
    send_error(thd,error);
744 745 746
    unlock_slave_threads(active_mi);
    UNLOCK_ACTIVE_MI;
    return 1;
747
  }
748 749
  
  if (connect_to_master(thd, &mysql, active_mi))
750
  {
751
    net_printf(thd, error= ER_CONNECT_TO_MASTER,
752
	       mc_mysql_error(&mysql));
753 754 755 756 757 758 759 760
    goto err;
  }

  // now that we are connected, get all database and tables in each
  {
    MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
    uint num_dbs;

761
    if (mc_mysql_query(&mysql, "SHOW DATABASES", 14) ||
762 763
	!(db_res = mc_mysql_store_result(&mysql)))
    {
764
      net_printf(thd, error = ER_QUERY_ON_MASTER,
765 766 767 768 769 770
		 mc_mysql_error(&mysql));
      goto err;
    }

    if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
      goto err;
unknown's avatar
unknown committed
771 772 773 774
    /*
      In theory, the master could have no databases at all
      and run with skip-grant
    */
775 776 777

    if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
    {
778
      net_printf(thd, error = ER_OUTOFMEMORY);
779 780 781
      goto err;
    }

unknown's avatar
unknown committed
782 783 784 785 786 787
    /*
      This is a temporary solution until we have online backup
      capabilities - to be replaced once online backup is working
      we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
      can to minimize the lock time.
    */
788 789
    if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 27) ||
	mc_mysql_query(&mysql, "SHOW MASTER STATUS",18) ||
790 791
	!(master_status_res = mc_mysql_store_result(&mysql)))
    {
792
      net_printf(thd, error = ER_QUERY_ON_MASTER,
793 794 795 796
		 mc_mysql_error(&mysql));
      goto err;
    }

unknown's avatar
unknown committed
797 798 799 800
    /*
      Go through every table in every database, and if the replication
      rules allow replicating it, get it
    */
801 802 803

    table_res_end = table_res + num_dbs;

804 805
    for (cur_table_res = table_res; cur_table_res < table_res_end;
	 cur_table_res++)
806 807 808 809 810 811 812 813 814 815 816
    {
      // since we know how many rows we have, this can never be NULL
      MYSQL_ROW row = mc_mysql_fetch_row(db_res);
      char* db = row[0];

      /*
	Do not replicate databases excluded by rules
	also skip mysql database - in most cases the user will
	mess up and not exclude mysql database with the rules when
	he actually means to - in this case, he is up for a surprise if
	his priv tables get dropped and downloaded from master
817
	TODO - add special option, not enabled
818 819 820 821 822
	by default, to allow inclusion of mysql database into load
	data from master
      */

      if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
823
	  !strcmp(db,"mysql"))
824 825 826 827 828 829 830 831
      {
	*cur_table_res = 0;
	continue;
      }

      if (mysql_rm_db(thd, db, 1,1) ||
	  mysql_create_db(thd, db, 0, 1))
      {
832
	send_error(thd, 0, 0);
833 834 835 836 837
	cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
	goto err;
      }

      if (mc_mysql_select_db(&mysql, db) ||
838
	  mc_mysql_query(&mysql, "SHOW TABLES", 11) ||
839
	  !(*cur_table_res = mc_mysql_store_result(&mysql)))
840
      {
841
	net_printf(thd, error = ER_QUERY_ON_MASTER,
842 843 844 845 846
		   mc_mysql_error(&mysql));
	cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
	goto err;
      }

847
      if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi)))
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
      {
	// we do not report the error - fetch_db_tables handles it
	cleanup_mysql_results(db_res, cur_table_res, table_res);
	goto err;
      }
    }

    cleanup_mysql_results(db_res, cur_table_res - 1, table_res);

    // adjust position in the master
    if (master_status_res)
    {
      MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);

      /*
	We need this check because the master may not be running with
	log-bin, but it will still allow us to do all the steps
	of LOAD DATA FROM MASTER - no reason to forbid it, really,
866
	although it does not make much sense for the user to do it
867 868 869
      */
      if (row[0] && row[1])
      {
870 871
	strmake(active_mi->master_log_name, row[0],
		sizeof(active_mi->master_log_name));
unknown's avatar
unknown committed
872
	active_mi->master_log_pos = strtoull(row[1], (char**) 0, 10);
873 874 875
	// don't hit the magic number
	if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
	  active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
876
	flush_master_info(active_mi);
877 878 879 880
      }
      mc_mysql_free_result(master_status_res);
    }

881
    if (mc_mysql_query(&mysql, "UNLOCK TABLES", 13))
882
    {
883
      net_printf(thd, error = ER_QUERY_ON_MASTER,
884 885 886 887
		 mc_mysql_error(&mysql));
      goto err;
    }
  }
888
  thd->proc_info="purging old relay logs";
889 890
  if (purge_relay_logs(&active_mi->rli,thd,
		       0 /* not only reset, but also reinit */,
891 892
		       &errmsg))
  {
893
    send_error(thd, 0, "Failed purging old relay logs");
894 895 896 897 898
    unlock_slave_threads(active_mi);
    UNLOCK_ACTIVE_MI;
    return 1;
  }
  pthread_mutex_lock(&active_mi->rli.data_lock);
899 900 901 902 903 904 905
  active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
  strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
	  sizeof(active_mi->rli.group_master_log_name)-1);
  /*
    No need to update rli.event* coordinates, they will be when the slave
    threads start ; only rli.group* coordinates are necessary here.
  */
unknown's avatar
unknown committed
906
  flush_relay_log_info(&active_mi->rli);
907 908 909 910 911
  pthread_cond_broadcast(&active_mi->rli.data_cond);
  pthread_mutex_unlock(&active_mi->rli.data_lock);
  thd->proc_info = "starting slave";
  if (restart_thread_mask)
  {
912 913 914 915
    error=start_slave_threads(0 /* mutex not needed */,
			      1 /* wait for start */,
			      active_mi,master_info_file,relay_log_info_file,
			      restart_thread_mask);
916
  }
917 918

err:
919 920 921 922
  unlock_slave_threads(active_mi);
  UNLOCK_ACTIVE_MI;
  thd->proc_info = 0;

923 924
  mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
  if (!error)
925
    send_ok(thd);
926 927 928

  return error;
}
929

unknown's avatar
SCRUM  
unknown committed
930
#endif /* HAVE_REPLICATION */
931