sql_table.cc 196 KB
Newer Older
1
/* Copyright (C) 2000-2004 MySQL AB
unknown's avatar
unknown committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/* drop and alter of tables */

#include "mysql_priv.h"
20
#include <hash.h>
unknown's avatar
unknown committed
21
#include <myisam.h>
22
#include <my_dir.h>
23 24
#include "sp_head.h"
#include "sql_trigger.h"
25
#include "sql_show.h"
unknown's avatar
unknown committed
26 27 28 29 30

#ifdef __WIN__
#include <io.h>
#endif

unknown's avatar
unknown committed
31 32
int creating_table= 0;        // How many mysql_create_table are running

unknown's avatar
unknown committed
33
const char *primary_key_name="PRIMARY";
unknown's avatar
unknown committed
34 35 36 37 38 39

static bool check_if_keyname_exists(const char *name,KEY *start, KEY *end);
static char *make_unique_key_name(const char *field_name,KEY *start,KEY *end);
static int copy_data_between_tables(TABLE *from,TABLE *to,
				    List<create_field> &create,
				    enum enum_duplicates handle_duplicates,
40
                                    bool ignore,
41
				    uint order_num, ORDER *order,
unknown's avatar
unknown committed
42
				    ha_rows *copied,ha_rows *deleted);
unknown's avatar
unknown committed
43
static bool prepare_blob_field(THD *thd, create_field *sql_field);
44
static bool check_engine(THD *thd, const char *table_name,
45
                         HA_CREATE_INFO *create_info);                             
unknown's avatar
unknown committed
46 47 48 49 50 51
static int mysql_prepare_table(THD *thd, HA_CREATE_INFO *create_info,
                               List<create_field> *fields,
                               List<Key> *keys, bool tmp_table,
                               uint *db_options,
                               handler *file, KEY **key_info_buffer,
                               uint *key_count, int select_field_count);
unknown's avatar
unknown committed
52

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
#define MYSQL50_TABLE_NAME_PREFIX         "#mysql50#"
#define MYSQL50_TABLE_NAME_PREFIX_LENGTH  9

uint filename_to_tablename(const char *from, char *to, uint to_length)
{
  uint errors, res= strconvert(&my_charset_filename, from,
                               system_charset_info,  to, to_length, &errors);
  if (errors) // Old 5.0 name
  {
    res= strxnmov(to, to_length, MYSQL50_TABLE_NAME_PREFIX,  from, NullS) - to;
    sql_print_error("Invalid (old?) table or database name '%s'", from);
    /*
      TODO: add a stored procedure for fix table and database names,
      and mention its name in error log.
    */
  }
  return res;
}


uint tablename_to_filename(const char *from, char *to, uint to_length)
{
  uint errors;
unknown's avatar
unknown committed
76 77
  if (from[0] == '#' && !strncmp(from, MYSQL50_TABLE_NAME_PREFIX,
                                 MYSQL50_TABLE_NAME_PREFIX_LENGTH))
78 79 80 81 82 83
    return my_snprintf(to, to_length, "%s", from + 9);
  return strconvert(system_charset_info, from,
                    &my_charset_filename, to, to_length, &errors);
}


84
/*
85
  Creates path to a file: mysql_data_dir/db/table.ext
86 87

  SYNOPSIS
88 89 90 91 92 93 94 95 96 97 98 99 100
   build_table_filename()
   buff			where to write result
   bufflen              buff size
   db                   database name, in system_charset_info
   table                table name, in system_charset_info
   ext                  file extension

  NOTES

    Uses database and table name, and extension to create
    a file name in mysql_data_dir. Database and table
    names are converted from system_charset_info into "fscs".
    'ext' is not converted.
101 102 103

  RETURN

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
*/


uint build_table_filename(char *buff, size_t bufflen, const char *db,
                          const char *table, const char *ext)
{
  uint length;
  char dbbuff[FN_REFLEN];
  char tbbuff[FN_REFLEN];
  VOID(tablename_to_filename(table, tbbuff, sizeof(tbbuff)));
  VOID(tablename_to_filename(db, dbbuff, sizeof(dbbuff)));
  strxnmov(buff, bufflen,
           mysql_data_home, "/", dbbuff, "/", tbbuff, ext, NullS);
  length= unpack_filename(buff, buff);
  return length;
}


122
uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen)
123
{
124 125
  uint length;
  char tbbuff[FN_REFLEN];
126 127 128 129 130 131 132
  char tmp_table_name[tmp_file_prefix_length+22+22+22+3];
  my_snprintf(tmp_table_name, sizeof(tmp_table_name),
	      "%s%lx_%lx_%x",
	      tmp_file_prefix, current_pid,
	      thd->thread_id, thd->tmp_table++);
  VOID(tablename_to_filename(tmp_table_name, tbbuff, sizeof(tbbuff)));
  strxnmov(buff, bufflen, mysql_tmpdir, "/", tbbuff, reg_ext, NullS);
133 134
  length= unpack_filename(buff, buff);
  return length;
135 136
}

137 138 139 140 141 142 143
/*
  Return values for compare_tables().
  If you make compare_tables() non-static, move them to a header file.
*/
#define ALTER_TABLE_DATA_CHANGED  1
#define ALTER_TABLE_INDEX_CHANGED 2

144

unknown's avatar
unknown committed
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/*
  SYNOPSIS
    mysql_copy_create_list()
    orig_create_list          Original list of created fields
    inout::new_create_list    Copy of original list

  RETURN VALUES
    FALSE                     Success
    TRUE                      Memory allocation error

  DESCRIPTION
    mysql_prepare_table destroys the create_list and in some cases we need
    this lists for more purposes. Thus we copy it specifically for use
    by mysql_prepare_table
*/

static int mysql_copy_create_list(List<create_field> *orig_create_list,
162
                                  List<create_field> *new_create_list)
unknown's avatar
unknown committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
{
  List_iterator<create_field> prep_field_it(*orig_create_list);
  create_field *prep_field;
  DBUG_ENTER("mysql_copy_create_list");

  while ((prep_field= prep_field_it++))
  {
    create_field *field= new create_field(*prep_field);
    if (!field || new_create_list->push_back(field))
    {
      mem_alloc_error(2);
      DBUG_RETURN(TRUE);
    }
  }
  DBUG_RETURN(FALSE);
}


/*
  SYNOPSIS
    mysql_copy_key_list()
    orig_key                  Original list of keys
    inout::new_key            Copy of original list

  RETURN VALUES
    FALSE                     Success
    TRUE                      Memory allocation error

  DESCRIPTION
    mysql_prepare_table destroys the key list and in some cases we need
    this lists for more purposes. Thus we copy it specifically for use
    by mysql_prepare_table
*/

static int mysql_copy_key_list(List<Key> *orig_key,
                               List<Key> *new_key)
{
  List_iterator<Key> prep_key_it(*orig_key);
  Key *prep_key;
202
  DBUG_ENTER("mysql_copy_key_list");
unknown's avatar
unknown committed
203 204 205 206 207 208 209 210 211 212 213

  while ((prep_key= prep_key_it++))
  {
    List<key_part_spec> prep_columns;
    List_iterator<key_part_spec> prep_col_it(prep_key->columns);
    key_part_spec *prep_col;
    Key *temp_key;

    while ((prep_col= prep_col_it++))
    {
      key_part_spec *prep_key_part;
214 215

      if (!(prep_key_part= new key_part_spec(*prep_col)))
unknown's avatar
unknown committed
216 217 218 219 220 221 222 223 224 225
      {
        mem_alloc_error(sizeof(key_part_spec));
        DBUG_RETURN(TRUE);
      }
      if (prep_columns.push_back(prep_key_part))
      {
        mem_alloc_error(2);
        DBUG_RETURN(TRUE);
      }
    }
226 227 228 229 230
    if (!(temp_key= new Key(prep_key->type, prep_key->name,
                            prep_key->algorithm,
                            prep_key->generated,
                            prep_columns,
                            prep_key->parser_name)))
unknown's avatar
unknown committed
231 232 233 234 235 236 237 238 239 240 241 242 243
    {
      mem_alloc_error(sizeof(Key));
      DBUG_RETURN(TRUE);
    }
    if (new_key->push_back(temp_key))
    {
      mem_alloc_error(2);
      DBUG_RETURN(TRUE);
    }
  }
  DBUG_RETURN(FALSE);
}

unknown's avatar
unknown committed
244 245 246
/*
--------------------------------------------------------------------------

247
   MODULE: DDL log
unknown's avatar
unknown committed
248 249 250 251 252 253 254 255
   -----------------

   This module is used to ensure that we can recover from crashes that occur
   in the middle of a meta-data operation in MySQL. E.g. DROP TABLE t1, t2;
   We need to ensure that both t1 and t2 are dropped and not only t1 and
   also that each table drop is entirely done and not "half-baked".

   To support this we create log entries for each meta-data statement in the
256
   ddl log while we are executing. These entries are dropped when the
unknown's avatar
unknown committed
257 258 259 260
   operation is completed.

   At recovery those entries that were not completed will be executed.

261
   There is only one ddl log in the system and it is protected by a mutex
unknown's avatar
unknown committed
262 263 264
   and there is a global struct that contains information about its current
   state.

265 266
   History:
   First version written in 2006 by Mikael Ronstrom
unknown's avatar
unknown committed
267 268 269 270
--------------------------------------------------------------------------
*/


271
typedef struct st_global_ddl_log
unknown's avatar
unknown committed
272
{
273 274 275 276 277 278
  /*
    We need to adjust buffer size to be able to handle downgrades/upgrades
    where IO_SIZE has changed. We'll set the buffer size such that we can
    handle that the buffer size was upto 4 times bigger in the version
    that wrote the DDL log.
  */
279
  char file_entry_buf[4*IO_SIZE];
unknown's avatar
unknown committed
280 281
  char file_name_str[FN_REFLEN];
  char *file_name;
282 283 284
  DDL_LOG_MEMORY_ENTRY *first_free;
  DDL_LOG_MEMORY_ENTRY *first_used;
  uint num_entries;
unknown's avatar
unknown committed
285 286
  File file_id;
  uint name_len;
287
  uint io_size;
288
  bool inited;
289
  bool recovery_phase;
290
} GLOBAL_DDL_LOG;
unknown's avatar
unknown committed
291

292
GLOBAL_DDL_LOG global_ddl_log;
unknown's avatar
unknown committed
293

294
pthread_mutex_t LOCK_gdl;
unknown's avatar
unknown committed
295

296 297 298 299 300
#define DDL_LOG_ENTRY_TYPE_POS 0
#define DDL_LOG_ACTION_TYPE_POS 1
#define DDL_LOG_PHASE_POS 2
#define DDL_LOG_NEXT_ENTRY_POS 4
#define DDL_LOG_NAME_POS 8
301

302 303
#define DDL_LOG_NUM_ENTRY_POS 0
#define DDL_LOG_NAME_LEN_POS 4
304
#define DDL_LOG_IO_SIZE_POS 8
unknown's avatar
unknown committed
305

unknown's avatar
unknown committed
306
/*
307
  Read one entry from ddl log file
unknown's avatar
unknown committed
308
  SYNOPSIS
309
    read_ddl_log_file_entry()
310
    entry_no                     Entry number to read
unknown's avatar
unknown committed
311
  RETURN VALUES
312 313
    TRUE                         Error
    FALSE                        Success
unknown's avatar
unknown committed
314 315
*/

316
static bool read_ddl_log_file_entry(uint entry_no)
unknown's avatar
unknown committed
317 318
{
  bool error= FALSE;
319 320 321 322
  File file_id= global_ddl_log.file_id;
  char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
  uint io_size= global_ddl_log.io_size;
  DBUG_ENTER("read_ddl_log_file_entry");
unknown's avatar
unknown committed
323

324 325
  if (my_pread(file_id, file_entry_buf, io_size, io_size * entry_no,
               MYF(MY_WME)) != io_size)
unknown's avatar
unknown committed
326 327 328 329 330 331
    error= TRUE;
  DBUG_RETURN(error);
}


/*
332
  Write one entry from ddl log file
unknown's avatar
unknown committed
333
  SYNOPSIS
334
    write_ddl_log_file_entry()
unknown's avatar
unknown committed
335 336 337 338 339 340
    entry_no                     Entry number to read
  RETURN VALUES
    TRUE                         Error
    FALSE                        Success
*/

341
static bool write_ddl_log_file_entry(uint entry_no)
unknown's avatar
unknown committed
342 343
{
  bool error= FALSE;
344 345 346
  File file_id= global_ddl_log.file_id;
  char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
  DBUG_ENTER("write_ddl_log_file_entry");
unknown's avatar
unknown committed
347

348
  if (my_pwrite(file_id, file_entry_buf,
349
                IO_SIZE, IO_SIZE * entry_no, MYF(MY_WME)) != IO_SIZE)
unknown's avatar
unknown committed
350 351 352 353 354 355
    error= TRUE;
  DBUG_RETURN(error);
}


/*
356
  Write ddl log header
unknown's avatar
unknown committed
357
  SYNOPSIS
358
    write_ddl_log_header()
unknown's avatar
unknown committed
359 360 361 362 363
  RETURN VALUES
    TRUE                      Error
    FALSE                     Success
*/

364
static bool write_ddl_log_header()
unknown's avatar
unknown committed
365 366
{
  uint16 const_var;
367
  bool error= FALSE;
368
  DBUG_ENTER("write_ddl_log_header");
unknown's avatar
unknown committed
369

370 371
  int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NUM_ENTRY_POS],
            global_ddl_log.num_entries);
372
  const_var= FN_LEN;
373
  int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_LEN_POS],
374
            const_var);
375
  const_var= IO_SIZE;
376
  int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS],
377
            const_var);
378
  if (write_ddl_log_file_entry(0UL))
379 380 381 382 383
  {
    sql_print_error("Error writing ddl log header");
    DBUG_RETURN(TRUE);
  }
  VOID(sync_ddl_log());
384
  DBUG_RETURN(error);
unknown's avatar
unknown committed
385 386 387
}


388
/*
389
  Create ddl log file name
390
  SYNOPSIS
391
    create_ddl_log_file_name()
392 393 394 395 396
    file_name                   Filename setup
  RETURN VALUES
    NONE
*/

397
static inline void create_ddl_log_file_name(char *file_name)
398
{
399
  strxmov(file_name, mysql_data_home, "/", "ddl_log.log", NullS);
400 401 402
}


unknown's avatar
unknown committed
403
/*
404
  Read header of ddl log file
unknown's avatar
unknown committed
405
  SYNOPSIS
406
    read_ddl_log_header()
unknown's avatar
unknown committed
407
  RETURN VALUES
408 409
    > 0                  Last entry in ddl log
    0                    No entries in ddl log
unknown's avatar
unknown committed
410
  DESCRIPTION
411 412 413
    When we read the ddl log header we get information about maximum sizes
    of names in the ddl log and we also get information about the number
    of entries in the ddl log.
unknown's avatar
unknown committed
414 415
*/

416
static uint read_ddl_log_header()
unknown's avatar
unknown committed
417
{
418
  char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
419
  char file_name[FN_REFLEN];
420
  uint entry_no;
421
  bool successful_open= FALSE;
422
  DBUG_ENTER("read_ddl_log_header");
unknown's avatar
unknown committed
423

424 425
  bzero(file_entry_buf, sizeof(global_ddl_log.file_entry_buf));
  global_ddl_log.inited= FALSE;
426
  global_ddl_log.recovery_phase= TRUE;
427
  global_ddl_log.io_size= IO_SIZE;
428
  create_ddl_log_file_name(file_name);
429 430
  if ((global_ddl_log.file_id= my_open(file_name,
                                        O_RDWR | O_BINARY, MYF(MY_WME))) >= 0)
unknown's avatar
unknown committed
431
  {
432
    if (read_ddl_log_file_entry(0UL))
433
    {
434 435
      /* Write message into error log */
      sql_print_error("Failed to read ddl log file in recovery");
436
    }
437 438
    else
      successful_open= TRUE;
unknown's avatar
unknown committed
439
  }
440 441
  entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
  global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
442 443
  if (successful_open)
  {
444
    global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
445 446 447
    DBUG_ASSERT(global_ddl_log.io_size <=
                sizeof(global_ddl_log.file_entry_buf));
  }
448
  else
449 450 451
  {
    entry_no= 0;
  }
452 453 454 455
  global_ddl_log.first_free= NULL;
  global_ddl_log.first_used= NULL;
  global_ddl_log.num_entries= 0;
  VOID(pthread_mutex_init(&LOCK_gdl, MY_MUTEX_INIT_FAST));
unknown's avatar
unknown committed
456
  DBUG_RETURN(entry_no);
unknown's avatar
unknown committed
457 458 459 460
}


/*
461
  Read a ddl log entry
unknown's avatar
unknown committed
462
  SYNOPSIS
463
    read_ddl_log_entry()
unknown's avatar
unknown committed
464 465 466 467 468 469
    read_entry               Number of entry to read
    out:entry_info           Information from entry
  RETURN VALUES
    TRUE                     Error
    FALSE                    Success
  DESCRIPTION
470
    Read a specified entry in the ddl log
unknown's avatar
unknown committed
471 472
*/

473
bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
unknown's avatar
unknown committed
474
{
475
  char *file_entry_buf= (char*)&global_ddl_log.file_entry_buf;
476
  uint inx;
477
  DBUG_ENTER("read_ddl_log_entry");
478

479
  if (read_ddl_log_file_entry(read_entry))
480 481 482
  {
    DBUG_RETURN(TRUE);
  }
483 484 485 486 487 488 489 490 491 492 493 494
  ddl_log_entry->entry_pos= read_entry;
  ddl_log_entry->entry_type=
            (enum ddl_log_entry_code)file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
  ddl_log_entry->action_type=
            (enum ddl_log_action_code)file_entry_buf[DDL_LOG_ACTION_TYPE_POS];
  ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS];
  ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]);
  ddl_log_entry->name= &file_entry_buf[DDL_LOG_NAME_POS];
  inx= DDL_LOG_NAME_POS + global_ddl_log.name_len;
  ddl_log_entry->from_name= &file_entry_buf[inx];
  inx+= global_ddl_log.name_len;
  ddl_log_entry->handler_name= &file_entry_buf[inx];
unknown's avatar
unknown committed
495 496 497 498 499
  DBUG_RETURN(FALSE);
}


/*
500
  Initialise ddl log
unknown's avatar
unknown committed
501
  SYNOPSIS
502
    init_ddl_log()
unknown's avatar
unknown committed
503 504 505 506
  RETURN VALUES
    TRUE                     Error
    FALSE                    Success
  DESCRIPTION
507
    Write the header of the ddl log file and length of names. Also set
unknown's avatar
unknown committed
508 509 510
    number of entries to zero.
*/

511
static bool init_ddl_log()
unknown's avatar
unknown committed
512
{
unknown's avatar
unknown committed
513
  bool error= FALSE;
514
  char file_name[FN_REFLEN];
515
  DBUG_ENTER("init_ddl_log");
unknown's avatar
unknown committed
516

517 518 519 520 521
  if (global_ddl_log.inited)
  {
    DBUG_RETURN(FALSE);
  }
  global_ddl_log.io_size= IO_SIZE;
522
  create_ddl_log_file_name(file_name);
523 524 525 526
  if ((global_ddl_log.file_id= my_create(file_name,
                                         CREATE_MODE,
                                         O_RDWR | O_TRUNC | O_BINARY,
                                         MYF(MY_WME))) < 0)
527
  {
528
    /* Couldn't create ddl log file, this is serious error */
529 530
    sql_print_error("Failed to open ddl log file");
    DBUG_RETURN(TRUE);
531
  }
unknown's avatar
Fixes  
unknown committed
532
  global_ddl_log.inited= TRUE;
533
  if (write_ddl_log_header())
534
  {
unknown's avatar
Fixes  
unknown committed
535
    global_ddl_log.inited= FALSE;
536
    DBUG_RETURN(TRUE);
537
  }
538
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
539 540 541 542
}


/*
543
  Execute one action in a ddl log entry
unknown's avatar
unknown committed
544
  SYNOPSIS
545 546
    execute_ddl_log_action()
    ddl_log_entry              Information in action entry to execute
unknown's avatar
unknown committed
547
  RETURN VALUES
548 549
    TRUE                       Error
    FALSE                      Success
unknown's avatar
unknown committed
550 551
*/

552
static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
unknown's avatar
unknown committed
553
{
554 555 556
  bool frm_action= FALSE;
  LEX_STRING handler_name;
  handler *file;
557 558
  MEM_ROOT mem_root;
  bool error= TRUE;
559 560
  char path[FN_REFLEN];
  char from_path[FN_REFLEN];
561 562
  char *par_ext= (char*)".par";
  handlerton *hton;
563
  DBUG_ENTER("execute_ddl_log_action");
564

565
  if (ddl_log_entry->entry_type == DDL_IGNORE_LOG_ENTRY_CODE)
566 567 568
  {
    DBUG_RETURN(FALSE);
  }
569 570
  handler_name.str= (char*)ddl_log_entry->handler_name;
  handler_name.length= strlen(ddl_log_entry->handler_name);
571
  init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0); 
572
  if (ddl_log_entry->handler_name[0] == 0)
573 574 575
    frm_action= TRUE;
  else
  {
576
    TABLE_SHARE dummy;
577 578 579 580 581 582 583

    hton= ha_resolve_by_name(thd, &handler_name);
    if (!hton)
    {
      my_error(ER_ILLEGAL_HA, MYF(0), ddl_log_entry->handler_name);
      goto error;
    }
584 585
    bzero(&dummy, sizeof(TABLE_SHARE));
    file= get_new_handler(&dummy, &mem_root, hton);
586
    if (!file)
587 588
    {
      mem_alloc_error(sizeof(handler));
589
      goto error;
590
    }
591
  }
592
  switch (ddl_log_entry->action_type)
593
  {
594
    case DDL_LOG_REPLACE_ACTION:
595
    case DDL_LOG_DELETE_ACTION:
596
    {
597
      if (ddl_log_entry->phase == 0)
598 599 600
      {
        if (frm_action)
        {
601
          strxmov(path, ddl_log_entry->name, reg_ext, NullS);
602 603
          if (my_delete(path, MYF(MY_WME)))
            break;
604
#ifdef WITH_PARTITION_STORAGE_ENGINE
605
          strxmov(path, ddl_log_entry->name, par_ext, NullS);
606 607
          VOID(my_delete(path, MYF(MY_WME)));
#endif
608 609 610
        }
        else
        {
611
          if (file->delete_table(ddl_log_entry->name))
612 613
            break;
        }
614
        if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
615
        {
616
          VOID(sync_ddl_log());
617
          error= FALSE;
618
        }
619 620
        if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
          break;
621
      }
622 623 624 625 626 627
      DBUG_ASSERT(ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION);
      /*
        Fall through and perform the rename action of the replace
        action. We have already indicated the success of the delete
        action in the log entry by stepping up the phase.
      */
628
    }
629
    case DDL_LOG_RENAME_ACTION:
630
    {
631 632 633
      error= TRUE;
      if (frm_action)
      {
634 635
        strxmov(path, ddl_log_entry->name, reg_ext, NullS);
        strxmov(from_path, ddl_log_entry->from_name, reg_ext, NullS);
636
        if (my_rename(path, from_path, MYF(MY_WME)))
637
          break;
638
#ifdef WITH_PARTITION_STORAGE_ENGINE
639 640
        strxmov(path, ddl_log_entry->name, par_ext, NullS);
        strxmov(from_path, ddl_log_entry->from_name, par_ext, NullS);
641 642
        VOID(my_rename(path, from_path, MYF(MY_WME)));
#endif
643 644 645
      }
      else
      {
646 647
        if (file->rename_table(ddl_log_entry->name,
                               ddl_log_entry->from_name))
648
          break;
649 650 651 652 653
      }
      if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
      {
        VOID(sync_ddl_log());
        error= FALSE;
654 655
      }
      break;
656
    }
657 658 659 660 661 662 663 664
    default:
      DBUG_ASSERT(0);
      break;
  }
  delete file;
error:
  free_root(&mem_root, MYF(0)); 
  DBUG_RETURN(error);
unknown's avatar
unknown committed
665 666 667
}


unknown's avatar
unknown committed
668
/*
669
  Get a free entry in the ddl log
unknown's avatar
unknown committed
670
  SYNOPSIS
671 672
    get_free_ddl_log_entry()
    out:active_entry                A ddl log memory entry returned
unknown's avatar
unknown committed
673
  RETURN VALUES
674 675
    TRUE                       Error
    FALSE                      Success
unknown's avatar
unknown committed
676 677
*/

678 679
static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
                                   bool *write_header)
unknown's avatar
unknown committed
680
{
681 682 683
  DDL_LOG_MEMORY_ENTRY *used_entry;
  DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used;
  DBUG_ENTER("get_free_ddl_log_entry");
684

685
  if (global_ddl_log.first_free == NULL)
686
  {
687 688
    if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc(
                              sizeof(DDL_LOG_MEMORY_ENTRY), MYF(MY_WME))))
689
    {
690
      sql_print_error("Failed to allocate memory for ddl log free list");
691 692
      DBUG_RETURN(TRUE);
    }
693
    global_ddl_log.num_entries++;
694
    used_entry->entry_pos= global_ddl_log.num_entries;
695
    *write_header= TRUE;
696 697 698
  }
  else
  {
699 700
    used_entry= global_ddl_log.first_free;
    global_ddl_log.first_free= used_entry->next_log_entry;
701
    *write_header= FALSE;
702 703 704 705 706 707
  }
  /*
    Move from free list to used list
  */
  used_entry->next_log_entry= first_used;
  used_entry->prev_log_entry= NULL;
708
  global_ddl_log.first_used= used_entry;
709 710 711 712
  if (first_used)
    first_used->prev_log_entry= used_entry;

  *active_entry= used_entry;
713
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
714 715 716 717
}


/*
718
  External interface methods for the DDL log Module
719 720 721 722
  ---------------------------------------------------
*/

/*
unknown's avatar
unknown committed
723
  SYNOPSIS
724 725 726
    write_ddl_log_entry()
    ddl_log_entry         Information about log entry
    out:entry_written     Entry information written into   
727

unknown's avatar
unknown committed
728
  RETURN VALUES
729 730 731 732
    TRUE                      Error
    FALSE                     Success

  DESCRIPTION
733
    A careful write of the ddl log is performed to ensure that we can
734
    handle crashes occurring during CREATE and ALTER TABLE processing.
unknown's avatar
unknown committed
735 736
*/

737 738
bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
                         DDL_LOG_MEMORY_ENTRY **active_entry)
unknown's avatar
unknown committed
739
{
740
  bool error, write_header;
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
  DBUG_ENTER("write_ddl_log_entry");

  if (init_ddl_log())
  {
    DBUG_RETURN(TRUE);
  }
  global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_ENTRY_CODE;
  global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS]=
                                    ddl_log_entry->action_type;
  global_ddl_log.file_entry_buf[DDL_LOG_PHASE_POS]= 0;
  int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NEXT_ENTRY_POS],
            ddl_log_entry->next_entry);
  DBUG_ASSERT(strlen(ddl_log_entry->name) < FN_LEN);
  strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS],
          ddl_log_entry->name, FN_LEN);
  if (ddl_log_entry->action_type == DDL_LOG_RENAME_ACTION ||
      ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION)
unknown's avatar
unknown committed
758
  {
759 760 761
    DBUG_ASSERT(strlen(ddl_log_entry->from_name) < FN_LEN);
    strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN],
          ddl_log_entry->from_name, FN_LEN);
unknown's avatar
unknown committed
762
  }
763
  else
764 765 766 767 768
    global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
  DBUG_ASSERT(strlen(ddl_log_entry->handler_name) < FN_LEN);
  strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + (2*FN_LEN)],
          ddl_log_entry->handler_name, FN_LEN);
  if (get_free_ddl_log_entry(active_entry, &write_header))
769 770 771 772
  {
    DBUG_RETURN(TRUE);
  }
  error= FALSE;
773
  if (write_ddl_log_file_entry((*active_entry)->entry_pos))
774
  {
775
    error= TRUE;
776 777 778
    sql_print_error("Failed to write entry_no = %u",
                    (*active_entry)->entry_pos);
  }
779 780
  if (write_header && !error)
  {
781 782
    VOID(sync_ddl_log());
    if (write_ddl_log_header())
783 784
      error= TRUE;
  }
785
  if (error)
786
    release_ddl_log_memory_entry(*active_entry);
787
  DBUG_RETURN(error);
unknown's avatar
unknown committed
788 789 790 791
}


/*
792
  Write final entry in the ddl log
unknown's avatar
unknown committed
793
  SYNOPSIS
794
    write_execute_ddl_log_entry()
795 796 797 798
    first_entry                    First entry in linked list of entries
                                   to execute, if 0 = NULL it means that
                                   the entry is removed and the entries
                                   are put into the free list.
799 800 801
    complete                       Flag indicating we are simply writing
                                   info about that entry has been completed
    in:out:active_entry            Entry to execute, 0 = NULL if the entry
802 803 804
                                   is written first time and needs to be
                                   returned. In this case the entry written
                                   is returned in this parameter
unknown's avatar
unknown committed
805
  RETURN VALUES
806 807 808 809
    TRUE                           Error
    FALSE                          Success

  DESCRIPTION
810
    This is the last write in the ddl log. The previous log entries have
811
    already been written but not yet synched to disk.
812 813 814 815
    We write a couple of log entries that describes action to perform.
    This entries are set-up in a linked list, however only when a first
    execute entry is put as the first entry these will be executed.
    This routine writes this first 
816
*/ 
unknown's avatar
unknown committed
817

818 819 820
bool write_execute_ddl_log_entry(uint first_entry,
                                 bool complete,
                                 DDL_LOG_MEMORY_ENTRY **active_entry)
unknown's avatar
unknown committed
821
{
822
  bool write_header= FALSE;
823 824
  char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
  DBUG_ENTER("write_execute_ddl_log_entry");
825

826 827 828 829
  if (init_ddl_log())
  {
    DBUG_RETURN(TRUE);
  }
830 831
  if (!complete)
  {
832 833 834 835 836 837
    /*
      We haven't synched the log entries yet, we synch them now before
      writing the execute entry. If complete is true we haven't written
      any log entries before, we are only here to write the execute
      entry to indicate it is done.
    */
838 839
    VOID(sync_ddl_log());
    file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_EXECUTE_CODE;
840 841
  }
  else
842 843 844 845 846 847 848
    file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
  file_entry_buf[DDL_LOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
  file_entry_buf[DDL_LOG_PHASE_POS]= 0;
  int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry);
  file_entry_buf[DDL_LOG_NAME_POS]= 0;
  file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
  file_entry_buf[DDL_LOG_NAME_POS + 2*FN_LEN]= 0;
849
  if (!(*active_entry))
850
  {
851
    if (get_free_ddl_log_entry(active_entry, &write_header))
852 853 854
    {
      DBUG_RETURN(TRUE);
    }
855
  }
856
  if (write_ddl_log_file_entry((*active_entry)->entry_pos))
857
  {
858
    sql_print_error("Error writing execute entry in ddl log");
859
    release_ddl_log_memory_entry(*active_entry);
860 861
    DBUG_RETURN(TRUE);
  }
862
  VOID(sync_ddl_log());
863 864
  if (write_header)
  {
865
    if (write_ddl_log_header())
866
    {
867
      release_ddl_log_memory_entry(*active_entry);
868 869 870
      DBUG_RETURN(TRUE);
    }
  }
unknown's avatar
unknown committed
871 872 873 874
  DBUG_RETURN(FALSE);
}


875
/*
876
  For complex rename operations we need to deactivate individual entries.
877
  SYNOPSIS
878
    deactivate_ddl_log_entry()
879 880 881 882 883 884 885 886
    entry_no                      Entry position of record to change
  RETURN VALUES
    TRUE                         Error
    FALSE                        Success
  DESCRIPTION
    During replace operations where we start with an existing table called
    t1 and a replacement table called t1#temp or something else and where
    we want to delete t1 and rename t1#temp to t1 this is not possible to
887
    do in a safe manner unless the ddl log is informed of the phases in
888 889 890 891 892 893 894 895 896 897 898
    the change.

    Delete actions are 1-phase actions that can be ignored immediately after
    being executed.
    Rename actions from x to y is also a 1-phase action since there is no
    interaction with any other handlers named x and y.
    Replace action where drop y and x -> y happens needs to be a two-phase
    action. Thus the first phase will drop y and the second phase will
    rename x -> y.
*/

899
bool deactivate_ddl_log_entry(uint entry_no)
900
{
901 902
  char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
  DBUG_ENTER("deactivate_ddl_log_entry");
903

904
  if (!read_ddl_log_file_entry(entry_no))
905
  {
906
    if (file_entry_buf[DDL_LOG_ENTRY_TYPE_POS] == DDL_LOG_ENTRY_CODE)
907
    {
908 909 910 911 912 913
      if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_DELETE_ACTION ||
          file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_RENAME_ACTION ||
          (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION &&
           file_entry_buf[DDL_LOG_PHASE_POS] == 1))
        file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
      else if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION)
914
      {
915 916
        DBUG_ASSERT(file_entry_buf[DDL_LOG_PHASE_POS] == 0);
        file_entry_buf[DDL_LOG_PHASE_POS]= 1;
917 918 919 920 921
      }
      else
      {
        DBUG_ASSERT(0);
      }
922 923 924 925 926 927
      if (write_ddl_log_file_entry(entry_no))
      {
        sql_print_error("Error in deactivating log entry. Position = %u",
                        entry_no);
        DBUG_RETURN(TRUE);
      }
928 929
    }
  }
930 931 932 933 934 935
  else
  {
    sql_print_error("Failed in reading entry before deactivating it");
    DBUG_RETURN(TRUE);
  }
  DBUG_RETURN(FALSE);
936 937 938 939
}


/*
940
  Sync ddl log file
941
  SYNOPSIS
942
    sync_ddl_log()
943 944 945 946 947
  RETURN VALUES
    TRUE                      Error
    FALSE                     Success
*/

948
bool sync_ddl_log()
949 950
{
  bool error= FALSE;
951
  DBUG_ENTER("sync_ddl_log");
952

953 954
  if ((!global_ddl_log.recovery_phase) &&
      init_ddl_log())
955 956 957 958
  {
    DBUG_RETURN(TRUE);
  }
  if (my_sync(global_ddl_log.file_id, MYF(0)))
959 960
  {
    /* Write to error log */
961
    sql_print_error("Failed to sync ddl log");
962 963 964 965 966 967
    error= TRUE;
  }
  DBUG_RETURN(error);
}


968 969 970
/*
  Release a log memory entry
  SYNOPSIS
971
    release_ddl_log_memory_entry()
972 973 974 975 976
    log_memory_entry                Log memory entry to release
  RETURN VALUES
    NONE
*/

977
void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry)
978
{
979 980 981 982
  DDL_LOG_MEMORY_ENTRY *first_free= global_ddl_log.first_free;
  DDL_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
  DDL_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
  DBUG_ENTER("release_ddl_log_memory_entry");
983

984
  global_ddl_log.first_free= log_entry;
985 986 987 988 989
  log_entry->next_log_entry= first_free;

  if (prev_log_entry)
    prev_log_entry->next_log_entry= next_log_entry;
  else
990
    global_ddl_log.first_used= next_log_entry;
991 992
  if (next_log_entry)
    next_log_entry->prev_log_entry= prev_log_entry;
993
  DBUG_VOID_RETURN;
994 995 996
}


unknown's avatar
unknown committed
997
/*
998
  Execute one entry in the ddl log. Executing an entry means executing
unknown's avatar
unknown committed
999 1000
  a linked list of actions.
  SYNOPSIS
1001
    execute_ddl_log_entry()
unknown's avatar
unknown committed
1002 1003 1004 1005 1006 1007
    first_entry                Reference to first action in entry
  RETURN VALUES
    TRUE                       Error
    FALSE                      Success
*/

1008
bool execute_ddl_log_entry(THD *thd, uint first_entry)
unknown's avatar
unknown committed
1009
{
1010
  DDL_LOG_ENTRY ddl_log_entry;
unknown's avatar
unknown committed
1011
  uint read_entry= first_entry;
1012
  DBUG_ENTER("execute_ddl_log_entry");
unknown's avatar
unknown committed
1013

1014
  pthread_mutex_lock(&LOCK_gdl);
unknown's avatar
unknown committed
1015 1016
  do
  {
1017
    if (read_ddl_log_entry(read_entry, &ddl_log_entry))
1018 1019
    {
      /* Write to error log and continue with next log entry */
1020 1021
      sql_print_error("Failed to read entry = %u from ddl log",
                      read_entry);
1022 1023
      break;
    }
1024 1025
    DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE ||
                ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE);
1026

1027
    if (execute_ddl_log_action(thd, &ddl_log_entry))
unknown's avatar
unknown committed
1028
    {
1029
      /* Write to error log and continue with next log entry */
1030 1031
      sql_print_error("Failed to execute action for entry = %u from ddl log",
                      read_entry);
1032
      break;
unknown's avatar
unknown committed
1033
    }
1034
    read_entry= ddl_log_entry.next_entry;
unknown's avatar
unknown committed
1035
  } while (read_entry);
1036
  pthread_mutex_unlock(&LOCK_gdl);
unknown's avatar
unknown committed
1037 1038 1039
  DBUG_RETURN(FALSE);
}

1040

unknown's avatar
unknown committed
1041
/*
1042
  Execute the ddl log at recovery of MySQL Server
unknown's avatar
unknown committed
1043
  SYNOPSIS
1044
    execute_ddl_log_recovery()
unknown's avatar
unknown committed
1045 1046 1047 1048
  RETURN VALUES
    NONE
*/

1049
void execute_ddl_log_recovery()
unknown's avatar
unknown committed
1050
{
1051
  uint num_entries, i;
unknown's avatar
Fixes  
unknown committed
1052
  THD *thd;
1053
  DDL_LOG_ENTRY ddl_log_entry;
unknown's avatar
Fixes  
unknown committed
1054
  char file_name[FN_REFLEN];
1055
  DBUG_ENTER("execute_ddl_log_recovery");
unknown's avatar
unknown committed
1056

1057 1058 1059 1060 1061 1062 1063 1064
  /*
    To be able to run this from boot, we allocate a temporary THD
  */
  if (!(thd=new THD))
    DBUG_VOID_RETURN;
  thd->thread_stack= (char*) &thd;
  thd->store_globals();

1065 1066
  num_entries= read_ddl_log_header();
  for (i= 0; i < num_entries; i++)
unknown's avatar
unknown committed
1067
  {
1068
    if (read_ddl_log_entry(i, &ddl_log_entry))
1069
    {
1070 1071 1072
      sql_print_error("Failed to read entry no = %u from ddl log",
                       i);
      continue;
1073
    }
1074
    if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE)
unknown's avatar
unknown committed
1075
    {
unknown's avatar
Fixes  
unknown committed
1076
      if (execute_ddl_log_entry(thd, ddl_log_entry.next_entry))
unknown's avatar
unknown committed
1077
      {
1078 1079
        /* Real unpleasant scenario but we continue anyways.  */
        continue;
unknown's avatar
unknown committed
1080 1081 1082
      }
    }
  }
1083 1084 1085 1086 1087 1088
  create_ddl_log_file_name(file_name);
  VOID(my_delete(file_name, MYF(0)));
  global_ddl_log.recovery_phase= FALSE;
  delete thd;
  /* Remember that we don't have a THD */
  my_pthread_setspecific_ptr(THR_THD,  0);
1089
  DBUG_VOID_RETURN;
1090 1091 1092 1093
}


/*
1094
  Release all memory allocated to the ddl log
1095
  SYNOPSIS
1096
    release_ddl_log()
1097 1098 1099 1100
  RETURN VALUES
    NONE
*/

1101
void release_ddl_log()
1102
{
1103 1104 1105
  DDL_LOG_MEMORY_ENTRY *free_list= global_ddl_log.first_free;
  DDL_LOG_MEMORY_ENTRY *used_list= global_ddl_log.first_used;
  DBUG_ENTER("release_ddl_log");
1106

1107
  pthread_mutex_lock(&LOCK_gdl);
1108 1109
  while (used_list)
  {
1110
    DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
1111
    my_free((char*)used_list, MYF(0));
unknown's avatar
unknown committed
1112
    used_list= tmp;
1113 1114 1115
  }
  while (free_list)
  {
1116
    DDL_LOG_MEMORY_ENTRY *tmp= free_list->next_log_entry;
1117
    my_free((char*)free_list, MYF(0));
unknown's avatar
unknown committed
1118
    free_list= tmp;
1119
  }
1120
  VOID(my_close(global_ddl_log.file_id, MYF(0)));
1121
  pthread_mutex_unlock(&LOCK_gdl);
1122
  VOID(pthread_mutex_destroy(&LOCK_gdl));
1123
  DBUG_VOID_RETURN;
1124 1125 1126
}


unknown's avatar
unknown committed
1127 1128 1129
/*
---------------------------------------------------------------------------

1130
  END MODULE DDL log
unknown's avatar
unknown committed
1131 1132 1133 1134 1135
  --------------------

---------------------------------------------------------------------------
*/

unknown's avatar
unknown committed
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168

/*
  SYNOPSIS
    mysql_write_frm()
    lpt                    Struct carrying many parameters needed for this
                           method
    flags                  Flags as defined below
      WFRM_INITIAL_WRITE        If set we need to prepare table before
                                creating the frm file
      WFRM_CREATE_HANDLER_FILES If set we need to create the handler file as
                                part of the creation of the frm file
      WFRM_PACK_FRM             If set we should pack the frm file and delete
                                the frm file

  RETURN VALUES
    TRUE                   Error
    FALSE                  Success

  DESCRIPTION
    A support method that creates a new frm file and in this process it
    regenerates the partition data. It works fine also for non-partitioned
    tables since it only handles partitioned data if it exists.
*/

bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
{
  /*
    Prepare table to prepare for writing a new frm file where the
    partitions in add/drop state have temporarily changed their state
    We set tmp_table to avoid get errors on naming of primary key index.
  */
  int error= 0;
  char path[FN_REFLEN+1];
1169 1170
  char shadow_path[FN_REFLEN+1];
  char shadow_frm_name[FN_REFLEN+1];
unknown's avatar
unknown committed
1171 1172 1173
  char frm_name[FN_REFLEN+1];
  DBUG_ENTER("mysql_write_frm");

1174 1175 1176
  /*
    Build shadow frm file name
  */
1177
  build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
1178
                       lpt->table_name, "#");
1179
  strxmov(shadow_frm_name, shadow_path, reg_ext, NullS);
1180
  if (flags & WFRM_WRITE_SHADOW)
unknown's avatar
unknown committed
1181
  {
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193
    if (mysql_copy_create_list(lpt->create_list,
                               &lpt->new_create_list) ||
        mysql_copy_key_list(lpt->key_list,
                            &lpt->new_key_list) ||
        mysql_prepare_table(lpt->thd, lpt->create_info,
                            &lpt->new_create_list,
                            &lpt->new_key_list,
                            /*tmp_table*/ 1,
                            &lpt->db_options,
                            lpt->table->file,
                            &lpt->key_info_buffer,
                            &lpt->key_count,
1194
                            /*select_field_count*/ 0))
unknown's avatar
unknown committed
1195 1196 1197 1198 1199
    {
      DBUG_RETURN(TRUE);
    }
#ifdef WITH_PARTITION_STORAGE_ENGINE
    {
1200 1201 1202 1203 1204
      partition_info *part_info= lpt->table->part_info;
      char *part_syntax_buf;
      uint syntax_len;

      if (part_info)
unknown's avatar
unknown committed
1205
      {
1206 1207 1208 1209 1210 1211 1212 1213
        if (!(part_syntax_buf= generate_partition_syntax(part_info,
                                                         &syntax_len,
                                                         TRUE, FALSE)))
        {
          DBUG_RETURN(TRUE);
        }
        part_info->part_info_string= part_syntax_buf;
        part_info->part_info_len= syntax_len;
unknown's avatar
unknown committed
1214 1215 1216
      }
    }
#endif
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
    /* Write shadow frm file */
    lpt->create_info->table_options= lpt->db_options;
    if ((mysql_create_frm(lpt->thd, shadow_frm_name, lpt->db,
                          lpt->table_name, lpt->create_info,
                          lpt->new_create_list, lpt->key_count,
                          lpt->key_info_buffer, lpt->table->file)) ||
         lpt->table->file->create_handler_files(shadow_path, NULL, FALSE))
    {
      my_delete(shadow_frm_name, MYF(0));
      error= 1;
      goto end;
    }
unknown's avatar
unknown committed
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
  }
  if (flags & WFRM_PACK_FRM)
  {
    /*
      We need to pack the frm file and after packing it we delete the
      frm file to ensure it doesn't get used. This is only used for
      handlers that have the main version of the frm file stored in the
      handler.
    */
    const void *data= 0;
    uint length= 0;
1240
    if (readfrm(shadow_path, &data, &length) ||
unknown's avatar
unknown committed
1241 1242 1243 1244 1245 1246 1247 1248
        packfrm(data, length, &lpt->pack_frm_data, &lpt->pack_frm_len))
    {
      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
      my_free((char*)lpt->pack_frm_data, MYF(MY_ALLOW_ZERO_PTR));
      mem_alloc_error(length);
      error= 1;
      goto end;
    }
1249
    error= my_delete(shadow_frm_name, MYF(MY_WME));
unknown's avatar
unknown committed
1250
  }
1251 1252
  if (flags & WFRM_INSTALL_SHADOW)
  {
1253 1254 1255
#ifdef WITH_PARTITION_STORAGE_ENGINE
    partition_info *part_info= lpt->part_info;
#endif
1256 1257 1258 1259 1260 1261 1262 1263 1264
    /*
      Build frm file name
    */
    build_table_filename(path, sizeof(path), lpt->db,
                         lpt->table_name, "");
    strxmov(frm_name, path, reg_ext, NullS);
    /*
      When we are changing to use new frm file we need to ensure that we
      don't collide with another thread in process to open the frm file.
1265 1266 1267 1268 1269 1270
      We start by deleting the .frm file and possible .par file. Then we
      write to the DDL log that we have completed the delete phase by
      increasing the phase of the log entry. Next step is to rename the
      new .frm file and the new .par file to the real name. After
      completing this we write a new phase to the log entry that will
      deactivate it.
1271 1272 1273
    */
    VOID(pthread_mutex_lock(&LOCK_open));
    if (my_delete(frm_name, MYF(MY_WME)) ||
1274
#ifdef WITH_PARTITION_STORAGE_ENGINE
1275 1276
        lpt->table->file->create_handler_files(path, shadow_path,
                                               CHF_DELETE_FLAG) ||
1277 1278
        deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos) ||
        (sync_ddl_log(), FALSE) ||
1279
#endif
1280
#ifdef WITH_PARTITION_STORAGE_ENGINE
1281
        my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
1282 1283
        lpt->table->file->create_handler_files(path, shadow_path,
                                               CHF_RENAME_FLAG))
1284 1285 1286
#else
        my_rename(shadow_frm_name, frm_name, MYF(MY_WME)))
#endif
1287 1288 1289 1290
    {
      error= 1;
    }
    VOID(pthread_mutex_unlock(&LOCK_open));
1291
#ifdef WITH_PARTITION_STORAGE_ENGINE
1292
    deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos);
1293
    part_info->frm_log_entry= NULL;
1294
    VOID(sync_ddl_log());
1295
#endif
unknown's avatar
unknown committed
1296
  }
1297

unknown's avatar
unknown committed
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330
end:
  DBUG_RETURN(error);
}


/*
  SYNOPSIS
    write_bin_log()
    thd                           Thread object
    clear_error                   is clear_error to be called
    query                         Query to log
    query_length                  Length of query

  RETURN VALUES
    NONE

  DESCRIPTION
    Write the binlog if open, routine used in multiple places in this
    file
*/

void write_bin_log(THD *thd, bool clear_error,
                   char const *query, ulong query_length)
{
  if (mysql_bin_log.is_open())
  {
    if (clear_error)
      thd->clear_error();
    thd->binlog_query(THD::STMT_QUERY_TYPE,
                      query, query_length, FALSE, FALSE);
  }
}

1331

unknown's avatar
unknown committed
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
/*
 delete (drop) tables.

  SYNOPSIS
   mysql_rm_table()
   thd			Thread handle
   tables		List of tables to delete
   if_exists		If 1, don't give error if one table doesn't exists

  NOTES
    Will delete all tables that can be deleted and give a compact error
    messages for tables that could not be deleted.
    If a table is in use, we will wait for all users to free the table
    before dropping it

    Wait if global_read_lock (FLUSH TABLES WITH READ LOCK) is set.

  RETURN
unknown's avatar
unknown committed
1350 1351
    FALSE OK.  In this case ok packet is sent to user
    TRUE  Error
unknown's avatar
unknown committed
1352 1353

*/
unknown's avatar
unknown committed
1354

unknown's avatar
unknown committed
1355 1356
bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
                    my_bool drop_temporary)
unknown's avatar
unknown committed
1357
{
1358
  bool error= FALSE, need_start_waiters= FALSE;
unknown's avatar
unknown committed
1359 1360 1361 1362
  DBUG_ENTER("mysql_rm_table");

  /* mark for close and remove all cached entries */

1363
  if (!drop_temporary)
unknown's avatar
unknown committed
1364
  {
1365
    if ((error= wait_if_global_read_lock(thd, 0, 1)))
1366
    {
1367
      my_error(ER_TABLE_NOT_LOCKED_FOR_WRITE, MYF(0), tables->table_name);
1368
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
1369
    }
1370 1371
    else
      need_start_waiters= TRUE;
unknown's avatar
unknown committed
1372
  }
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382

  /*
    Acquire LOCK_open after wait_if_global_read_lock(). If we would hold
    LOCK_open during wait_if_global_read_lock(), other threads could not
    close their tables. This would make a pretty deadlock.
  */
  thd->mysys_var->current_mutex= &LOCK_open;
  thd->mysys_var->current_cond= &COND_refresh;
  VOID(pthread_mutex_lock(&LOCK_open));

unknown's avatar
VIEW  
unknown committed
1383
  error= mysql_rm_table_part2(thd, tables, if_exists, drop_temporary, 0, 0);
1384 1385 1386 1387 1388 1389 1390 1391

  pthread_mutex_unlock(&LOCK_open);

  pthread_mutex_lock(&thd->mysys_var->mutex);
  thd->mysys_var->current_mutex= 0;
  thd->mysys_var->current_cond= 0;
  pthread_mutex_unlock(&thd->mysys_var->mutex);

1392 1393 1394
  if (need_start_waiters)
    start_waiting_global_read_lock(thd);

1395
  if (error)
unknown's avatar
unknown committed
1396
    DBUG_RETURN(TRUE);
1397
  send_ok(thd);
unknown's avatar
unknown committed
1398
  DBUG_RETURN(FALSE);
1399 1400
}

unknown's avatar
unknown committed
1401 1402 1403 1404 1405

/*
 delete (drop) tables.

  SYNOPSIS
1406 1407 1408 1409 1410
    mysql_rm_table_part2_with_lock()
    thd			Thread handle
    tables		List of tables to delete
    if_exists		If 1, don't give error if one table doesn't exists
    dont_log_query	Don't write query to log files. This will also not
1411
                        generate warnings if the handler files doesn't exists
unknown's avatar
unknown committed
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421

 NOTES
   Works like documented in mysql_rm_table(), but don't check
   global_read_lock and don't send_ok packet to server.

 RETURN
  0	ok
  1	error
*/

unknown's avatar
unknown committed
1422 1423
int mysql_rm_table_part2_with_lock(THD *thd,
				   TABLE_LIST *tables, bool if_exists,
1424
				   bool drop_temporary, bool dont_log_query)
unknown's avatar
unknown committed
1425 1426 1427 1428 1429 1430
{
  int error;
  thd->mysys_var->current_mutex= &LOCK_open;
  thd->mysys_var->current_cond= &COND_refresh;
  VOID(pthread_mutex_lock(&LOCK_open));

unknown's avatar
VIEW  
unknown committed
1431 1432
  error= mysql_rm_table_part2(thd, tables, if_exists, drop_temporary, 1,
			      dont_log_query);
unknown's avatar
unknown committed
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442

  pthread_mutex_unlock(&LOCK_open);

  pthread_mutex_lock(&thd->mysys_var->mutex);
  thd->mysys_var->current_mutex= 0;
  thd->mysys_var->current_cond= 0;
  pthread_mutex_unlock(&thd->mysys_var->mutex);
  return error;
}

unknown's avatar
unknown committed
1443

1444
/*
1445 1446 1447 1448 1449 1450 1451 1452 1453
  Execute the drop of a normal or temporary table

  SYNOPSIS
    mysql_rm_table_part2()
    thd			Thread handler
    tables		Tables to drop
    if_exists		If set, don't give an error if table doesn't exists.
			In this case we give an warning of level 'NOTE'
    drop_temporary	Only drop temporary tables
unknown's avatar
VIEW  
unknown committed
1454
    drop_view		Allow to delete VIEW .frm
1455 1456
    dont_log_query	Don't write query to log files. This will also not
			generate warnings if the handler files doesn't exists  
1457

1458 1459 1460 1461 1462 1463 1464 1465 1466
  TODO:
    When logging to the binary log, we should log
    tmp_tables and transactional tables as separate statements if we
    are in a transaction;  This is needed to get these tables into the
    cached binary log that is only written on COMMIT.

   The current code only writes DROP statements that only uses temporary
   tables to the cache binary log.  This should be ok on most cases, but
   not all.
1467 1468 1469 1470 1471

 RETURN
   0	ok
   1	Error
   -1	Thread was killed
1472
*/
1473 1474

int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists,
unknown's avatar
VIEW  
unknown committed
1475 1476
			 bool drop_temporary, bool drop_view,
			 bool dont_log_query)
1477 1478
{
  TABLE_LIST *table;
1479 1480
  char path[FN_REFLEN], *alias;
  uint path_length;
1481 1482
  String wrong_tables;
  int error;
1483
  int non_temp_tables_count= 0;
unknown's avatar
unknown committed
1484
  bool some_tables_deleted=0, tmp_table_deleted=0, foreign_key_error=0;
1485
  String built_query;
1486 1487
  DBUG_ENTER("mysql_rm_table_part2");

1488 1489 1490 1491
  LINT_INIT(alias);
  LINT_INIT(path_length);
  safe_mutex_assert_owner(&LOCK_open);

1492
  if (thd->current_stmt_binlog_row_based && !dont_log_query)
1493 1494 1495 1496 1497 1498 1499
  {
    built_query.set_charset(system_charset_info);
    if (if_exists)
      built_query.append("DROP TABLE IF EXISTS ");
    else
      built_query.append("DROP TABLE ");
  }
unknown's avatar
unknown committed
1500 1501 1502 1503 1504 1505 1506 1507 1508
  /*
    If we have the table in the definition cache, we don't have to check the
    .frm file to find if the table is a normal table (not view) and what
    engine to use.
  */

  for (table= tables; table; table= table->next_local)
  {
    TABLE_SHARE *share;
unknown's avatar
unknown committed
1509
    table->db_type= NULL;
unknown's avatar
unknown committed
1510 1511 1512 1513
    if ((share= get_cached_table_share(table->db, table->table_name)))
      table->db_type= share->db_type;
  }

1514
  if (lock_table_names(thd, tables))
1515
    DBUG_RETURN(1);
1516

1517 1518 1519
  /* Don't give warnings for not found errors, as we already generate notes */
  thd->no_warnings_for_error= 1;

unknown's avatar
VIEW  
unknown committed
1520
  for (table= tables; table; table= table->next_local)
unknown's avatar
unknown committed
1521
  {
1522
    char *db=table->db;
unknown's avatar
unknown committed
1523 1524
    handlerton *table_type;
    enum legacy_db_type frm_db_type;
1525

1526
    mysql_ha_flush(thd, table, MYSQL_HA_CLOSE_FINAL, TRUE);
unknown's avatar
unknown committed
1527
    if (!close_temporary_table(thd, table))
unknown's avatar
unknown committed
1528
    {
1529
      tmp_table_deleted=1;
unknown's avatar
unknown committed
1530
      continue;					// removed temporary table
unknown's avatar
unknown committed
1531
    }
unknown's avatar
unknown committed
1532

1533 1534 1535 1536 1537 1538
    /*
      If row-based replication is used and the table is not a
      temporary table, we add the table name to the drop statement
      being built.  The string always end in a comma and the comma
      will be chopped off before being written to the binary log.
      */
1539
    if (thd->current_stmt_binlog_row_based && !dont_log_query)
1540
    {
1541
      non_temp_tables_count++;
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
      /*
        Don't write the database name if it is the current one (or if
        thd->db is NULL).
      */
      built_query.append("`");
      if (thd->db == NULL || strcmp(db,thd->db) != 0)
      {
        built_query.append(db);
        built_query.append("`.`");
      }

      built_query.append(table->table_name);
      built_query.append("`,");
    }

unknown's avatar
unknown committed
1557
    error=0;
unknown's avatar
unknown committed
1558
    table_type= table->db_type;
1559
    if (!drop_temporary)
unknown's avatar
unknown committed
1560
    {
1561
      TABLE *locked_table;
unknown's avatar
unknown committed
1562 1563 1564 1565
      abort_locked_tables(thd, db, table->table_name);
      remove_table_from_cache(thd, db, table->table_name,
	                      RTFC_WAIT_OTHER_THREAD_FLAG |
			      RTFC_CHECK_KILLED_FLAG);
1566 1567 1568 1569 1570 1571 1572
      /*
        If the table was used in lock tables, remember it so that
        unlock_table_names can free it
      */
      if ((locked_table= drop_locked_tables(thd, db, table->table_name)))
        table->table= locked_table;

1573
      if (thd->killed)
1574 1575
      {
        thd->no_warnings_for_error= 0;
1576
	DBUG_RETURN(-1);
1577
      }
1578
      alias= (lower_case_table_names == 2) ? table->alias : table->table_name;
unknown's avatar
unknown committed
1579
      /* remove .frm file and engine files */
1580 1581
      path_length= build_table_filename(path, sizeof(path),
                                        db, alias, reg_ext);
unknown's avatar
unknown committed
1582
    }
1583 1584
    if (drop_temporary ||
        (table_type == NULL &&        
unknown's avatar
unknown committed
1585 1586 1587
         (access(path, F_OK) &&
          ha_create_table_from_engine(thd, db, alias)) ||
         (!drop_view &&
unknown's avatar
unknown committed
1588
          mysql_frm_type(thd, path, &frm_db_type) != FRMTYPE_TABLE)))
unknown's avatar
unknown committed
1589
    {
1590
      // Table was not found on disk and table can't be created from engine
1591
      if (if_exists)
1592 1593
	push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
			    ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
1594
			    table->table_name);
1595
      else
1596
        error= 1;
unknown's avatar
unknown committed
1597 1598 1599
    }
    else
    {
unknown's avatar
unknown committed
1600
      char *end;
unknown's avatar
unknown committed
1601 1602 1603 1604 1605
      if (table_type == NULL)
      {
	mysql_frm_type(thd, path, &frm_db_type);
        table_type= ha_resolve_by_legacy_type(thd, frm_db_type);
      }
1606 1607
      // Remove extension for delete
      *(end= path + path_length - reg_ext_length)= '\0';
unknown's avatar
unknown committed
1608
      error= ha_delete_table(thd, table_type, path, db, table->table_name,
1609
                             !dont_log_query);
1610
      if ((error == ENOENT || error == HA_ERR_NO_SUCH_TABLE) && 
unknown's avatar
unknown committed
1611
	  (if_exists || table_type == NULL))
1612
	error= 0;
unknown's avatar
unknown committed
1613
      if (error == HA_ERR_ROW_IS_REFERENCED)
unknown's avatar
unknown committed
1614 1615
      {
	/* the table is referenced by a foreign key constraint */
1616
	foreign_key_error=1;
unknown's avatar
unknown committed
1617
      }
1618
      if (!error || error == ENOENT || error == HA_ERR_NO_SUCH_TABLE)
unknown's avatar
unknown committed
1619
      {
1620
        int new_error;
unknown's avatar
unknown committed
1621 1622
	/* Delete the table definition file */
	strmov(end,reg_ext);
1623
	if (!(new_error=my_delete(path,MYF(MY_WME))))
1624
        {
unknown's avatar
unknown committed
1625
	  some_tables_deleted=1;
1626 1627
          new_error= Table_triggers_list::drop_all_triggers(thd, db,
                                                            table->table_name);
1628
        }
1629
        error|= new_error;
1630
      }
unknown's avatar
unknown committed
1631 1632 1633 1634 1635
    }
    if (error)
    {
      if (wrong_tables.length())
	wrong_tables.append(',');
1636
      wrong_tables.append(String(table->table_name,system_charset_info));
unknown's avatar
unknown committed
1637 1638
    }
  }
unknown's avatar
unknown committed
1639
  thd->tmp_table_used= tmp_table_deleted;
1640 1641 1642 1643
  error= 0;
  if (wrong_tables.length())
  {
    if (!foreign_key_error)
1644
      my_printf_error(ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), MYF(0),
1645
                      wrong_tables.c_ptr());
1646
    else
unknown's avatar
unknown committed
1647
      my_message(ER_ROW_IS_REFERENCED, ER(ER_ROW_IS_REFERENCED), MYF(0));
1648 1649 1650 1651
    error= 1;
  }

  if (some_tables_deleted || tmp_table_deleted || !error)
unknown's avatar
unknown committed
1652
  {
unknown's avatar
unknown committed
1653
    query_cache_invalidate3(thd, tables, 0);
1654
    if (!dont_log_query)
1655
    {
1656
      if (!thd->current_stmt_binlog_row_based ||
1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
          non_temp_tables_count > 0 && !tmp_table_deleted)
      {
        /*
          In this case, we are either using statement-based
          replication or using row-based replication but have only
          deleted one or more non-temporary tables (and no temporary
          tables).  In this case, we can write the original query into
          the binary log.
         */
        write_bin_log(thd, !error, thd->query, thd->query_length);
      }
1668
      else if (thd->current_stmt_binlog_row_based &&
1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
               non_temp_tables_count > 0 &&
               tmp_table_deleted)
      {
        /*
          In this case we have deleted both temporary and
          non-temporary tables, so:
          - since we have deleted a non-temporary table we have to
            binlog the statement, but
          - since we have deleted a temporary table we cannot binlog
            the statement (since the table has not been created on the
            slave, this might cause the slave to stop).

          Instead, we write a built statement, only containing the
          non-temporary tables, to the binary log
        */
        built_query.chop();                  // Chop of the last comma
        built_query.append(" /* generated by server */");
        write_bin_log(thd, !error, built_query.ptr(), built_query.length());
      }
      /*
        The remaining cases are:
        - no tables where deleted and
        - only temporary tables where deleted and row-based
          replication is used.
        In both these cases, nothing should be written to the binary
        log.
      */
1696
    }
unknown's avatar
unknown committed
1697
  }
unknown's avatar
unknown committed
1698

unknown's avatar
unknown committed
1699
  unlock_table_names(thd, tables, (TABLE_LIST*) 0);
1700
  thd->no_warnings_for_error= 0;
1701
  DBUG_RETURN(error);
unknown's avatar
unknown committed
1702 1703 1704
}


unknown's avatar
unknown committed
1705
bool quick_rm_table(handlerton *base,const char *db,
unknown's avatar
unknown committed
1706 1707 1708
		   const char *table_name)
{
  char path[FN_REFLEN];
unknown's avatar
unknown committed
1709 1710 1711
  bool error= 0;
  DBUG_ENTER("quick_rm_table");

1712 1713
  uint path_length= build_table_filename(path, sizeof(path),
                                         db, table_name, reg_ext);
unknown's avatar
unknown committed
1714
  if (my_delete(path,MYF(0)))
unknown's avatar
unknown committed
1715
    error= 1; /* purecov: inspected */
1716
  path[path_length - reg_ext_length]= '\0'; // Remove reg_ext
unknown's avatar
unknown committed
1717 1718
  DBUG_RETURN(ha_delete_table(current_thd, base, path, db, table_name, 0) ||
              error);
unknown's avatar
unknown committed
1719 1720
}

1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738
/*
  Sort keys in the following order:
  - PRIMARY KEY
  - UNIQUE keyws where all column are NOT NULL
  - Other UNIQUE keys
  - Normal keys
  - Fulltext keys

  This will make checking for duplicated keys faster and ensure that
  PRIMARY keys are prioritized.
*/

static int sort_keys(KEY *a, KEY *b)
{
  if (a->flags & HA_NOSAME)
  {
    if (!(b->flags & HA_NOSAME))
      return -1;
1739
    if ((a->flags ^ b->flags) & (HA_NULL_PART_KEY | HA_END_SPACE_KEY))
1740 1741
    {
      /* Sort NOT NULL keys before other keys */
1742
      return (a->flags & (HA_NULL_PART_KEY | HA_END_SPACE_KEY)) ? 1 : -1;
1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
    }
    if (a->name == primary_key_name)
      return -1;
    if (b->name == primary_key_name)
      return 1;
  }
  else if (b->flags & HA_NOSAME)
    return 1;					// Prefer b

  if ((a->flags ^ b->flags) & HA_FULLTEXT)
  {
    return (a->flags & HA_FULLTEXT) ? 1 : -1;
  }
unknown's avatar
unknown committed
1756
  /*
1757
    Prefer original key order.	usable_key_parts contains here
unknown's avatar
unknown committed
1758 1759 1760 1761 1762
    the original key position.
  */
  return ((a->usable_key_parts < b->usable_key_parts) ? -1 :
	  (a->usable_key_parts > b->usable_key_parts) ? 1 :
	  0);
1763 1764
}

1765 1766
/*
  Check TYPELIB (set or enum) for duplicates
1767

1768 1769 1770
  SYNOPSIS
    check_duplicates_in_interval()
    set_or_name   "SET" or "ENUM" string for warning message
1771 1772
    name	  name of the checked column
    typelib	  list of values for the column
1773 1774

  DESCRIPTION
1775
    This function prints an warning for each value in list
1776 1777 1778 1779 1780 1781 1782
    which has some duplicates on its right

  RETURN VALUES
    void
*/

void check_duplicates_in_interval(const char *set_or_name,
1783 1784
                                  const char *name, TYPELIB *typelib,
                                  CHARSET_INFO *cs)
1785
{
1786
  TYPELIB tmp= *typelib;
1787
  const char **cur_value= typelib->type_names;
1788 1789 1790
  unsigned int *cur_length= typelib->type_lengths;
  
  for ( ; tmp.count > 1; cur_value++, cur_length++)
1791
  {
1792 1793 1794 1795
    tmp.type_names++;
    tmp.type_lengths++;
    tmp.count--;
    if (find_type2(&tmp, (const char*)*cur_value, *cur_length, cs))
1796
    {
unknown's avatar
unknown committed
1797
      push_warning_printf(current_thd,MYSQL_ERROR::WARN_LEVEL_NOTE,
1798 1799 1800 1801 1802 1803
			  ER_DUPLICATED_VALUE_IN_TYPE,
			  ER(ER_DUPLICATED_VALUE_IN_TYPE),
			  name,*cur_value,set_or_name);
    }
  }
}
1804

1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839

/*
  Check TYPELIB (set or enum) max and total lengths

  SYNOPSIS
    calculate_interval_lengths()
    cs            charset+collation pair of the interval
    typelib       list of values for the column
    max_length    length of the longest item
    tot_length    sum of the item lengths

  DESCRIPTION
    After this function call:
    - ENUM uses max_length
    - SET uses tot_length.

  RETURN VALUES
    void
*/
void calculate_interval_lengths(CHARSET_INFO *cs, TYPELIB *interval,
                                uint32 *max_length, uint32 *tot_length)
{
  const char **pos;
  uint *len;
  *max_length= *tot_length= 0;
  for (pos= interval->type_names, len= interval->type_lengths;
       *pos ; pos++, len++)
  {
    uint length= cs->cset->numchars(cs, *pos, *pos + *len);
    *tot_length+= length;
    set_if_bigger(*max_length, (uint32)length);
  }
}


unknown's avatar
unknown committed
1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859
/*
  Prepare a create_table instance for packing

  SYNOPSIS
    prepare_create_field()
    sql_field     field to prepare for packing
    blob_columns  count for BLOBs
    timestamps    count for timestamps
    table_flags   table flags

  DESCRIPTION
    This function prepares a create_field instance.
    Fields such as pack_flag are valid after this call.

  RETURN VALUES
   0	ok
   1	Error
*/

int prepare_create_field(create_field *sql_field, 
unknown's avatar
unknown committed
1860 1861
			 uint *blob_columns, 
			 int *timestamps, int *timestamps_with_niladic,
unknown's avatar
unknown committed
1862 1863 1864
			 uint table_flags)
{
  DBUG_ENTER("prepare_field");
unknown's avatar
unknown committed
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883

  /*
    This code came from mysql_prepare_table.
    Indent preserved to make patching easier
  */
  DBUG_ASSERT(sql_field->charset);

  switch (sql_field->sql_type) {
  case FIELD_TYPE_BLOB:
  case FIELD_TYPE_MEDIUM_BLOB:
  case FIELD_TYPE_TINY_BLOB:
  case FIELD_TYPE_LONG_BLOB:
    sql_field->pack_flag=FIELDFLAG_BLOB |
      pack_length_to_packflag(sql_field->pack_length -
                              portable_sizeof_char_ptr);
    if (sql_field->charset->state & MY_CS_BINSORT)
      sql_field->pack_flag|=FIELDFLAG_BINARY;
    sql_field->length=8;			// Unireg field length
    sql_field->unireg_check=Field::BLOB_FIELD;
1884
    (*blob_columns)++;
unknown's avatar
unknown committed
1885 1886
    break;
  case FIELD_TYPE_GEOMETRY:
unknown's avatar
unknown committed
1887
#ifdef HAVE_SPATIAL
unknown's avatar
unknown committed
1888 1889 1890 1891
    if (!(table_flags & HA_CAN_GEOMETRY))
    {
      my_printf_error(ER_CHECK_NOT_IMPLEMENTED, ER(ER_CHECK_NOT_IMPLEMENTED),
                      MYF(0), "GEOMETRY");
unknown's avatar
unknown committed
1892
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1893 1894 1895 1896 1897 1898 1899 1900
    }
    sql_field->pack_flag=FIELDFLAG_GEOM |
      pack_length_to_packflag(sql_field->pack_length -
                              portable_sizeof_char_ptr);
    if (sql_field->charset->state & MY_CS_BINSORT)
      sql_field->pack_flag|=FIELDFLAG_BINARY;
    sql_field->length=8;			// Unireg field length
    sql_field->unireg_check=Field::BLOB_FIELD;
1901
    (*blob_columns)++;
unknown's avatar
unknown committed
1902 1903 1904 1905 1906
    break;
#else
    my_printf_error(ER_FEATURE_DISABLED,ER(ER_FEATURE_DISABLED), MYF(0),
                    sym_group_geom.name, sym_group_geom.needed_define);
    DBUG_RETURN(1);
unknown's avatar
unknown committed
1907
#endif /*HAVE_SPATIAL*/
unknown's avatar
unknown committed
1908
  case MYSQL_TYPE_VARCHAR:
unknown's avatar
unknown committed
1909
#ifndef QQ_ALL_HANDLERS_SUPPORT_VARCHAR
unknown's avatar
unknown committed
1910 1911 1912 1913 1914 1915 1916 1917
    if (table_flags & HA_NO_VARCHAR)
    {
      /* convert VARCHAR to CHAR because handler is not yet up to date */
      sql_field->sql_type=    MYSQL_TYPE_VAR_STRING;
      sql_field->pack_length= calc_pack_length(sql_field->sql_type,
                                               (uint) sql_field->length);
      if ((sql_field->length / sql_field->charset->mbmaxlen) >
          MAX_FIELD_CHARLENGTH)
unknown's avatar
unknown committed
1918
      {
unknown's avatar
unknown committed
1919 1920
        my_printf_error(ER_TOO_BIG_FIELDLENGTH, ER(ER_TOO_BIG_FIELDLENGTH),
                        MYF(0), sql_field->field_name, MAX_FIELD_CHARLENGTH);
unknown's avatar
unknown committed
1921 1922
        DBUG_RETURN(1);
      }
unknown's avatar
unknown committed
1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
    }
#endif
    /* fall through */
  case FIELD_TYPE_STRING:
    sql_field->pack_flag=0;
    if (sql_field->charset->state & MY_CS_BINSORT)
      sql_field->pack_flag|=FIELDFLAG_BINARY;
    break;
  case FIELD_TYPE_ENUM:
    sql_field->pack_flag=pack_length_to_packflag(sql_field->pack_length) |
      FIELDFLAG_INTERVAL;
    if (sql_field->charset->state & MY_CS_BINSORT)
      sql_field->pack_flag|=FIELDFLAG_BINARY;
    sql_field->unireg_check=Field::INTERVAL_FIELD;
    check_duplicates_in_interval("ENUM",sql_field->field_name,
                                 sql_field->interval,
                                 sql_field->charset);
    break;
  case FIELD_TYPE_SET:
    sql_field->pack_flag=pack_length_to_packflag(sql_field->pack_length) |
      FIELDFLAG_BITFIELD;
    if (sql_field->charset->state & MY_CS_BINSORT)
      sql_field->pack_flag|=FIELDFLAG_BINARY;
    sql_field->unireg_check=Field::BIT_FIELD;
    check_duplicates_in_interval("SET",sql_field->field_name,
                                 sql_field->interval,
                                 sql_field->charset);
    break;
  case FIELD_TYPE_DATE:			// Rest of string types
  case FIELD_TYPE_NEWDATE:
  case FIELD_TYPE_TIME:
  case FIELD_TYPE_DATETIME:
  case FIELD_TYPE_NULL:
    sql_field->pack_flag=f_settype((uint) sql_field->sql_type);
    break;
  case FIELD_TYPE_BIT:
unknown's avatar
unknown committed
1959 1960 1961
    /* 
      We have sql_field->pack_flag already set here, see mysql_prepare_table().
    */
unknown's avatar
unknown committed
1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974
    break;
  case FIELD_TYPE_NEWDECIMAL:
    sql_field->pack_flag=(FIELDFLAG_NUMBER |
                          (sql_field->flags & UNSIGNED_FLAG ? 0 :
                           FIELDFLAG_DECIMAL) |
                          (sql_field->flags & ZEROFILL_FLAG ?
                           FIELDFLAG_ZEROFILL : 0) |
                          (sql_field->decimals << FIELDFLAG_DEC_SHIFT));
    break;
  case FIELD_TYPE_TIMESTAMP:
    /* We should replace old TIMESTAMP fields with their newer analogs */
    if (sql_field->unireg_check == Field::TIMESTAMP_OLD_FIELD)
    {
1975
      if (!*timestamps)
unknown's avatar
unknown committed
1976
      {
unknown's avatar
unknown committed
1977
        sql_field->unireg_check= Field::TIMESTAMP_DNUN_FIELD;
1978
        (*timestamps_with_niladic)++;
unknown's avatar
unknown committed
1979
      }
unknown's avatar
unknown committed
1980 1981 1982 1983
      else
        sql_field->unireg_check= Field::NONE;
    }
    else if (sql_field->unireg_check != Field::NONE)
1984
      (*timestamps_with_niladic)++;
unknown's avatar
unknown committed
1985

1986
    (*timestamps)++;
unknown's avatar
unknown committed
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001
    /* fall-through */
  default:
    sql_field->pack_flag=(FIELDFLAG_NUMBER |
                          (sql_field->flags & UNSIGNED_FLAG ? 0 :
                           FIELDFLAG_DECIMAL) |
                          (sql_field->flags & ZEROFILL_FLAG ?
                           FIELDFLAG_ZEROFILL : 0) |
                          f_settype((uint) sql_field->sql_type) |
                          (sql_field->decimals << FIELDFLAG_DEC_SHIFT));
    break;
  }
  if (!(sql_field->flags & NOT_NULL_FLAG))
    sql_field->pack_flag|= FIELDFLAG_MAYBE_NULL;
  if (sql_field->flags & NO_DEFAULT_VALUE_FLAG)
    sql_field->pack_flag|= FIELDFLAG_NO_DEFAULT;
unknown's avatar
unknown committed
2002 2003 2004
  DBUG_RETURN(0);
}

unknown's avatar
unknown committed
2005
/*
2006
  Preparation for table creation
unknown's avatar
unknown committed
2007 2008

  SYNOPSIS
2009
    mysql_prepare_table()
2010 2011 2012 2013 2014 2015 2016 2017 2018 2019
      thd                       Thread object.
      create_info               Create information (like MAX_ROWS).
      fields                    List of fields to create.
      keys                      List of keys to create.
      tmp_table                 If a temporary table is to be created.
      db_options          INOUT Table options (like HA_OPTION_PACK_RECORD).
      file                      The handler for the new table.
      key_info_buffer     OUT   An array of KEY structs for the indexes.
      key_count           OUT   The number of elements in the array.
      select_field_count        The number of fields coming from a select table.
unknown's avatar
unknown committed
2020

2021
  DESCRIPTION
2022
    Prepares the table and key structures for table creation.
unknown's avatar
unknown committed
2023

2024
  NOTES
2025
    sets create_info->varchar if the table has a varchar
2026

unknown's avatar
unknown committed
2027 2028 2029 2030
  RETURN VALUES
    0	ok
    -1	error
*/
unknown's avatar
unknown committed
2031

2032 2033 2034 2035 2036 2037
static int mysql_prepare_table(THD *thd, HA_CREATE_INFO *create_info,
                               List<create_field> *fields,
                               List<Key> *keys, bool tmp_table,
                               uint *db_options,
                               handler *file, KEY **key_info_buffer,
                               uint *key_count, int select_field_count)
unknown's avatar
unknown committed
2038
{
2039
  const char	*key_name;
unknown's avatar
unknown committed
2040
  create_field	*sql_field,*dup_field;
unknown's avatar
unknown committed
2041
  uint		field,null_fields,blob_columns,max_key_length;
unknown's avatar
unknown committed
2042
  ulong		record_offset= 0;
2043
  KEY		*key_info;
unknown's avatar
unknown committed
2044
  KEY_PART_INFO *key_part_info;
2045 2046 2047
  int		timestamps= 0, timestamps_with_niladic= 0;
  int		field_no,dup_no;
  int		select_field_pos,auto_increment=0;
2048
  List_iterator<create_field> it(*fields),it2(*fields);
unknown's avatar
unknown committed
2049
  uint total_uneven_bit_length= 0;
2050
  DBUG_ENTER("mysql_prepare_table");
unknown's avatar
unknown committed
2051

2052
  select_field_pos= fields->elements - select_field_count;
unknown's avatar
unknown committed
2053
  null_fields=blob_columns=0;
2054
  create_info->varchar= 0;
unknown's avatar
unknown committed
2055
  max_key_length= file->max_key_length();
2056

2057
  for (field_no=0; (sql_field=it++) ; field_no++)
unknown's avatar
unknown committed
2058
  {
2059 2060
    CHARSET_INFO *save_cs;

2061 2062 2063 2064 2065 2066
    /*
      Initialize length from its original value (number of characters),
      which was set in the parser. This is necessary if we're
      executing a prepared statement for the second time.
    */
    sql_field->length= sql_field->char_length;
2067
    if (!sql_field->charset)
2068 2069 2070
      sql_field->charset= create_info->default_table_charset;
    /*
      table_charset is set in ALTER TABLE if we want change character set
2071 2072 2073
      for all varchar/char columns.
      But the table charset must not affect the BLOB fields, so don't
      allow to change my_charset_bin to somethig else.
2074
    */
2075
    if (create_info->table_charset && sql_field->charset != &my_charset_bin)
2076
      sql_field->charset= create_info->table_charset;
2077

2078
    save_cs= sql_field->charset;
2079 2080 2081 2082 2083
    if ((sql_field->flags & BINCMP_FLAG) &&
	!(sql_field->charset= get_charset_by_csname(sql_field->charset->csname,
						    MY_CS_BINSORT,MYF(0))))
    {
      char tmp[64];
2084 2085
      strmake(strmake(tmp, save_cs->csname, sizeof(tmp)-4),
              STRING_WITH_LEN("_bin"));
2086 2087 2088
      my_error(ER_UNKNOWN_COLLATION, MYF(0), tmp);
      DBUG_RETURN(-1);
    }
2089

2090 2091
    if (sql_field->sql_type == FIELD_TYPE_SET ||
        sql_field->sql_type == FIELD_TYPE_ENUM)
2092 2093 2094
    {
      uint32 dummy;
      CHARSET_INFO *cs= sql_field->charset;
2095
      TYPELIB *interval= sql_field->interval;
2096 2097 2098 2099 2100 2101

      /*
        Create typelib from interval_list, and if necessary
        convert strings from client character set to the
        column character set.
      */
2102
      if (!interval)
2103
      {
2104 2105 2106 2107
        /*
          Create the typelib in prepared statement memory if we're
          executing one.
        */
unknown's avatar
unknown committed
2108
        MEM_ROOT *stmt_root= thd->stmt_arena->mem_root;
2109 2110 2111

        interval= sql_field->interval= typelib(stmt_root,
                                               sql_field->interval_list);
2112 2113
        List_iterator<String> it(sql_field->interval_list);
        String conv, *tmp;
2114 2115 2116 2117 2118
        char comma_buf[2];
        int comma_length= cs->cset->wc_mb(cs, ',', (uchar*) comma_buf,
                                          (uchar*) comma_buf + 
                                          sizeof(comma_buf));
        DBUG_ASSERT(comma_length > 0);
2119
        for (uint i= 0; (tmp= it++); i++)
2120
        {
unknown's avatar
unknown committed
2121
          uint lengthsp;
2122 2123 2124 2125 2126
          if (String::needs_conversion(tmp->length(), tmp->charset(),
                                       cs, &dummy))
          {
            uint cnv_errs;
            conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
2127
            interval->type_names[i]= strmake_root(stmt_root, conv.ptr(),
unknown's avatar
unknown committed
2128
                                                  conv.length());
2129 2130
            interval->type_lengths[i]= conv.length();
          }
2131

2132
          // Strip trailing spaces.
unknown's avatar
unknown committed
2133 2134
          lengthsp= cs->cset->lengthsp(cs, interval->type_names[i],
                                       interval->type_lengths[i]);
2135 2136
          interval->type_lengths[i]= lengthsp;
          ((uchar *)interval->type_names[i])[lengthsp]= '\0';
2137 2138 2139 2140 2141 2142
          if (sql_field->sql_type == FIELD_TYPE_SET)
          {
            if (cs->coll->instr(cs, interval->type_names[i], 
                                interval->type_lengths[i], 
                                comma_buf, comma_length, NULL, 0))
            {
unknown's avatar
unknown committed
2143
              my_error(ER_ILLEGAL_VALUE_FOR_TYPE, MYF(0), "set", tmp->ptr());
2144 2145 2146
              DBUG_RETURN(-1);
            }
          }
2147
        }
2148
        sql_field->interval_list.empty(); // Don't need interval_list anymore
2149 2150 2151 2152 2153 2154
      }

      /*
        Convert the default value from client character
        set into the column character set if necessary.
      */
unknown's avatar
unknown committed
2155
      if (sql_field->def && cs != sql_field->def->collation.collation)
2156
      {
unknown's avatar
unknown committed
2157 2158
        Query_arena backup_arena;
        bool need_to_change_arena= !thd->stmt_arena->is_conventional();
2159 2160 2161
        if (need_to_change_arena)
        {
          /* Asser that we don't do that at every PS execute */
unknown's avatar
unknown committed
2162 2163 2164
          DBUG_ASSERT(thd->stmt_arena->is_first_stmt_execute() ||
                      thd->stmt_arena->is_first_sp_execute());
          thd->set_n_backup_active_arena(thd->stmt_arena, &backup_arena);
2165 2166 2167 2168 2169
        }

        sql_field->def= sql_field->def->safe_charset_converter(cs);

        if (need_to_change_arena)
unknown's avatar
unknown committed
2170
          thd->restore_active_arena(thd->stmt_arena, &backup_arena);
2171 2172

        if (! sql_field->def)
unknown's avatar
unknown committed
2173 2174 2175 2176 2177
        {
          /* Could not convert */
          my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
          DBUG_RETURN(-1);
        }
2178 2179 2180 2181
      }

      if (sql_field->sql_type == FIELD_TYPE_SET)
      {
2182
        uint32 field_length;
2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197
        if (sql_field->def)
        {
          char *not_used;
          uint not_used2;
          bool not_found= 0;
          String str, *def= sql_field->def->val_str(&str);
          def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
          (void) find_set(interval, def->ptr(), def->length(),
                          cs, &not_used, &not_used2, &not_found);
          if (not_found)
          {
            my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
            DBUG_RETURN(-1);
          }
        }
2198 2199
        calculate_interval_lengths(cs, interval, &dummy, &field_length);
        sql_field->length= field_length + (interval->count - 1);
2200 2201 2202
      }
      else  /* FIELD_TYPE_ENUM */
      {
2203
        uint32 field_length;
2204 2205 2206 2207 2208 2209 2210 2211 2212 2213
        if (sql_field->def)
        {
          String str, *def= sql_field->def->val_str(&str);
          def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
          if (!find_type2(interval, def->ptr(), def->length(), cs))
          {
            my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
            DBUG_RETURN(-1);
          }
        }
2214 2215
        calculate_interval_lengths(cs, interval, &field_length, &dummy);
        sql_field->length= field_length;
2216 2217 2218 2219
      }
      set_if_smaller(sql_field->length, MAX_FIELD_WIDTH-1);
    }

2220 2221
    if (sql_field->sql_type == FIELD_TYPE_BIT)
    { 
unknown's avatar
unknown committed
2222
      sql_field->pack_flag= FIELDFLAG_NUMBER;
2223 2224 2225 2226 2227 2228
      if (file->table_flags() & HA_CAN_BIT_FIELD)
        total_uneven_bit_length+= sql_field->length & 7;
      else
        sql_field->pack_flag|= FIELDFLAG_TREAT_BIT_AS_CHAR;
    }

2229
    sql_field->create_length_to_internal_length();
unknown's avatar
unknown committed
2230 2231
    if (prepare_blob_field(thd, sql_field))
      DBUG_RETURN(-1);
2232

unknown's avatar
unknown committed
2233 2234
    if (!(sql_field->flags & NOT_NULL_FLAG))
      null_fields++;
unknown's avatar
unknown committed
2235

unknown's avatar
unknown committed
2236 2237
    if (check_column_name(sql_field->field_name))
    {
2238
      my_error(ER_WRONG_COLUMN_NAME, MYF(0), sql_field->field_name);
unknown's avatar
unknown committed
2239 2240
      DBUG_RETURN(-1);
    }
unknown's avatar
unknown committed
2241

2242 2243
    /* Check if we have used the same field name before */
    for (dup_no=0; (dup_field=it2++) != sql_field; dup_no++)
unknown's avatar
unknown committed
2244
    {
2245
      if (my_strcasecmp(system_charset_info,
2246 2247
			sql_field->field_name,
			dup_field->field_name) == 0)
unknown's avatar
unknown committed
2248
      {
2249 2250 2251 2252
	/*
	  If this was a CREATE ... SELECT statement, accept a field
	  redefinition if we are changing a field in the SELECT part
	*/
2253 2254
	if (field_no < select_field_pos || dup_no >= select_field_pos)
	{
2255
	  my_error(ER_DUP_FIELDNAME, MYF(0), sql_field->field_name);
2256 2257 2258 2259
	  DBUG_RETURN(-1);
	}
	else
	{
2260
	  /* Field redefined */
2261
	  sql_field->def=		dup_field->def;
2262
	  sql_field->sql_type=		dup_field->sql_type;
2263 2264 2265
	  sql_field->charset=		(dup_field->charset ?
					 dup_field->charset :
					 create_info->default_table_charset);
2266
	  sql_field->length=		dup_field->char_length;
2267
          sql_field->pack_length=	dup_field->pack_length;
2268
          sql_field->key_length=	dup_field->key_length;
2269
	  sql_field->create_length_to_internal_length();
2270 2271
	  sql_field->decimals=		dup_field->decimals;
	  sql_field->unireg_check=	dup_field->unireg_check;
2272 2273 2274 2275 2276 2277 2278 2279
          /* 
            We're making one field from two, the result field will have
            dup_field->flags as flags. If we've incremented null_fields
            because of sql_field->flags, decrement it back.
          */
          if (!(sql_field->flags & NOT_NULL_FLAG))
            null_fields--;
	  sql_field->flags=		dup_field->flags;
unknown's avatar
unknown committed
2280
          sql_field->interval=          dup_field->interval;
2281 2282 2283
	  it2.remove();			// Remove first (create) definition
	  select_field_pos--;
	  break;
2284
	}
unknown's avatar
unknown committed
2285 2286
      }
    }
2287 2288 2289 2290
    /* Don't pack rows in old tables if the user has requested this */
    if ((sql_field->flags & BLOB_FLAG) ||
	sql_field->sql_type == MYSQL_TYPE_VARCHAR &&
	create_info->row_type != ROW_TYPE_FIXED)
2291
      (*db_options)|= HA_OPTION_PACK_RECORD;
unknown's avatar
unknown committed
2292 2293
    it2.rewind();
  }
2294 2295 2296

  /* record_offset will be increased with 'length-of-null-bits' later */
  record_offset= 0;
unknown's avatar
unknown committed
2297
  null_fields+= total_uneven_bit_length;
unknown's avatar
unknown committed
2298 2299 2300 2301

  it.rewind();
  while ((sql_field=it++))
  {
2302
    DBUG_ASSERT(sql_field->charset != 0);
2303

unknown's avatar
unknown committed
2304 2305
    if (prepare_create_field(sql_field, &blob_columns, 
			     &timestamps, &timestamps_with_niladic,
unknown's avatar
unknown committed
2306
			     file->table_flags()))
unknown's avatar
unknown committed
2307
      DBUG_RETURN(-1);
2308
    if (sql_field->sql_type == MYSQL_TYPE_VARCHAR)
unknown's avatar
unknown committed
2309
      create_info->varchar= 1;
2310
    sql_field->offset= record_offset;
unknown's avatar
unknown committed
2311 2312
    if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
      auto_increment++;
2313
    record_offset+= sql_field->pack_length;
unknown's avatar
unknown committed
2314
  }
2315 2316
  if (timestamps_with_niladic > 1)
  {
unknown's avatar
unknown committed
2317 2318
    my_message(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS,
               ER(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS), MYF(0));
2319 2320
    DBUG_RETURN(-1);
  }
unknown's avatar
unknown committed
2321 2322
  if (auto_increment > 1)
  {
unknown's avatar
unknown committed
2323
    my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
unknown's avatar
unknown committed
2324 2325 2326
    DBUG_RETURN(-1);
  }
  if (auto_increment &&
2327
      (file->table_flags() & HA_NO_AUTO_INCREMENT))
unknown's avatar
unknown committed
2328
  {
unknown's avatar
unknown committed
2329 2330
    my_message(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT,
               ER(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT), MYF(0));
unknown's avatar
unknown committed
2331 2332 2333
    DBUG_RETURN(-1);
  }

2334
  if (blob_columns && (file->table_flags() & HA_NO_BLOBS))
unknown's avatar
unknown committed
2335
  {
unknown's avatar
unknown committed
2336 2337
    my_message(ER_TABLE_CANT_HANDLE_BLOB, ER(ER_TABLE_CANT_HANDLE_BLOB),
               MYF(0));
unknown's avatar
unknown committed
2338 2339 2340 2341
    DBUG_RETURN(-1);
  }

  /* Create keys */
2342

2343
  List_iterator<Key> key_iterator(*keys), key_iterator2(*keys);
2344
  uint key_parts=0, fk_key_count=0;
2345
  bool primary_key=0,unique_key=0;
2346
  Key *key, *key2;
unknown's avatar
unknown committed
2347
  uint tmp, key_number;
2348 2349
  /* special marker for keys to be ignored */
  static char ignore_key[1];
2350

2351
  /* Calculate number of key segements */
2352
  *key_count= 0;
2353

unknown's avatar
unknown committed
2354 2355
  while ((key=key_iterator++))
  {
2356 2357
    DBUG_PRINT("info", ("key name: '%s'  type: %d", key->name ? key->name :
                        "(none)" , key->type));
2358 2359 2360 2361 2362 2363 2364
    if (key->type == Key::FOREIGN_KEY)
    {
      fk_key_count++;
      foreign_key *fk_key= (foreign_key*) key;
      if (fk_key->ref_columns.elements &&
	  fk_key->ref_columns.elements != fk_key->columns.elements)
      {
2365 2366 2367
        my_error(ER_WRONG_FK_DEF, MYF(0),
                 (fk_key->name ?  fk_key->name : "foreign key without name"),
                 ER(ER_KEY_REF_DO_NOT_MATCH_TABLE_REF));
2368 2369 2370 2371
	DBUG_RETURN(-1);
      }
      continue;
    }
2372
    (*key_count)++;
unknown's avatar
unknown committed
2373
    tmp=file->max_key_parts();
unknown's avatar
unknown committed
2374 2375 2376 2377 2378
    if (key->columns.elements > tmp)
    {
      my_error(ER_TOO_MANY_KEY_PARTS,MYF(0),tmp);
      DBUG_RETURN(-1);
    }
2379
    if (key->name && strlen(key->name) > NAME_LEN)
unknown's avatar
unknown committed
2380
    {
2381
      my_error(ER_TOO_LONG_IDENT, MYF(0), key->name);
unknown's avatar
unknown committed
2382 2383
      DBUG_RETURN(-1);
    }
2384
    key_iterator2.rewind ();
2385
    if (key->type != Key::FOREIGN_KEY)
2386
    {
2387
      while ((key2 = key_iterator2++) != key)
2388
      {
unknown's avatar
unknown committed
2389
	/*
2390 2391 2392
          foreign_key_prefix(key, key2) returns 0 if key or key2, or both, is
          'generated', and a generated key is a prefix of the other key.
          Then we do not need the generated shorter key.
unknown's avatar
unknown committed
2393
        */
2394 2395 2396
        if ((key2->type != Key::FOREIGN_KEY &&
             key2->name != ignore_key &&
             !foreign_key_prefix(key, key2)))
2397
        {
2398
          /* TODO: issue warning message */
2399 2400 2401 2402 2403 2404 2405
          /* mark that the generated key should be ignored */
          if (!key2->generated ||
              (key->generated && key->columns.elements <
               key2->columns.elements))
            key->name= ignore_key;
          else
          {
2406 2407 2408
            key2->name= ignore_key;
            key_parts-= key2->columns.elements;
            (*key_count)--;
2409 2410 2411
          }
          break;
        }
2412 2413 2414 2415 2416 2417
      }
    }
    if (key->name != ignore_key)
      key_parts+=key->columns.elements;
    else
      (*key_count)--;
2418
    if (key->name && !tmp_table && (key->type != Key::PRIMARY) &&
2419 2420 2421 2422 2423
	!my_strcasecmp(system_charset_info,key->name,primary_key_name))
    {
      my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name);
      DBUG_RETURN(-1);
    }
2424
  }
unknown's avatar
unknown committed
2425
  tmp=file->max_keys();
2426
  if (*key_count > tmp)
2427 2428 2429 2430
  {
    my_error(ER_TOO_MANY_KEYS,MYF(0),tmp);
    DBUG_RETURN(-1);
  }
2431

2432
  (*key_info_buffer)= key_info= (KEY*) sql_calloc(sizeof(KEY) * (*key_count));
2433
  key_part_info=(KEY_PART_INFO*) sql_calloc(sizeof(KEY_PART_INFO)*key_parts);
2434
  if (!*key_info_buffer || ! key_part_info)
2435 2436
    DBUG_RETURN(-1);				// Out of memory

2437
  key_iterator.rewind();
unknown's avatar
unknown committed
2438
  key_number=0;
unknown's avatar
unknown committed
2439
  for (; (key=key_iterator++) ; key_number++)
2440 2441 2442 2443
  {
    uint key_length=0;
    key_part_spec *column;

2444 2445 2446 2447 2448 2449 2450 2451 2452 2453
    if (key->name == ignore_key)
    {
      /* ignore redundant keys */
      do
	key=key_iterator++;
      while (key && key->name == ignore_key);
      if (!key)
	break;
    }

unknown's avatar
unknown committed
2454
    switch(key->type){
unknown's avatar
unknown committed
2455
    case Key::MULTIPLE:
2456
	key_info->flags= 0;
2457
	break;
unknown's avatar
unknown committed
2458
    case Key::FULLTEXT:
2459
	key_info->flags= HA_FULLTEXT;
2460 2461
	if ((key_info->parser_name= key->parser_name))
          key_info->flags|= HA_USES_PARSER;
2462
	break;
unknown's avatar
unknown committed
2463
    case Key::SPATIAL:
unknown's avatar
unknown committed
2464
#ifdef HAVE_SPATIAL
2465
	key_info->flags= HA_SPATIAL;
2466
	break;
unknown's avatar
unknown committed
2467
#else
2468 2469
	my_error(ER_FEATURE_DISABLED, MYF(0),
                 sym_group_geom.name, sym_group_geom.needed_define);
unknown's avatar
unknown committed
2470 2471
	DBUG_RETURN(-1);
#endif
2472 2473 2474 2475
    case Key::FOREIGN_KEY:
      key_number--;				// Skip this key
      continue;
    default:
2476 2477
      key_info->flags = HA_NOSAME;
      break;
unknown's avatar
unknown committed
2478
    }
2479 2480
    if (key->generated)
      key_info->flags|= HA_GENERATED_KEY;
unknown's avatar
unknown committed
2481

unknown's avatar
unknown committed
2482 2483
    key_info->key_parts=(uint8) key->columns.elements;
    key_info->key_part=key_part_info;
unknown's avatar
unknown committed
2484
    key_info->usable_key_parts= key_number;
2485
    key_info->algorithm=key->algorithm;
unknown's avatar
unknown committed
2486

2487 2488
    if (key->type == Key::FULLTEXT)
    {
2489
      if (!(file->table_flags() & HA_CAN_FULLTEXT))
2490
      {
unknown's avatar
unknown committed
2491 2492
	my_message(ER_TABLE_CANT_HANDLE_FT, ER(ER_TABLE_CANT_HANDLE_FT),
                   MYF(0));
2493
	DBUG_RETURN(-1);
2494 2495
      }
    }
unknown's avatar
unknown committed
2496 2497 2498
    /*
       Make SPATIAL to be RTREE by default
       SPATIAL only on BLOB or at least BINARY, this
2499
       actually should be replaced by special GEOM type
unknown's avatar
unknown committed
2500 2501 2502
       in near future when new frm file is ready
       checking for proper key parts number:
    */
2503

2504
    /* TODO: Add proper checks if handler supports key_type and algorithm */
2505
    if (key_info->flags & HA_SPATIAL)
2506 2507 2508
    {
      if (key_info->key_parts != 1)
      {
2509
	my_error(ER_WRONG_ARGUMENTS, MYF(0), "SPATIAL INDEX");
2510
	DBUG_RETURN(-1);
unknown's avatar
unknown committed
2511
      }
2512
    }
unknown's avatar
unknown committed
2513
    else if (key_info->algorithm == HA_KEY_ALG_RTREE)
unknown's avatar
unknown committed
2514
    {
unknown's avatar
unknown committed
2515
#ifdef HAVE_RTREE_KEYS
2516 2517
      if ((key_info->key_parts & 1) == 1)
      {
2518
	my_error(ER_WRONG_ARGUMENTS, MYF(0), "RTREE INDEX");
2519
	DBUG_RETURN(-1);
unknown's avatar
unknown committed
2520
      }
2521
      /* TODO: To be deleted */
2522
      my_error(ER_NOT_SUPPORTED_YET, MYF(0), "RTREE INDEX");
2523
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
2524
#else
2525 2526
      my_error(ER_FEATURE_DISABLED, MYF(0),
               sym_group_rtree.name, sym_group_rtree.needed_define);
unknown's avatar
unknown committed
2527 2528
      DBUG_RETURN(-1);
#endif
unknown's avatar
unknown committed
2529
    }
2530

2531
    List_iterator<key_part_spec> cols(key->columns), cols2(key->columns);
2532
    CHARSET_INFO *ft_key_charset=0;  // for FULLTEXT
unknown's avatar
unknown committed
2533 2534
    for (uint column_nr=0 ; (column=cols++) ; column_nr++)
    {
2535
      uint length;
unknown's avatar
unknown committed
2536 2537
      key_part_spec *dup_column;

unknown's avatar
unknown committed
2538 2539 2540
      it.rewind();
      field=0;
      while ((sql_field=it++) &&
2541
	     my_strcasecmp(system_charset_info,
2542 2543
			   column->field_name,
			   sql_field->field_name))
unknown's avatar
unknown committed
2544 2545 2546
	field++;
      if (!sql_field)
      {
2547
	my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name);
unknown's avatar
unknown committed
2548 2549
	DBUG_RETURN(-1);
      }
unknown's avatar
unknown committed
2550
      while ((dup_column= cols2++) != column)
2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561
      {
        if (!my_strcasecmp(system_charset_info,
	     	           column->field_name, dup_column->field_name))
	{
	  my_printf_error(ER_DUP_FIELDNAME,
			  ER(ER_DUP_FIELDNAME),MYF(0),
			  column->field_name);
	  DBUG_RETURN(-1);
	}
      }
      cols2.rewind();
2562
      if (key->type == Key::FULLTEXT)
2563
      {
2564 2565
	if ((sql_field->sql_type != MYSQL_TYPE_STRING &&
	     sql_field->sql_type != MYSQL_TYPE_VARCHAR &&
2566 2567
	     !f_is_blob(sql_field->pack_flag)) ||
	    sql_field->charset == &my_charset_bin ||
2568
	    sql_field->charset->mbminlen > 1 || // ucs2 doesn't work yet
2569 2570
	    (ft_key_charset && sql_field->charset != ft_key_charset))
	{
2571
	    my_error(ER_BAD_FT_COLUMN, MYF(0), column->field_name);
2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582
	    DBUG_RETURN(-1);
	}
	ft_key_charset=sql_field->charset;
	/*
	  for fulltext keys keyseg length is 1 for blobs (it's ignored in ft
	  code anyway, and 0 (set to column width later) for char's. it has
	  to be correct col width for char's, as char data are not prefixed
	  with length (unlike blobs, where ft code takes data length from a
	  data prefix, ignoring column->length).
	*/
	column->length=test(f_is_blob(sql_field->pack_flag));
2583
      }
2584
      else
2585
      {
2586 2587
	column->length*= sql_field->charset->mbmaxlen;

2588 2589
	if (f_is_blob(sql_field->pack_flag) ||
            (f_is_geom(sql_field->pack_flag) && key->type != Key::SPATIAL))
2590
	{
unknown's avatar
unknown committed
2591
	  if (!(file->table_flags() & HA_CAN_INDEX_BLOBS))
2592
	  {
2593
	    my_error(ER_BLOB_USED_AS_KEY, MYF(0), column->field_name);
2594 2595
	    DBUG_RETURN(-1);
	  }
2596 2597 2598
          if (f_is_geom(sql_field->pack_flag) && sql_field->geom_type ==
              Field::GEOM_POINT)
            column->length= 21;
2599 2600
	  if (!column->length)
	  {
2601
	    my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), column->field_name);
2602 2603 2604
	    DBUG_RETURN(-1);
	  }
	}
unknown's avatar
unknown committed
2605
#ifdef HAVE_SPATIAL
2606
	if (key->type == Key::SPATIAL)
2607
	{
2608
	  if (!column->length)
2609 2610
	  {
	    /*
2611 2612
              4 is: (Xmin,Xmax,Ymin,Ymax), this is for 2D case
              Lately we'll extend this code to support more dimensions
2613
	    */
2614
	    column->length= 4*sizeof(double);
2615 2616
	  }
	}
unknown's avatar
unknown committed
2617
#endif
2618 2619 2620 2621 2622 2623 2624
	if (!(sql_field->flags & NOT_NULL_FLAG))
	{
	  if (key->type == Key::PRIMARY)
	  {
	    /* Implicitly set primary key fields to NOT NULL for ISO conf. */
	    sql_field->flags|= NOT_NULL_FLAG;
	    sql_field->pack_flag&= ~FIELDFLAG_MAYBE_NULL;
unknown's avatar
unknown committed
2625
            null_fields--;
2626 2627 2628
	  }
	  else
	     key_info->flags|= HA_NULL_PART_KEY;
unknown's avatar
unknown committed
2629
	  if (!(file->table_flags() & HA_NULL_IN_KEY))
2630
	  {
2631
	    my_error(ER_NULL_COLUMN_IN_INDEX, MYF(0), column->field_name);
2632 2633 2634 2635
	    DBUG_RETURN(-1);
	  }
	  if (key->type == Key::SPATIAL)
	  {
unknown's avatar
unknown committed
2636 2637
	    my_message(ER_SPATIAL_CANT_HAVE_NULL,
                       ER(ER_SPATIAL_CANT_HAVE_NULL), MYF(0));
2638 2639 2640 2641 2642 2643 2644 2645
	    DBUG_RETURN(-1);
	  }
	}
	if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
	{
	  if (column_nr == 0 || (file->table_flags() & HA_AUTO_PART_KEY))
	    auto_increment--;			// Field is used
	}
unknown's avatar
unknown committed
2646
      }
2647

unknown's avatar
unknown committed
2648 2649 2650
      key_part_info->fieldnr= field;
      key_part_info->offset=  (uint16) sql_field->offset;
      key_part_info->key_type=sql_field->pack_flag;
2651 2652
      length= sql_field->key_length;

unknown's avatar
unknown committed
2653 2654 2655 2656
      if (column->length)
      {
	if (f_is_blob(sql_field->pack_flag))
	{
unknown's avatar
unknown committed
2657
	  if ((length=column->length) > max_key_length ||
2658
	      length > file->max_key_part_length())
2659
	  {
unknown's avatar
unknown committed
2660
	    length=min(max_key_length, file->max_key_part_length());
2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675
	    if (key->type == Key::MULTIPLE)
	    {
	      /* not a critical problem */
	      char warn_buff[MYSQL_ERRMSG_SIZE];
	      my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
			  length);
	      push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
			   ER_TOO_LONG_KEY, warn_buff);
	    }
	    else
	    {
	      my_error(ER_TOO_LONG_KEY,MYF(0),length);
	      DBUG_RETURN(-1);
	    }
	  }
unknown's avatar
unknown committed
2676
	}
2677
	else if (!f_is_geom(sql_field->pack_flag) &&
2678 2679 2680 2681 2682 2683
		  (column->length > length ||
		   ((f_is_packed(sql_field->pack_flag) ||
		     ((file->table_flags() & HA_NO_PREFIX_CHAR_KEYS) &&
		      (key_info->flags & HA_NOSAME))) &&
		    column->length != length)))
	{
unknown's avatar
unknown committed
2684
	  my_message(ER_WRONG_SUB_KEY, ER(ER_WRONG_SUB_KEY), MYF(0));
2685 2686 2687 2688
	  DBUG_RETURN(-1);
	}
	else if (!(file->table_flags() & HA_NO_PREFIX_CHAR_KEYS))
	  length=column->length;
unknown's avatar
unknown committed
2689 2690 2691
      }
      else if (length == 0)
      {
2692
	my_error(ER_WRONG_KEY_COLUMN, MYF(0), column->field_name);
unknown's avatar
unknown committed
2693 2694
	  DBUG_RETURN(-1);
      }
2695
      if (length > file->max_key_part_length() && key->type != Key::FULLTEXT)
2696
      {
2697 2698 2699
        length= file->max_key_part_length();
        /* Align key length to multibyte char boundary */
        length-= length % sql_field->charset->mbmaxlen;
2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713
	if (key->type == Key::MULTIPLE)
	{
	  /* not a critical problem */
	  char warn_buff[MYSQL_ERRMSG_SIZE];
	  my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
		      length);
	  push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
		       ER_TOO_LONG_KEY, warn_buff);
	}
	else
	{
	  my_error(ER_TOO_LONG_KEY,MYF(0),length);
	  DBUG_RETURN(-1);
	}
2714 2715
      }
      key_part_info->length=(uint16) length;
unknown's avatar
unknown committed
2716
      /* Use packed keys for long strings on the first column */
2717
      if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
unknown's avatar
unknown committed
2718
	  (length >= KEY_DEFAULT_PACK_LENGTH &&
2719 2720
	   (sql_field->sql_type == MYSQL_TYPE_STRING ||
	    sql_field->sql_type == MYSQL_TYPE_VARCHAR ||
unknown's avatar
unknown committed
2721 2722
	    sql_field->pack_flag & FIELDFLAG_BLOB)))
      {
2723 2724 2725
	if (column_nr == 0 && (sql_field->pack_flag & FIELDFLAG_BLOB) ||
            sql_field->sql_type == MYSQL_TYPE_VARCHAR)
	  key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
unknown's avatar
unknown committed
2726 2727 2728 2729 2730 2731 2732 2733 2734 2735
	else
	  key_info->flags|= HA_PACK_KEY;
      }
      key_length+=length;
      key_part_info++;

      /* Create the key name based on the first column (if not given) */
      if (column_nr == 0)
      {
	if (key->type == Key::PRIMARY)
2736 2737 2738
	{
	  if (primary_key)
	  {
unknown's avatar
unknown committed
2739 2740
	    my_message(ER_MULTIPLE_PRI_KEY, ER(ER_MULTIPLE_PRI_KEY),
                       MYF(0));
2741 2742 2743 2744 2745
	    DBUG_RETURN(-1);
	  }
	  key_name=primary_key_name;
	  primary_key=1;
	}
2746
	else if (!(key_name = key->name))
unknown's avatar
unknown committed
2747
	  key_name=make_unique_key_name(sql_field->field_name,
2748 2749
					*key_info_buffer, key_info);
	if (check_if_keyname_exists(key_name, *key_info_buffer, key_info))
unknown's avatar
unknown committed
2750
	{
2751
	  my_error(ER_DUP_KEYNAME, MYF(0), key_name);
unknown's avatar
unknown committed
2752 2753 2754 2755 2756
	  DBUG_RETURN(-1);
	}
	key_info->name=(char*) key_name;
      }
    }
2757 2758
    if (!key_info->name || check_column_name(key_info->name))
    {
2759
      my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key_info->name);
2760 2761
      DBUG_RETURN(-1);
    }
2762 2763
    if (!(key_info->flags & HA_NULL_PART_KEY))
      unique_key=1;
unknown's avatar
unknown committed
2764
    key_info->key_length=(uint16) key_length;
2765
    if (key_length > max_key_length && key->type != Key::FULLTEXT)
unknown's avatar
unknown committed
2766
    {
2767
      my_error(ER_TOO_LONG_KEY,MYF(0),max_key_length);
unknown's avatar
unknown committed
2768 2769
      DBUG_RETURN(-1);
    }
unknown's avatar
unknown committed
2770
    key_info++;
unknown's avatar
unknown committed
2771
  }
2772
  if (!unique_key && !primary_key &&
2773
      (file->table_flags() & HA_REQUIRE_PRIMARY_KEY))
2774
  {
unknown's avatar
unknown committed
2775
    my_message(ER_REQUIRES_PRIMARY_KEY, ER(ER_REQUIRES_PRIMARY_KEY), MYF(0));
2776 2777
    DBUG_RETURN(-1);
  }
unknown's avatar
unknown committed
2778 2779
  if (auto_increment > 0)
  {
unknown's avatar
unknown committed
2780
    my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
unknown's avatar
unknown committed
2781 2782
    DBUG_RETURN(-1);
  }
2783
  /* Sort keys in optimized order */
2784
  qsort((gptr) *key_info_buffer, *key_count, sizeof(KEY),
2785
	(qsort_cmp) sort_keys);
unknown's avatar
unknown committed
2786
  create_info->null_bits= null_fields;
unknown's avatar
unknown committed
2787

2788 2789 2790
  DBUG_RETURN(0);
}

2791

unknown's avatar
unknown committed
2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811
/*
  Set table default charset, if not set

  SYNOPSIS
    set_table_default_charset()
    create_info        Table create information

  DESCRIPTION
    If the table character set was not given explicitely,
    let's fetch the database default character set and
    apply it to the table.
*/

static void set_table_default_charset(THD *thd,
				      HA_CREATE_INFO *create_info, char *db)
{
  if (!create_info->default_table_charset)
  {
    HA_CREATE_INFO db_info;
    char path[FN_REFLEN];
2812 2813
    /* Abuse build_table_filename() to build the path to the db.opt file */
    build_table_filename(path, sizeof(path), db, "", MY_DB_OPT_FILE);
unknown's avatar
unknown committed
2814 2815 2816 2817 2818 2819
    load_db_opt(thd, path, &db_info);
    create_info->default_table_charset= db_info.default_table_charset;
  }
}


unknown's avatar
unknown committed
2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842
/*
  Extend long VARCHAR fields to blob & prepare field if it's a blob

  SYNOPSIS
    prepare_blob_field()
    sql_field		Field to check

  RETURN
    0	ok
    1	Error (sql_field can't be converted to blob)
        In this case the error is given
*/

static bool prepare_blob_field(THD *thd, create_field *sql_field)
{
  DBUG_ENTER("prepare_blob_field");

  if (sql_field->length > MAX_FIELD_VARCHARLENGTH &&
      !(sql_field->flags & BLOB_FLAG))
  {
    /* Convert long VARCHAR columns to TEXT or BLOB */
    char warn_buff[MYSQL_ERRMSG_SIZE];

2843 2844
    if (sql_field->def || (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES |
                                                      MODE_STRICT_ALL_TABLES)))
unknown's avatar
unknown committed
2845 2846 2847 2848 2849 2850 2851 2852
    {
      my_error(ER_TOO_BIG_FIELDLENGTH, MYF(0), sql_field->field_name,
               MAX_FIELD_VARCHARLENGTH / sql_field->charset->mbmaxlen);
      DBUG_RETURN(1);
    }
    sql_field->sql_type= FIELD_TYPE_BLOB;
    sql_field->flags|= BLOB_FLAG;
    sprintf(warn_buff, ER(ER_AUTO_CONVERT), sql_field->field_name,
2853
            (sql_field->charset == &my_charset_bin) ? "VARBINARY" : "VARCHAR",
unknown's avatar
unknown committed
2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872
            (sql_field->charset == &my_charset_bin) ? "BLOB" : "TEXT");
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_AUTO_CONVERT,
                 warn_buff);
  }
    
  if ((sql_field->flags & BLOB_FLAG) && sql_field->length)
  {
    if (sql_field->sql_type == FIELD_TYPE_BLOB)
    {
      /* The user has given a length to the blob column */
      sql_field->sql_type= get_blob_type_from_length(sql_field->length);
      sql_field->pack_length= calc_pack_length(sql_field->sql_type, 0);
    }
    sql_field->length= 0;
  }
  DBUG_RETURN(0);
}


2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916
/*
  Preparation of create_field for SP function return values.
  Based on code used in the inner loop of mysql_prepare_table() above

  SYNOPSIS
    sp_prepare_create_field()
    thd			Thread object
    sql_field		Field to prepare

  DESCRIPTION
    Prepares the field structures for field creation.

*/

void sp_prepare_create_field(THD *thd, create_field *sql_field)
{
  if (sql_field->sql_type == FIELD_TYPE_SET ||
      sql_field->sql_type == FIELD_TYPE_ENUM)
  {
    uint32 field_length, dummy;
    if (sql_field->sql_type == FIELD_TYPE_SET)
    {
      calculate_interval_lengths(sql_field->charset,
                                 sql_field->interval, &dummy, 
                                 &field_length);
      sql_field->length= field_length + 
                         (sql_field->interval->count - 1);
    }
    else /* FIELD_TYPE_ENUM */
    {
      calculate_interval_lengths(sql_field->charset,
                                 sql_field->interval,
                                 &field_length, &dummy);
      sql_field->length= field_length;
    }
    set_if_smaller(sql_field->length, MAX_FIELD_WIDTH-1);
  }

  if (sql_field->sql_type == FIELD_TYPE_BIT)
  {
    sql_field->pack_flag= FIELDFLAG_NUMBER |
                          FIELDFLAG_TREAT_BIT_AS_CHAR;
  }
  sql_field->create_length_to_internal_length();
unknown's avatar
unknown committed
2917 2918 2919 2920
  DBUG_ASSERT(sql_field->def == 0);
  /* Can't go wrong as sql_field->def is not defined */
  (void) prepare_blob_field(thd, sql_field);
}
2921 2922


2923 2924 2925 2926
/*
  Create a table

  SYNOPSIS
unknown's avatar
unknown committed
2927
    mysql_create_table_internal()
2928 2929 2930 2931 2932 2933
    thd			Thread object
    db			Database
    table_name		Table name
    create_info		Create information (like MAX_ROWS)
    fields		List of fields to create
    keys		List of keys to create
unknown's avatar
unknown committed
2934
    internal_tmp_table  Set to 1 if this is an internal temporary table
2935
			(From ALTER TABLE)
2936 2937

  DESCRIPTION
2938
    If one creates a temporary table, this is automatically opened
2939 2940 2941 2942 2943 2944 2945

    no_log is needed for the case of CREATE ... SELECT,
    as the logging will be done later in sql_insert.cc
    select_field_count is also used for CREATE ... SELECT,
    and must be zero for standard create of table.

  RETURN VALUES
unknown's avatar
unknown committed
2946 2947
    FALSE OK
    TRUE  error
2948 2949
*/

unknown's avatar
unknown committed
2950 2951 2952 2953 2954 2955
bool mysql_create_table_internal(THD *thd,
                                const char *db, const char *table_name,
                                HA_CREATE_INFO *create_info,
                                List<create_field> &fields,
                                List<Key> &keys,bool internal_tmp_table,
                                uint select_field_count)
2956
{
2957
  char		path[FN_REFLEN];
2958
  uint          path_length;
2959 2960 2961 2962
  const char	*alias;
  uint		db_options, key_count;
  KEY		*key_info_buffer;
  handler	*file;
unknown's avatar
unknown committed
2963
  bool		error= TRUE;
unknown's avatar
unknown committed
2964
  DBUG_ENTER("mysql_create_table_internal");
2965 2966 2967 2968

  /* Check for duplicate fields and check type of table to create */
  if (!fields.elements)
  {
unknown's avatar
unknown committed
2969 2970
    my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
               MYF(0));
unknown's avatar
unknown committed
2971
    DBUG_RETURN(TRUE);
2972
  }
2973
  if (check_engine(thd, table_name, create_info))
2974
    DBUG_RETURN(TRUE);
2975
  db_options= create_info->table_options;
2976 2977 2978
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
    db_options|=HA_OPTION_PACK_RECORD;
  alias= table_case_name(create_info, table_name);
unknown's avatar
unknown committed
2979 2980
  if (!(file=get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
                             create_info->db_type)))
2981
  {
unknown's avatar
unknown committed
2982
    mem_alloc_error(sizeof(handler));
2983 2984
    DBUG_RETURN(TRUE);
  }
2985
#ifdef WITH_PARTITION_STORAGE_ENGINE
2986 2987
  partition_info *part_info= thd->work_part_info;

unknown's avatar
unknown committed
2988 2989 2990 2991 2992 2993 2994 2995
  if (!part_info && create_info->db_type->partition_flags &&
      (create_info->db_type->partition_flags() & HA_USE_AUTO_PARTITION))
  {
    /*
      Table is not defined as a partitioned table but the engine handles
      all tables as partitioned. The handler will set up the partition info
      object with the default settings.
    */
unknown's avatar
unknown committed
2996
    thd->work_part_info= part_info= new partition_info();
unknown's avatar
unknown committed
2997 2998 2999 3000 3001 3002
    if (!part_info)
    {
      mem_alloc_error(sizeof(partition_info));
      DBUG_RETURN(TRUE);
    }
    file->set_auto_partitions(part_info);
unknown's avatar
unknown committed
3003
    part_info->default_engine_type= create_info->db_type;
unknown's avatar
unknown committed
3004
  }
3005 3006 3007
  if (part_info)
  {
    /*
unknown's avatar
unknown committed
3008 3009 3010 3011 3012 3013 3014 3015 3016
      The table has been specified as a partitioned table.
      If this is part of an ALTER TABLE the handler will be the partition
      handler but we need to specify the default handler to use for
      partitions also in the call to check_partition_info. We transport
      this information in the default_db_type variable, it is either
      DB_TYPE_DEFAULT or the engine set in the ALTER TABLE command.

      Check that we don't use foreign keys in the table since it won't
      work even with InnoDB beneath it.
3017
    */
unknown's avatar
unknown committed
3018 3019
    List_iterator<Key> key_iterator(keys);
    Key *key;
unknown's avatar
unknown committed
3020
    handlerton *part_engine_type= create_info->db_type;
3021 3022
    char *part_syntax_buf;
    uint syntax_len;
unknown's avatar
unknown committed
3023
    handlerton *engine_type;
3024 3025 3026 3027 3028
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
    {
      my_error(ER_PARTITION_NO_TEMPORARY, MYF(0));
      goto err;
    }
unknown's avatar
unknown committed
3029 3030 3031 3032 3033 3034 3035 3036
    while ((key= key_iterator++))
    {
      if (key->type == Key::FOREIGN_KEY)
      {
        my_error(ER_CANNOT_ADD_FOREIGN, MYF(0));
        goto err;
      }
    }
3037 3038
    if ((part_engine_type == &partition_hton) &&
        part_info->default_engine_type)
3039 3040 3041 3042 3043 3044
    {
      /*
        This only happens at ALTER TABLE.
        default_engine_type was assigned from the engine set in the ALTER
        TABLE command.
      */
unknown's avatar
unknown committed
3045
      ;
3046
    }
3047 3048
    else
    {
unknown's avatar
unknown committed
3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
      if (create_info->used_fields & HA_CREATE_USED_ENGINE)
      {
        part_info->default_engine_type= create_info->db_type;
      }
      else
      {
        if (part_info->default_engine_type == NULL)
        {
          part_info->default_engine_type= ha_checktype(thd,
                                          DB_TYPE_DEFAULT, 0, 0);
        }
      }
3061
    }
unknown's avatar
unknown committed
3062 3063
    DBUG_PRINT("info", ("db_type = %d",
                         ha_legacy_type(part_info->default_engine_type)));
unknown's avatar
unknown committed
3064
    if (part_info->check_partition_info( &engine_type, file,
unknown's avatar
unknown committed
3065
                             create_info->max_rows))
unknown's avatar
unknown committed
3066
      goto err;
unknown's avatar
unknown committed
3067
    part_info->default_engine_type= engine_type;
unknown's avatar
unknown committed
3068

3069 3070 3071 3072 3073 3074
    /*
      We reverse the partitioning parser and generate a standard format
      for syntax stored in frm file.
    */
    if (!(part_syntax_buf= generate_partition_syntax(part_info,
                                                     &syntax_len,
unknown's avatar
unknown committed
3075
                                                     TRUE, FALSE)))
unknown's avatar
unknown committed
3076
      goto err;
3077 3078
    part_info->part_info_string= part_syntax_buf;
    part_info->part_info_len= syntax_len;
unknown's avatar
unknown committed
3079 3080
    if ((!(engine_type->partition_flags &&
           engine_type->partition_flags() & HA_CAN_PARTITION)) ||
unknown's avatar
unknown committed
3081
        create_info->db_type == &partition_hton)
3082 3083 3084 3085 3086
    {
      /*
        The handler assigned to the table cannot handle partitioning.
        Assign the partition handler as the handler of the table.
      */
unknown's avatar
unknown committed
3087 3088
      DBUG_PRINT("info", ("db_type: %d",
                          ha_legacy_type(create_info->db_type)));
3089
      delete file;
unknown's avatar
unknown committed
3090
      create_info->db_type= &partition_hton;
3091 3092 3093 3094
      if (!(file= get_ha_partition(part_info)))
      {
        DBUG_RETURN(TRUE);
      }
3095 3096 3097 3098 3099 3100 3101 3102
      /*
        If we have default number of partitions or subpartitions we
        might require to set-up the part_info object such that it
        creates a proper .par file. The current part_info object is
        only used to create the frm-file and .par-file.
      */
      if (part_info->use_default_no_partitions &&
          part_info->no_parts &&
3103
          (int)part_info->no_parts != file->get_default_no_partitions(0ULL))
3104
      {
3105
        uint i;
3106
        List_iterator<partition_element> part_it(part_info->partitions);
3107 3108 3109 3110
        part_it++;
        DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
        for (i= 1; i < part_info->partitions.elements; i++)
          (part_it++)->part_state= PART_TO_BE_DROPPED;
3111 3112 3113 3114
      }
      else if (part_info->is_sub_partitioned() &&
               part_info->use_default_no_subpartitions &&
               part_info->no_subparts &&
3115 3116
               (int)part_info->no_subparts !=
                 file->get_default_no_partitions(0ULL))
3117
      {
3118
        DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129
        part_info->no_subparts= file->get_default_no_partitions(0ULL);
      }
    }
    else if (create_info->db_type != engine_type)
    {
      delete file;
      if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root, engine_type)))
      {
        mem_alloc_error(sizeof(handler));
        DBUG_RETURN(TRUE);
      }
3130 3131 3132
    }
  }
#endif
3133

unknown's avatar
unknown committed
3134
  set_table_default_charset(thd, create_info, (char*) db);
3135

3136 3137 3138
  if (mysql_prepare_table(thd, create_info, &fields,
			  &keys, internal_tmp_table, &db_options, file,
			  &key_info_buffer, &key_count,
3139
			  select_field_count))
3140
    goto err;
unknown's avatar
unknown committed
3141 3142 3143 3144

      /* Check if table exists */
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
  {
3145
    path_length= build_tmptable_filename(thd, path, sizeof(path));
3146
    if (lower_case_table_names)
3147
      my_casedn_str(files_charset_info, path);
unknown's avatar
unknown committed
3148 3149 3150
    create_info->table_options|=HA_CREATE_DELAY_KEY_WRITE;
  }
  else
3151
    path_length= build_table_filename(path, sizeof(path), db, alias, reg_ext);
3152

unknown's avatar
unknown committed
3153
  /* Check if table already exists */
unknown's avatar
unknown committed
3154 3155
  if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
      find_temporary_table(thd, db, table_name))
unknown's avatar
unknown committed
3156
  {
3157
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
3158 3159
    {
      create_info->table_existed= 1;		// Mark that table existed
3160 3161 3162
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          alias);
unknown's avatar
unknown committed
3163 3164
      error= 0;
      goto err;
3165
    }
unknown's avatar
unknown committed
3166
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias);
3167
    goto err;
unknown's avatar
unknown committed
3168
  }
unknown's avatar
unknown committed
3169
  if (wait_if_global_read_lock(thd, 0, 1))
3170
    goto err;
unknown's avatar
unknown committed
3171
  VOID(pthread_mutex_lock(&LOCK_open));
unknown's avatar
unknown committed
3172
  if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
unknown's avatar
unknown committed
3173 3174 3175 3176
  {
    if (!access(path,F_OK))
    {
      if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
3177 3178
        goto warn;
      my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
unknown's avatar
unknown committed
3179
      goto unlock_and_end;
unknown's avatar
unknown committed
3180
    }
unknown's avatar
unknown committed
3181
    DBUG_ASSERT(get_cached_table_share(db, alias) == 0);
unknown's avatar
unknown committed
3182 3183
  }

unknown's avatar
unknown committed
3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196
  /*
    Check that table with given name does not already
    exist in any storage engine. In such a case it should
    be discovered and the error ER_TABLE_EXISTS_ERROR be returned
    unless user specified CREATE TABLE IF EXISTS
    The LOCK_open mutex has been locked to make sure no
    one else is attempting to discover the table. Since
    it's not on disk as a frm file, no one could be using it!
  */
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
  {
    bool create_if_not_exists =
      create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
3197
    if (ha_table_exists_in_engine(thd, db, table_name))
unknown's avatar
unknown committed
3198
    {
3199
      DBUG_PRINT("info", ("Table with same name already existed in handler"));
unknown's avatar
unknown committed
3200 3201

      if (create_if_not_exists)
3202 3203
        goto warn;
      my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
unknown's avatar
unknown committed
3204
      goto unlock_and_end;
unknown's avatar
unknown committed
3205 3206 3207 3208
    }
  }

  thd->proc_info="creating table";
3209
  create_info->table_existed= 0;		// Mark that table is created
unknown's avatar
unknown committed
3210

unknown's avatar
unknown committed
3211
  if (thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE)
3212
    create_info->data_file_name= create_info->index_file_name= 0;
unknown's avatar
unknown committed
3213
  create_info->table_options=db_options;
3214

3215
  path[path_length - reg_ext_length]= '\0'; // Remove .frm extension
unknown's avatar
unknown committed
3216 3217
  if (rea_create_table(thd, path, db, table_name, create_info, fields,
                       key_count, key_info_buffer, file))
unknown's avatar
unknown committed
3218
    goto unlock_and_end;
unknown's avatar
unknown committed
3219

unknown's avatar
unknown committed
3220 3221 3222 3223 3224 3225
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
  {
    /* Open table and put in temporary table list */
    if (!(open_temporary_table(thd, path, db, table_name, 1)))
    {
      (void) rm_temporary_table(create_info->db_type, path);
unknown's avatar
unknown committed
3226
      goto unlock_and_end;
unknown's avatar
unknown committed
3227
    }
unknown's avatar
unknown committed
3228
    thd->tmp_table_used= 1;
unknown's avatar
unknown committed
3229
  }
unknown's avatar
unknown committed
3230

3231 3232 3233 3234 3235
  /*
    Don't write statement if:
    - It is an internal temporary table,
    - Row-based logging is used and it we are creating a temporary table, or
    - The binary log is not open.
3236
    Otherwise, the statement shall be binlogged.
3237 3238
   */
  if (!internal_tmp_table &&
3239 3240
      (!thd->current_stmt_binlog_row_based ||
       (thd->current_stmt_binlog_row_based &&
3241
        !(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
3242
    write_bin_log(thd, TRUE, thd->query, thd->query_length);
unknown's avatar
unknown committed
3243
  error= FALSE;
unknown's avatar
unknown committed
3244
unlock_and_end:
unknown's avatar
unknown committed
3245
  VOID(pthread_mutex_unlock(&LOCK_open));
3246
  start_waiting_global_read_lock(thd);
unknown's avatar
unknown committed
3247 3248

err:
unknown's avatar
unknown committed
3249
  thd->proc_info="After create";
unknown's avatar
unknown committed
3250
  delete file;
unknown's avatar
unknown committed
3251
  DBUG_RETURN(error);
3252 3253

warn:
3254
  error= FALSE;
3255 3256 3257 3258
  push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                      ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                      alias);
  create_info->table_existed= 1;		// Mark that table existed
unknown's avatar
unknown committed
3259
  goto unlock_and_end;
unknown's avatar
unknown committed
3260 3261
}

unknown's avatar
unknown committed
3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304

/*
  Database locking aware wrapper for mysql_create_table_internal(),
*/

bool mysql_create_table(THD *thd, const char *db, const char *table_name,
                        HA_CREATE_INFO *create_info,
                        List<create_field> &fields,
                        List<Key> &keys,bool internal_tmp_table,
                        uint select_field_count)
{
  bool result;
  DBUG_ENTER("mysql_create_table");

  /* Wait for any database locks */
  pthread_mutex_lock(&LOCK_lock_db);
  while (!thd->killed &&
         hash_search(&lock_db_cache,(byte*) db, strlen(db)))
  {
    wait_for_condition(thd, &LOCK_lock_db, &COND_refresh);
    pthread_mutex_lock(&LOCK_lock_db);
  }

  if (thd->killed)
  {
    pthread_mutex_unlock(&LOCK_lock_db);
    DBUG_RETURN(TRUE);
  }
  creating_table++;
  pthread_mutex_unlock(&LOCK_lock_db);

  result= mysql_create_table_internal(thd, db, table_name, create_info,
                                      fields, keys, internal_tmp_table,
                                      select_field_count);

  pthread_mutex_lock(&LOCK_lock_db);
  if (!--creating_table && creating_database)
    pthread_cond_signal(&COND_refresh);
  pthread_mutex_unlock(&LOCK_lock_db);
  DBUG_RETURN(result);
}


unknown's avatar
unknown committed
3305 3306 3307 3308 3309 3310 3311 3312
/*
** Give the key name after the first field with an optional '_#' after
**/

static bool
check_if_keyname_exists(const char *name, KEY *start, KEY *end)
{
  for (KEY *key=start ; key != end ; key++)
3313
    if (!my_strcasecmp(system_charset_info,name,key->name))
unknown's avatar
unknown committed
3314 3315 3316 3317 3318 3319 3320 3321 3322 3323
      return 1;
  return 0;
}


static char *
make_unique_key_name(const char *field_name,KEY *start,KEY *end)
{
  char buff[MAX_FIELD_NAME],*buff_end;

3324 3325
  if (!check_if_keyname_exists(field_name,start,end) &&
      my_strcasecmp(system_charset_info,field_name,primary_key_name))
unknown's avatar
unknown committed
3326
    return (char*) field_name;			// Use fieldname
3327 3328 3329 3330 3331 3332
  buff_end=strmake(buff,field_name, sizeof(buff)-4);

  /*
    Only 3 chars + '\0' left, so need to limit to 2 digit
    This is ok as we can't have more than 100 keys anyway
  */
3333
  for (uint i=2 ; i< 100; i++)
unknown's avatar
unknown committed
3334
  {
3335 3336
    *buff_end= '_';
    int10_to_str(i, buff_end+1, 10);
unknown's avatar
unknown committed
3337 3338 3339
    if (!check_if_keyname_exists(buff,start,end))
      return sql_strdup(buff);
  }
3340
  return (char*) "not_specified";		// Should never happen
unknown's avatar
unknown committed
3341 3342
}

3343

unknown's avatar
unknown committed
3344 3345 3346 3347 3348
/****************************************************************************
** Create table from a list of fields and items
****************************************************************************/

TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
unknown's avatar
VIEW  
unknown committed
3349
			       TABLE_LIST *create_table,
unknown's avatar
unknown committed
3350 3351 3352
			       List<create_field> *extra_fields,
			       List<Key> *keys,
			       List<Item> *items,
3353 3354
			       MYSQL_LOCK **lock,
                               TABLEOP_HOOKS *hooks)
unknown's avatar
unknown committed
3355 3356
{
  TABLE tmp_table;		// Used during 'create_field()'
unknown's avatar
unknown committed
3357
  TABLE_SHARE share;
3358
  TABLE *table= 0;
3359
  uint select_field_count= items->elements;
unknown's avatar
unknown committed
3360
  /* Add selected items to field list */
unknown's avatar
unknown committed
3361
  List_iterator_fast<Item> it(*items);
unknown's avatar
unknown committed
3362 3363
  Item *item;
  Field *tmp_field;
3364
  bool not_used;
3365 3366 3367
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
3368
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
3369 3370 3371
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

3372 3373
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
3374 3375 3376
  tmp_table.s->db_low_byte_first= 
        test(create_info->db_type == &myisam_hton ||
             create_info->db_type == &heap_hton);
unknown's avatar
unknown committed
3377 3378 3379 3380 3381
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
3382 3383 3384 3385 3386
    Field *field;
    if (item->type() == Item::FUNC_ITEM)
      field=item->tmp_table_field(&tmp_table);
    else
      field=create_tmp_field(thd, &tmp_table, item, item->type(),
unknown's avatar
unknown committed
3387
                             (Item ***) 0, &tmp_field, 0, 0, 0, 0);
3388 3389
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
3390 3391
					   ((Item_field *)item)->field :
					   (Field*) 0))))
unknown's avatar
unknown committed
3392
      DBUG_RETURN(0);
unknown's avatar
unknown committed
3393 3394
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
unknown's avatar
unknown committed
3395 3396
    extra_fields->push_back(cr_field);
  }
3397
  /*
unknown's avatar
unknown committed
3398 3399
    create and lock table

3400
    We don't log the statement, it will be logged later.
unknown's avatar
unknown committed
3401

3402 3403 3404 3405
    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
3406 3407
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
unknown's avatar
unknown committed
3408
    TODO: create and open should be done atomic !
3409
  */
unknown's avatar
unknown committed
3410
  {
unknown's avatar
unknown committed
3411
    tmp_disable_binlog(thd);
3412
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
unknown's avatar
unknown committed
3413 3414 3415
                            create_info, *extra_fields, *keys, 0,
                            select_field_count))
    {
3416 3417
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               MYSQL_LOCK_IGNORE_FLUSH)))
unknown's avatar
unknown committed
3418
        quick_rm_table(create_info->db_type, create_table->db,
3419
                       table_case_name(create_info, create_table->table_name));
unknown's avatar
unknown committed
3420 3421 3422 3423
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
unknown's avatar
unknown committed
3424
  }
unknown's avatar
unknown committed
3425

3426 3427 3428 3429 3430 3431
  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
unknown's avatar
unknown committed
3432
  table->reginfo.lock_type=TL_WRITE;
3433
  hooks->prelock(&table, 1);                    // Call prelock hooks
3434 3435
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
unknown's avatar
unknown committed
3436
  {
3437
    VOID(pthread_mutex_lock(&LOCK_open));
unknown's avatar
unknown committed
3438
    hash_delete(&open_cache,(byte*) table);
3439
    VOID(pthread_mutex_unlock(&LOCK_open));
unknown's avatar
VIEW  
unknown committed
3440
    quick_rm_table(create_info->db_type, create_table->db,
3441
		   table_case_name(create_info, create_table->table_name));
unknown's avatar
unknown committed
3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


/****************************************************************************
** Alter a table definition
****************************************************************************/

3453
bool
unknown's avatar
unknown committed
3454
mysql_rename_table(handlerton *base,
unknown's avatar
unknown committed
3455
		   const char *old_db,
unknown's avatar
unknown committed
3456
		   const char *old_name,
unknown's avatar
unknown committed
3457
		   const char *new_db,
unknown's avatar
unknown committed
3458
		   const char *new_name)
unknown's avatar
unknown committed
3459
{
3460
  THD *thd= current_thd;
3461 3462 3463
  char from[FN_REFLEN], to[FN_REFLEN], lc_from[FN_REFLEN], lc_to[FN_REFLEN];
  char *from_base= from, *to_base= to;
  char tmp_name[NAME_LEN+1];
unknown's avatar
unknown committed
3464
  handler *file;
unknown's avatar
unknown committed
3465
  int error=0;
unknown's avatar
unknown committed
3466
  DBUG_ENTER("mysql_rename_table");
unknown's avatar
unknown committed
3467

unknown's avatar
unknown committed
3468
  file= (base == NULL ? 0 :
unknown's avatar
unknown committed
3469 3470
         get_new_handler((TABLE_SHARE*) 0, thd->mem_root, base));

3471 3472
  build_table_filename(from, sizeof(from), old_db, old_name, "");
  build_table_filename(to, sizeof(to), new_db, new_name, "");
3473 3474 3475 3476 3477 3478

  /*
    If lower_case_table_names == 2 (case-preserving but case-insensitive
    file system) and the storage is not HA_FILE_BASED, we need to provide
    a lowercase file name, but we leave the .frm in mixed case.
   */
3479 3480
  if (lower_case_table_names == 2 && file &&
      !(file->table_flags() & HA_FILE_BASED))
unknown's avatar
unknown committed
3481
  {
3482 3483
    strmov(tmp_name, old_name);
    my_casedn_str(files_charset_info, tmp_name);
3484
    build_table_filename(lc_from, sizeof(lc_from), old_db, tmp_name, "");
3485
    from_base= lc_from;
unknown's avatar
unknown committed
3486

3487 3488
    strmov(tmp_name, new_name);
    my_casedn_str(files_charset_info, tmp_name);
3489
    build_table_filename(lc_to, sizeof(lc_to), new_db, tmp_name, "");
3490
    to_base= lc_to;
unknown's avatar
unknown committed
3491 3492
  }

3493
  if (!file || !(error=file->rename_table(from_base, to_base)))
3494 3495 3496
  {
    if (rename_file_ext(from,to,reg_ext))
    {
unknown's avatar
unknown committed
3497
      error=my_errno;
3498
      /* Restore old file name */
3499
      if (file)
3500
        file->rename_table(to_base, from_base);
3501 3502
    }
  }
unknown's avatar
unknown committed
3503
  delete file;
unknown's avatar
unknown committed
3504 3505 3506
  if (error)
    my_error(ER_ERROR_ON_RENAME, MYF(0), from, to, error);
  DBUG_RETURN(error != 0);
unknown's avatar
unknown committed
3507 3508
}

unknown's avatar
unknown committed
3509

unknown's avatar
unknown committed
3510
/*
3511 3512 3513 3514 3515 3516
  Force all other threads to stop using the table

  SYNOPSIS
    wait_while_table_is_used()
    thd			Thread handler
    table		Table to remove from cache
3517
    function		HA_EXTRA_PREPARE_FOR_DELETE if table is to be deleted
3518
			HA_EXTRA_FORCE_REOPEN if table is not be used
3519 3520 3521 3522 3523 3524 3525
  NOTES
   When returning, the table will be unusable for other threads until
   the table is closed.

  PREREQUISITES
    Lock on LOCK_open
    Win32 clients must also have a WRITE LOCK on the table !
unknown's avatar
unknown committed
3526 3527
*/

3528 3529
static void wait_while_table_is_used(THD *thd,TABLE *table,
				     enum ha_extra_function function)
unknown's avatar
unknown committed
3530
{
3531
  DBUG_ENTER("wait_while_table_is_used");
unknown's avatar
unknown committed
3532 3533 3534
  DBUG_PRINT("enter", ("table: '%s'  share: 0x%lx  db_stat: %u  version: %u",
                       table->s->table_name.str, (ulong) table->s,
                       table->db_stat, table->s->version));
unknown's avatar
unknown committed
3535

3536
  VOID(table->file->extra(function));
3537
  /* Mark all tables that are in use as 'old' */
unknown's avatar
unknown committed
3538
  mysql_lock_abort(thd, table, TRUE);	/* end threads waiting on lock */
3539 3540

  /* Wait until all there are no other threads that has this table open */
unknown's avatar
unknown committed
3541 3542 3543
  remove_table_from_cache(thd, table->s->db.str,
                          table->s->table_name.str,
                          RTFC_WAIT_OTHER_THREAD_FLAG);
3544 3545
  DBUG_VOID_RETURN;
}
unknown's avatar
unknown committed
3546

3547 3548
/*
  Close a cached table
unknown's avatar
unknown committed
3549

3550
  SYNOPSIS
unknown's avatar
unknown committed
3551
    close_cached_table()
3552 3553 3554 3555 3556 3557
    thd			Thread handler
    table		Table to remove from cache

  NOTES
    Function ends by signaling threads waiting for the table to try to
    reopen the table.
unknown's avatar
unknown committed
3558

3559 3560 3561 3562
  PREREQUISITES
    Lock on LOCK_open
    Win32 clients must also have a WRITE LOCK on the table !
*/
3563

3564
void close_cached_table(THD *thd, TABLE *table)
3565 3566
{
  DBUG_ENTER("close_cached_table");
3567

3568
  wait_while_table_is_used(thd, table, HA_EXTRA_PREPARE_FOR_DELETE);
3569 3570
  /* Close lock if this is not got with LOCK TABLES */
  if (thd->lock)
3571
  {
3572 3573
    mysql_unlock_tables(thd, thd->lock);
    thd->lock=0;			// Start locked threads
unknown's avatar
unknown committed
3574
  }
3575 3576 3577 3578 3579
  /* Close all copies of 'table'.  This also frees all LOCK TABLES lock */
  thd->open_tables=unlink_open_table(thd,thd->open_tables,table);

  /* When lock on LOCK_open is freed other threads can continue */
  pthread_cond_broadcast(&COND_refresh);
unknown's avatar
unknown committed
3580
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3581 3582
}

3583
static int send_check_errmsg(THD *thd, TABLE_LIST* table,
unknown's avatar
unknown committed
3584
			     const char* operator_name, const char* errmsg)
3585

unknown's avatar
unknown committed
3586
{
3587 3588
  Protocol *protocol= thd->protocol;
  protocol->prepare_for_resend();
3589 3590
  protocol->store(table->alias, system_charset_info);
  protocol->store((char*) operator_name, system_charset_info);
3591
  protocol->store(STRING_WITH_LEN("error"), system_charset_info);
3592
  protocol->store(errmsg, system_charset_info);
unknown's avatar
unknown committed
3593
  thd->clear_error();
3594
  if (protocol->write())
unknown's avatar
unknown committed
3595 3596 3597 3598
    return -1;
  return 1;
}

3599

unknown's avatar
unknown committed
3600
static int prepare_for_restore(THD* thd, TABLE_LIST* table,
3601
			       HA_CHECK_OPT *check_opt)
unknown's avatar
unknown committed
3602
{
unknown's avatar
unknown committed
3603
  DBUG_ENTER("prepare_for_restore");
3604

unknown's avatar
unknown committed
3605 3606 3607 3608 3609 3610
  if (table->table) // do not overwrite existing tables on restore
  {
    DBUG_RETURN(send_check_errmsg(thd, table, "restore",
				  "table exists, will not overwrite on restore"
				  ));
  }
unknown's avatar
unknown committed
3611
  else
unknown's avatar
unknown committed
3612
  {
3613
    char* backup_dir= thd->lex->backup_dir;
3614
    char src_path[FN_REFLEN], dst_path[FN_REFLEN], uname[FN_REFLEN];
3615 3616
    char* table_name= table->table_name;
    char* db= table->db;
3617

3618 3619 3620
    VOID(tablename_to_filename(table->table_name, uname, sizeof(uname)));

    if (fn_format_relative_to_data_home(src_path, uname, backup_dir, reg_ext))
unknown's avatar
unknown committed
3621
      DBUG_RETURN(-1); // protect buffer overflow
unknown's avatar
unknown committed
3622

3623
    build_table_filename(dst_path, sizeof(dst_path), db, table_name, reg_ext);
3624

3625
    if (lock_and_wait_for_table_name(thd,table))
unknown's avatar
unknown committed
3626
      DBUG_RETURN(-1);
3627

3628
    if (my_copy(src_path, dst_path, MYF(MY_WME)))
unknown's avatar
unknown committed
3629
    {
3630
      pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
3631
      unlock_table_name(thd, table);
3632
      pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3633 3634 3635
      DBUG_RETURN(send_check_errmsg(thd, table, "restore",
				    "Failed copying .frm file"));
    }
3636
    if (mysql_truncate(thd, table, 1))
unknown's avatar
unknown committed
3637
    {
3638
      pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
3639
      unlock_table_name(thd, table);
3640
      pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3641 3642
      DBUG_RETURN(send_check_errmsg(thd, table, "restore",
				    "Failed generating table from .frm file"));
unknown's avatar
unknown committed
3643
    }
unknown's avatar
unknown committed
3644
  }
unknown's avatar
unknown committed
3645

3646 3647 3648 3649
  /*
    Now we should be able to open the partially restored table
    to finish the restore in the handler later on
  */
3650 3651
  pthread_mutex_lock(&LOCK_open);
  if (reopen_name_locked_table(thd, table))
3652
  {
3653
    unlock_table_name(thd, table);
3654
    pthread_mutex_unlock(&LOCK_open);
3655 3656
    DBUG_RETURN(send_check_errmsg(thd, table, "restore",
                                  "Failed to open partially restored table"));
3657
  }
3658
  pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3659
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3660
}
3661

3662

unknown's avatar
unknown committed
3663
static int prepare_for_repair(THD *thd, TABLE_LIST *table_list,
3664
			      HA_CHECK_OPT *check_opt)
unknown's avatar
unknown committed
3665
{
unknown's avatar
unknown committed
3666 3667
  int error= 0;
  TABLE tmp_table, *table;
unknown's avatar
unknown committed
3668 3669 3670 3671
  TABLE_SHARE *share;
  char from[FN_REFLEN],tmp[FN_REFLEN+32];
  const char **ext;
  MY_STAT stat_info;
unknown's avatar
unknown committed
3672 3673 3674 3675
  DBUG_ENTER("prepare_for_repair");

  if (!(check_opt->sql_flags & TT_USEFRM))
    DBUG_RETURN(0);
unknown's avatar
unknown committed
3676 3677

  if (!(table= table_list->table))		/* if open_ltable failed */
unknown's avatar
unknown committed
3678
  {
unknown's avatar
unknown committed
3679 3680 3681 3682 3683 3684 3685 3686 3687
    char key[MAX_DBKEY_LENGTH];
    uint key_length;

    key_length= create_table_def_key(thd, key, table_list, 0);
    pthread_mutex_lock(&LOCK_open);
    if (!(share= (get_table_share(thd, table_list, key, key_length, 0,
                                  &error))))
    {
      pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3688
      DBUG_RETURN(0);				// Can't open frm file
unknown's avatar
unknown committed
3689 3690
    }

unknown's avatar
unknown committed
3691
    if (open_table_from_share(thd, share, "", 0, 0, 0, &tmp_table, FALSE))
unknown's avatar
unknown committed
3692 3693 3694 3695 3696
    {
      release_table_share(share, RELEASE_NORMAL);
      pthread_mutex_unlock(&LOCK_open);
      DBUG_RETURN(0);                           // Out of memory
    }
unknown's avatar
unknown committed
3697
    table= &tmp_table;
unknown's avatar
unknown committed
3698
    pthread_mutex_unlock(&LOCK_open);
3699
  }
unknown's avatar
unknown committed
3700

unknown's avatar
unknown committed
3701 3702 3703 3704 3705 3706 3707 3708 3709
  /*
    User gave us USE_FRM which means that the header in the index file is
    trashed.
    In this case we will try to fix the table the following way:
    - Rename the data file to a temporary name
    - Truncate the table
    - Replace the new data file with the old one
    - Run a normal repair using the new index file and the old data file
  */
unknown's avatar
unknown committed
3710

unknown's avatar
unknown committed
3711 3712 3713 3714
  /*
    Check if this is a table type that stores index and data separately,
    like ISAM or MyISAM
  */
unknown's avatar
unknown committed
3715
  ext= table->file->bas_ext();
unknown's avatar
unknown committed
3716 3717
  if (!ext[0] || !ext[1])
    goto end;					// No data file
unknown's avatar
unknown committed
3718

unknown's avatar
unknown committed
3719 3720
  // Name of data file
  strxmov(from, table->s->normalized_path.str, ext[1], NullS);
unknown's avatar
unknown committed
3721 3722
  if (!my_stat(from, &stat_info, MYF(0)))
    goto end;				// Can't use USE_FRM flag
unknown's avatar
unknown committed
3723

3724 3725
  my_snprintf(tmp, sizeof(tmp), "%s-%lx_%lx",
	      from, current_pid, thd->thread_id);
unknown's avatar
unknown committed
3726

3727 3728 3729 3730 3731 3732 3733
  /* If we could open the table, close it */
  if (table_list->table)
  {
    pthread_mutex_lock(&LOCK_open);
    close_cached_table(thd, table);
    pthread_mutex_unlock(&LOCK_open);
  }
unknown's avatar
unknown committed
3734
  if (lock_and_wait_for_table_name(thd,table_list))
unknown's avatar
unknown committed
3735
  {
unknown's avatar
unknown committed
3736 3737
    error= -1;
    goto end;
unknown's avatar
unknown committed
3738
  }
unknown's avatar
unknown committed
3739
  if (my_rename(from, tmp, MYF(MY_WME)))
unknown's avatar
unknown committed
3740
  {
3741
    pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
3742
    unlock_table_name(thd, table_list);
3743
    pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764
    error= send_check_errmsg(thd, table_list, "repair",
			     "Failed renaming data file");
    goto end;
  }
  if (mysql_truncate(thd, table_list, 1))
  {
    pthread_mutex_lock(&LOCK_open);
    unlock_table_name(thd, table_list);
    pthread_mutex_unlock(&LOCK_open);
    error= send_check_errmsg(thd, table_list, "repair",
			     "Failed generating table from .frm file");
    goto end;
  }
  if (my_rename(tmp, from, MYF(MY_WME)))
  {
    pthread_mutex_lock(&LOCK_open);
    unlock_table_name(thd, table_list);
    pthread_mutex_unlock(&LOCK_open);
    error= send_check_errmsg(thd, table_list, "repair",
			     "Failed restoring .MYD file");
    goto end;
unknown's avatar
unknown committed
3765 3766
  }

3767 3768 3769 3770
  /*
    Now we should be able to open the partially repaired table
    to finish the repair in the handler later on.
  */
3771 3772
  pthread_mutex_lock(&LOCK_open);
  if (reopen_name_locked_table(thd, table_list))
unknown's avatar
unknown committed
3773
  {
unknown's avatar
unknown committed
3774
    unlock_table_name(thd, table_list);
unknown's avatar
unknown committed
3775
    pthread_mutex_unlock(&LOCK_open);
3776 3777 3778
    error= send_check_errmsg(thd, table_list, "repair",
                             "Failed to open partially repaired table");
    goto end;
unknown's avatar
unknown committed
3779
  }
3780
  pthread_mutex_unlock(&LOCK_open);
unknown's avatar
unknown committed
3781 3782 3783

end:
  if (table == &tmp_table)
unknown's avatar
unknown committed
3784 3785 3786 3787 3788
  {
    pthread_mutex_lock(&LOCK_open);
    closefrm(table, 1);				// Free allocated memory
    pthread_mutex_unlock(&LOCK_open);
  }
unknown's avatar
unknown committed
3789
  DBUG_RETURN(error);
unknown's avatar
unknown committed
3790
}
3791

3792

3793

unknown's avatar
unknown committed
3794 3795
/*
  RETURN VALUES
unknown's avatar
merge  
unknown committed
3796 3797 3798
    FALSE Message sent to net (admin operation went ok)
    TRUE  Message should be sent by caller 
          (admin operation or network communication failed)
unknown's avatar
unknown committed
3799
*/
unknown's avatar
unknown committed
3800 3801 3802 3803 3804
static bool mysql_admin_table(THD* thd, TABLE_LIST* tables,
                              HA_CHECK_OPT* check_opt,
                              const char *operator_name,
                              thr_lock_type lock_type,
                              bool open_for_modify,
3805
                              bool no_warnings_for_error,
unknown's avatar
unknown committed
3806 3807 3808
                              uint extra_open_options,
                              int (*prepare_func)(THD *, TABLE_LIST *,
                                                  HA_CHECK_OPT *),
unknown's avatar
unknown committed
3809 3810 3811
                              int (handler::*operator_func)(THD *,
                                                            HA_CHECK_OPT *),
                              int (view_operator_func)(THD *, TABLE_LIST*))
unknown's avatar
unknown committed
3812
{
3813 3814
  TABLE_LIST *table, *save_next_global, *save_next_local;
  SELECT_LEX *select= &thd->lex->select_lex;
unknown's avatar
unknown committed
3815
  List<Item> field_list;
3816 3817
  Item *item;
  Protocol *protocol= thd->protocol;
3818
  LEX *lex= thd->lex;
3819
  int result_code;
3820
  DBUG_ENTER("mysql_admin_table");
unknown's avatar
unknown committed
3821 3822 3823 3824 3825 3826 3827 3828 3829

  field_list.push_back(item = new Item_empty_string("Table", NAME_LEN*2));
  item->maybe_null = 1;
  field_list.push_back(item = new Item_empty_string("Op", 10));
  item->maybe_null = 1;
  field_list.push_back(item = new Item_empty_string("Msg_type", 10));
  item->maybe_null = 1;
  field_list.push_back(item = new Item_empty_string("Msg_text", 255));
  item->maybe_null = 1;
3830 3831
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
unknown's avatar
unknown committed
3832
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
3833

3834
  mysql_ha_flush(thd, tables, MYSQL_HA_CLOSE_FINAL, FALSE);
unknown's avatar
VIEW  
unknown committed
3835
  for (table= tables; table; table= table->next_local)
unknown's avatar
unknown committed
3836 3837
  {
    char table_name[NAME_LEN*2+2];
3838
    char* db = table->db;
3839
    bool fatal_error=0;
unknown's avatar
unknown committed
3840

unknown's avatar
unknown committed
3841
    strxmov(table_name, db, ".", table->table_name, NullS);
unknown's avatar
unknown committed
3842
    thd->open_options|= extra_open_options;
3843 3844
    table->lock_type= lock_type;
    /* open only one table from local list of command */
3845
    save_next_global= table->next_global;
3846
    table->next_global= 0;
3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857
    save_next_local= table->next_local;
    table->next_local= 0;
    select->table_list.first= (byte*)table;
    /*
      Time zone tables and SP tables can be add to lex->query_tables list,
      so it have to be prepared.
      TODO: Investigate if we can put extra tables into argument instead of
      using lex->query_tables
    */
    lex->query_tables= table;
    lex->query_tables_last= &table->next_global;
3858
    lex->query_tables_own_last= 0;
3859
    thd->no_warnings_for_error= no_warnings_for_error;
3860
    if (view_operator_func == NULL)
3861
      table->required_type=FRMTYPE_TABLE;
3862
    open_and_lock_tables(thd, table);
3863
    thd->no_warnings_for_error= 0;
3864 3865
    table->next_global= save_next_global;
    table->next_local= save_next_local;
unknown's avatar
unknown committed
3866
    thd->open_options&= ~extra_open_options;
3867

unknown's avatar
unknown committed
3868
    if (prepare_func)
3869
    {
unknown's avatar
unknown committed
3870
      switch ((*prepare_func)(thd, table, check_opt)) {
3871 3872 3873 3874 3875 3876 3877
      case  1:           // error, message written to net
        close_thread_tables(thd);
        continue;
      case -1:           // error, message could be written to net
        goto err;
      default:           // should be 0 otherwise
        ;
unknown's avatar
unknown committed
3878
      }
3879
    }
3880

3881
    /*
unknown's avatar
unknown committed
3882 3883 3884 3885 3886 3887
      CHECK TABLE command is only command where VIEW allowed here and this
      command use only temporary teble method for VIEWs resolving => there
      can't be VIEW tree substitition of join view => if opening table
      succeed then table->table will have real TABLE pointer as value (in
      case of join view substitution table->table can be 0, but here it is
      impossible)
3888
    */
unknown's avatar
unknown committed
3889 3890
    if (!table->table)
    {
unknown's avatar
unknown committed
3891
      char buf[ERRMSGSIZE+ERRMSGSIZE+2];
unknown's avatar
unknown committed
3892
      const char *err_msg;
3893
      protocol->prepare_for_resend();
3894 3895
      protocol->store(table_name, system_charset_info);
      protocol->store(operator_name, system_charset_info);
3896
      protocol->store(STRING_WITH_LEN("error"), system_charset_info);
unknown's avatar
unknown committed
3897 3898
      if (!(err_msg=thd->net.last_error))
	err_msg=ER(ER_CHECK_NO_SUCH_TABLE);
3899 3900 3901 3902
      /* if it was a view will check md5 sum */
      if (table->view &&
          view_checksum(thd, table) == HA_ADMIN_WRONG_CHECKSUM)
      {
unknown's avatar
unknown committed
3903
        strxmov(buf, err_msg, "; ", ER(ER_VIEW_CHECKSUM), NullS);
3904 3905
        err_msg= (const char *)buf;
      }
3906
      protocol->store(err_msg, system_charset_info);
3907
      lex->cleanup_after_one_table_open();
unknown's avatar
unknown committed
3908
      thd->clear_error();
3909 3910 3911 3912 3913
      /*
        View opening can be interrupted in the middle of process so some
        tables can be left opening
      */
      close_thread_tables(thd);
3914
      if (protocol->write())
unknown's avatar
unknown committed
3915 3916 3917
	goto err;
      continue;
    }
3918 3919 3920 3921 3922 3923 3924

    if (table->view)
    {
      result_code= (*view_operator_func)(thd, table);
      goto send_result;
    }

unknown's avatar
unknown committed
3925
    table->table->pos_in_table_list= table;
3926
    if ((table->table->db_stat & HA_READ_ONLY) && open_for_modify)
unknown's avatar
unknown committed
3927
    {
unknown's avatar
unknown committed
3928
      char buff[FN_REFLEN + MYSQL_ERRMSG_SIZE];
3929
      uint length;
3930
      protocol->prepare_for_resend();
3931 3932
      protocol->store(table_name, system_charset_info);
      protocol->store(operator_name, system_charset_info);
3933
      protocol->store(STRING_WITH_LEN("error"), system_charset_info);
3934 3935 3936
      length= my_snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
                          table_name);
      protocol->store(buff, length, system_charset_info);
3937
      close_thread_tables(thd);
unknown's avatar
unknown committed
3938
      table->table=0;				// For query cache
3939
      if (protocol->write())
unknown's avatar
unknown committed
3940 3941 3942 3943
	goto err;
      continue;
    }

3944
    /* Close all instances of the table to allow repair to rename files */
3945 3946
    if (lock_type == TL_WRITE && table->table->s->version &&
        !table->table->s->log_table)
3947 3948
    {
      pthread_mutex_lock(&LOCK_open);
3949 3950
      const char *old_message=thd->enter_cond(&COND_refresh, &LOCK_open,
					      "Waiting to get writelock");
unknown's avatar
unknown committed
3951
      mysql_lock_abort(thd,table->table, TRUE);
unknown's avatar
unknown committed
3952 3953
      remove_table_from_cache(thd, table->table->s->db.str,
                              table->table->s->table_name.str,
unknown's avatar
unknown committed
3954 3955
                              RTFC_WAIT_OTHER_THREAD_FLAG |
                              RTFC_CHECK_KILLED_FLAG);
3956
      thd->exit_cond(old_message);
3957 3958
      if (thd->killed)
	goto err;
3959 3960 3961
      /* Flush entries in the query cache involving this table. */
      query_cache_invalidate3(thd, table->table, 0);
      open_for_modify= 0;
3962 3963
    }

unknown's avatar
unknown committed
3964
    if (table->table->s->crashed && operator_func == &handler::ha_check)
3965 3966 3967 3968
    {
      protocol->prepare_for_resend();
      protocol->store(table_name, system_charset_info);
      protocol->store(operator_name, system_charset_info);
3969 3970 3971
      protocol->store(STRING_WITH_LEN("warning"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Table is marked as crashed"),
                      system_charset_info);
3972 3973 3974 3975
      if (protocol->write())
        goto err;
    }

unknown's avatar
unknown committed
3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990
    if (operator_func == &handler::ha_repair)
    {
      if ((table->table->file->check_old_types() == HA_ADMIN_NEEDS_ALTER) ||
          (table->table->file->ha_check_for_upgrade(check_opt) ==
           HA_ADMIN_NEEDS_ALTER))
      {
        close_thread_tables(thd);
        tmp_disable_binlog(thd); // binlogging is done by caller if wanted
        result_code= mysql_recreate_table(thd, table, 0);
        reenable_binlog(thd);
        goto send_result;
      }

    }

3991 3992 3993 3994
    result_code = (table->table->file->*operator_func)(thd, check_opt);

send_result:

3995
    lex->cleanup_after_one_table_open();
unknown's avatar
unknown committed
3996
    thd->clear_error();  // these errors shouldn't get client
3997
    protocol->prepare_for_resend();
3998 3999
    protocol->store(table_name, system_charset_info);
    protocol->store(operator_name, system_charset_info);
unknown's avatar
unknown committed
4000

4001 4002 4003
send_result_message:

    DBUG_PRINT("info", ("result_code: %d", result_code));
4004 4005
    switch (result_code) {
    case HA_ADMIN_NOT_IMPLEMENTED:
4006
      {
4007 4008
	char buf[ERRMSGSIZE+20];
	uint length=my_snprintf(buf, ERRMSGSIZE,
4009
				ER(ER_CHECK_NOT_IMPLEMENTED), operator_name);
4010
	protocol->store(STRING_WITH_LEN("note"), system_charset_info);
4011
	protocol->store(buf, length, system_charset_info);
4012
      }
unknown's avatar
unknown committed
4013 4014
      break;

unknown's avatar
unknown committed
4015 4016
    case HA_ADMIN_NOT_BASE_TABLE:
      {
unknown's avatar
unknown committed
4017 4018
        char buf[ERRMSGSIZE+20];
        uint length= my_snprintf(buf, ERRMSGSIZE,
unknown's avatar
unknown committed
4019
                                 ER(ER_BAD_TABLE_ERROR), table_name);
4020
        protocol->store(STRING_WITH_LEN("note"), system_charset_info);
unknown's avatar
unknown committed
4021
        protocol->store(buf, length, system_charset_info);
unknown's avatar
unknown committed
4022 4023 4024
      }
      break;

4025
    case HA_ADMIN_OK:
4026 4027
      protocol->store(STRING_WITH_LEN("status"), system_charset_info);
      protocol->store(STRING_WITH_LEN("OK"), system_charset_info);
unknown's avatar
unknown committed
4028 4029
      break;

4030
    case HA_ADMIN_FAILED:
4031 4032 4033
      protocol->store(STRING_WITH_LEN("status"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Operation failed"),
                      system_charset_info);
unknown's avatar
unknown committed
4034 4035
      break;

unknown's avatar
unknown committed
4036
    case HA_ADMIN_REJECT:
4037 4038 4039
      protocol->store(STRING_WITH_LEN("status"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Operation need committed state"),
                      system_charset_info);
unknown's avatar
unknown committed
4040
      open_for_modify= FALSE;
unknown's avatar
unknown committed
4041 4042
      break;

4043
    case HA_ADMIN_ALREADY_DONE:
4044 4045 4046
      protocol->store(STRING_WITH_LEN("status"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Table is already up to date"),
                      system_charset_info);
4047 4048
      break;

4049
    case HA_ADMIN_CORRUPT:
4050 4051
      protocol->store(STRING_WITH_LEN("error"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Corrupt"), system_charset_info);
4052
      fatal_error=1;
unknown's avatar
unknown committed
4053 4054
      break;

unknown's avatar
unknown committed
4055
    case HA_ADMIN_INVALID:
4056 4057 4058
      protocol->store(STRING_WITH_LEN("error"), system_charset_info);
      protocol->store(STRING_WITH_LEN("Invalid argument"),
                      system_charset_info);
unknown's avatar
unknown committed
4059 4060
      break;

4061 4062 4063 4064 4065 4066 4067 4068
    case HA_ADMIN_TRY_ALTER:
    {
      /*
        This is currently used only by InnoDB. ha_innobase::optimize() answers
        "try with alter", so here we close the table, do an ALTER TABLE,
        reopen the table and do ha_innobase::analyze() on it.
      */
      close_thread_tables(thd);
unknown's avatar
VIEW  
unknown committed
4069 4070 4071
      TABLE_LIST *save_next_local= table->next_local,
                 *save_next_global= table->next_global;
      table->next_local= table->next_global= 0;
4072
      tmp_disable_binlog(thd); // binlogging is done by caller if wanted
4073
      result_code= mysql_recreate_table(thd, table, 0);
4074
      reenable_binlog(thd);
unknown's avatar
unknown committed
4075
      close_thread_tables(thd);
4076 4077 4078 4079 4080 4081
      if (!result_code) // recreation went ok
      {
        if ((table->table= open_ltable(thd, table, lock_type)) &&
            ((result_code= table->table->file->analyze(thd, check_opt)) > 0))
          result_code= 0; // analyze went ok
      }
4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093
      if (result_code) // either mysql_recreate_table or analyze failed
      {
        const char *err_msg;
        if ((err_msg= thd->net.last_error))
        {
          if (!thd->vio_ok())
          {
            sql_print_error(err_msg);
          }
          else
          {
            /* Hijack the row already in-progress. */
4094
            protocol->store(STRING_WITH_LEN("error"), system_charset_info);
4095 4096 4097 4098 4099 4100 4101 4102 4103
            protocol->store(err_msg, system_charset_info);
            (void)protocol->write();
            /* Start off another row for HA_ADMIN_FAILED */
            protocol->prepare_for_resend();
            protocol->store(table_name, system_charset_info);
            protocol->store(operator_name, system_charset_info);
          }
        }
      }
4104
      result_code= result_code ? HA_ADMIN_FAILED : HA_ADMIN_OK;
unknown's avatar
VIEW  
unknown committed
4105 4106
      table->next_local= save_next_local;
      table->next_global= save_next_global;
4107 4108
      goto send_result_message;
    }
4109 4110
    case HA_ADMIN_WRONG_CHECKSUM:
    {
4111
      protocol->store(STRING_WITH_LEN("note"), system_charset_info);
unknown's avatar
unknown committed
4112 4113
      protocol->store(ER(ER_VIEW_CHECKSUM), strlen(ER(ER_VIEW_CHECKSUM)),
                      system_charset_info);
4114 4115
      break;
    }
4116

unknown's avatar
unknown committed
4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129
    case HA_ADMIN_NEEDS_UPGRADE:
    case HA_ADMIN_NEEDS_ALTER:
    {
      char buf[ERRMSGSIZE];
      uint length;

      protocol->store(STRING_WITH_LEN("error"), system_charset_info);
      length=my_snprintf(buf, ERRMSGSIZE, ER(ER_TABLE_NEEDS_UPGRADE), table->table_name);
      protocol->store(buf, length, system_charset_info);
      fatal_error=1;
      break;
    }

4130
    default:				// Probably HA_ADMIN_INTERNAL_ERROR
4131 4132 4133 4134 4135
      {
        char buf[ERRMSGSIZE+20];
        uint length=my_snprintf(buf, ERRMSGSIZE,
                                "Unknown - internal error %d during operation",
                                result_code);
4136
        protocol->store(STRING_WITH_LEN("error"), system_charset_info);
4137 4138 4139 4140
        protocol->store(buf, length, system_charset_info);
        fatal_error=1;
        break;
      }
unknown's avatar
unknown committed
4141
    }
unknown's avatar
unknown committed
4142
    if (table->table)
4143
    {
4144
      /* in the below check we do not refresh the log tables */
unknown's avatar
unknown committed
4145 4146
      if (fatal_error)
        table->table->s->version=0;               // Force close of table
4147
      else if (open_for_modify && !table->table->s->log_table)
unknown's avatar
unknown committed
4148 4149
      {
        pthread_mutex_lock(&LOCK_open);
unknown's avatar
unknown committed
4150 4151
        remove_table_from_cache(thd, table->table->s->db.str,
                                table->table->s->table_name.str, RTFC_NO_FLAG);
unknown's avatar
unknown committed
4152 4153 4154 4155
        pthread_mutex_unlock(&LOCK_open);
        /* Something may be modified, that's why we have to invalidate cache */
        query_cache_invalidate3(thd, table->table, 0);
      }
4156
    }
unknown's avatar
unknown committed
4157
    close_thread_tables(thd);
unknown's avatar
unknown committed
4158
    table->table=0;				// For query cache
4159
    if (protocol->write())
unknown's avatar
unknown committed
4160 4161 4162
      goto err;
  }

4163
  send_eof(thd);
unknown's avatar
unknown committed
4164
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
4165
 err:
4166
  close_thread_tables(thd);			// Shouldn't be needed
unknown's avatar
unknown committed
4167 4168
  if (table)
    table->table=0;
unknown's avatar
unknown committed
4169
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4170 4171
}

unknown's avatar
unknown committed
4172

unknown's avatar
unknown committed
4173
bool mysql_backup_table(THD* thd, TABLE_LIST* table_list)
unknown's avatar
unknown committed
4174 4175 4176
{
  DBUG_ENTER("mysql_backup_table");
  DBUG_RETURN(mysql_admin_table(thd, table_list, 0,
4177
				"backup", TL_READ, 0, 0, 0, 0,
4178
				&handler::backup, 0));
unknown's avatar
unknown committed
4179
}
unknown's avatar
unknown committed
4180

4181

unknown's avatar
unknown committed
4182
bool mysql_restore_table(THD* thd, TABLE_LIST* table_list)
unknown's avatar
unknown committed
4183 4184 4185
{
  DBUG_ENTER("mysql_restore_table");
  DBUG_RETURN(mysql_admin_table(thd, table_list, 0,
4186
				"restore", TL_WRITE, 1, 1, 0,
4187
				&prepare_for_restore,
4188
				&handler::restore, 0));
unknown's avatar
unknown committed
4189
}
unknown's avatar
unknown committed
4190

4191

unknown's avatar
unknown committed
4192
bool mysql_repair_table(THD* thd, TABLE_LIST* tables, HA_CHECK_OPT* check_opt)
4193 4194 4195
{
  DBUG_ENTER("mysql_repair_table");
  DBUG_RETURN(mysql_admin_table(thd, tables, check_opt,
4196 4197 4198
				"repair", TL_WRITE, 1,
                                test(check_opt->sql_flags & TT_USEFRM),
                                HA_OPEN_FOR_REPAIR,
4199
				&prepare_for_repair,
unknown's avatar
unknown committed
4200
				&handler::ha_repair, 0));
4201 4202
}

4203

unknown's avatar
unknown committed
4204
bool mysql_optimize_table(THD* thd, TABLE_LIST* tables, HA_CHECK_OPT* check_opt)
4205 4206 4207
{
  DBUG_ENTER("mysql_optimize_table");
  DBUG_RETURN(mysql_admin_table(thd, tables, check_opt,
4208
				"optimize", TL_WRITE, 1,0,0,0,
4209
				&handler::optimize, 0));
4210 4211 4212
}


unknown's avatar
unknown committed
4213 4214 4215 4216 4217
/*
  Assigned specified indexes for a table into key cache

  SYNOPSIS
    mysql_assign_to_keycache()
4218 4219
    thd		Thread object
    tables	Table list (one table only)
unknown's avatar
unknown committed
4220 4221

  RETURN VALUES
unknown's avatar
unknown committed
4222 4223
   FALSE ok
   TRUE  error
unknown's avatar
unknown committed
4224 4225
*/

unknown's avatar
unknown committed
4226
bool mysql_assign_to_keycache(THD* thd, TABLE_LIST* tables,
4227
			     LEX_STRING *key_cache_name)
unknown's avatar
unknown committed
4228
{
4229
  HA_CHECK_OPT check_opt;
unknown's avatar
unknown committed
4230
  KEY_CACHE *key_cache;
unknown's avatar
unknown committed
4231
  DBUG_ENTER("mysql_assign_to_keycache");
4232 4233 4234 4235 4236 4237 4238

  check_opt.init();
  pthread_mutex_lock(&LOCK_global_system_variables);
  if (!(key_cache= get_key_cache(key_cache_name)))
  {
    pthread_mutex_unlock(&LOCK_global_system_variables);
    my_error(ER_UNKNOWN_KEY_CACHE, MYF(0), key_cache_name->str);
unknown's avatar
unknown committed
4239
    DBUG_RETURN(TRUE);
4240 4241 4242 4243
  }
  pthread_mutex_unlock(&LOCK_global_system_variables);
  check_opt.key_cache= key_cache;
  DBUG_RETURN(mysql_admin_table(thd, tables, &check_opt,
4244
				"assign_to_keycache", TL_READ_NO_INSERT, 0, 0,
4245
				0, 0, &handler::assign_to_keycache, 0));
unknown's avatar
unknown committed
4246 4247
}

unknown's avatar
unknown committed
4248 4249 4250 4251 4252 4253

/*
  Reassign all tables assigned to a key cache to another key cache

  SYNOPSIS
    reassign_keycache_tables()
4254 4255 4256
    thd		Thread object
    src_cache	Reference to the key cache to clean up
    dest_cache	New key cache
unknown's avatar
unknown committed
4257

4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270
  NOTES
    This is called when one sets a key cache size to zero, in which
    case we have to move the tables associated to this key cache to
    the "default" one.

    One has to ensure that one never calls this function while
    some other thread is changing the key cache. This is assured by
    the caller setting src_cache->in_init before calling this function.

    We don't delete the old key cache as there may still be pointers pointing
    to it for a while after this function returns.

 RETURN VALUES
unknown's avatar
unknown committed
4271 4272 4273
    0	  ok
*/

4274 4275
int reassign_keycache_tables(THD *thd, KEY_CACHE *src_cache,
			     KEY_CACHE *dst_cache)
unknown's avatar
unknown committed
4276 4277 4278
{
  DBUG_ENTER("reassign_keycache_tables");

4279 4280
  DBUG_ASSERT(src_cache != dst_cache);
  DBUG_ASSERT(src_cache->in_init);
unknown's avatar
unknown committed
4281
  src_cache->param_buff_size= 0;		// Free key cache
4282 4283
  ha_resize_key_cache(src_cache);
  ha_change_key_cache(src_cache, dst_cache);
4284
  DBUG_RETURN(0);
unknown's avatar
unknown committed
4285 4286 4287
}


unknown's avatar
unknown committed
4288 4289 4290 4291 4292
/*
  Preload specified indexes for a table into key cache

  SYNOPSIS
    mysql_preload_keys()
4293 4294
    thd		Thread object
    tables	Table list (one table only)
unknown's avatar
unknown committed
4295 4296

  RETURN VALUES
unknown's avatar
unknown committed
4297 4298
    FALSE ok
    TRUE  error
unknown's avatar
unknown committed
4299 4300
*/

unknown's avatar
unknown committed
4301
bool mysql_preload_keys(THD* thd, TABLE_LIST* tables)
unknown's avatar
unknown committed
4302 4303 4304
{
  DBUG_ENTER("mysql_preload_keys");
  DBUG_RETURN(mysql_admin_table(thd, tables, 0,
4305
				"preload_keys", TL_READ, 0, 0, 0, 0,
4306
				&handler::preload_keys, 0));
unknown's avatar
unknown committed
4307 4308 4309
}


unknown's avatar
unknown committed
4310 4311 4312 4313 4314
/*
  Create a table identical to the specified table

  SYNOPSIS
    mysql_create_like_table()
4315 4316
    thd		Thread object
    table	Table list (one table only)
unknown's avatar
unknown committed
4317 4318 4319 4320
    create_info Create info
    table_ident Src table_ident

  RETURN VALUES
unknown's avatar
unknown committed
4321 4322
    FALSE OK
    TRUE  error
unknown's avatar
unknown committed
4323 4324
*/

unknown's avatar
unknown committed
4325 4326 4327
bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
                             HA_CREATE_INFO *create_info,
                             Table_ident *table_ident)
unknown's avatar
unknown committed
4328
{
unknown's avatar
unknown committed
4329
  TABLE *tmp_table;
unknown's avatar
unknown committed
4330
  char src_path[FN_REFLEN], dst_path[FN_REFLEN], tmp_path[FN_REFLEN];
4331
  uint dst_path_length;
unknown's avatar
unknown committed
4332
  char *db= table->db;
4333
  char *table_name= table->table_name;
4334
  char *src_db;
unknown's avatar
unknown committed
4335
  char *src_table= table_ident->table.str;
unknown's avatar
unknown committed
4336 4337
  int  err;
  bool res= TRUE;
unknown's avatar
unknown committed
4338
  enum legacy_db_type not_used;
4339

4340
  TABLE_LIST src_tables_list;
unknown's avatar
unknown committed
4341
  DBUG_ENTER("mysql_create_like_table");
4342
  src_db= table_ident->db.str ? table_ident->db.str : thd->db;
unknown's avatar
unknown committed
4343 4344 4345 4346 4347 4348

  /*
    Validate the source table
  */
  if (table_ident->table.length > NAME_LEN ||
      (table_ident->table.length &&
4349
       check_table_name(src_table,table_ident->table.length)))
unknown's avatar
unknown committed
4350
  {
4351
    my_error(ER_WRONG_TABLE_NAME, MYF(0), src_table);
unknown's avatar
unknown committed
4352
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4353
  }
4354 4355 4356 4357 4358
  if (!src_db || check_db_name(src_db))
  {
    my_error(ER_WRONG_DB_NAME, MYF(0), src_db ? src_db : "NULL");
    DBUG_RETURN(-1);
  }
4359

unknown's avatar
VIEW  
unknown committed
4360
  bzero((gptr)&src_tables_list, sizeof(src_tables_list));
4361
  src_tables_list.db= src_db;
unknown's avatar
Merge  
unknown committed
4362
  src_tables_list.table_name= src_table;
4363

4364 4365
  if (lock_and_wait_for_table_name(thd, &src_tables_list))
    goto err;
unknown's avatar
unknown committed
4366 4367

  if ((tmp_table= find_temporary_table(thd, src_db, src_table)))
unknown's avatar
unknown committed
4368
    strxmov(src_path, tmp_table->s->path.str, reg_ext, NullS);
unknown's avatar
unknown committed
4369 4370
  else
  {
4371 4372
    build_table_filename(src_path, sizeof(src_path),
                         src_db, src_table, reg_ext);
4373
    /* Resolve symlinks (for windows) */
unknown's avatar
unknown committed
4374
    unpack_filename(src_path, src_path);
4375 4376
    if (lower_case_table_names)
      my_casedn_str(files_charset_info, src_path);
unknown's avatar
unknown committed
4377 4378 4379
    if (access(src_path, F_OK))
    {
      my_error(ER_BAD_TABLE_ERROR, MYF(0), src_table);
4380
      goto err;
unknown's avatar
unknown committed
4381 4382 4383
    }
  }

4384 4385 4386
  /* 
     create like should be not allowed for Views, Triggers, ... 
  */
4387
  if (mysql_frm_type(thd, src_path, &not_used) != FRMTYPE_TABLE)
4388
  {
4389
    my_error(ER_WRONG_OBJECT, MYF(0), src_db, src_table, "BASE TABLE");
4390 4391 4392
    goto err;
  }

unknown's avatar
unknown committed
4393 4394 4395
  /*
    Validate the destination table

4396
    skip the destination table name checking as this is already
unknown's avatar
unknown committed
4397 4398 4399 4400 4401 4402
    validated.
  */
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
  {
    if (find_temporary_table(thd, db, table_name))
      goto table_exists;
4403
    dst_path_length= build_tmptable_filename(thd, dst_path, sizeof(dst_path));
4404 4405
    if (lower_case_table_names)
      my_casedn_str(files_charset_info, dst_path);
unknown's avatar
unknown committed
4406 4407 4408 4409
    create_info->table_options|= HA_CREATE_DELAY_KEY_WRITE;
  }
  else
  {
4410 4411
    dst_path_length= build_table_filename(dst_path, sizeof(dst_path),
                                          db, table_name, reg_ext);
unknown's avatar
unknown committed
4412 4413 4414 4415
    if (!access(dst_path, F_OK))
      goto table_exists;
  }

4416
  /*
unknown's avatar
unknown committed
4417
    Create a new table by copying from source table
4418
  */
4419 4420 4421 4422 4423 4424
  if (my_copy(src_path, dst_path, MYF(MY_DONT_OVERWRITE_FILE)))
  {
    if (my_errno == ENOENT)
      my_error(ER_BAD_DB_ERROR,MYF(0),db);
    else
      my_error(ER_CANT_CREATE_FILE,MYF(0),dst_path,my_errno);
4425
    goto err;
4426
  }
unknown's avatar
unknown committed
4427 4428

  /*
4429 4430
    As mysql_truncate don't work on a new table at this stage of
    creation, instead create the table directly (for both normal
unknown's avatar
unknown committed
4431 4432
    and temporary tables).
  */
unknown's avatar
unknown committed
4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445
#ifdef WITH_PARTITION_STORAGE_ENGINE
  /*
    For partitioned tables we need to copy the .par file as well since
    it is used in open_table_def to even be able to create a new handler.
    There is no way to find out here if the original table is a
    partitioned table so we copy the file and ignore any errors.
  */
  fn_format(tmp_path, dst_path, reg_ext, ".par", MYF(MY_REPLACE_EXT));
  strmov(dst_path, tmp_path);
  fn_format(tmp_path, src_path, reg_ext, ".par", MYF(MY_REPLACE_EXT));
  strmov(src_path, tmp_path);
  my_copy(src_path, dst_path, MYF(MY_DONT_OVERWRITE_FILE));
#endif
4446
  dst_path[dst_path_length - reg_ext_length]= '\0';  // Remove .frm
unknown's avatar
unknown committed
4447
  err= ha_create_table(thd, dst_path, db, table_name, create_info, 1);
4448

unknown's avatar
unknown committed
4449 4450 4451 4452
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
  {
    if (err || !open_temporary_table(thd, dst_path, db, table_name, 1))
    {
4453 4454
      (void) rm_temporary_table(create_info->db_type,
				dst_path); /* purecov: inspected */
4455
      goto err;     /* purecov: inspected */
unknown's avatar
unknown committed
4456 4457 4458 4459
    }
  }
  else if (err)
  {
4460 4461 4462
    (void) quick_rm_table(create_info->db_type, db,
			  table_name); /* purecov: inspected */
    goto err;	    /* purecov: inspected */
unknown's avatar
unknown committed
4463
  }
4464

4465 4466 4467
  /*
    We have to write the query before we unlock the tables.
  */
4468
  if (thd->current_stmt_binlog_row_based)
4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518
  {
    /*
       Since temporary tables are not replicated under row-based
       replication, CREATE TABLE ... LIKE ... needs special
       treatement.  We have four cases to consider, according to the
       following decision table:

           ==== ========= ========= ==============================
           Case    Target    Source Write to binary log
           ==== ========= ========= ==============================
           1       normal    normal Original statement
           2       normal temporary Generated statement
           3    temporary    normal Nothing
           4    temporary temporary Nothing
           ==== ========= ========= ==============================

       The variable 'tmp_table' below is used to see if the source
       table is a temporary table: if it is set, then the source table
       was a temporary table and we can take apropriate actions.
    */
    if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
    {
      if (tmp_table)                            // Case 2
      {
        char buf[2048];
        String query(buf, sizeof(buf), system_charset_info);
        query.length(0);  // Have to zero it since constructor doesn't
        TABLE *table_ptr;
        int error;

        /*
          Let's open and lock the table: it will be closed (and
          unlocked) by close_thread_tables() at the end of the
          statement anyway.
         */
        if (!(table_ptr= open_ltable(thd, table, TL_READ_NO_INSERT)))
          goto err;

        int result= store_create_info(thd, table, &query, create_info);

        DBUG_ASSERT(result == 0); // store_create_info() always return 0
        write_bin_log(thd, TRUE, query.ptr(), query.length());
      }
      else                                      // Case 1
        write_bin_log(thd, TRUE, thd->query, thd->query_length);
    }
    /*
      Case 3 and 4 does nothing under RBR
    */
  }
4519
  else
4520 4521
    write_bin_log(thd, TRUE, thd->query, thd->query_length);

unknown's avatar
unknown committed
4522
  res= FALSE;
4523
  goto err;
4524

unknown's avatar
unknown committed
4525 4526 4527 4528
table_exists:
  if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
  {
    char warn_buff[MYSQL_ERRMSG_SIZE];
4529 4530
    my_snprintf(warn_buff, sizeof(warn_buff),
		ER(ER_TABLE_EXISTS_ERROR), table_name);
4531
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
4532
		 ER_TABLE_EXISTS_ERROR,warn_buff);
unknown's avatar
unknown committed
4533
    res= FALSE;
unknown's avatar
unknown committed
4534
  }
4535 4536 4537 4538 4539 4540 4541 4542
  else
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);

err:
  pthread_mutex_lock(&LOCK_open);
  unlock_table_name(thd, &src_tables_list);
  pthread_mutex_unlock(&LOCK_open);
  DBUG_RETURN(res);
unknown's avatar
unknown committed
4543 4544 4545
}


unknown's avatar
unknown committed
4546
bool mysql_analyze_table(THD* thd, TABLE_LIST* tables, HA_CHECK_OPT* check_opt)
4547
{
unknown's avatar
unknown committed
4548 4549 4550 4551 4552 4553
#ifdef OS2
  thr_lock_type lock_type = TL_WRITE;
#else
  thr_lock_type lock_type = TL_READ_NO_INSERT;
#endif

4554 4555
  DBUG_ENTER("mysql_analyze_table");
  DBUG_RETURN(mysql_admin_table(thd, tables, check_opt,
4556
				"analyze", lock_type, 1, 0, 0, 0,
4557
				&handler::analyze, 0));
4558 4559 4560
}


unknown's avatar
unknown committed
4561
bool mysql_check_table(THD* thd, TABLE_LIST* tables,HA_CHECK_OPT* check_opt)
4562
{
unknown's avatar
unknown committed
4563 4564 4565 4566 4567 4568
#ifdef OS2
  thr_lock_type lock_type = TL_WRITE;
#else
  thr_lock_type lock_type = TL_READ_NO_INSERT;
#endif

4569 4570
  DBUG_ENTER("mysql_check_table");
  DBUG_RETURN(mysql_admin_table(thd, tables, check_opt,
unknown's avatar
unknown committed
4571
				"check", lock_type,
4572
				0, HA_OPEN_FOR_REPAIR, 0, 0,
unknown's avatar
unknown committed
4573
				&handler::ha_check, &view_checksum));
4574 4575
}

unknown's avatar
unknown committed
4576

unknown's avatar
unknown committed
4577
/* table_list should contain just one table */
unknown's avatar
unknown committed
4578 4579 4580 4581
static int
mysql_discard_or_import_tablespace(THD *thd,
                                   TABLE_LIST *table_list,
                                   enum tablespace_op_type tablespace_op)
unknown's avatar
unknown committed
4582 4583 4584 4585 4586 4587
{
  TABLE *table;
  my_bool discard;
  int error;
  DBUG_ENTER("mysql_discard_or_import_tablespace");

unknown's avatar
unknown committed
4588 4589 4590 4591
  /*
    Note that DISCARD/IMPORT TABLESPACE always is the only operation in an
    ALTER TABLE
  */
unknown's avatar
unknown committed
4592 4593 4594

  thd->proc_info="discard_or_import_tablespace";

unknown's avatar
unknown committed
4595
  discard= test(tablespace_op == DISCARD_TABLESPACE);
unknown's avatar
unknown committed
4596

unknown's avatar
unknown committed
4597 4598 4599 4600 4601
 /*
   We set this flag so that ha_innobase::open and ::external_lock() do
   not complain when we lock the table
 */
  thd->tablespace_op= TRUE;
unknown's avatar
unknown committed
4602 4603 4604 4605 4606
  if (!(table=open_ltable(thd,table_list,TL_WRITE)))
  {
    thd->tablespace_op=FALSE;
    DBUG_RETURN(-1);
  }
4607

unknown's avatar
unknown committed
4608 4609 4610 4611 4612 4613 4614
  error=table->file->discard_or_import_tablespace(discard);

  thd->proc_info="end";

  if (error)
    goto err;

unknown's avatar
unknown committed
4615 4616 4617 4618
  /*
    The 0 in the call below means 'not in a transaction', which means
    immediate invalidation; that is probably what we wish here
  */
unknown's avatar
unknown committed
4619 4620 4621 4622 4623 4624 4625 4626
  query_cache_invalidate3(thd, table_list, 0);

  /* The ALTER TABLE is always in its own transaction */
  error = ha_commit_stmt(thd);
  if (ha_commit(thd))
    error=1;
  if (error)
    goto err;
4627
  write_bin_log(thd, FALSE, thd->query, thd->query_length);
unknown's avatar
unknown committed
4628 4629
err:
  close_thread_tables(thd);
unknown's avatar
unknown committed
4630
  thd->tablespace_op=FALSE;
4631
  
unknown's avatar
unknown committed
4632 4633
  if (error == 0)
  {
unknown's avatar
unknown committed
4634
    send_ok(thd);
unknown's avatar
unknown committed
4635
    DBUG_RETURN(0);
unknown's avatar
unknown committed
4636
  }
unknown's avatar
unknown committed
4637

4638 4639
  table->file->print_error(error, MYF(0));
    
unknown's avatar
unknown committed
4640
  DBUG_RETURN(-1);
unknown's avatar
unknown committed
4641
}
unknown's avatar
unknown committed
4642

4643

unknown's avatar
unknown committed
4644 4645
/*
  SYNOPSIS
4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657
    compare_tables()
      table                     The original table.
      create_list               The fields for the new table.
      key_info_buffer           An array of KEY structs for the new indexes.
      key_count                 The number of elements in the array.
      create_info               Create options for the new table.
      alter_info                Alter options.
      order_num                 Number of order list elements.
      index_drop_buffer   OUT   An array of offsets into table->key_info.
      index_drop_count    OUT   The number of elements in the array.
      index_add_buffer    OUT   An array of offsets into key_info_buffer.
      index_add_count     OUT   The number of elements in the array.
unknown's avatar
unknown committed
4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668

  DESCRIPTION
    'table' (first argument) contains information of the original
    table, which includes all corresponding parts that the new
    table has in arguments create_list, key_list and create_info.

    By comparing the changes between the original and new table
    we can determine how much it has changed after ALTER TABLE
    and whether we need to make a copy of the table, or just change
    the .frm file.

4669 4670 4671 4672 4673
    If there are no data changes, but index changes, 'index_drop_buffer'
    and/or 'index_add_buffer' are populated with offsets into
    table->key_info or key_info_buffer respectively for the indexes
    that need to be dropped and/or (re-)created.

unknown's avatar
unknown committed
4674
  RETURN VALUES
4675 4676 4677
    0                           No copy needed
    ALTER_TABLE_DATA_CHANGED    Data changes, copy needed
    ALTER_TABLE_INDEX_CHANGED   Index changes, copy might be needed
unknown's avatar
unknown committed
4678 4679
*/

4680 4681 4682 4683 4684 4685
static uint compare_tables(TABLE *table, List<create_field> *create_list,
                           KEY *key_info_buffer, uint key_count,
                           HA_CREATE_INFO *create_info,
                           ALTER_INFO *alter_info, uint order_num,
                           uint *index_drop_buffer, uint *index_drop_count,
                           uint *index_add_buffer, uint *index_add_count)
unknown's avatar
unknown committed
4686 4687 4688 4689 4690
{
  Field **f_ptr, *field;
  uint changes= 0, tmp;
  List_iterator_fast<create_field> new_field_it(*create_list);
  create_field *new_field;
4691 4692
  KEY_PART_INFO *key_part;
  KEY_PART_INFO *end;
4693
  DBUG_ENTER("compare_tables");
unknown's avatar
unknown committed
4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720

  /*
    Some very basic checks. If number of fields changes, or the
    handler, we need to run full ALTER TABLE. In the future
    new fields can be added and old dropped without copy, but
    not yet.

    Test also that engine was not given during ALTER TABLE, or
    we are force to run regular alter table (copy).
    E.g. ALTER TABLE tbl_name ENGINE=MyISAM.

    For the following ones we also want to run regular alter table:
    ALTER TABLE tbl_name ORDER BY ..
    ALTER TABLE tbl_name CONVERT TO CHARACTER SET ..

    At the moment we can't handle altering temporary tables without a copy.
    We also test if OPTIMIZE TABLE was given and was mapped to alter table.
    In that case we always do full copy.
  */
  if (table->s->fields != create_list->elements ||
      table->s->db_type != create_info->db_type ||
      table->s->tmp_table ||
      create_info->used_fields & HA_CREATE_USED_ENGINE ||
      create_info->used_fields & HA_CREATE_USED_CHARSET ||
      create_info->used_fields & HA_CREATE_USED_DEFAULT_CHARSET ||
      (alter_info->flags & ALTER_RECREATE) ||
      order_num)
4721
    DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
unknown's avatar
unknown committed
4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732

  /*
    Go through fields and check if the original ones are compatible
    with new table.
  */
  for (f_ptr= table->field, new_field= new_field_it++;
       (field= *f_ptr); f_ptr++, new_field= new_field_it++)
  {
    /* Make sure we have at least the default charset in use. */
    if (!new_field->charset)
      new_field->charset= create_info->default_table_charset;
4733

unknown's avatar
unknown committed
4734 4735 4736
    /* Check that NULL behavior is same for old and new fields */
    if ((new_field->flags & NOT_NULL_FLAG) !=
	(uint) (field->flags & NOT_NULL_FLAG))
4737
      DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
unknown's avatar
unknown committed
4738 4739 4740 4741 4742 4743 4744 4745 4746 4747

    /* Don't pack rows in old tables if the user has requested this. */
    if (create_info->row_type == ROW_TYPE_DYNAMIC ||
	(new_field->flags & BLOB_FLAG) ||
	new_field->sql_type == MYSQL_TYPE_VARCHAR &&
	create_info->row_type != ROW_TYPE_FIXED)
      create_info->table_options|= HA_OPTION_PACK_RECORD;

    /* Evaluate changes bitmap and send to check_if_incompatible_data() */
    if (!(tmp= field->is_equal(new_field)))
4748
      DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);
4749 4750
    // Clear indexed marker
    field->add_index= 0;
unknown's avatar
unknown committed
4751 4752 4753 4754 4755 4756 4757
    changes|= tmp;
  }

  /*
    Go through keys and check if the original ones are compatible
    with new table.
  */
4758 4759 4760 4761
  KEY *table_key;
  KEY *table_key_end= table->key_info + table->s->keys;
  KEY *new_key;
  KEY *new_key_end= key_info_buffer + key_count;
unknown's avatar
unknown committed
4762

4763 4764 4765 4766 4767 4768 4769 4770
  DBUG_PRINT("info", ("index count old: %d  new: %d",
                      table->s->keys, key_count));
  /*
    Step through all keys of the old table and search matching new keys.
  */
  *index_drop_count= 0;
  *index_add_count= 0;
  for (table_key= table->key_info; table_key < table_key_end; table_key++)
unknown's avatar
unknown committed
4771
  {
4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795
    KEY_PART_INFO *table_part;
    KEY_PART_INFO *table_part_end= table_key->key_part + table_key->key_parts;
    KEY_PART_INFO *new_part;

    /* Search a new key with the same name. */
    for (new_key= key_info_buffer; new_key < new_key_end; new_key++)
    {
      if (! strcmp(table_key->name, new_key->name))
        break;
    }
    if (new_key >= new_key_end)
    {
      /* Key not found. Add the offset of the key to the drop buffer. */
      index_drop_buffer[(*index_drop_count)++]= table_key - table->key_info;
      DBUG_PRINT("info", ("index dropped: '%s'", table_key->name));
      continue;
    }

    /* Check that the key types are compatible between old and new tables. */
    if ((table_key->algorithm != new_key->algorithm) ||
	((table_key->flags & HA_KEYFLAG_MASK) !=
         (new_key->flags & HA_KEYFLAG_MASK)) ||
        (table_key->key_parts != new_key->key_parts))
      goto index_changed;
unknown's avatar
unknown committed
4796 4797 4798 4799 4800

    /*
      Check that the key parts remain compatible between the old and
      new tables.
    */
4801 4802 4803
    for (table_part= table_key->key_part, new_part= new_key->key_part;
         table_part < table_part_end;
         table_part++, new_part++)
unknown's avatar
unknown committed
4804 4805 4806
    {
      /*
	Key definition has changed if we are using a different field or
4807 4808
	if the used key part length is different. We know that the fields
        did not change. Comparing field numbers is sufficient.
unknown's avatar
unknown committed
4809
      */
4810 4811 4812
      if ((table_part->length != new_part->length) ||
          (table_part->fieldnr - 1 != new_part->fieldnr))
	goto index_changed;
unknown's avatar
unknown committed
4813
    }
4814 4815 4816 4817 4818 4819
    continue;

  index_changed:
    /* Key modified. Add the offset of the key to both buffers. */
    index_drop_buffer[(*index_drop_count)++]= table_key - table->key_info;
    index_add_buffer[(*index_add_count)++]= new_key - key_info_buffer;
4820 4821 4822 4823 4824 4825 4826 4827
    key_part= new_key->key_part;
    end= key_part + new_key->key_parts;
    for(; key_part != end; key_part++)
    {
      // Mark field to be part of new key 
      field= table->field[key_part->fieldnr];
      field->add_index= 1;
    }
4828
    DBUG_PRINT("info", ("index changed: '%s'", table_key->name));
unknown's avatar
unknown committed
4829
  }
4830
  /*end of for (; table_key < table_key_end;) */
unknown's avatar
unknown committed
4831

4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846
  /*
    Step through all keys of the new table and find matching old keys.
  */
  for (new_key= key_info_buffer; new_key < new_key_end; new_key++)
  {
    /* Search an old key with the same name. */
    for (table_key= table->key_info; table_key < table_key_end; table_key++)
    {
      if (! strcmp(table_key->name, new_key->name))
        break;
    }
    if (table_key >= table_key_end)
    {
      /* Key not found. Add the offset of the key to the add buffer. */
      index_add_buffer[(*index_add_count)++]= new_key - key_info_buffer;
4847 4848 4849 4850 4851 4852 4853 4854
      key_part= new_key->key_part;
      end= key_part + new_key->key_parts;
      for(; key_part != end; key_part++)
      {
        // Mark field to be part of new key 
        field= table->field[key_part->fieldnr];
        field->add_index= 1;
      }
unknown's avatar
unknown committed
4855
      DBUG_PRINT("info", ("index added: '%s'", new_key->name));
4856 4857
    }
  }
4858 4859 4860 4861 4862

  /* Check if changes are compatible with current handler without a copy */
  if (table->file->check_if_incompatible_data(create_info, changes))
    DBUG_RETURN(ALTER_TABLE_DATA_CHANGED);

4863 4864 4865 4866
  if (*index_drop_count || *index_add_count)
    DBUG_RETURN(ALTER_TABLE_INDEX_CHANGED);

  DBUG_RETURN(0); // Tables are compatible
unknown's avatar
unknown committed
4867 4868 4869
}


4870 4871 4872
/*
  Alter table
*/
4873

unknown's avatar
unknown committed
4874 4875 4876 4877 4878
bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
                       HA_CREATE_INFO *create_info,
                       TABLE_LIST *table_list,
                       List<create_field> &fields, List<Key> &keys,
                       uint order_num, ORDER *order,
4879
                       enum enum_duplicates handle_duplicates, bool ignore,
unknown's avatar
unknown committed
4880
                       ALTER_INFO *alter_info, bool do_send_ok)
unknown's avatar
unknown committed
4881
{
4882
  TABLE *table,*new_table=0;
unknown's avatar
unknown committed
4883
  int error;
unknown's avatar
unknown committed
4884 4885
  char tmp_name[80],old_name[32],new_name_buff[FN_REFLEN];
  char new_alias_buff[FN_REFLEN], *table_name, *db, *new_alias, *alias;
4886
  char index_file[FN_REFLEN], data_file[FN_REFLEN];
4887
  char reg_path[FN_REFLEN+1];
unknown's avatar
unknown committed
4888 4889
  ha_rows copied,deleted;
  ulonglong next_insert_id;
4890
  uint db_create_options, used_fields;
unknown's avatar
unknown committed
4891
  handlerton *old_db_type, *new_db_type;
unknown's avatar
unknown committed
4892
  uint need_copy_table= 0;
4893
  bool no_table_reopen= FALSE;
4894
#ifdef WITH_PARTITION_STORAGE_ENGINE
unknown's avatar
unknown committed
4895
  uint fast_alter_partition= 0;
4896 4897
  bool partition_changed= FALSE;
#endif
4898 4899 4900 4901 4902 4903 4904 4905 4906 4907
  List<create_field> prepared_create_list;
  List<Key>          prepared_key_list;
  bool need_lock_for_indexes= TRUE;
  uint db_options= 0;
  uint key_count;
  KEY  *key_info_buffer;
  uint index_drop_count;
  uint *index_drop_buffer;
  uint index_add_count;
  uint *index_add_buffer;
unknown's avatar
unknown committed
4908
  bool committed= 0;
unknown's avatar
unknown committed
4909 4910
  DBUG_ENTER("mysql_alter_table");

4911 4912 4913 4914 4915
  LINT_INIT(index_add_count);
  LINT_INIT(index_drop_count);
  LINT_INIT(index_add_buffer);
  LINT_INIT(index_drop_buffer);

unknown's avatar
unknown committed
4916
  thd->proc_info="init";
4917
  table_name=table_list->table_name;
unknown's avatar
unknown committed
4918
  alias= (lower_case_table_names == 2) ? table_list->alias : table_name;
unknown's avatar
unknown committed
4919
  db=table_list->db;
unknown's avatar
unknown committed
4920
  if (!new_db || !my_strcasecmp(table_alias_charset, new_db, db))
4921
    new_db= db;
4922 4923
  build_table_filename(reg_path, sizeof(reg_path), db, table_name, reg_ext);

4924
  used_fields=create_info->used_fields;
4925

4926
  mysql_ha_flush(thd, table_list, MYSQL_HA_CLOSE_FINAL, FALSE);
unknown's avatar
unknown committed
4927

unknown's avatar
unknown committed
4928
  /* DISCARD/IMPORT TABLESPACE is always alone in an ALTER TABLE */
4929
  if (alter_info->tablespace_op != NO_TABLESPACE_OP)
unknown's avatar
unknown committed
4930
    DBUG_RETURN(mysql_discard_or_import_tablespace(thd,table_list,
4931
						   alter_info->tablespace_op));
unknown's avatar
unknown committed
4932
  if (!(table=open_ltable(thd,table_list,TL_WRITE_ALLOW_READ)))
unknown's avatar
unknown committed
4933
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4934 4935 4936 4937 4938

  /* Check that we are not trying to rename to an existing table */
  if (new_name)
  {
    strmov(new_name_buff,new_name);
4939
    strmov(new_alias= new_alias_buff, new_name);
unknown's avatar
unknown committed
4940
    if (lower_case_table_names)
unknown's avatar
unknown committed
4941 4942 4943
    {
      if (lower_case_table_names != 2)
      {
4944
	my_casedn_str(files_charset_info, new_name_buff);
unknown's avatar
unknown committed
4945 4946
	new_alias= new_name;			// Create lower case table name
      }
4947
      my_casedn_str(files_charset_info, new_name);
unknown's avatar
unknown committed
4948
    }
4949
    if (new_db == db &&
unknown's avatar
unknown committed
4950
	!my_strcasecmp(table_alias_charset, new_name_buff, table_name))
4951 4952
    {
      /*
4953 4954
	Source and destination table names are equal: make later check
	easier.
4955
      */
unknown's avatar
unknown committed
4956
      new_alias= new_name= table_name;
4957
    }
unknown's avatar
unknown committed
4958 4959
    else
    {
unknown's avatar
unknown committed
4960
      if (table->s->tmp_table != NO_TMP_TABLE)
unknown's avatar
unknown committed
4961 4962 4963
      {
	if (find_temporary_table(thd,new_db,new_name_buff))
	{
4964
	  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name_buff);
unknown's avatar
unknown committed
4965
	  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4966 4967 4968 4969
	}
      }
      else
      {
4970
	char dir_buff[FN_REFLEN];
unknown's avatar
unknown committed
4971 4972
	strxnmov(dir_buff, sizeof(dir_buff)-1,
                 mysql_real_data_home, new_db, NullS);
4973
	if (!access(fn_format(new_name_buff,new_name_buff,dir_buff,reg_ext,0),
unknown's avatar
unknown committed
4974 4975 4976
		    F_OK))
	{
	  /* Table will be closed in do_command() */
4977
	  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
unknown's avatar
unknown committed
4978
	  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
4979 4980 4981 4982 4983
	}
      }
    }
  }
  else
4984 4985 4986 4987
  {
    new_alias= (lower_case_table_names == 2) ? alias : table_name;
    new_name= table_name;
  }
unknown's avatar
unknown committed
4988

4989
  old_db_type= table->s->db_type;
unknown's avatar
unknown committed
4990
  if (create_info->db_type == (handlerton*) &default_hton)
4991
    create_info->db_type= old_db_type;
unknown's avatar
unknown committed
4992

4993
#ifdef WITH_PARTITION_STORAGE_ENGINE
unknown's avatar
unknown committed
4994 4995
  if (prep_alter_part_table(thd, table, alter_info, create_info, old_db_type,
                            &partition_changed, &fast_alter_partition))
4996
  {
unknown's avatar
unknown committed
4997
    DBUG_RETURN(TRUE);
4998 4999
  }
#endif
5000
  if (check_engine(thd, new_name, create_info))
5001 5002
    DBUG_RETURN(TRUE);
  new_db_type= create_info->db_type;
5003
  if (create_info->row_type == ROW_TYPE_NOT_USED)
5004
    create_info->row_type= table->s->row_type;
unknown's avatar
unknown committed
5005

5006 5007 5008
  DBUG_PRINT("info", ("old type: %s  new type: %s",
             ha_resolve_storage_engine_name(old_db_type),
             ha_resolve_storage_engine_name(new_db_type)));
unknown's avatar
unknown committed
5009 5010
  if (ha_check_storage_engine_flag(old_db_type, HTON_ALTER_NOT_SUPPORTED) ||
      ha_check_storage_engine_flag(new_db_type, HTON_ALTER_NOT_SUPPORTED))
5011 5012 5013 5014 5015 5016
  {
    DBUG_PRINT("info", ("doesn't support alter"));
    my_error(ER_ILLEGAL_HA, MYF(0), table_name);
    DBUG_RETURN(TRUE);
  }
  
unknown's avatar
unknown committed
5017
  thd->proc_info="setup";
5018
  if (!(alter_info->flags & ~(ALTER_RENAME | ALTER_KEYS_ONOFF)) &&
5019
      !table->s->tmp_table) // no need to touch frm
unknown's avatar
unknown committed
5020 5021
  {
    error=0;
5022
    if (new_name != table_name || new_db != db)
unknown's avatar
unknown committed
5023
    {
5024 5025 5026 5027 5028 5029
      thd->proc_info="rename";
      VOID(pthread_mutex_lock(&LOCK_open));
      /* Then do a 'simple' rename of the table */
      error=0;
      if (!access(new_name_buff,F_OK))
      {
5030
	my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name);
5031
	error= -1;
5032 5033 5034
      }
      else
      {
5035
	*fn_ext(new_name)=0;
unknown's avatar
unknown committed
5036
        table->s->version= 0;                   // Force removal of table def
5037 5038
	close_cached_table(thd, table);
	if (mysql_rename_table(old_db_type,db,table_name,new_db,new_alias))
5039
	  error= -1;
5040 5041 5042 5043 5044 5045 5046
        else if (Table_triggers_list::change_table_name(thd, db, table_name,
                                                        new_db, new_alias))
        {
          VOID(mysql_rename_table(old_db_type, new_db, new_alias, db,
                                  table_name));
          error= -1;
        }
5047 5048
      }
      VOID(pthread_mutex_unlock(&LOCK_open));
unknown's avatar
unknown committed
5049
    }
unknown's avatar
unknown committed
5050

5051
    if (!error)
5052
    {
5053
      switch (alter_info->keys_onoff) {
5054
      case LEAVE_AS_IS:
5055
        break;
5056
      case ENABLE:
5057 5058 5059 5060 5061 5062
        VOID(pthread_mutex_lock(&LOCK_open));
        wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN);
        VOID(pthread_mutex_unlock(&LOCK_open));
        error= table->file->enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
        /* COND_refresh will be signaled in close_thread_tables() */
        break;
5063
      case DISABLE:
5064 5065 5066 5067 5068 5069
        VOID(pthread_mutex_lock(&LOCK_open));
        wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN);
        VOID(pthread_mutex_unlock(&LOCK_open));
        error=table->file->disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
        /* COND_refresh will be signaled in close_thread_tables() */
        break;
5070
      }
5071
    }
5072

5073
    if (error == HA_ERR_WRONG_COMMAND)
unknown's avatar
unknown committed
5074 5075
    {
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
5076
			  ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
5077
			  table->alias);
unknown's avatar
unknown committed
5078 5079
      error=0;
    }
unknown's avatar
unknown committed
5080 5081
    if (!error)
    {
5082
      write_bin_log(thd, TRUE, thd->query, thd->query_length);
5083 5084
      if (do_send_ok)
        send_ok(thd);
unknown's avatar
unknown committed
5085
    }
5086
    else if (error > 0)
5087 5088
    {
      table->file->print_error(error, MYF(0));
5089
      error= -1;
5090
    }
5091 5092
    table_list->table=0;				// For query cache
    query_cache_invalidate3(thd, table_list, 0);
unknown's avatar
unknown committed
5093 5094 5095 5096
    DBUG_RETURN(error);
  }

  /* Full alter table */
5097

5098
  /* Let new create options override the old ones */
5099
  if (!(used_fields & HA_CREATE_USED_MIN_ROWS))
5100
    create_info->min_rows= table->s->min_rows;
5101
  if (!(used_fields & HA_CREATE_USED_MAX_ROWS))
5102
    create_info->max_rows= table->s->max_rows;
5103
  if (!(used_fields & HA_CREATE_USED_AVG_ROW_LENGTH))
5104
    create_info->avg_row_length= table->s->avg_row_length;
5105
  if (!(used_fields & HA_CREATE_USED_DEFAULT_CHARSET))
5106
    create_info->default_table_charset= table->s->table_charset;
5107

5108
  restore_record(table, s->default_values);     // Empty record for DEFAULT
5109
  List_iterator<Alter_drop> drop_it(alter_info->drop_list);
unknown's avatar
unknown committed
5110
  List_iterator<create_field> def_it(fields);
5111
  List_iterator<Alter_column> alter_it(alter_info->alter_list);
unknown's avatar
unknown committed
5112 5113
  List<create_field> create_list;		// Add new fields here
  List<Key> key_list;				// Add new keys here
5114 5115
  create_field *def;

unknown's avatar
unknown committed
5116
  /*
5117
    First collect all fields from table which isn't in drop_list
unknown's avatar
unknown committed
5118 5119 5120 5121 5122
  */

  Field **f_ptr,*field;
  for (f_ptr=table->field ; (field= *f_ptr) ; f_ptr++)
  {
5123
    /* Check if field should be dropped */
unknown's avatar
unknown committed
5124 5125 5126 5127 5128
    Alter_drop *drop;
    drop_it.rewind();
    while ((drop=drop_it++))
    {
      if (drop->type == Alter_drop::COLUMN &&
5129
	  !my_strcasecmp(system_charset_info,field->field_name, drop->name))
5130 5131 5132
      {
	/* Reset auto_increment value if it was dropped */
	if (MTYP_TYPENR(field->unireg_check) == Field::NEXT_NUMBER &&
5133
	    !(used_fields & HA_CREATE_USED_AUTO))
5134 5135 5136 5137
	{
	  create_info->auto_increment_value=0;
	  create_info->used_fields|=HA_CREATE_USED_AUTO;
	}
unknown's avatar
unknown committed
5138
	break;
5139
      }
unknown's avatar
unknown committed
5140 5141 5142 5143 5144 5145 5146 5147 5148 5149
    }
    if (drop)
    {
      drop_it.remove();
      continue;
    }
    /* Check if field is changed */
    def_it.rewind();
    while ((def=def_it++))
    {
5150
      if (def->change &&
5151
	  !my_strcasecmp(system_charset_info,field->field_name, def->change))
unknown's avatar
unknown committed
5152 5153 5154 5155 5156
	break;
    }
    if (def)
    {						// Field is changed
      def->field=field;
5157 5158 5159 5160 5161
      if (!def->after)
      {
	create_list.push_back(def);
	def_it.remove();
      }
unknown's avatar
unknown committed
5162
    }
unknown's avatar
unknown committed
5163 5164
    else // This field was not dropped and not changed, add it to the list
    {	 // for the new table.   
5165
      create_list.push_back(def=new create_field(field,field));
unknown's avatar
unknown committed
5166 5167 5168 5169
      alter_it.rewind();			// Change default if ALTER
      Alter_column *alter;
      while ((alter=alter_it++))
      {
5170
	if (!my_strcasecmp(system_charset_info,field->field_name, alter->name))
unknown's avatar
unknown committed
5171 5172 5173 5174
	  break;
      }
      if (alter)
      {
5175 5176
	if (def->sql_type == FIELD_TYPE_BLOB)
	{
5177
	  my_error(ER_BLOB_CANT_HAVE_DEFAULT, MYF(0), def->change);
unknown's avatar
unknown committed
5178
	  DBUG_RETURN(TRUE);
5179
	}
5180 5181 5182 5183
	if ((def->def=alter->def))              // Use new default
          def->flags&= ~NO_DEFAULT_VALUE_FLAG;
        else
          def->flags|= NO_DEFAULT_VALUE_FLAG;
unknown's avatar
unknown committed
5184 5185 5186 5187 5188 5189 5190 5191
	alter_it.remove();
      }
    }
  }
  def_it.rewind();
  List_iterator<create_field> find_it(create_list);
  while ((def=def_it++))			// Add new columns
  {
5192
    if (def->change && ! def->field)
unknown's avatar
unknown committed
5193
    {
5194
      my_error(ER_BAD_FIELD_ERROR, MYF(0), def->change, table_name);
unknown's avatar
unknown committed
5195
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
5196 5197 5198 5199 5200 5201 5202 5203 5204 5205 5206
    }
    if (!def->after)
      create_list.push_back(def);
    else if (def->after == first_keyword)
      create_list.push_front(def);
    else
    {
      create_field *find;
      find_it.rewind();
      while ((find=find_it++))			// Add new columns
      {
5207
	if (!my_strcasecmp(system_charset_info,def->after, find->field_name))
unknown's avatar
unknown committed
5208 5209 5210 5211
	  break;
      }
      if (!find)
      {
5212
	my_error(ER_BAD_FIELD_ERROR, MYF(0), def->after, table_name);
unknown's avatar
unknown committed
5213
	DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
5214 5215 5216 5217
      }
      find_it.after(def);			// Put element after this
    }
  }
5218
  if (alter_info->alter_list.elements)
unknown's avatar
unknown committed
5219
  {
5220 5221
    my_error(ER_BAD_FIELD_ERROR, MYF(0),
             alter_info->alter_list.head()->name, table_name);
unknown's avatar
unknown committed
5222
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
5223 5224 5225
  }
  if (!create_list.elements)
  {
unknown's avatar
unknown committed
5226 5227
    my_message(ER_CANT_REMOVE_ALL_FIELDS, ER(ER_CANT_REMOVE_ALL_FIELDS),
               MYF(0));
unknown's avatar
unknown committed
5228
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
5229 5230 5231
  }

  /*
5232 5233
    Collect all keys which isn't in drop list. Add only those
    for which some fields exists.
unknown's avatar
unknown committed
5234 5235 5236 5237 5238 5239 5240
  */

  List_iterator<Key> key_it(keys);
  List_iterator<create_field> field_it(create_list);
  List<key_part_spec> key_parts;

  KEY *key_info=table->key_info;
5241
  for (uint i=0 ; i < table->s->keys ; i++,key_info++)
unknown's avatar
unknown committed
5242
  {
5243
    char *key_name= key_info->name;
unknown's avatar
unknown committed
5244 5245 5246 5247 5248
    Alter_drop *drop;
    drop_it.rewind();
    while ((drop=drop_it++))
    {
      if (drop->type == Alter_drop::KEY &&
5249
	  !my_strcasecmp(system_charset_info,key_name, drop->name))
unknown's avatar
unknown committed
5250 5251 5252 5253 5254 5255 5256 5257 5258 5259 5260 5261 5262 5263 5264 5265 5266 5267 5268 5269 5270
	break;
    }
    if (drop)
    {
      drop_it.remove();
      continue;
    }

    KEY_PART_INFO *key_part= key_info->key_part;
    key_parts.empty();
    for (uint j=0 ; j < key_info->key_parts ; j++,key_part++)
    {
      if (!key_part->field)
	continue;				// Wrong field (from UNIREG)
      const char *key_part_name=key_part->field->field_name;
      create_field *cfield;
      field_it.rewind();
      while ((cfield=field_it++))
      {
	if (cfield->change)
	{
unknown's avatar
unknown committed
5271 5272
	  if (!my_strcasecmp(system_charset_info, key_part_name,
			     cfield->change))
unknown's avatar
unknown committed
5273 5274
	    break;
	}
5275
	else if (!my_strcasecmp(system_charset_info,
5276
				key_part_name, cfield->field_name))
unknown's avatar
unknown committed
5277
	  break;
unknown's avatar
unknown committed
5278 5279 5280 5281 5282
      }
      if (!cfield)
	continue;				// Field is removed
      uint key_part_length=key_part->length;
      if (cfield->field)			// Not new field
5283 5284 5285 5286 5287 5288 5289
      {
        /*
          If the field can't have only a part used in a key according to its
          new type, or should not be used partially according to its
          previous type, or the field length is less than the key part
          length, unset the key part length.

5290 5291 5292
          We also unset the key part length if it is the same as the
          old field's length, so the whole new field will be used.

5293 5294 5295 5296 5297
          BLOBs may have cfield->length == 0, which is why we test it before
          checking whether cfield->length < key_part_length (in chars).
         */
        if (!Field::type_can_have_key_part(cfield->field->type()) ||
            !Field::type_can_have_key_part(cfield->sql_type) ||
unknown's avatar
unknown committed
5298 5299
            (cfield->field->field_length == key_part_length &&
             !f_is_blob(key_part->key_type)) ||
5300 5301 5302
	    (cfield->length && (cfield->length < key_part_length /
                                key_part->field->charset()->mbmaxlen)))
	  key_part_length= 0;			// Use whole field
unknown's avatar
unknown committed
5303
      }
5304
      key_part_length /= key_part->field->charset()->mbmaxlen;
unknown's avatar
unknown committed
5305 5306 5307 5308
      key_parts.push_back(new key_part_spec(cfield->field_name,
					    key_part_length));
    }
    if (key_parts.elements)
unknown's avatar
unknown committed
5309
      key_list.push_back(new Key(key_info->flags & HA_SPATIAL ? Key::SPATIAL :
5310
				 (key_info->flags & HA_NOSAME ?
5311
				 (!my_strcasecmp(system_charset_info,
5312 5313
						 key_name, primary_key_name) ?
				  Key::PRIMARY	: Key::UNIQUE) :
5314 5315 5316
				  (key_info->flags & HA_FULLTEXT ?
				   Key::FULLTEXT : Key::MULTIPLE)),
				 key_name,
5317
				 key_info->algorithm,
5318
                                 test(key_info->flags & HA_GENERATED_KEY),
5319 5320 5321
				 key_parts,
                                 key_info->flags & HA_USES_PARSER ?
                                 &key_info->parser->name : 0));
unknown's avatar
unknown committed
5322 5323 5324 5325
  }
  {
    Key *key;
    while ((key=key_it++))			// Add new keys
5326 5327 5328
    {
      if (key->type != Key::FOREIGN_KEY)
	key_list.push_back(key);
5329 5330 5331 5332
      if (key->name &&
	  !my_strcasecmp(system_charset_info,key->name,primary_key_name))
      {
	my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name);
unknown's avatar
unknown committed
5333
	DBUG_RETURN(TRUE);
5334
      }
5335
    }
unknown's avatar
unknown committed
5336 5337
  }

5338
  if (alter_info->drop_list.elements)
unknown's avatar
unknown committed
5339
  {
5340 5341
    my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0),
             alter_info->drop_list.head()->name);
unknown's avatar
unknown committed
5342 5343
    goto err;
  }
5344
  if (alter_info->alter_list.elements)
unknown's avatar
unknown committed
5345
  {
5346 5347
    my_error(ER_CANT_DROP_FIELD_OR_KEY, MYF(0),
             alter_info->alter_list.head()->name);
unknown's avatar
unknown committed
5348 5349 5350
    goto err;
  }

5351
  db_create_options= table->s->db_create_options & ~(HA_OPTION_PACK_RECORD);
5352 5353
  my_snprintf(tmp_name, sizeof(tmp_name), "%s-%lx_%lx", tmp_file_prefix,
	      current_pid, thd->thread_id);
5354 5355
  /* Safety fix for innodb */
  if (lower_case_table_names)
5356
    my_casedn_str(files_charset_info, tmp_name);
5357 5358 5359 5360
  if (new_db_type != old_db_type && !table->file->can_switch_engines()) {
    my_error(ER_ROW_IS_REFERENCED, MYF(0));
    goto err;
  }
unknown's avatar
unknown committed
5361 5362
  create_info->db_type=new_db_type;
  if (!create_info->comment)
5363
    create_info->comment= table->s->comment;
5364 5365 5366 5367 5368

  table->file->update_create_info(create_info);
  if ((create_info->table_options &
       (HA_OPTION_PACK_KEYS | HA_OPTION_NO_PACK_KEYS)) ||
      (used_fields & HA_CREATE_USED_PACK_KEYS))
unknown's avatar
unknown committed
5369 5370 5371 5372 5373 5374 5375 5376 5377 5378
    db_create_options&= ~(HA_OPTION_PACK_KEYS | HA_OPTION_NO_PACK_KEYS);
  if (create_info->table_options &
      (HA_OPTION_CHECKSUM | HA_OPTION_NO_CHECKSUM))
    db_create_options&= ~(HA_OPTION_CHECKSUM | HA_OPTION_NO_CHECKSUM);
  if (create_info->table_options &
      (HA_OPTION_DELAY_KEY_WRITE | HA_OPTION_NO_DELAY_KEY_WRITE))
    db_create_options&= ~(HA_OPTION_DELAY_KEY_WRITE |
			  HA_OPTION_NO_DELAY_KEY_WRITE);
  create_info->table_options|= db_create_options;

5379
  if (table->s->tmp_table)
unknown's avatar
unknown committed
5380 5381
    create_info->options|=HA_LEX_CREATE_TMP_TABLE;

unknown's avatar
unknown committed
5382 5383
  set_table_default_charset(thd, create_info, db);

5384 5385 5386 5387 5388 5389 5390 5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426 5427 5428 5429 5430 5431
  {
    /*
      For some purposes we need prepared table structures and translated
      key descriptions with proper default key name assignment.

      Unfortunately, mysql_prepare_table() modifies the field and key
      lists. mysql_create_table() needs the unmodified lists. Hence, we
      need to copy the lists and all their elements. The lists contain
      pointers to the elements only.

      We cannot copy conditionally because the partition code always
      needs prepared lists and compare_tables() needs them and is almost
      always called.
    */

    /* Copy fields. */
    List_iterator<create_field> prep_field_it(create_list);
    create_field *prep_field;
    while ((prep_field= prep_field_it++))
      prepared_create_list.push_back(new create_field(*prep_field));

    /* Copy keys and key parts. */
    List_iterator<Key> prep_key_it(key_list);
    Key *prep_key;
    while ((prep_key= prep_key_it++))
    {
      List<key_part_spec> prep_columns;
      List_iterator<key_part_spec> prep_col_it(prep_key->columns);
      key_part_spec *prep_col;

      while ((prep_col= prep_col_it++))
        prep_columns.push_back(new key_part_spec(*prep_col));
      prepared_key_list.push_back(new Key(prep_key->type, prep_key->name,
                                          prep_key->algorithm,
                                          prep_key->generated, prep_columns,
                                          prep_key->parser_name));
    }

    /* Create the prepared information. */
    if (mysql_prepare_table(thd, create_info, &prepared_create_list,
                            &prepared_key_list,
                            (table->s->tmp_table != NO_TMP_TABLE), &db_options,
                            table->file, &key_info_buffer, &key_count, 0))
      goto err;
  }

  if (thd->variables.old_alter_table
      || (table->s->db_type != create_info->db_type)
5432
#ifdef WITH_PARTITION_STORAGE_ENGINE
5433
      || partition_changed
5434
#endif
5435
     )
unknown's avatar
unknown committed
5436 5437
    need_copy_table= 1;
  else
5438 5439 5440 5441 5442 5443 5444 5445 5446 5447 5448 5449 5450 5451 5452 5453 5454 5455 5456 5457 5458 5459 5460
  {
    /* Try to optimize ALTER TABLE. Allocate result buffers. */
    if (! (index_drop_buffer=
           (uint*) thd->alloc(sizeof(uint) * table->s->keys)) ||
        ! (index_add_buffer=
           (uint*) thd->alloc(sizeof(uint) * prepared_key_list.elements)))
      goto err;
    /* Check how much the tables differ. */
    need_copy_table= compare_tables(table, &prepared_create_list,
                                    key_info_buffer, key_count,
                                    create_info, alter_info, order_num,
                                    index_drop_buffer, &index_drop_count,
                                    index_add_buffer, &index_add_count);
  }

  /*
    If there are index changes only, try to do them online. "Index
    changes only" means also that the handler for the table does not
    change. The table is open and locked. The handler can be accessed.
  */
  if (need_copy_table == ALTER_TABLE_INDEX_CHANGED)
  {
    int   pk_changed= 0;
unknown's avatar
unknown committed
5461
    ulong alter_flags= 0;
5462 5463 5464 5465 5466 5467
    ulong needed_online_flags= 0;
    ulong needed_fast_flags= 0;
    KEY   *key;
    uint  *idx_p;
    uint  *idx_end_p;

unknown's avatar
unknown committed
5468 5469 5470
    if (table->s->db_type->alter_table_flags)
      alter_flags= table->s->db_type->alter_table_flags(alter_info->flags);
    DBUG_PRINT("info", ("alter_flags: %lu", alter_flags));
5471 5472 5473 5474 5475 5476 5477 5478 5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493 5494 5495 5496 5497 5498 5499 5500 5501 5502 5503 5504 5505 5506 5507 5508 5509 5510 5511 5512 5513 5514 5515 5516 5517 5518 5519 5520 5521 5522 5523 5524 5525 5526 5527 5528 5529 5530 5531 5532 5533 5534 5535 5536 5537 5538 5539 5540 5541 5542 5543 5544 5545 5546 5547 5548 5549 5550 5551 5552 5553 5554 5555 5556 5557 5558 5559
    /* Check dropped indexes. */
    for (idx_p= index_drop_buffer, idx_end_p= idx_p + index_drop_count;
         idx_p < idx_end_p;
         idx_p++)
    {
      key= table->key_info + *idx_p;
      DBUG_PRINT("info", ("index dropped: '%s'", key->name));
      if (key->flags & HA_NOSAME)
      {
        /* Unique key. Check for "PRIMARY". */
        if (! my_strcasecmp(system_charset_info,
                            key->name, primary_key_name))
        {
          /* Primary key. */
          needed_online_flags|=  HA_ONLINE_DROP_PK_INDEX;
          needed_fast_flags|= HA_ONLINE_DROP_PK_INDEX_NO_WRITES;
          pk_changed++;
        }
        else
        {
          /* Non-primary unique key. */
          needed_online_flags|=  HA_ONLINE_DROP_UNIQUE_INDEX;
          needed_fast_flags|= HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES;
        }
      }
      else
      {
        /* Non-unique key. */
        needed_online_flags|=  HA_ONLINE_DROP_INDEX;
        needed_fast_flags|= HA_ONLINE_DROP_INDEX_NO_WRITES;
      }
    }

    /* Check added indexes. */
    for (idx_p= index_add_buffer, idx_end_p= idx_p + index_add_count;
         idx_p < idx_end_p;
         idx_p++)
    {
      key= key_info_buffer + *idx_p;
      DBUG_PRINT("info", ("index added: '%s'", key->name));
      if (key->flags & HA_NOSAME)
      {
        /* Unique key. Check for "PRIMARY". */
        if (! my_strcasecmp(system_charset_info,
                            key->name, primary_key_name))
        {
          /* Primary key. */
          needed_online_flags|=  HA_ONLINE_ADD_PK_INDEX;
          needed_fast_flags|= HA_ONLINE_ADD_PK_INDEX_NO_WRITES;
          pk_changed++;
        }
        else
        {
          /* Non-primary unique key. */
          needed_online_flags|=  HA_ONLINE_ADD_UNIQUE_INDEX;
          needed_fast_flags|= HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES;
        }
      }
      else
      {
        /* Non-unique key. */
        needed_online_flags|=  HA_ONLINE_ADD_INDEX;
        needed_fast_flags|= HA_ONLINE_ADD_INDEX_NO_WRITES;
      }
    }

    /*
      Online or fast add/drop index is possible only if
      the primary key is not added and dropped in the same statement.
      Otherwise we have to recreate the table.
      need_copy_table is no-zero at this place.
    */
    if ( pk_changed < 2 )
    {
      if ((alter_flags & needed_online_flags) == needed_online_flags)
      {
        /* All required online flags are present. */
        need_copy_table= 0;
        need_lock_for_indexes= FALSE;
      }
      else if ((alter_flags & needed_fast_flags) == needed_fast_flags)
      {
        /* All required fast flags are present. */
        need_copy_table= 0;
      }
    }
    DBUG_PRINT("info", ("need_copy_table: %u  need_lock: %d",
                        need_copy_table, need_lock_for_indexes));
  }
unknown's avatar
unknown committed
5560

5561 5562
  /*
    better have a negative test here, instead of positive, like
unknown's avatar
unknown committed
5563
    alter_info->flags & ALTER_ADD_COLUMN|ALTER_ADD_INDEX|...
5564 5565
    so that ALTER TABLE won't break when somebody will add new flag
  */
unknown's avatar
unknown committed
5566 5567
  if (!need_copy_table)
    create_info->frm_only= 1;
5568

5569
#ifdef WITH_PARTITION_STORAGE_ENGINE
unknown's avatar
unknown committed
5570
  if (fast_alter_partition)
unknown's avatar
unknown committed
5571
  {
unknown's avatar
unknown committed
5572 5573 5574 5575 5576
    DBUG_RETURN(fast_alter_partition_table(thd, table, alter_info,
                                           create_info, table_list,
                                           &create_list, &key_list,
                                           db, table_name,
                                           fast_alter_partition));
unknown's avatar
unknown committed
5577
  }
5578
#endif
unknown's avatar
unknown committed
5579

5580 5581 5582 5583 5584 5585 5586 5587 5588 5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622
  /*
    Handling of symlinked tables:
    If no rename:
      Create new data file and index file on the same disk as the
      old data and index files.
      Copy data.
      Rename new data file over old data file and new index file over
      old index file.
      Symlinks are not changed.

   If rename:
      Create new data file and index file on the same disk as the
      old data and index files.  Create also symlinks to point at
      the new tables.
      Copy data.
      At end, rename temporary tables and symlinks to temporary table
      to final table name.
      Remove old table and old symlinks

    If rename is made to another database:
      Create new tables in new database.
      Copy data.
      Remove old table and symlinks.
  */
  if (!strcmp(db, new_db))		// Ignore symlink if db changed
  {
    if (create_info->index_file_name)
    {
      /* Fix index_file_name to have 'tmp_name' as basename */
      strmov(index_file, tmp_name);
      create_info->index_file_name=fn_same(index_file,
					   create_info->index_file_name,
					   1);
    }
    if (create_info->data_file_name)
    {
      /* Fix data_file_name to have 'tmp_name' as basename */
      strmov(data_file, tmp_name);
      create_info->data_file_name=fn_same(data_file,
					  create_info->data_file_name,
					  1);
    }
  }
5623 5624
  else
    create_info->data_file_name=create_info->index_file_name=0;
unknown's avatar
unknown committed
5625

5626 5627 5628 5629 5630 5631 5632 5633 5634 5635 5636 5637 5638
  /*
    Create a table with a temporary name.
    With create_info->frm_only == 1 this creates a .frm file only.
    We don't log the statement, it will be logged later.
  */
  tmp_disable_binlog(thd);
  error= mysql_create_table(thd, new_db, tmp_name,
                            create_info,create_list,key_list,1,0);
  reenable_binlog(thd);
  if (error)
    DBUG_RETURN(error);

  /* Open the table if we need to copy the data. */
5639
  if (need_copy_table)
unknown's avatar
unknown committed
5640
  {
5641
    if (table->s->tmp_table)
5642 5643 5644 5645
    {
      TABLE_LIST tbl;
      bzero((void*) &tbl, sizeof(tbl));
      tbl.db= new_db;
5646
      tbl.table_name= tbl.alias= tmp_name;
unknown's avatar
unknown committed
5647
      /* Table is in thd->temporary_tables */
5648 5649
      new_table= open_table(thd, &tbl, thd->mem_root, (bool*) 0,
                            MYSQL_LOCK_IGNORE_FLUSH);
5650 5651 5652 5653
    }
    else
    {
      char path[FN_REFLEN];
unknown's avatar
unknown committed
5654
      /* table is a normal table: Create temporary table in same directory */
5655
      build_table_filename(path, sizeof(path), new_db, tmp_name, "");
5656 5657 5658 5659 5660 5661 5662
      new_table=open_temporary_table(thd, path, new_db, tmp_name,0);
    }
    if (!new_table)
    {
      VOID(quick_rm_table(new_db_type,new_db,tmp_name));
      goto err;
    }
unknown's avatar
unknown committed
5663 5664
  }

5665
  /* Copy the data if necessary. */
5666
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// calc cuted fields
unknown's avatar
unknown committed
5667 5668
  thd->cuted_fields=0L;
  thd->proc_info="copy to tmp table";
5669
  next_insert_id=thd->next_insert_id;		// Remember for logging
unknown's avatar
unknown committed
5670
  copied=deleted=0;
unknown's avatar
unknown committed
5671
  if (new_table && !(new_table->file->table_flags() & HA_NO_COPY_ON_ALTER))
5672
  {
5673
    /* We don't want update TIMESTAMP fields during ALTER TABLE. */
unknown's avatar
unknown committed
5674
    new_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
5675
    new_table->next_number_field=new_table->found_next_number_field;
unknown's avatar
unknown committed
5676
    error=copy_data_between_tables(table,new_table,create_list,
5677
				   handle_duplicates, ignore,
5678
				   order_num, order, &copied, &deleted);
5679
  }
unknown's avatar
unknown committed
5680
  thd->last_insert_id=next_insert_id;		// Needed for correct log
5681
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
5682

5683 5684 5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710
  /* If we did not need to copy, we might still need to add/drop indexes. */
  if (! new_table)
  {
    uint          *key_numbers;
    uint          *keyno_p;
    KEY           *key_info;
    KEY           *key;
    uint          *idx_p;
    uint          *idx_end_p;
    KEY_PART_INFO *key_part;
    KEY_PART_INFO *part_end;
    DBUG_PRINT("info", ("No new_table, checking add/drop index"));

    if (index_add_count)
    {
#ifdef XXX_TO_BE_DONE_LATER_BY_WL3020_AND_WL1892
      if (! need_lock_for_indexes)
      {
        /* Downgrade the write lock. */
        mysql_lock_downgrade_write(thd, table, TL_WRITE_ALLOW_WRITE);
      }

      /* Create a new .frm file for crash recovery. */
      /* TODO: Must set INDEX_TO_BE_ADDED flags in the frm file. */
      VOID(pthread_mutex_lock(&LOCK_open));
      error= (mysql_create_frm(thd, reg_path, db, table_name,
                               create_info, prepared_create_list, key_count,
                               key_info_buffer, table->file) ||
5711
              table->file->create_handler_files(reg_path, NULL, FALSE));
5712 5713 5714 5715 5716 5717 5718 5719 5720 5721 5722 5723 5724 5725 5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749 5750 5751 5752 5753 5754 5755 5756
      VOID(pthread_mutex_unlock(&LOCK_open));
      if (error)
        goto err;
#endif

      /* The add_index() method takes an array of KEY structs. */
      key_info= (KEY*) thd->alloc(sizeof(KEY) * index_add_count);
      key= key_info;
      for (idx_p= index_add_buffer, idx_end_p= idx_p + index_add_count;
           idx_p < idx_end_p;
           idx_p++, key++)
      {
        /* Copy the KEY struct. */
        *key= key_info_buffer[*idx_p];
        /* Fix the key parts. */
        part_end= key->key_part + key->key_parts;
        for (key_part= key->key_part; key_part < part_end; key_part++)
          key_part->field= table->field[key_part->fieldnr];
      }
      /* Add the indexes. */
      if ((error= table->file->add_index(table, key_info, index_add_count)))
      {
        /*
          Exchange the key_info for the error message. If we exchange
          key number by key name in the message later, we need correct info.
        */
        KEY *save_key_info= table->key_info;
        table->key_info= key_info;
        table->file->print_error(error, MYF(0));
        table->key_info= save_key_info;
        goto err;
      }
    }
    /*end of if (index_add_count)*/

    if (index_drop_count)
    {
#ifdef XXX_TO_BE_DONE_LATER_BY_WL3020_AND_WL1892
      /* Create a new .frm file for crash recovery. */
      /* TODO: Must set INDEX_IS_ADDED in the frm file. */
      /* TODO: Must set INDEX_TO_BE_DROPPED in the frm file. */
      VOID(pthread_mutex_lock(&LOCK_open));
      error= (mysql_create_frm(thd, reg_path, db, table_name,
                               create_info, prepared_create_list, key_count,
                               key_info_buffer, table->file) ||
5757
              table->file->create_handler_files(reg_path, NULL, FALSE));
5758 5759 5760 5761 5762 5763 5764 5765 5766 5767 5768 5769 5770 5771 5772 5773 5774 5775 5776 5777 5778 5779 5780 5781 5782 5783 5784 5785 5786 5787 5788 5789 5790 5791 5792 5793 5794 5795 5796 5797 5798 5799 5800 5801 5802 5803 5804 5805 5806 5807 5808 5809 5810 5811 5812 5813 5814 5815 5816 5817 5818 5819 5820 5821 5822 5823 5824 5825 5826 5827 5828 5829 5830 5831 5832 5833 5834 5835 5836 5837
      VOID(pthread_mutex_unlock(&LOCK_open));
      if (error)
        goto err;

      if (! need_lock_for_indexes)
      {
        LOCK_PARAM_TYPE lpt;

        lpt.thd= thd;
        lpt.table= table;
        lpt.db= db;
        lpt.table_name= table_name;
        lpt.create_info= create_info;
        lpt.create_list= &create_list;
        lpt.key_count= key_count;
        lpt.key_info_buffer= key_info_buffer;
        abort_and_upgrade_lock(lpt);
      }
#endif

      /* The prepare_drop_index() method takes an array of key numbers. */
      key_numbers= (uint*) thd->alloc(sizeof(uint) * index_drop_count);
      keyno_p= key_numbers;
      /* Get the number of each key. */
      for (idx_p= index_drop_buffer, idx_end_p= idx_p + index_drop_count;
           idx_p < idx_end_p;
           idx_p++, keyno_p++)
        *keyno_p= *idx_p;
      /*
        Tell the handler to prepare for drop indexes.
        This re-numbers the indexes to get rid of gaps.
      */
      if ((error= table->file->prepare_drop_index(table, key_numbers,
                                                  index_drop_count)))
      {
        table->file->print_error(error, MYF(0));
        goto err;
      }

#ifdef XXX_TO_BE_DONE_LATER_BY_WL3020
      if (! need_lock_for_indexes)
      {
        /* Downgrade the lock again. */
        if (table->reginfo.lock_type == TL_WRITE_ALLOW_READ)
        {
          LOCK_PARAM_TYPE lpt;

          lpt.thd= thd;
          lpt.table= table;
          lpt.db= db;
          lpt.table_name= table_name;
          lpt.create_info= create_info;
          lpt.create_list= &create_list;
          lpt.key_count= key_count;
          lpt.key_info_buffer= key_info_buffer;
          close_open_tables_and_downgrade(lpt);
        }
      }
#endif

      /* Tell the handler to finally drop the indexes. */
      if ((error= table->file->final_drop_index(table)))
      {
        table->file->print_error(error, MYF(0));
        goto err;
      }
    }
    /*end of if (index_drop_count)*/

    if (index_add_count || index_drop_count)
    {
      /*
        The final .frm file is already created as a temporary file
        and will be renamed to the original table name later.
      */

      /* Need to commit before a table is unlocked (NDB requirement). */
      DBUG_PRINT("info", ("Committing after add/drop index"));
      if (ha_commit_stmt(thd) || ha_commit(thd))
        goto err;
unknown's avatar
unknown committed
5838
      committed= 1;
5839 5840 5841 5842
    }
  }
  /*end of if (! new_table) for add/drop index*/

unknown's avatar
unknown committed
5843
  if (table->s->tmp_table != NO_TMP_TABLE)
unknown's avatar
unknown committed
5844 5845 5846 5847
  {
    /* We changed a temporary table */
    if (error)
    {
unknown's avatar
unknown committed
5848 5849 5850
      /*
	The following function call will free the new_table pointer,
	in close_temporary_table(), so we can safely directly jump to err
5851
      */
5852 5853 5854 5855
      if (new_table)
        close_temporary_table(thd, new_table, 1, 1);
      else
        VOID(quick_rm_table(new_db_type,new_db,tmp_name));
unknown's avatar
unknown committed
5856 5857
      goto err;
    }
5858 5859 5860 5861 5862 5863
    /* Close lock if this is a transactional table */
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
      thd->lock=0;
    }
unknown's avatar
unknown committed
5864
    /* Remove link to old table and rename the new one */
unknown's avatar
unknown committed
5865
    close_temporary_table(thd, table, 1, 1);
5866 5867
    /* Should pass the 'new_name' as we store table name in the cache */
    if (rename_temporary_table(thd, new_table, new_db, new_name))
unknown's avatar
unknown committed
5868
    {						// Fatal error
5869 5870 5871 5872 5873 5874 5875
      if (new_table)
      {
        close_temporary_table(thd, new_table, 1, 1);
        my_free((gptr) new_table,MYF(0));
      }
      else
        VOID(quick_rm_table(new_db_type,new_db,tmp_name));
unknown's avatar
unknown committed
5876 5877
      goto err;
    }
5878
    /* We don't replicate alter table statement on temporary tables */
5879
    if (!thd->current_stmt_binlog_row_based)
5880
      write_bin_log(thd, TRUE, thd->query, thd->query_length);
unknown's avatar
unknown committed
5881 5882 5883
    goto end_temporary;
  }

5884 5885
  if (new_table)
  {
unknown's avatar
unknown committed
5886 5887
    /* close temporary table that will be the new table */
    intern_close_table(new_table);
5888 5889
    my_free((gptr) new_table,MYF(0));
  }
unknown's avatar
unknown committed
5890 5891 5892 5893 5894 5895 5896
  VOID(pthread_mutex_lock(&LOCK_open));
  if (error)
  {
    VOID(quick_rm_table(new_db_type,new_db,tmp_name));
    VOID(pthread_mutex_unlock(&LOCK_open));
    goto err;
  }
5897

unknown's avatar
unknown committed
5898
  /*
5899 5900
    Data is copied.  Now we rename the old table to a temp name,
    rename the new one to the old name, remove all entries from the old table
5901
    from the cache, free all locks, close the old table and remove it.
unknown's avatar
unknown committed
5902 5903 5904
  */

  thd->proc_info="rename result table";
5905 5906
  my_snprintf(old_name, sizeof(old_name), "%s2-%lx-%lx", tmp_file_prefix,
	      current_pid, thd->thread_id);
5907 5908
  if (lower_case_table_names)
    my_casedn_str(files_charset_info, old_name);
5909
  if (new_name != table_name || new_db != db)
unknown's avatar
unknown committed
5910 5911 5912 5913
  {
    if (!access(new_name_buff,F_OK))
    {
      error=1;
5914
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name_buff);
unknown's avatar
unknown committed
5915 5916 5917 5918 5919 5920
      VOID(quick_rm_table(new_db_type,new_db,tmp_name));
      VOID(pthread_mutex_unlock(&LOCK_open));
      goto err;
    }
  }

unknown's avatar
unknown committed
5921 5922 5923 5924 5925
#if (!defined( __WIN__) && !defined( __EMX__) && !defined( OS2))
  if (table->file->has_transactions())
#endif
  {
    /*
5926
      Win32 and InnoDB can't drop a table that is in use, so we must
5927
      close the original table at before doing the rename
unknown's avatar
unknown committed
5928
    */
unknown's avatar
unknown committed
5929
    table->s->version= 0;                	// Force removal of table def
5930
    close_cached_table(thd, table);
unknown's avatar
unknown committed
5931
    table=0;					// Marker that table is closed
5932
    no_table_reopen= TRUE;
unknown's avatar
unknown committed
5933
  }
unknown's avatar
unknown committed
5934 5935 5936
#if (!defined( __WIN__) && !defined( __EMX__) && !defined( OS2))
  else
    table->file->extra(HA_EXTRA_FORCE_REOPEN);	// Don't use this file anymore
unknown's avatar
unknown committed
5937 5938
#endif

unknown's avatar
unknown committed
5939

unknown's avatar
unknown committed
5940
  error=0;
5941
  if (!need_copy_table)
unknown's avatar
unknown committed
5942
    new_db_type=old_db_type= NULL; // this type cannot happen in regular ALTER
unknown's avatar
unknown committed
5943 5944 5945 5946 5947 5948
  if (mysql_rename_table(old_db_type,db,table_name,db,old_name))
  {
    error=1;
    VOID(quick_rm_table(new_db_type,new_db,tmp_name));
  }
  else if (mysql_rename_table(new_db_type,new_db,tmp_name,new_db,
5949 5950 5951 5952 5953
			      new_alias) ||
           (new_name != table_name || new_db != db) && // we also do rename
           Table_triggers_list::change_table_name(thd, db, table_name,
                                                  new_db, new_alias))
       
unknown's avatar
unknown committed
5954 5955
  {						// Try to get everything back
    error=1;
unknown's avatar
unknown committed
5956
    VOID(quick_rm_table(new_db_type,new_db,new_alias));
unknown's avatar
unknown committed
5957
    VOID(quick_rm_table(new_db_type,new_db,tmp_name));
unknown's avatar
unknown committed
5958
    VOID(mysql_rename_table(old_db_type,db,old_name,db,alias));
unknown's avatar
unknown committed
5959 5960 5961
  }
  if (error)
  {
unknown's avatar
unknown committed
5962 5963 5964 5965
    /*
      This shouldn't happen.  We solve this the safe way by
      closing the locked table.
    */
5966
    if (table)
unknown's avatar
unknown committed
5967 5968
    {
      table->s->version= 0;            	        // Force removal of table def
5969
      close_cached_table(thd,table);
unknown's avatar
unknown committed
5970
    }
unknown's avatar
unknown committed
5971 5972 5973
    VOID(pthread_mutex_unlock(&LOCK_open));
    goto err;
  }
5974 5975 5976 5977 5978 5979 5980 5981 5982 5983
  if (! need_copy_table)
  {
    if (! table)
    {
      VOID(pthread_mutex_unlock(&LOCK_open));
      if (! (table= open_ltable(thd, table_list, TL_WRITE_ALLOW_READ)))
        goto err;
      VOID(pthread_mutex_lock(&LOCK_open));
    }
    /* Tell the handler that a new frm file is in place. */
5984
    if (table->file->create_handler_files(reg_path, NULL, FALSE))
5985 5986 5987 5988 5989
    {
      VOID(pthread_mutex_unlock(&LOCK_open));
      goto err;
    }
  }
5990
  if (thd->lock || new_name != table_name || no_table_reopen)  // True if WIN32
unknown's avatar
unknown committed
5991
  {
unknown's avatar
unknown committed
5992
    /*
unknown's avatar
unknown committed
5993 5994
      Not table locking or alter table with rename.
      Free locks and remove old table
unknown's avatar
unknown committed
5995
    */
5996
    if (table)
unknown's avatar
unknown committed
5997 5998
    {
      table->s->version= 0;              	// Force removal of table def
5999
      close_cached_table(thd,table);
unknown's avatar
unknown committed
6000
    }
unknown's avatar
unknown committed
6001 6002 6003 6004
    VOID(quick_rm_table(old_db_type,db,old_name));
  }
  else
  {
unknown's avatar
unknown committed
6005 6006 6007 6008 6009
    /*
      Using LOCK TABLES without rename.
      This code is never executed on WIN32!
      Remove old renamed table, reopen table and get new locks
    */
unknown's avatar
unknown committed
6010 6011 6012
    if (table)
    {
      VOID(table->file->extra(HA_EXTRA_FORCE_REOPEN)); // Use new file
unknown's avatar
unknown committed
6013
      /* Mark in-use copies old */
unknown's avatar
unknown committed
6014
      remove_table_from_cache(thd,db,table_name,RTFC_NO_FLAG);
unknown's avatar
unknown committed
6015
      /* end threads waiting on lock */
unknown's avatar
unknown committed
6016
      mysql_lock_abort(thd,table, TRUE);
unknown's avatar
unknown committed
6017 6018 6019 6020 6021
    }
    VOID(quick_rm_table(old_db_type,db,old_name));
    if (close_data_tables(thd,db,table_name) ||
	reopen_tables(thd,1,0))
    {						// This shouldn't happen
6022
      if (table)
unknown's avatar
unknown committed
6023 6024
      {
        table->s->version= 0;                   // Force removal of table def
6025
	close_cached_table(thd,table);		// Remove lock for table
unknown's avatar
unknown committed
6026
      }
unknown's avatar
unknown committed
6027 6028 6029 6030
      VOID(pthread_mutex_unlock(&LOCK_open));
      goto err;
    }
  }
6031 6032 6033 6034 6035 6036 6037 6038
  VOID(pthread_mutex_unlock(&LOCK_open));
  VOID(pthread_cond_broadcast(&COND_refresh));
  /*
    The ALTER TABLE is always in its own transaction.
    Commit must not be called while LOCK_open is locked. It could call
    wait_if_global_read_lock(), which could create a deadlock if called
    with LOCK_open.
  */
unknown's avatar
unknown committed
6039 6040 6041 6042 6043 6044 6045 6046
  if (!committed)
  {
    error = ha_commit_stmt(thd);
    if (ha_commit(thd))
      error=1;
    if (error)
      goto err;
  }
unknown's avatar
unknown committed
6047
  thd->proc_info="end";
6048

6049 6050 6051 6052
  ha_binlog_log_query(thd, create_info->db_type, LOGCOM_ALTER_TABLE,
                      thd->query, thd->query_length,
                      db, table_name);

6053
  DBUG_ASSERT(!(mysql_bin_log.is_open() && thd->current_stmt_binlog_row_based &&
6054 6055
                (create_info->options & HA_LEX_CREATE_TMP_TABLE)));
  write_bin_log(thd, TRUE, thd->query, thd->query_length);
unknown's avatar
unknown committed
6056 6057 6058 6059
  /*
    TODO RONM: This problem needs to handled for Berkeley DB partitions
    as well
  */
6060
  if (ha_check_storage_engine_flag(old_db_type,HTON_FLUSH_AFTER_RENAME))
6061
  {
unknown's avatar
unknown committed
6062 6063 6064 6065 6066
    /*
      For the alter table to be properly flushed to the logs, we
      have to open the new table.  If not, we get a problem on server
      shutdown.
    */
6067
    char path[FN_REFLEN];
6068
    build_table_filename(path, sizeof(path), new_db, table_name, "");
6069 6070
    table=open_temporary_table(thd, path, new_db, tmp_name,0);
    if (table)
unknown's avatar
unknown committed
6071
    {
6072 6073
      intern_close_table(table);
      my_free((char*) table, MYF(0));
unknown's avatar
unknown committed
6074
    }
6075
    else
6076
      sql_print_warning("Could not open table %s.%s after rename\n",
unknown's avatar
unknown committed
6077
                        new_db,table_name);
6078
    ha_flush_logs(old_db_type);
6079
  }
unknown's avatar
unknown committed
6080
  table_list->table=0;				// For query cache
unknown's avatar
unknown committed
6081
  query_cache_invalidate3(thd, table_list, 0);
unknown's avatar
unknown committed
6082 6083

end_temporary:
6084 6085 6086
  my_snprintf(tmp_name, sizeof(tmp_name), ER(ER_INSERT_INFO),
	      (ulong) (copied + deleted), (ulong) deleted,
	      (ulong) thd->cuted_fields);
6087 6088
  if (do_send_ok)
    send_ok(thd,copied+deleted,0L,tmp_name);
unknown's avatar
unknown committed
6089
  thd->some_tables_deleted=0;
unknown's avatar
unknown committed
6090
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
6091 6092

 err:
unknown's avatar
unknown committed
6093
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
6094
}
unknown's avatar
unknown committed
6095
/* mysql_alter_table */
unknown's avatar
unknown committed
6096 6097

static int
unknown's avatar
unknown committed
6098
copy_data_between_tables(TABLE *from,TABLE *to,
6099
			 List<create_field> &create,
unknown's avatar
unknown committed
6100
			 enum enum_duplicates handle_duplicates,
6101
                         bool ignore,
6102
			 uint order_num, ORDER *order,
unknown's avatar
unknown committed
6103
			 ha_rows *copied,
6104
			 ha_rows *deleted)
unknown's avatar
unknown committed
6105 6106 6107 6108 6109
{
  int error;
  Copy_field *copy,*copy_end;
  ulong found_count,delete_count;
  THD *thd= current_thd;
unknown's avatar
unknown committed
6110 6111 6112 6113 6114 6115
  uint length;
  SORT_FIELD *sortorder;
  READ_RECORD info;
  TABLE_LIST   tables;
  List<Item>   fields;
  List<Item>   all_fields;
6116
  ha_rows examined_rows;
6117
  bool auto_increment_field_copied= 0;
6118
  ulong save_sql_mode;
unknown's avatar
unknown committed
6119 6120
  DBUG_ENTER("copy_data_between_tables");

6121 6122 6123 6124 6125 6126
  /*
    Turn off recovery logging since rollback of an alter table is to
    delete the new table so there is no need to log the changes to it.
    
    This needs to be done before external_lock
  */
6127
  error= ha_enable_transaction(thd, FALSE);
6128 6129
  if (error)
    DBUG_RETURN(-1);
6130
  
6131
  if (!(copy= new Copy_field[to->s->fields]))
unknown's avatar
unknown committed
6132 6133
    DBUG_RETURN(-1);				/* purecov: inspected */

6134
  if (to->file->ha_external_lock(thd, F_WRLCK))
6135
    DBUG_RETURN(-1);
6136 6137 6138 6139 6140 6141 6142

  /* We can abort alter table for any table type */
  thd->no_trans_update= 0;
  thd->abort_on_warning= !ignore && test(thd->variables.sql_mode &
                                         (MODE_STRICT_TRANS_TABLES |
                                          MODE_STRICT_ALL_TABLES));

6143
  from->file->info(HA_STATUS_VARIABLE);
6144
  to->file->start_bulk_insert(from->file->records);
unknown's avatar
unknown committed
6145

6146 6147
  save_sql_mode= thd->variables.sql_mode;

unknown's avatar
unknown committed
6148 6149 6150 6151 6152 6153 6154
  List_iterator<create_field> it(create);
  create_field *def;
  copy_end=copy;
  for (Field **ptr=to->field ; *ptr ; ptr++)
  {
    def=it++;
    if (def->field)
6155 6156
    {
      if (*ptr == to->next_number_field)
6157
      {
6158
        auto_increment_field_copied= TRUE;
6159 6160 6161 6162 6163 6164 6165 6166 6167
        /*
          If we are going to copy contents of one auto_increment column to
          another auto_increment column it is sensible to preserve zeroes.
          This condition also covers case when we are don't actually alter
          auto_increment column.
        */
        if (def->field == from->found_next_number_field)
          thd->variables.sql_mode|= MODE_NO_AUTO_VALUE_ON_ZERO;
      }
unknown's avatar
unknown committed
6168
      (copy_end++)->set(*ptr,def->field,0);
6169 6170
    }

unknown's avatar
unknown committed
6171 6172
  }

6173 6174
  found_count=delete_count=0;

unknown's avatar
unknown committed
6175 6176
  if (order)
  {
unknown's avatar
unknown committed
6177
    from->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
6178
					      MYF(MY_FAE | MY_ZEROFILL));
unknown's avatar
unknown committed
6179
    bzero((char*) &tables,sizeof(tables));
6180
    tables.table= from;
unknown's avatar
unknown committed
6181 6182
    tables.alias= tables.table_name= from->s->table_name.str;
    tables.db=    from->s->db.str;
unknown's avatar
unknown committed
6183 6184
    error=1;

unknown's avatar
unknown committed
6185
    if (thd->lex->select_lex.setup_ref_array(thd, order_num) ||
unknown's avatar
unknown committed
6186
	setup_order(thd, thd->lex->select_lex.ref_pointer_array,
6187
		    &tables, fields, all_fields, order) ||
6188 6189 6190
	!(sortorder=make_unireg_sortorder(order, &length)) ||
	(from->sort.found_records = filesort(thd, from, sortorder, length,
					     (SQL_SELECT *) 0, HA_POS_ERROR,
unknown's avatar
unknown committed
6191 6192
					     &examined_rows)) ==
	HA_POS_ERROR)
unknown's avatar
unknown committed
6193 6194 6195
      goto err;
  };

6196 6197 6198 6199 6200
  /*
    Handler must be told explicitly to retrieve all columns, because
    this function does not set field->query_id in the columns to the
    current query id
  */
6201
  to->file->ha_set_all_bits_in_write_set();
6202
  from->file->ha_retrieve_all_cols();
unknown's avatar
unknown committed
6203
  init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1);
6204
  if (ignore ||
6205
      handle_duplicates == DUP_REPLACE)
unknown's avatar
unknown committed
6206
    to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
6207
  thd->row_count= 0;
6208
  restore_record(to, s->default_values);        // Create empty record
unknown's avatar
unknown committed
6209 6210 6211 6212
  while (!(error=info.read_record(&info)))
  {
    if (thd->killed)
    {
unknown's avatar
SCRUM  
unknown committed
6213
      thd->send_kill_message();
unknown's avatar
unknown committed
6214 6215 6216
      error= 1;
      break;
    }
6217
    thd->row_count++;
6218 6219
    if (to->next_number_field)
    {
6220
      if (auto_increment_field_copied)
6221
        to->auto_increment_field_not_null= TRUE;
6222 6223 6224
      else
        to->next_number_field->reset();
    }
6225
    
unknown's avatar
unknown committed
6226
    for (Copy_field *copy_ptr=copy ; copy_ptr != copy_end ; copy_ptr++)
6227
    {
unknown's avatar
unknown committed
6228
      copy_ptr->do_copy(copy_ptr);
6229
    }
6230
    if ((error=to->file->ha_write_row((byte*) to->record[0])))
unknown's avatar
unknown committed
6231
    {
6232
      if ((!ignore &&
6233
	   handle_duplicates != DUP_REPLACE) ||
unknown's avatar
unknown committed
6234 6235 6236 6237 6238 6239
	  (error != HA_ERR_FOUND_DUPP_KEY &&
	   error != HA_ERR_FOUND_DUPP_UNIQUE))
      {
	to->file->print_error(error,MYF(0));
	break;
      }
6240
      to->file->restore_auto_increment();
unknown's avatar
unknown committed
6241 6242 6243
      delete_count++;
    }
    else
6244
      found_count++;
unknown's avatar
unknown committed
6245 6246
  }
  end_read_record(&info);
6247
  free_io_cache(from);
6248
  delete [] copy;				// This is never 0
unknown's avatar
unknown committed
6249

6250
  if (to->file->end_bulk_insert() && error <= 0)
unknown's avatar
unknown committed
6251
  {
unknown's avatar
unknown committed
6252
    to->file->print_error(my_errno,MYF(0));
unknown's avatar
unknown committed
6253 6254
    error=1;
  }
unknown's avatar
unknown committed
6255
  to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6256

6257
  ha_enable_transaction(thd,TRUE);
6258

6259 6260 6261 6262 6263 6264 6265 6266
  /*
    Ensure that the new table is saved properly to disk so that we
    can do a rename
  */
  if (ha_commit_stmt(thd))
    error=1;
  if (ha_commit(thd))
    error=1;
6267

unknown's avatar
unknown committed
6268
 err:
6269
  thd->variables.sql_mode= save_sql_mode;
6270
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
6271
  free_io_cache(from);
unknown's avatar
unknown committed
6272 6273
  *copied= found_count;
  *deleted=delete_count;
6274
  if (to->file->ha_external_lock(thd,F_UNLCK))
6275
    error=1;
unknown's avatar
unknown committed
6276 6277
  DBUG_RETURN(error > 0 ? -1 : 0);
}
6278

unknown's avatar
unknown committed
6279

6280 6281 6282 6283 6284 6285 6286 6287 6288 6289 6290 6291
/*
  Recreates tables by calling mysql_alter_table().

  SYNOPSIS
    mysql_recreate_table()
    thd			Thread handler
    tables		Tables to recreate
    do_send_ok          If we should send_ok() or leave it to caller

 RETURN
    Like mysql_alter_table().
*/
unknown's avatar
unknown committed
6292 6293
bool mysql_recreate_table(THD *thd, TABLE_LIST *table_list,
                          bool do_send_ok)
6294 6295 6296 6297 6298 6299 6300 6301 6302
{
  DBUG_ENTER("mysql_recreate_table");
  LEX *lex= thd->lex;
  HA_CREATE_INFO create_info;
  lex->create_list.empty();
  lex->key_list.empty();
  lex->col_list.empty();
  lex->alter_info.reset();
  bzero((char*) &create_info,sizeof(create_info));
unknown's avatar
unknown committed
6303
  create_info.db_type= (handlerton*) &default_hton;
unknown's avatar
unknown committed
6304
  create_info.row_type=ROW_TYPE_NOT_USED;
6305
  create_info.default_table_charset=default_charset_info;
unknown's avatar
unknown committed
6306
  /* Force alter table to recreate table */
unknown's avatar
unknown committed
6307
  lex->alter_info.flags= (ALTER_CHANGE_COLUMN | ALTER_RECREATE);
6308 6309 6310
  DBUG_RETURN(mysql_alter_table(thd, NullS, NullS, &create_info,
                                table_list, lex->create_list,
                                lex->key_list, 0, (ORDER *) 0,
6311
                                DUP_ERROR, 0, &lex->alter_info, do_send_ok));
6312 6313 6314
}


unknown's avatar
unknown committed
6315
bool mysql_checksum_table(THD *thd, TABLE_LIST *tables, HA_CHECK_OPT *check_opt)
6316 6317 6318 6319 6320
{
  TABLE_LIST *table;
  List<Item> field_list;
  Item *item;
  Protocol *protocol= thd->protocol;
unknown's avatar
unknown committed
6321
  DBUG_ENTER("mysql_checksum_table");
6322 6323 6324 6325 6326

  field_list.push_back(item = new Item_empty_string("Table", NAME_LEN*2));
  item->maybe_null= 1;
  field_list.push_back(item=new Item_int("Checksum",(longlong) 1,21));
  item->maybe_null= 1;
6327 6328
  if (protocol->send_fields(&field_list,
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
unknown's avatar
unknown committed
6329
    DBUG_RETURN(TRUE);
6330

unknown's avatar
VIEW  
unknown committed
6331
  for (table= tables; table; table= table->next_local)
6332 6333
  {
    char table_name[NAME_LEN*2+2];
6334
    TABLE *t;
6335

6336
    strxmov(table_name, table->db ,".", table->table_name, NullS);
unknown's avatar
unknown committed
6337

6338
    t= table->table= open_ltable(thd, table, TL_READ);
unknown's avatar
unknown committed
6339
    thd->clear_error();			// these errors shouldn't get client
6340 6341 6342 6343

    protocol->prepare_for_resend();
    protocol->store(table_name, system_charset_info);

6344
    if (!t)
6345
    {
unknown's avatar
unknown committed
6346
      /* Table didn't exist */
6347
      protocol->store_null();
unknown's avatar
unknown committed
6348
      thd->clear_error();
6349 6350 6351
    }
    else
    {
6352
      t->pos_in_table_list= table;
6353

6354
      if (t->file->table_flags() & HA_HAS_CHECKSUM &&
6355 6356
	  !(check_opt->flags & T_EXTEND))
	protocol->store((ulonglong)t->file->checksum());
6357
      else if (!(t->file->table_flags() & HA_HAS_CHECKSUM) &&
unknown's avatar
unknown committed
6358
	       (check_opt->flags & T_QUICK))
6359
	protocol->store_null();
6360 6361
      else
      {
6362 6363
	/* calculating table's checksum */
	ha_checksum crc= 0;
unknown's avatar
unknown committed
6364
        uchar null_mask=256 -  (1 << t->s->last_null_bit_pos);
6365

6366 6367 6368 6369 6370
        /*
          Set all bits in read set and inform InnoDB that we are reading all
          fields
        */
        t->file->ha_retrieve_all_cols();
6371

unknown's avatar
unknown committed
6372
	if (t->file->ha_rnd_init(1))
6373 6374 6375
	  protocol->store_null();
	else
	{
6376
	  for (;;)
6377 6378
	  {
	    ha_checksum row_crc= 0;
6379 6380 6381 6382 6383 6384 6385
            int error= t->file->rnd_next(t->record[0]);
            if (unlikely(error))
            {
              if (error == HA_ERR_RECORD_DELETED)
                continue;
              break;
            }
unknown's avatar
unknown committed
6386 6387 6388 6389
	    if (t->s->null_bytes)
            {
              /* fix undefined null bits */
              t->record[0][t->s->null_bytes-1] |= null_mask;
unknown's avatar
unknown committed
6390 6391 6392
              if (!(t->s->db_create_options & HA_OPTION_PACK_RECORD))
                t->record[0][0] |= 1;

unknown's avatar
unknown committed
6393 6394
	      row_crc= my_checksum(row_crc, t->record[0], t->s->null_bytes);
            }
6395

6396
	    for (uint i= 0; i < t->s->fields; i++ )
6397 6398
	    {
	      Field *f= t->field[i];
6399 6400
	      if ((f->type() == FIELD_TYPE_BLOB) ||
                  (f->type() == MYSQL_TYPE_VARCHAR))
6401 6402 6403 6404 6405 6406 6407
	      {
		String tmp;
		f->val_str(&tmp);
		row_crc= my_checksum(row_crc, (byte*) tmp.ptr(), tmp.length());
	      }
	      else
		row_crc= my_checksum(row_crc, (byte*) f->ptr,
unknown's avatar
unknown committed
6408
				     f->pack_length());
6409
	    }
6410

6411 6412 6413
	    crc+= row_crc;
	  }
	  protocol->store((ulonglong)crc);
unknown's avatar
unknown committed
6414
          t->file->ha_rnd_end();
6415
	}
6416
      }
unknown's avatar
unknown committed
6417
      thd->clear_error();
6418 6419 6420 6421 6422 6423 6424 6425
      close_thread_tables(thd);
      table->table=0;				// For query cache
    }
    if (protocol->write())
      goto err;
  }

  send_eof(thd);
unknown's avatar
unknown committed
6426
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
6427

6428 6429 6430 6431
 err:
  close_thread_tables(thd);			// Shouldn't be needed
  if (table)
    table->table=0;
unknown's avatar
unknown committed
6432
  DBUG_RETURN(TRUE);
6433
}
6434 6435

static bool check_engine(THD *thd, const char *table_name,
6436
                         HA_CREATE_INFO *create_info)
6437
{
6438
  handlerton **new_engine= &create_info->db_type;
unknown's avatar
unknown committed
6439
  handlerton *req_engine= *new_engine;
unknown's avatar
unknown committed
6440
  bool no_substitution=
6441
        test(thd->variables.sql_mode & MODE_NO_ENGINE_SUBSTITUTION);
unknown's avatar
unknown committed
6442 6443
  if (!(*new_engine= ha_checktype(thd, ha_legacy_type(req_engine),
                                  no_substitution, 1)))
6444 6445
    return TRUE;

unknown's avatar
unknown committed
6446
  if (req_engine != (handlerton*) &default_hton && req_engine != *new_engine)
6447 6448 6449 6450
  {
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                       ER_WARN_USING_OTHER_HANDLER,
                       ER(ER_WARN_USING_OTHER_HANDLER),
unknown's avatar
unknown committed
6451
                       ha_resolve_storage_engine_name(*new_engine),
6452 6453
                       table_name);
  }
6454 6455 6456 6457 6458 6459 6460 6461 6462 6463 6464
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE &&
      ha_check_storage_engine_flag(*new_engine, HTON_TEMPORARY_NOT_SUPPORTED))
  {
    if (create_info->used_fields & HA_CREATE_USED_ENGINE)
    {
      my_error(ER_ILLEGAL_HA_CREATE_OPTION, MYF(0), (*new_engine)->name, "TEMPORARY");
      *new_engine= 0;
      return TRUE;
    }
    *new_engine= &myisam_hton;
  }
6465 6466
  return FALSE;
}