sql_insert.cc 119 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000-2006 MySQL AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
6

unknown's avatar
unknown committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
11

unknown's avatar
unknown committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
32
  Delayed_insert::get_local_table() the table of the thread is copied
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

unknown's avatar
unknown committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
61
#include "sql_show.h"
62
#include "slave.h"
63
#include "rpl_mi.h"
unknown's avatar
unknown committed
64

unknown's avatar
unknown committed
65
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
66
static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
unknown's avatar
unknown committed
67
static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
68
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
69
static void end_delayed_insert(THD *thd);
70
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
71
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
72
#endif
73
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
74 75 76 77 78 79 80 81 82 83 84

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
                              table_map *map)
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

144

unknown's avatar
unknown committed
145
/*
146
  Check if insert fields are correct.
147 148 149 150 151 152 153

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
154
    check_unique                If duplicate values should be rejected.
155 156 157 158 159 160 161 162 163

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
164 165
*/

unknown's avatar
unknown committed
166 167
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
168
                               bool check_unique, table_map *map)
unknown's avatar
unknown committed
169
{
unknown's avatar
VIEW  
unknown committed
170
  TABLE *table= table_list->table;
171

172 173
  if (!table_list->updatable)
  {
174
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
175 176 177
    return -1;
  }

unknown's avatar
unknown committed
178 179
  if (fields.elements == 0 && values.elements != 0)
  {
180 181 182 183 184 185
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
186
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
187
    {
unknown's avatar
unknown committed
188
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
189 190
      return -1;
    }
unknown's avatar
unknown committed
191
#ifndef NO_EMBEDDED_ACCESS_CHECKS
192 193 194 195 196 197
    Field_iterator_table field_it;
    field_it.set_table(table);
    if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
                                table->s->db.str, table->s->table_name.str,
                                &field_it))
      return -1;
unknown's avatar
unknown committed
198
#endif
199 200
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
201 202 203 204
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
205
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
206 207 208
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
209 210
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
211
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
212
    int res;
unknown's avatar
unknown committed
213

unknown's avatar
unknown committed
214 215
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
216
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
217 218 219
      return -1;
    }

220
    thd->dup_field= 0;
unknown's avatar
unknown committed
221 222 223
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
224
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
225 226 227 228 229

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
230
    table_list->next_local= 0;
231
    context->resolve_in_table_list_only(table_list);
232
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
233 234

    /* Restore the current context. */
235
    ctx_state.restore_state(context, table_list);
236
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
237

unknown's avatar
unknown committed
238
    if (res)
unknown's avatar
unknown committed
239
      return -1;
unknown's avatar
unknown committed
240

unknown's avatar
unknown committed
241
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
242
    {
243
      if (check_view_single_update(fields, table_list, map))
244
        return -1;
245
      table= table_list->table;
246
    }
247

248
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
249
    {
250
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
251 252
      return -1;
    }
253 254 255 256 257 258 259 260 261 262 263 264
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
265
  }
unknown's avatar
unknown committed
266
  // For the values we need select_priv
unknown's avatar
unknown committed
267
#ifndef NO_EMBEDDED_ACCESS_CHECKS
268
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
unknown committed
269
#endif
270 271 272

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
273
       check_view_insertability(thd, table_list)))
274
  {
275
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
276 277 278
    return -1;
  }

unknown's avatar
unknown committed
279 280 281 282
  return 0;
}


283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
302
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
303
                               List<Item> &update_fields, table_map *map)
304
{
unknown's avatar
unknown committed
305
  TABLE *table= insert_table_list->table;
306
  my_bool timestamp_mark;
307

308 309
  LINT_INIT(timestamp_mark);

310 311
  if (table->timestamp_field)
  {
312 313 314 315 316 317
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
318 319
  }

320 321
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
322 323
    return -1;

324 325 326 327
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
      check_view_single_update(update_fields, insert_table_list, map))
    return -1;

328 329 330
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
331 332
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
333 334
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
335 336 337
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
338 339 340 341
  }
  return 0;
}

unknown's avatar
unknown committed
342
/*
343 344 345 346 347 348 349 350 351 352 353 354
  Prepare triggers  for INSERT-like statement.

  SYNOPSIS
    prepare_triggers_for_insert_stmt()
      table   Table to which insert will happen

  NOTE
    Prepare triggers for INSERT-like statement by marking fields
    used by triggers and inform handlers that batching of UPDATE/DELETE 
    cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/

unknown's avatar
unknown committed
355
void prepare_triggers_for_insert_stmt(TABLE *table)
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
{
  if (table->triggers)
  {
    if (table->triggers->has_triggers(TRG_EVENT_DELETE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER DELETE triggers that might access to 
        subject table and therefore might need delete to be done 
        immediately. So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
    }
    if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER UPDATE triggers that might access to subject 
        table and therefore might need update to be done immediately. 
        So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
    }
  }
unknown's avatar
unknown committed
380
  table->mark_columns_needed_for_insert();
381 382
}

383

384 385 386 387 388 389 390 391
/**
  Upgrade table-level lock of INSERT statement to TL_WRITE if
  a more concurrent lock is infeasible for some reason. This is
  necessary for engines without internal locking support (MyISAM).
  An engine with internal locking implementation might later
  downgrade the lock in handler::store_lock() method.
*/

unknown's avatar
unknown committed
392
static
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
                       enum_duplicates duplic,
                       bool is_multi_insert)
{
  if (duplic == DUP_UPDATE ||
      duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT)
  {
    *lock_type= TL_WRITE;
    return;
  }

  if (*lock_type == TL_WRITE_DELAYED)
  {
    /*
      We do not use delayed threads if:
unknown's avatar
unknown committed
408 409 410 411 412
      - we're running in the safe mode or skip-new mode -- the
        feature is disabled in these modes
      - we're executing this statement on a replication slave --
        we need to ensure serial execution of queries on the
        slave
413 414
      - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
        insert cannot be concurrent
unknown's avatar
unknown committed
415 416 417 418 419 420 421 422 423 424 425 426
      - this statement is directly or indirectly invoked from
        a stored function or trigger (under pre-locking) - to
        avoid deadlocks, since INSERT DELAYED involves a lock
        upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
        attempt while keeping other table level locks.
      - this statement itself may require pre-locking.
        We should upgrade the lock even though in most cases
        delayed functionality may work. Unfortunately, we can't
        easily identify whether the subject table is not used in
        the statement indirectly via a stored function or trigger:
        if it is used, that will lead to a deadlock between the
        client connection and the delayed thread.
427 428 429
    */
    if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
        thd->slave_thread ||
unknown's avatar
unknown committed
430 431 432
        thd->variables.max_insert_delayed_threads == 0 ||
        thd->prelocked_mode ||
        thd->lex->uses_stored_routines())
433 434 435 436 437 438
    {
      *lock_type= TL_WRITE;
      return;
    }
    bool log_on= (thd->options & OPTION_BIN_LOG ||
                  ! (thd->security_ctx->master_access & SUPER_ACL));
439 440
    if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
        log_on && mysql_bin_log.is_open() && is_multi_insert)
441 442 443 444 445 446 447 448 449 450 451 452 453 454
    {
      /*
        Statement-based binary logging does not work in this case, because:
        a) two concurrent statements may have their rows intermixed in the
        queue, leading to autoincrement replication problems on slave (because
        the values generated used for one statement don't depend only on the
        value generated for the first row of this statement, so are not
        replicable)
        b) if first row of the statement has an error the full statement is
        not binlogged, while next rows of the statement may be inserted.
        c) if first row succeeds, statement is binlogged immediately with a
        zero error code (i.e. "no error"), if then second row fails, query
        will fail on slave too and slave will stop (wrongly believing that the
        master got no error).
455 456 457 458 459 460 461 462 463
        So we fallback to non-delayed INSERT.
        Note that to be fully correct, we should test the "binlog format which
        the delayed thread is going to use for this row". But in the common case
        where the global binlog format is not changed and the session binlog
        format may be changed, that is equal to the global binlog format.
        We test it without mutex for speed reasons (condition rarely true), and
        in the common case (global not changed) it is as good as without mutex;
        if global value is changed, anyway there is uncertainty as the delayed
        thread may be old and use the before-the-change value.
464 465 466 467 468 469 470
      */
      *lock_type= TL_WRITE;
    }
  }
}


unknown's avatar
unknown committed
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
/**
  Find or create a delayed insert thread for the first table in
  the table list, then open and lock the remaining tables.
  If a table can not be used with insert delayed, upgrade the lock
  and open and lock all tables using the standard mechanism.

  @param thd         thread context
  @param table_list  list of "descriptors" for tables referenced
                     directly in statement SQL text.
                     The first element in the list corresponds to
                     the destination table for inserts, remaining
                     tables, if any, are usually tables referenced
                     by sub-queries in the right part of the
                     INSERT.

  @return Status of the operation. In case of success 'table'
  member of every table_list element points to an instance of
  class TABLE.

  @sa open_and_lock_tables for more information about MySQL table
  level locking
*/

static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
  DBUG_ENTER("open_and_lock_for_insert_delayed");

#ifndef EMBEDDED_LIBRARY
  if (delayed_get_table(thd, table_list))
    DBUG_RETURN(TRUE);

  if (table_list->table)
  {
    /*
      Open tables used for sub-selects or in stored functions, will also
      cache these functions.
    */
    if (open_and_lock_tables(thd, table_list->next_global))
    {
      end_delayed_insert(thd);
      DBUG_RETURN(TRUE);
    }
    /*
      First table was not processed by open_and_lock_tables(),
      we need to set updatability flag "by hand".
    */
    if (!table_list->derived && !table_list->view)
      table_list->updatable= 1;  // usual table
    DBUG_RETURN(FALSE);
  }
#endif
  /*
    * This is embedded library and we don't have auxiliary
    threads OR
    * a lock upgrade was requested inside delayed_get_table
      because
      - there are too many delayed insert threads OR
      - the table has triggers.
    Use a normal insert.
  */
  table_list->lock_type= TL_WRITE;
  DBUG_RETURN(open_and_lock_tables(thd, table_list));
}


537 538 539 540
/**
  INSERT statement implementation
*/

unknown's avatar
unknown committed
541 542 543 544 545
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
546 547
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
548
{
549
  int error, res;
550
  bool transactional_table, joins_freed= FALSE;
551
  bool changed;
unknown's avatar
unknown committed
552 553 554 555
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
556
  TABLE *table= 0;
unknown's avatar
unknown committed
557
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
558
  List_item *values;
unknown's avatar
unknown committed
559
  Name_resolution_context *context;
560
  Name_resolution_context_state ctx_state;
561
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
562
  char *query= thd->query;
unknown's avatar
unknown committed
563 564 565 566 567 568
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
unknown's avatar
unknown committed
569
                (!(thd->security_ctx->master_access & SUPER_ACL)));
unknown's avatar
unknown committed
570
#endif
unknown's avatar
unknown committed
571
  thr_lock_type lock_type = table_list->lock_type;
unknown's avatar
unknown committed
572
  Item *unused_conds= 0;
unknown's avatar
unknown committed
573 574
  DBUG_ENTER("mysql_insert");

575
  /*
576 577 578 579 580 581 582 583 584 585 586 587
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
  upgrade_lock_type(thd, &table_list->lock_type, duplic,
                    values_list.elements > 1);

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
    never be able to get a lock on the table. QQQ: why not
    upgrade the lock here instead?
  */
unknown's avatar
unknown committed
588
  if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
589
      find_locked_table(thd, table_list->db, table_list->table_name))
590
  {
591 592 593
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
             table_list->table_name);
    DBUG_RETURN(TRUE);
594
  }
unknown's avatar
unknown committed
595

unknown's avatar
unknown committed
596
  if (table_list->lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
597
  {
unknown's avatar
unknown committed
598 599
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
600 601
  }
  else
unknown's avatar
unknown committed
602 603 604 605
  {
    if (open_and_lock_tables(thd, table_list))
      DBUG_RETURN(TRUE);
  }
606

unknown's avatar
unknown committed
607
  thd->proc_info="init";
608
  thd->used_tables=0;
unknown's avatar
unknown committed
609
  values= its++;
unknown's avatar
unknown committed
610
  value_count= values->elements;
unknown's avatar
unknown committed
611

unknown's avatar
VIEW  
unknown committed
612
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
613
			   update_fields, update_values, duplic, &unused_conds,
unknown's avatar
unknown committed
614 615 616 617 618
                           FALSE,
                           (fields.elements || !value_count),
                           !ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES))))
unknown's avatar
unknown committed
619
    goto abort;
620

621 622
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;
unknown's avatar
unknown committed
623
  lock_type= table_list->lock_type;
624

unknown's avatar
unknown committed
625
  context= &thd->lex->select_lex.context;
626 627 628 629 630 631 632 633 634
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

unknown's avatar
unknown committed
635
  /* Save the state of the current name resolution context. */
636
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
637 638 639 640 641

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
642
  table_list->next_local= 0;
unknown's avatar
unknown committed
643 644
  context->resolve_in_table_list_only(table_list);

645
  while ((values= its++))
unknown's avatar
unknown committed
646 647 648 649
  {
    counter++;
    if (values->elements != value_count)
    {
650
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
651 652
      goto abort;
    }
653
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
654 655 656
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
657 658
 
  /* Restore the current context. */
659
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
660

unknown's avatar
unknown committed
661
  /*
unknown's avatar
unknown committed
662
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
663
  */
unknown's avatar
unknown committed
664
  info.records= info.deleted= info.copied= info.updated= 0;
665
  info.ignore= ignore;
unknown's avatar
unknown committed
666
  info.handle_duplicates=duplic;
667 668
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
669
  info.view= (table_list->view ? table_list : 0);
670

671 672 673
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
674
    to NULL.
675
  */
unknown's avatar
unknown committed
676
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
677
                             !ignore) ?
678 679
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
680 681 682
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

683 684 685 686 687 688 689 690
#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    goto abort;
#endif

unknown's avatar
unknown committed
691 692
  error=0;
  thd->proc_info="update";
unknown's avatar
unknown committed
693
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
694
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
695 696 697
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
698 699
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
700 701 702 703
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
704 705 706 707
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
708 709 710 711
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
712
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
713
    table->file->ha_start_bulk_insert(values_list.elements);
714

715
  thd->no_trans_update.stmt= FALSE;
unknown's avatar
unknown committed
716 717 718
  thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES)));
unknown's avatar
unknown committed
719

unknown's avatar
unknown committed
720 721
  prepare_triggers_for_insert_stmt(table);

722

723 724 725 726
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
727
  while ((values= its++))
unknown's avatar
unknown committed
728 729 730
  {
    if (fields.elements || !value_count)
    {
731
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
732 733 734
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
735
      {
unknown's avatar
unknown committed
736
	if (values_list.elements != 1 && !thd->net.report_error)
unknown's avatar
unknown committed
737 738 739 740
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
741 742 743 744 745
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
746 747 748 749 750 751
	error=1;
	break;
      }
    }
    else
    {
752
      if (thd->used_tables)			// Column used in values()
753
	restore_record(table,s->default_values);	// Get empty record
754
      else
unknown's avatar
unknown committed
755 756 757 758 759 760 761 762 763 764 765
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
766
      {
unknown's avatar
unknown committed
767
	if (values_list.elements != 1 && ! thd->net.report_error)
unknown's avatar
unknown committed
768 769 770 771 772 773 774 775
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
776

777 778 779
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
780
					     ignore))) ==
unknown's avatar
unknown committed
781 782 783
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
784
    {
unknown's avatar
unknown committed
785 786
      error= 1;
      break;
unknown's avatar
unknown committed
787
    }
788
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
789
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
790
    {
791 792
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
unknown's avatar
unknown committed
793 794 795
      query=0;
    }
    else
796
#endif
unknown's avatar
unknown committed
797
      error=write_record(thd, table ,&info);
798 799
    if (error)
      break;
800
    thd->row_count++;
unknown's avatar
unknown committed
801
  }
802

803 804 805
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

806 807 808 809
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
810
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
811 812
  if (lock_type == TL_WRITE_DELAYED)
  {
813 814 815 816 817
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
818
    query_cache_invalidate3(thd, table_list, 1);
unknown's avatar
unknown committed
819 820
  }
  else
821
#endif
unknown's avatar
unknown committed
822
  {
823 824 825 826 827
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
828
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
829
    {
unknown's avatar
unknown committed
830 831
      table->file->print_error(my_errno,MYF(0));
      error=1;
832
    }
833
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
834

835
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
836
    {
837 838 839 840 841 842 843
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
844
      {
845
        if (mysql_bin_log.is_open())
846
        {
847
          if (error <= 0)
848 849 850 851 852 853 854 855 856 857 858
          {
            /*
              [Guilhem wrote] Temporary errors may have filled
              thd->net.last_error/errno.  For example if there has
              been a disk full error when writing the row, and it was
              MyISAM, then thd->net.last_error/errno will be set to
              "disk full"... and the my_pwrite() will wait until free
              space appears, and so when it finishes then the
              write_row() was entirely successful
            */
            /* todo: consider removing */
859
            thd->clear_error();
860
          }
861 862
          /* bug#22725:

863 864 865 866 867 868 869 870 871 872
          A query which per-row-loop can not be interrupted with
          KILLED, like INSERT, and that does not invoke stored
          routines can be binlogged with neglecting the KILLED error.
          
          If there was no error (error == zero) until after the end of
          inserting loop the KILLED flag that appeared later can be
          disregarded since previously possible invocation of stored
          routines did not result in any error due to the KILLED.  In
          such case the flag is ignored for constructing binlog event.
          */
873
          DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
unknown's avatar
unknown committed
874 875
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
876 877
                                transactional_table, FALSE,
                                (error>0) ? thd->killed : THD::NOT_KILLED) &&
unknown's avatar
unknown committed
878 879
              transactional_table)
          {
880
            error=1;
unknown's avatar
unknown committed
881
          }
882
        }
883
        if (!transactional_table)
unknown's avatar
unknown committed
884
          thd->no_trans_update.all= TRUE;
885
      }
unknown's avatar
unknown committed
886
    }
887
    if (transactional_table)
888
      error=ha_autocommit_or_rollback(thd,error);
889
    
unknown's avatar
unknown committed
890 891 892
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
893 894 895 896 897 898 899 900 901 902
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
unknown's avatar
unknown committed
903 904 905 906
      thd->lock=0;
    }
  }
  thd->proc_info="end";
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
923
  table->next_number_field=0;
924
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
925
  table->auto_increment_field_not_null= FALSE;
unknown's avatar
unknown committed
926
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
927
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
928 929 930
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
931

unknown's avatar
unknown committed
932 933 934 935
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
936
  {
937 938 939
    thd->row_count_func= info.copied + info.deleted +
                         ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                          info.touched : info.updated);
940
    send_ok(thd, (ulong) thd->row_count_func, id);
941
  }
942 943
  else
  {
unknown's avatar
unknown committed
944
    char buff[160];
945 946
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.touched : info.updated);
947
    if (ignore)
948 949 950
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
951
    else
952
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
953 954
	      (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
    thd->row_count_func= info.copied + info.deleted + updated;
955
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
956
  }
957
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
958
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
959 960

abort:
961
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
962 963
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
964
#endif
965
  if (table != NULL)
966
    table->file->ha_release_auto_increment();
967 968
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
969
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
970
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
971 972 973
}


unknown's avatar
VIEW  
unknown committed
974 975 976 977 978
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
979
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
980 981
    view    - reference on VIEW

982 983 984 985 986 987
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
988 989
  RETURN
    FALSE - OK
990 991 992
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
993 994 995
    TRUE  - can't be used for insert
*/

996
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
997
{
998
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
999
  TABLE *table= view->table;
1000 1001 1002
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
unknown committed
1003
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
1004
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1005
  MY_BITMAP used_fields;
unknown's avatar
unknown committed
1006
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
1007 1008
  DBUG_ENTER("check_key_in_view");

1009 1010 1011
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
1012 1013
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

1014
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
1015 1016
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
1017
  view->contain_auto_increment= 0;
1018 1019 1020 1021
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
unknown's avatar
unknown committed
1022
  thd->mark_used_columns= MARK_COLUMNS_NONE;
unknown's avatar
VIEW  
unknown committed
1023
  /* check simplicity and prepare unique test of view */
1024
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
1025
  {
1026
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1027
    {
unknown's avatar
unknown committed
1028
      thd->mark_used_columns= save_mark_used_columns;
1029 1030
      DBUG_RETURN(TRUE);
    }
1031
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
1032
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
1033
    if (!(field= trans->item->filed_for_view_update()))
1034
    {
unknown's avatar
unknown committed
1035
      thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
1036
      DBUG_RETURN(TRUE);
1037
    }
1038
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
1039 1040
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
1041 1042 1043 1044 1045
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
1046
  }
unknown's avatar
unknown committed
1047
  thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
1048
  /* unique test */
1049
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
1050
  {
1051
    /* Thanks to test above, we know that all columns are of type Item_field */
1052
    Item_field *field= (Item_field *)trans->item;
1053 1054 1055
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
1056 1057 1058 1059 1060 1061 1062
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
1063
/*
1064
  Check if table can be updated
unknown's avatar
unknown committed
1065 1066

  SYNOPSIS
1067 1068
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
1069
     table_list		Table list
1070 1071
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
1072
     select_insert      Check is making for SELECT ... INSERT
1073 1074

   RETURN
unknown's avatar
unknown committed
1075 1076
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
1077
*/
1078

unknown's avatar
unknown committed
1079
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1080
                                             List<Item> &fields,
unknown's avatar
unknown committed
1081
                                             bool select_insert)
unknown's avatar
unknown committed
1082
{
unknown's avatar
VIEW  
unknown committed
1083
  bool insert_into_view= (table_list->view != 0);
1084
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
1085

1086 1087 1088 1089 1090 1091 1092
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

1093 1094
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
1095
                                    table_list,
1096
                                    &thd->lex->select_lex.leaf_tables,
1097
                                    select_insert, INSERT_ACL, SELECT_ACL))
unknown's avatar
unknown committed
1098
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
1099 1100 1101 1102

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
1103 1104 1105 1106
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
1107
      DBUG_RETURN(TRUE);
1108
    }
1109
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
1110 1111
  }

unknown's avatar
unknown committed
1112
  DBUG_RETURN(FALSE);
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
1123 1124
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
1125 1126
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
unknown's avatar
unknown committed
1127 1128 1129 1130
    check_fields        TRUE if need to check that all INSERT fields are 
                        given values.
    abort_on_warning    whether to report if some INSERT field is not 
                        assigned as an error (TRUE) or as a warning (FALSE).
1131

unknown's avatar
unknown committed
1132 1133 1134 1135 1136
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
1137

1138 1139 1140
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
1141
  
1142
  RETURN VALUE
unknown's avatar
unknown committed
1143 1144
    FALSE OK
    TRUE  error
1145 1146
*/

unknown's avatar
unknown committed
1147
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
1148
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
1149
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
1150
                          enum_duplicates duplic,
unknown's avatar
unknown committed
1151 1152
                          COND **where, bool select_insert,
                          bool check_fields, bool abort_on_warning)
1153
{
unknown's avatar
unknown committed
1154
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
1155
  Name_resolution_context *context= &select_lex->context;
1156
  Name_resolution_context_state ctx_state;
1157
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
1158
  bool res= 0;
1159
  table_map map= 0;
1160
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
1161 1162 1163
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
1164 1165
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
unknown's avatar
unknown committed
1166

1167 1168 1169 1170 1171 1172 1173
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
1174
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
1187
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1188 1189
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
1190
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
1191
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
1192
  }
unknown's avatar
unknown committed
1193

1194
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
1195
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
1196

unknown's avatar
unknown committed
1197 1198

  /* Prepare the fields in the statement. */
1199
  if (values)
unknown's avatar
unknown committed
1200
  {
1201 1202 1203 1204 1205 1206
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1207
    /*
1208 1209 1210 1211 1212 1213
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

unknown's avatar
unknown committed
1214 1215
    res= check_insert_fields(thd, context->table_list, fields, *values,
                             !insert_into_view, &map) ||
unknown's avatar
unknown committed
1216
      setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229

    if (!res && check_fields)
    {
      bool saved_abort_on_warning= thd->abort_on_warning;
      thd->abort_on_warning= abort_on_warning;
      res= check_that_all_fields_are_given_values(thd, 
                                                  table ? table : 
                                                  context->table_list->table,
                                                  context->table_list);
      thd->abort_on_warning= saved_abort_on_warning;
    }

    if (!res && duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1230
    {
1231 1232 1233
      select_lex->no_wrap_view_item= TRUE;
      res= check_update_fields(thd, context->table_list, update_fields, &map);
      select_lex->no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
1234
    }
1235 1236 1237 1238

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);

unknown's avatar
unknown committed
1239
    if (!res)
1240
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
1241
  }
unknown's avatar
unknown committed
1242

unknown's avatar
unknown committed
1243 1244
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
1245

unknown's avatar
unknown committed
1246 1247 1248
  if (!table)
    table= table_list->table;

1249
  if (!select_insert)
unknown's avatar
unknown committed
1250
  {
1251
    Item *fake_conds= 0;
1252
    TABLE_LIST *duplicate;
1253
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1254
    {
1255
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1256 1257
      DBUG_RETURN(TRUE);
    }
1258
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
unknown's avatar
unknown committed
1259
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
1260
  }
1261
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1262
    table->prepare_for_position();
unknown's avatar
unknown committed
1263
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1264 1265 1266
}


unknown's avatar
unknown committed
1267 1268 1269 1270
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1271
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
1272 1273 1274 1275 1276 1277 1278
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
1289

unknown's avatar
unknown committed
1290 1291 1292 1293 1294
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1295

1296
    Sets thd->no_trans_update.stmt to TRUE if table which is updated didn't have
unknown's avatar
unknown committed
1297 1298 1299 1300 1301
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1302 1303 1304
*/


unknown's avatar
unknown committed
1305
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
1306
{
unknown's avatar
unknown committed
1307
  int error, trg_error= 0;
unknown's avatar
unknown committed
1308
  char *key=0;
1309
  MY_BITMAP *save_read_set, *save_write_set;
1310 1311
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1312
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1313

unknown's avatar
unknown committed
1314
  info->records++;
1315 1316
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1317

1318 1319
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1320
  {
1321
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1322
    {
1323
      uint key_nr;
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1335
      bool is_duplicate_key_error;
1336
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1337
	goto err;
1338
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
unknown's avatar
unknown committed
1339 1340
      if (!is_duplicate_key_error)
      {
1341 1342 1343 1344 1345
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1346
        if (info->ignore)
1347 1348
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
unknown's avatar
unknown committed
1349
      }
unknown's avatar
unknown committed
1350 1351
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1352
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1353 1354
	goto err;
      }
1355 1356
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1357 1358 1359 1360 1361
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1362 1363
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1364
          key_nr == table->s->next_number_index &&
1365
	  (insert_id_for_cur_row > 0))
1366
	goto err;
1367
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1368
      {
1369
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1370 1371 1372 1373
	  goto err;
      }
      else
      {
1374
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1375 1376 1377 1378
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1379

unknown's avatar
unknown committed
1380 1381
	if (!key)
	{
1382
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1383 1384 1385 1386 1387 1388
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1389
	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
unknown's avatar
unknown committed
1390
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1391
                                                (uchar*) key, HA_WHOLE_KEY,
unknown's avatar
unknown committed
1392 1393 1394
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1395
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1396
      {
unknown's avatar
unknown committed
1397
        int res= 0;
unknown's avatar
unknown committed
1398 1399 1400 1401
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1402
        */
unknown's avatar
unknown committed
1403
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1404 1405
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1406 1407
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1408
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
unknown's avatar
unknown committed
1409 1410
                                                 *info->update_values,
                                                 info->ignore,
unknown's avatar
unknown committed
1411 1412 1413
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1414 1415

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1416 1417 1418
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1419
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1420
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1421
          goto before_trg_err;
1422

1423
        table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1424 1425
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ) ||
            compare_record(table))
1426
        {
1427
          if ((error=table->file->ha_update_row(table->record[1],
1428 1429
                                                table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1430
          {
1431 1432
            if (info->ignore &&
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1433 1434 1435 1436
            {
              goto ok_or_after_trg_err;
            }
            goto err;
1437
          }
unknown's avatar
unknown committed
1438

1439 1440 1441 1442
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->updated++;
          else
            error= 0;
unknown's avatar
unknown committed
1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453
          /*
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
            like a regular UPDATE statement: it should not affect the value of a
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
            handled separately by THD::arg_of_last_insert_id_function.
          */
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
          if (table->next_number_field)
            table->file->adjust_next_insert_id_after_explicit_value(
              table->next_number_field->val_int());
unknown's avatar
unknown committed
1454 1455
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
unknown's avatar
unknown committed
1456
                                                        TRG_ACTION_AFTER, TRUE));
1457 1458
          info->copied++;
        }
1459

1460 1461 1462 1463 1464
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;

unknown's avatar
unknown committed
1465
        goto ok_or_after_trg_err;
1466 1467 1468
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1469 1470 1471 1472 1473
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1474 1475
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1476 1477 1478 1479 1480 1481
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1482 1483
	*/
	if (last_uniq_key(table,key_nr) &&
1484
	    !table->file->referenced_by_foreign_key() &&
1485
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1486 1487
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1488
        {
1489
          if ((error=table->file->ha_update_row(table->record[1],
1490 1491
					        table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1492
            goto err;
1493 1494 1495 1496
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->deleted++;
          else
            error= 0;
1497
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1498 1499 1500 1501 1502
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1503 1504 1505 1506 1507 1508 1509
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1510
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1511 1512 1513
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
unknown's avatar
unknown committed
1514
            thd->no_trans_update.stmt= TRUE;
unknown's avatar
unknown committed
1515 1516 1517 1518 1519 1520 1521 1522
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1523
        }
unknown's avatar
unknown committed
1524 1525
      }
    }
1526
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1527 1528 1529 1530 1531 1532 1533
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1534
  }
1535
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1536
  {
1537
    if (!info->ignore ||
1538
        table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1539
      goto err;
1540
    table->file->restore_auto_increment(prev_insert_id);
1541
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1542
  }
1543 1544 1545

after_trg_n_copied_inc:
  info->copied++;
1546
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1547 1548 1549
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1550 1551

ok_or_after_trg_err:
unknown's avatar
unknown committed
1552
  if (key)
1553
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1554
  if (!table->file->has_transactions())
1555
    thd->no_trans_update.stmt= TRUE;
unknown's avatar
unknown committed
1556
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1557 1558

err:
1559
  info->last_errno= error;
1560 1561 1562
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1563
  table->file->print_error(error,MYF(0));
1564
  
unknown's avatar
unknown committed
1565
before_trg_err:
1566
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1567 1568
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1569
  table->column_bitmaps_set(save_read_set, save_write_set);
1570
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1571 1572 1573 1574
}


/******************************************************************************
unknown's avatar
unknown committed
1575
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1576 1577
******************************************************************************/

1578 1579
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1580
{
1581
  int err= 0;
1582 1583
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1584 1585
  for (Field **field=entry->field ; *field ; field++)
  {
1586
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1587
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1588
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
unknown's avatar
unknown committed
1589
    {
1590 1591 1592
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1593
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1594
        view= test(table_list->view);
1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1611
      err= 1;
unknown's avatar
unknown committed
1612 1613
    }
  }
1614
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1615 1616 1617
}

/*****************************************************************************
unknown's avatar
unknown committed
1618 1619
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1620 1621
*****************************************************************************/

1622 1623
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1624 1625
class delayed_row :public ilink {
public:
1626
  char *record;
unknown's avatar
unknown committed
1627 1628
  enum_duplicates dup;
  time_t start_time;
1629 1630 1631
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1632
  ulonglong forced_insert_id;
1633 1634
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1635
  timestamp_auto_set_type timestamp_field_type;
1636
  LEX_STRING query;
unknown's avatar
unknown committed
1637

1638 1639 1640
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1641
      forced_insert_id(0), query(query_arg)
1642
    {}
unknown's avatar
unknown committed
1643 1644
  ~delayed_row()
  {
1645
    x_free(query.str);
unknown's avatar
unknown committed
1646 1647 1648 1649
    x_free(record);
  }
};

1650
/**
1651
  Delayed_insert - context of a thread responsible for delayed insert
1652 1653 1654 1655
  into one table. When processing delayed inserts, we create an own
  thread for every distinct table. Later on all delayed inserts directed
  into that table are handled by a dedicated thread.
*/
unknown's avatar
unknown committed
1656

1657
class Delayed_insert :public ilink {
unknown's avatar
unknown committed
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1668
  ulong group_count;
unknown's avatar
unknown committed
1669
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1670

1671
  Delayed_insert()
1672
    :locks_in_memory(0),
unknown's avatar
unknown committed
1673 1674 1675
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1676 1677
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1678 1679 1680
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1681 1682
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1683 1684 1685 1686
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1687
    thd.lex->set_stmt_unsafe();
1688
    thd.set_current_stmt_binlog_row_based_if_mixed();
unknown's avatar
unknown committed
1689

1690 1691
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1692
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1693
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1694
    bzero((char*) &info,sizeof(info));
1695
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
1696 1697 1698 1699 1700 1701
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
1702
  ~Delayed_insert()
unknown's avatar
unknown committed
1703
  {
1704
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1705 1706 1707 1708 1709 1710
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1711 1712 1713
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1714 1715
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1716
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1717 1718 1719
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1720
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
unknown's avatar
unknown committed
1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


1750
I_List<Delayed_insert> delayed_threads;
unknown's avatar
unknown committed
1751 1752


1753 1754 1755 1756
/**
  Return an instance of delayed insert thread that can handle
  inserts into a given table, if it exists. Otherwise return NULL.
*/
unknown's avatar
unknown committed
1757

1758
static
1759
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
unknown's avatar
unknown committed
1760 1761 1762
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
1763 1764
  I_List_iterator<Delayed_insert> it(delayed_threads);
  Delayed_insert *tmp;
unknown's avatar
unknown committed
1765 1766
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1767 1768
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
unknown's avatar
unknown committed
1769 1770 1771 1772 1773 1774 1775 1776 1777 1778
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


1779 1780 1781 1782
/**
  Attempt to find or create a delayed insert thread to handle inserts
  into this table.

unknown's avatar
unknown committed
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
  @return In case of success, table_list->table points to a local copy
          of the delayed table or is set to NULL, which indicates a
          request for lock upgrade. In case of failure, value of
          table_list->table is undefined.
  @retval TRUE  - this thread ran out of resources OR
                - a newly created delayed insert thread ran out of
                  resources OR
                - the created thread failed to open and lock the table
                  (e.g. because it does not exist) OR
                - the table opened in the created thread turned out to
                  be a view
  @retval FALSE - table successfully opened OR
                - too many delayed insert threads OR
                - the table has triggers and we have to fall back to
                  a normal INSERT
                Two latter cases indicate a request for lock upgrade.

  XXX: why do we regard INSERT DELAYED into a view as an error and
  do not simply a lock upgrade?
1802 1803
*/

unknown's avatar
unknown committed
1804 1805
static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
unknown's avatar
unknown committed
1806 1807
{
  int error;
1808
  Delayed_insert *tmp;
unknown's avatar
unknown committed
1809 1810
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
1811 1812
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
unknown's avatar
unknown committed
1813

1814
  /* Find the thread which handles this table. */
unknown's avatar
unknown committed
1815 1816
  if (!(tmp=find_handler(thd,table_list)))
  {
1817 1818 1819 1820
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1821
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1822
      DBUG_RETURN(0);
unknown's avatar
unknown committed
1823 1824
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1825 1826 1827 1828 1829
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1830
    {
1831
      if (!(tmp=new Delayed_insert()))
unknown's avatar
unknown committed
1832
      {
1833
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
unknown's avatar
unknown committed
1834 1835
        thd->fatal_error();
        goto end_create;
unknown's avatar
unknown committed
1836
      }
unknown's avatar
unknown committed
1837 1838 1839
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1840 1841 1842
      tmp->thd.set_db(table_list->db, strlen(table_list->db));
      tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
      if (tmp->thd.db == NULL || tmp->thd.query == NULL)
unknown's avatar
unknown committed
1843
      {
unknown's avatar
unknown committed
1844
        /* The error is reported */
unknown's avatar
unknown committed
1845
	delete tmp;
unknown's avatar
unknown committed
1846 1847
        thd->fatal_error();
        goto end_create;
unknown's avatar
unknown committed
1848
      }
unknown's avatar
unknown committed
1849
      tmp->table_list= *table_list;			// Needed to open table
1850
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
unknown's avatar
unknown committed
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
unknown's avatar
unknown committed
1862
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
unknown's avatar
unknown committed
1863 1864
        thd->fatal_error();
        goto end_create;
unknown's avatar
unknown committed
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1877 1878
        if (tmp->thd.net.report_error)
        {
unknown's avatar
unknown committed
1879 1880 1881 1882 1883 1884 1885 1886
          /*
            Copy the error message. Note that we don't treat fatal
            errors in the delayed thread as fatal errors in the
            main thread. Use of my_message will enable stored
            procedures continue handlers.
          */
          my_message(tmp->thd.net.last_errno, tmp->thd.net.last_error,
                     MYF(0));
unknown's avatar
unknown committed
1887 1888
	}
	tmp->unlock();
unknown's avatar
unknown committed
1889
        goto end_create;
unknown's avatar
unknown committed
1890 1891 1892 1893
      }
      if (thd->killed)
      {
	tmp->unlock();
1894
	goto end_create;
unknown's avatar
unknown committed
1895 1896 1897 1898 1899 1900
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
unknown's avatar
unknown committed
1901
  table_list->table= tmp->get_local_table(thd);
unknown's avatar
unknown committed
1902
  pthread_mutex_unlock(&tmp->mutex);
unknown's avatar
unknown committed
1903 1904
  if (table_list->table)
  {
unknown's avatar
unknown committed
1905
    DBUG_ASSERT(thd->net.report_error == 0);
unknown's avatar
unknown committed
1906
    thd->di=tmp;
unknown's avatar
unknown committed
1907
  }
1908 1909
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
1910
  DBUG_RETURN((table_list->table == NULL));
1911

unknown's avatar
unknown committed
1912
end_create:
1913
  pthread_mutex_unlock(&LOCK_delayed_create);
unknown's avatar
unknown committed
1914
  DBUG_RETURN(thd->net.report_error);
unknown's avatar
unknown committed
1915 1916 1917
}


1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928
/**
  As we can't let many client threads modify the same TABLE
  structure of the dedicated delayed insert thread, we create an
  own structure for each client thread. This includes a row
  buffer to save the column values and new fields that point to
  the new row buffer. The memory is allocated in the client
  thread and is freed automatically.

  @pre This function is called from the client thread.  Delayed
       insert thread mutex must be acquired before invoking this
       function.
unknown's avatar
unknown committed
1929 1930 1931

  @return Not-NULL table object on success. NULL in case of an error,
                    which is set in client_thd.
unknown's avatar
unknown committed
1932 1933
*/

1934
TABLE *Delayed_insert::get_local_table(THD* client_thd)
unknown's avatar
unknown committed
1935 1936
{
  my_ptrdiff_t adjust_ptrs;
1937
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
1938
  TABLE *copy;
unknown's avatar
unknown committed
1939
  TABLE_SHARE *share= table->s;
1940
  uchar *bitmap;
1941
  DBUG_ENTER("Delayed_insert::get_local_table");
unknown's avatar
unknown committed
1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
unknown's avatar
unknown committed
1959
      my_message(thd.net.last_errno, thd.net.last_error, MYF(0));
unknown's avatar
unknown committed
1960 1961 1962 1963
      goto error;
    }
  }

1964 1965 1966 1967 1968 1969 1970
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
unknown's avatar
unknown committed
1971
  client_thd->proc_info="allocating local table";
unknown's avatar
unknown committed
1972
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
1973
				   (share->fields+1)*sizeof(Field**)+
1974 1975
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
1976 1977 1978
  if (!copy)
    goto error;

1979
  /* Copy the TABLE object. */
unknown's avatar
unknown committed
1980
  *copy= *table;
unknown's avatar
unknown committed
1981
  /* We don't need to change the file handler here */
1982 1983
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
1984
  bitmap= (uchar*) (field + share->fields + 1);
1985 1986
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
1997
  {
1998
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
unknown's avatar
unknown committed
1999
      goto error;
2000
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
2001
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2002 2003
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
2004 2005 2006 2007 2008 2009 2010 2011
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
2012
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
2013
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
2014
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
2015 2016
  }

2017 2018
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
2019 2020 2021 2022

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

2023 2024 2025 2026 2027 2028 2029 2030 2031
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

2032
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
2033 2034 2035 2036 2037 2038

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
2039
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2040 2041 2042 2043 2044
}


/* Put a question in queue */

unknown's avatar
unknown committed
2045
static
unknown's avatar
unknown committed
2046 2047
int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
                  LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
2048
{
2049
  delayed_row *row= 0;
2050
  Delayed_insert *di=thd->di;
2051
  const Discrete_interval *forced_auto_inc;
unknown's avatar
unknown committed
2052
  DBUG_ENTER("write_delayed");
2053 2054
  DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
                       (ulong) query.length));
unknown's avatar
unknown committed
2055 2056 2057 2058 2059 2060 2061

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

2062
  if (thd->killed)
unknown's avatar
unknown committed
2063 2064
    goto err;

2065 2066 2067 2068 2069 2070 2071
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
2072 2073 2074
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2075
      goto err;
2076 2077
    query.str= str;
  }
2078 2079 2080 2081
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
unknown's avatar
unknown committed
2082
    goto err;
2083
  }
unknown's avatar
unknown committed
2084

2085
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
2086
    goto err;
2087
  memcpy(row->record, table->record[0], table->s->reclength);
unknown's avatar
unknown committed
2088 2089
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
2090 2091 2092 2093 2094 2095 2096 2097 2098 2099
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
2100
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
2101

2102
  /* Copy session variables. */
2103 2104
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
2105 2106 2107 2108 2109 2110 2111
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
2112

unknown's avatar
unknown committed
2113 2114 2115
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
2116
  if (table->s->blob_fields)
unknown's avatar
unknown committed
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}

unknown's avatar
unknown committed
2130 2131 2132 2133
/**
  Signal the delayed insert thread that this user connection
  is finished using it for this statement.
*/
unknown's avatar
unknown committed
2134 2135 2136

static void end_delayed_insert(THD *thd)
{
2137
  DBUG_ENTER("end_delayed_insert");
2138
  Delayed_insert *di=thd->di;
unknown's avatar
unknown committed
2139
  pthread_mutex_lock(&di->mutex);
2140
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
2141 2142 2143 2144 2145 2146
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
2147
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2148 2149 2150 2151 2152 2153 2154 2155 2156
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

2157 2158
  I_List_iterator<Delayed_insert> it(delayed_threads);
  Delayed_insert *tmp;
unknown's avatar
unknown committed
2159 2160
  while ((tmp=it++))
  {
unknown's avatar
SCRUM  
unknown committed
2161
    tmp->thd.killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2162 2163 2164
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
unknown's avatar
unknown committed
2165
      if (tmp->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
2166
      {
unknown's avatar
unknown committed
2167 2168 2169 2170 2171 2172
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
2173
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
unknown's avatar
unknown committed
2174 2175
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

2188
pthread_handler_t handle_delayed_insert(void *arg)
unknown's avatar
unknown committed
2189
{
2190
  Delayed_insert *di=(Delayed_insert*) arg;
unknown's avatar
unknown committed
2191 2192 2193 2194 2195
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
unknown's avatar
unknown committed
2196
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
2197
  thd->end_time();
unknown's avatar
unknown committed
2198
  threads.append(thd);
unknown's avatar
SCRUM  
unknown committed
2199
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
unknown's avatar
unknown committed
2200 2201
  pthread_mutex_unlock(&LOCK_thread_count);

2202 2203 2204 2205 2206 2207 2208 2209
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
unknown's avatar
unknown committed
2210
  pthread_mutex_lock(&di->mutex);
2211
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
unknown's avatar
unknown committed
2212 2213 2214 2215 2216 2217 2218 2219
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
2220
  thd->thread_stack= (char*) &thd;
unknown's avatar
unknown committed
2221
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
2222
  {
2223
    thd->fatal_error();
unknown's avatar
unknown committed
2224
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
2225
    goto err;
unknown's avatar
unknown committed
2226 2227 2228
  }

  /* open table */
unknown's avatar
unknown committed
2229
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
unknown's avatar
unknown committed
2230
  {
2231
    thd->fatal_error();				// Abort waiting inserts
2232
    goto err;
unknown's avatar
unknown committed
2233
  }
2234
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
unknown's avatar
unknown committed
2235
  {
2236
    thd->fatal_error();
2237
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
2238
    goto err;
unknown's avatar
unknown committed
2239
  }
unknown's avatar
unknown committed
2240 2241 2242 2243 2244 2245 2246
  if (di->table->triggers)
  {
    /*
      Table has triggers. This is not an error, but we do
      not support triggers with delayed insert. Terminate the delayed
      thread without an error and thus request lock upgrade.
    */
2247
    goto err;
unknown's avatar
unknown committed
2248
  }
unknown's avatar
unknown committed
2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
unknown's avatar
SCRUM  
unknown committed
2264
    if (thd->killed == THD::KILL_CONNECTION)
unknown's avatar
unknown committed
2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
2284
      set_timespec(abstime, delayed_insert_timeout);
unknown's avatar
unknown committed
2285 2286 2287 2288

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
2289
      di->thd.proc_info="Waiting for INSERT";
unknown's avatar
unknown committed
2290

2291
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
unknown's avatar
unknown committed
2292
      while (!thd->killed)
unknown's avatar
unknown committed
2293 2294
      {
	int error;
2295
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
unknown's avatar
unknown committed
2296 2297 2298 2299
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
2300
	if (error && error != EINTR && error != ETIMEDOUT)
unknown's avatar
unknown committed
2301 2302 2303 2304 2305 2306 2307 2308 2309
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
unknown's avatar
unknown committed
2310
	if (error == ETIMEDOUT || error == ETIME)
unknown's avatar
unknown committed
2311
	{
unknown's avatar
SCRUM  
unknown committed
2312
	  thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2313 2314 2315
	  break;
	}
      }
unknown's avatar
unknown committed
2316 2317
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2318 2319 2320 2321
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
2322
      pthread_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
2323
    }
2324
    di->thd.proc_info=0;
unknown's avatar
unknown committed
2325 2326 2327

    if (di->tables_in_use && ! thd->lock)
    {
2328
      bool not_used;
2329 2330 2331 2332 2333 2334 2335 2336 2337 2338
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
2339
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2340 2341
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
unknown's avatar
unknown committed
2342
      {
2343 2344 2345
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2346 2347 2348 2349 2350 2351 2352
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
2353 2354 2355
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
2356 2357 2358 2359 2360
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
unknown's avatar
unknown committed
2361 2362 2363 2364
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
unknown's avatar
unknown committed
2365 2366 2367
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
2368 2369 2370 2371
      /*
        We need to release next_insert_id before unlocking. This is
        enforced by handler::ha_external_lock().
      */
2372
      di->table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
2373 2374 2375 2376 2377 2378 2379 2380
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

2381 2382 2383 2384 2385 2386 2387
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
2388 2389 2390 2391

    TODO: This is not true any more, table maps are generated on the
    first call to ha_*_row() instead. Remove code that are used to
    cover for the case outlined above.
2392 2393 2394
   */
  ha_rollback_stmt(thd);

2395
#ifndef __WIN__
unknown's avatar
unknown committed
2396
end:
2397
#endif
unknown's avatar
unknown committed
2398 2399 2400 2401 2402 2403 2404
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2405 2406
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
unknown's avatar
unknown committed
2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
2441
      uchar *str;
unknown's avatar
unknown committed
2442 2443 2444 2445 2446 2447 2448 2449
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


2450
bool Delayed_insert::handle_inserts(void)
unknown's avatar
unknown committed
2451 2452
{
  int error;
2453
  ulong max_rows;
2454 2455
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2456
  delayed_row *row;
unknown's avatar
unknown committed
2457 2458 2459 2460 2461 2462
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2463
  table->use_all_columns();
unknown's avatar
unknown committed
2464 2465 2466 2467 2468

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
unknown's avatar
unknown committed
2469
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
unknown's avatar
unknown committed
2470 2471 2472 2473
    goto err;
  }

  thd.proc_info="insert";
2474
  max_rows= delayed_insert_limit;
unknown's avatar
unknown committed
2475
  if (thd.killed || table->needs_reopen_or_name_lock())
unknown's avatar
unknown committed
2476
  {
unknown's avatar
SCRUM  
unknown committed
2477
    thd.killed= THD::KILL_CONNECTION;
2478
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
2479 2480
  }

2481 2482 2483 2484 2485 2486 2487
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2488
  pthread_mutex_lock(&mutex);
2489

unknown's avatar
unknown committed
2490 2491 2492 2493
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2494
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2495 2496 2497

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2498 2499 2500 2501 2502
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
2503 2504 2505
    DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
                           row->query.str : "[NULL]",
                           (ulong) row->query.length));
2506 2507
    if (log_query)
    {
2508 2509 2510 2511 2512 2513 2514 2515
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
2516 2517 2518
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
2519 2520 2521 2522
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2523
    table->timestamp_field_type= row->timestamp_field_type;
unknown's avatar
unknown committed
2524

2525
    /* Copy the session variables. */
2526 2527
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
2528 2529 2530 2531 2532 2533 2534
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
2535

2536
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2537
    info.handle_duplicates= row->dup;
2538
    if (info.ignore ||
unknown's avatar
unknown committed
2539
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2540 2541 2542 2543
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2544 2545 2546 2547 2548 2549 2550
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
2551 2552
    if (info.handle_duplicates == DUP_UPDATE)
      table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
unknown's avatar
unknown committed
2553
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2554
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2555
    {
2556
      info.error_count++;				// Ignore errors
2557
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2558
      row->log_query = 0;
unknown's avatar
unknown committed
2559
    }
2560

unknown's avatar
unknown committed
2561 2562 2563 2564 2565
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2566 2567 2568 2569 2570
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2571

2572
    if (log_query && mysql_bin_log.is_open())
unknown's avatar
unknown committed
2573 2574 2575 2576 2577 2578 2579 2580 2581
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2582 2583 2584
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
unknown's avatar
unknown committed
2585
    }
2586

2587
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2588
      free_delayed_insert_blobs(table);
2589
    thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
unknown's avatar
unknown committed
2590 2591 2592 2593
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2594 2595 2596 2597 2598 2599 2600
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2601
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
2615
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
unknown's avatar
unknown committed
2616 2617
	  goto err;
	}
unknown's avatar
unknown committed
2618
	query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2619 2620 2621
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
unknown's avatar
unknown committed
2622 2623
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
unknown's avatar
unknown committed
2624
	}
2625 2626
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2627 2628 2629 2630 2631 2632 2633 2634 2635
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2636

2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
2651

unknown's avatar
unknown committed
2652 2653 2654 2655
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
2656
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
unknown's avatar
unknown committed
2657 2658
    goto err;
  }
unknown's avatar
unknown committed
2659
  query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2660 2661 2662 2663
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2664 2665 2666
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
2667 2668 2669 2670 2671 2672
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2673 2674 2675
#ifndef DBUG_OFF
    max_rows++;
#endif
2676
  }
2677
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
unknown's avatar
unknown committed
2678 2679 2680 2681
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2682
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2683 2684

/***************************************************************************
unknown's avatar
unknown committed
2685
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2686 2687
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2688 2689 2690 2691 2692 2693 2694 2695 2696

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2697 2698
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2699 2700
*/

unknown's avatar
unknown committed
2701
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2702 2703
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2704
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2705
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
2706
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
2707

unknown's avatar
unknown committed
2708 2709
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
2710
    clause if table is VIEW
unknown's avatar
unknown committed
2711
  */
unknown's avatar
unknown committed
2712
  
unknown's avatar
unknown committed
2713 2714 2715 2716
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
2717
                           &select_lex->where, TRUE, FALSE, FALSE))
unknown's avatar
unknown committed
2718
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2719

2720 2721 2722 2723
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
2724 2725
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2726
  /* skip all leaf tables belonged to view where we are insert */
2727
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2728 2729 2730 2731
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
2732
       first_select_leaf_table= first_select_leaf_table->next_leaf)
2733
  {}
unknown's avatar
unknown committed
2734
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
2735
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
2736 2737 2738
}


2739
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
2740
                             List<Item> *fields_par,
unknown's avatar
unknown committed
2741 2742
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
2743
                             enum_duplicates duplic,
2744 2745
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2746
   autoinc_value_of_last_inserted_row(0),
2747 2748 2749
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
2750 2751 2752 2753
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2754
  if (table_list_par)
unknown's avatar
unknown committed
2755
    info.view= (table_list_par->view ? table_list_par : 0);
2756 2757 2758
}


unknown's avatar
unknown committed
2759
int
2760
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
2761
{
2762
  LEX *lex= thd->lex;
2763
  int res;
2764
  table_map map= 0;
2765
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
2766 2767
  DBUG_ENTER("select_insert::prepare");

2768
  unit= u;
2769

2770 2771 2772 2773 2774 2775
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2776
  res= check_insert_fields(thd, table_list, *fields, values,
2777
                           !insert_into_view, &map) ||
2778
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2779

unknown's avatar
unknown committed
2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791
  if (!res && fields->elements)
  {
    bool saved_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
                                            (MODE_STRICT_TRANS_TABLES |
                                             MODE_STRICT_ALL_TABLES));
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
                                                table_list);
    thd->abort_on_warning= saved_abort_on_warning;
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
2792
  {
2793
    Name_resolution_context *context= &lex->select_lex.context;
2794 2795 2796 2797
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2798 2799

    /* Perform name resolution only in the first table - 'table_list'. */
2800
    table_list->next_local= 0;
2801 2802
    context->resolve_in_table_list_only(table_list);

2803
    lex->select_lex.no_wrap_view_item= TRUE;
2804
    res= res || check_update_fields(thd, context->table_list,
2805
                                    *info.update_fields, &map);
2806 2807
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
2808 2809 2810
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
2811
    */
2812 2813 2814 2815 2816 2817 2818 2819
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
unknown's avatar
unknown committed
2820 2821
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
2822

unknown's avatar
unknown committed
2823 2824
    res= res || setup_fields(thd, 0, *info.update_values,
                             MARK_COLUMNS_READ, 0, 0);
2825
    if (!res)
2826
    {
2827 2828 2829 2830 2831 2832 2833 2834
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
2835

2836 2837 2838
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
2839
                        (uchar*)lex->current_select);
2840
      }
2841 2842 2843
    }

    /* Restore the current context. */
2844
    ctx_state.restore_state(context, table_list);
2845
  }
2846

2847 2848
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
2849
    DBUG_RETURN(1);
2850 2851 2852 2853 2854 2855 2856 2857 2858 2859
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2860
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2861
      unique_table(thd, table_list, table_list->next_global, 0))
2862 2863
  {
    /* Using same table for INSERT and SELECT */
2864 2865
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2866
  }
2867
  else if (!thd->prelocked_mode)
2868 2869 2870 2871 2872 2873 2874
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2875 2876
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2877
    */
2878
    table->file->ha_start_bulk_insert((ha_rows) 0);
2879
  }
2880
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
2881
  table->next_number_field=table->found_next_number_field;
2882 2883 2884 2885 2886 2887 2888 2889 2890

#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    DBUG_RETURN(1);
#endif

unknown's avatar
unknown committed
2891
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2892 2893
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2894 2895 2896
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
2897 2898
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
unknown's avatar
unknown committed
2899
  thd->no_trans_update.stmt= FALSE;
unknown's avatar
unknown committed
2900
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2901 2902 2903
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
unknown's avatar
unknown committed
2904
  res= (table_list->prepare_where(thd, 0, TRUE) ||
2905
        table_list->prepare_check_option(thd));
2906 2907

  if (!res)
unknown's avatar
unknown committed
2908
     prepare_triggers_for_insert_stmt(table);
2909

2910
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2911 2912
}

2913

2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2933 2934
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2935
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2936
  DBUG_RETURN(0);
2937 2938 2939
}


2940 2941 2942 2943 2944 2945
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
2946 2947
select_insert::~select_insert()
{
2948
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
2949 2950 2951
  if (table)
  {
    table->next_number_field=0;
2952
    table->auto_increment_field_not_null= FALSE;
2953
    table->file->ha_reset();
unknown's avatar
unknown committed
2954
  }
2955
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2956
  thd->abort_on_warning= 0;
2957
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2958 2959 2960 2961 2962
}


bool select_insert::send_data(List<Item> &values)
{
2963
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
2964
  bool error=0;
2965

2966
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
2967
  {						// using limit offset,count
2968
    unit->offset_limit_cnt--;
2969
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2970
  }
unknown's avatar
unknown committed
2971

unknown's avatar
unknown committed
2972
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
2973 2974
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2975 2976
  if (thd->net.report_error)
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2977 2978
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
2979
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
2980 2981 2982 2983 2984
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2985
  }
2986

2987
  error= write_record(thd, table, &info);
2988 2989
    
  if (!error)
unknown's avatar
unknown committed
2990
  {
unknown's avatar
unknown committed
2991
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2992 2993
    {
      /*
unknown's avatar
unknown committed
2994 2995 2996
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
2997 2998 2999 3000 3001 3002 3003 3004
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
3005 3006 3007 3008 3009 3010 3011
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
3012 3013 3014 3015 3016 3017
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
3018
  }
unknown's avatar
unknown committed
3019
  DBUG_RETURN(error);
unknown's avatar
unknown committed
3020 3021 3022
}


unknown's avatar
unknown committed
3023 3024 3025
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
3026 3027
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3028
  else
unknown's avatar
unknown committed
3029 3030
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3031 3032
}

unknown's avatar
unknown committed
3033 3034
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
3035 3036
  DBUG_ENTER("select_insert::send_error");

3037
  my_message(errcode, err, MYF(0));
3038

unknown's avatar
unknown committed
3039
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3040 3041 3042 3043 3044
}


bool select_insert::send_eof()
{
3045
  int error;
3046
  bool const trans_table= table->file->has_transactions();
3047
  ulonglong id;
unknown's avatar
unknown committed
3048
  DBUG_ENTER("select_insert::send_eof");
3049 3050
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
                       trans_table, table->file->table_type()));
unknown's avatar
unknown committed
3051

3052
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
unknown's avatar
unknown committed
3053
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3054
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3055

unknown's avatar
unknown committed
3056
  if (info.copied || info.deleted || info.updated)
3057 3058 3059 3060 3061
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
3062
    query_cache_invalidate3(thd, table, 1);
3063 3064 3065 3066 3067 3068 3069
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
3070 3071
    if (!trans_table &&
        (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
3072
      thd->no_trans_update.all= TRUE;
3073
   }
unknown's avatar
unknown committed
3074

3075 3076 3077 3078 3079 3080
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
3081 3082
  if (mysql_bin_log.is_open())
  {
unknown's avatar
unknown committed
3083 3084
    if (!error)
      thd->clear_error();
3085 3086
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
3087
                      trans_table, FALSE);
3088
  }
3089 3090 3091 3092 3093 3094 3095 3096
  /*
    We will call ha_autocommit_or_rollback() also for
    non-transactional tables under row-based replication: there might
    be events in the binary logs transaction, and we need to write
    them to the binary log.
   */
  if (trans_table || thd->current_stmt_binlog_row_based)
  {
3097
    int error2= ha_autocommit_or_rollback(thd, error);
3098
    if (error2 && !error)
3099
      error= error2;
3100
  }
3101
  table->file->ha_release_auto_increment();
3102

3103
  if (error)
unknown's avatar
unknown committed
3104 3105
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
3106
    DBUG_RETURN(1);
unknown's avatar
unknown committed
3107
  }
unknown's avatar
unknown committed
3108
  char buff[160];
3109
  if (info.ignore)
unknown's avatar
unknown committed
3110 3111
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
3112
  else
unknown's avatar
unknown committed
3113
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
3114
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
3115 3116 3117
  thd->row_count_func= info.copied + info.deleted +
                       ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                        info.touched : info.updated);
3118 3119 3120 3121 3122 3123 3124

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
3125
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3126 3127
}

3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180
void select_insert::abort() {

  DBUG_ENTER("select_insert::abort");
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
    and the end of the function.
   */
  if (table)
  {
    /*
      If we are not in prelocked mode, we end the bulk insert started
      before.
    */
    if (!thd->prelocked_mode)
      table->file->ha_end_bulk_insert();

    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
    if (info.copied || info.deleted || info.updated)
    {
      DBUG_ASSERT(table != NULL);
      if (!table->file->has_transactions())
      {
        if (mysql_bin_log.is_open())
          thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                            table->file->has_transactions(), FALSE);
        if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
            !can_rollback_data())
          thd->no_trans_update.all= TRUE;
        query_cache_invalidate3(thd, table, 1);
      }
    }
    table->file->ha_release_auto_increment();
  }

  ha_rollback_stmt(thd);
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
3181 3182

/***************************************************************************
unknown's avatar
unknown committed
3183
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
3184 3185
***************************************************************************/

3186
/*
unknown's avatar
unknown committed
3187 3188
  Create table from lists of fields and items (or just return TABLE
  object for pre-opened existing table).
3189 3190 3191 3192 3193 3194 3195 3196

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
3197 3198
      alter_info   in/out Initial list of columns and indexes for the table
                          to be created
3199 3200
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
3201
                          be added to the end of alter_info->create_list)
3202
      lock         out    Pointer to the MYSQL_LOCK object for table created
unknown's avatar
unknown committed
3203 3204 3205 3206
                          (or open temporary table) will be returned in this
                          parameter. Since this table is not included in
                          THD::lock caller is responsible for explicitly
                          unlocking this table.
unknown's avatar
unknown committed
3207
      hooks
3208 3209

  NOTES
unknown's avatar
unknown committed
3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221
    This function behaves differently for base and temporary tables:
    - For base table we assume that either table exists and was pre-opened
      and locked at open_and_lock_tables() stage (and in this case we just
      emit error or warning and return pre-opened TABLE object) or special
      placeholder was put in table cache that guarantees that this table
      won't be created or opened until the placeholder will be removed
      (so there is an exclusive lock on this table).
    - We don't pre-open existing temporary table, instead we either open
      or create and then open table in this function.

    Since this function contains some logic specific to CREATE TABLE ...
    SELECT it should be changed before it can be used in other contexts.
3222 3223 3224 3225 3226 3227 3228 3229

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
3230
                                      Alter_info *alter_info,
unknown's avatar
unknown committed
3231 3232 3233
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
3234
{
unknown's avatar
unknown committed
3235
  TABLE tmp_table;		// Used during 'Create_field()'
unknown's avatar
unknown committed
3236
  TABLE_SHARE share;
3237 3238 3239 3240 3241 3242 3243 3244 3245
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

unknown's avatar
unknown committed
3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264
  DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););

  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
      create_table->table->db_stat)
  {
    /* Table already exists and was open at open_and_lock_tables() stage. */
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
    {
      create_info->table_existed= 1;		// Mark that table existed
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          create_table->table_name);
      DBUG_RETURN(create_table->table);
    }

    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
    DBUG_RETURN(0);
  }

3265 3266
  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
3267 3268 3269
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

3270 3271
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
3272
  tmp_table.s->db_low_byte_first= 
3273 3274
        test(create_info->db_type == myisam_hton ||
             create_info->db_type == heap_hton);
3275 3276 3277 3278
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
unknown's avatar
unknown committed
3279
    Create_field *cr_field;
3280
    Field *field, *def_field;
3281
    if (item->type() == Item::FUNC_ITEM)
3282
      field= item->tmp_table_field(&tmp_table);
3283
    else
3284 3285 3286
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
3287
    if (!field ||
unknown's avatar
unknown committed
3288
	!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
3289 3290 3291 3292 3293
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
3294
    alter_info->create_list.push_back(cr_field);
3295
  }
unknown's avatar
unknown committed
3296 3297 3298

  DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););

3299
  /*
unknown's avatar
unknown committed
3300 3301 3302 3303 3304
    Create and lock table.

    Note that we either creating (or opening existing) temporary table or
    creating base table on which name we have exclusive lock. So code below
    should not cause deadlocks or races.
3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
  */
  {
    tmp_disable_binlog(thd);
unknown's avatar
unknown committed
3317 3318
    if (!mysql_create_table_no_lock(thd, create_table->db,
                                    create_table->table_name,
3319 3320
                                    create_info, alter_info, 0,
                                    select_field_count))
3321
    {
unknown's avatar
unknown committed
3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362
      if (create_info->table_existed &&
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        /*
          This means that someone created table underneath server
          or it was created via different mysqld front-end to the
          cluster. We don't have much options but throw an error.
        */
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
        DBUG_RETURN(0);
      }

      DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););

      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        VOID(pthread_mutex_lock(&LOCK_open));
        if (reopen_name_locked_table(thd, create_table, FALSE))
        {
          quick_rm_table(create_info->db_type, create_table->db,
                         table_case_name(create_info, create_table->table_name),
                         0);
        }
        else
          table= create_table->table;
        VOID(pthread_mutex_unlock(&LOCK_open));
      }
      else
      {
        if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                                MYSQL_OPEN_TEMPORARY_ONLY)) &&
            !create_info->table_existed)
        {
          /*
            This shouldn't happen as creation of temporary table should make
            it preparable for open. But let us do close_temporary_table() here
            just in case.
          */
          close_temporary_table(thd, create_table);
        }
      }
3363 3364 3365 3366 3367 3368
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

unknown's avatar
unknown committed
3369 3370
  DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););

3371
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
3372
  hooks->prelock(&table, 1);                    // Call prelock hooks
3373
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
3374 3375
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
        hooks->postlock(&table, 1))
3376
  {
3377 3378 3379 3380 3381 3382
    if (*lock)
    {
      mysql_unlock_tables(thd, *lock);
      *lock= 0;
    }

unknown's avatar
unknown committed
3383 3384
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
3385 3386 3387 3388 3389 3390
    DBUG_RETURN(0);
  }
  DBUG_RETURN(table);
}


unknown's avatar
unknown committed
3391
int
3392
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3393 3394
{
  DBUG_ENTER("select_create::prepare");
3395

3396
  TABLEOP_HOOKS *hook_ptr= NULL;
3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414
  /*
    For row-based replication, the CREATE-SELECT statement is written
    in two pieces: the first one contain the CREATE TABLE statement
    necessary to create the table and the second part contain the rows
    that should go into the table.

    For non-temporary tables, the start of the CREATE-SELECT
    implicitly commits the previous transaction, and all events
    forming the statement will be stored the transaction cache. At end
    of the statement, the entire statement is committed as a
    transaction, and all events are written to the binary log.

    On the master, the table is locked for the duration of the
    statement, but since the CREATE part is replicated as a simple
    statement, there is no way to lock the table for accesses on the
    slave.  Hence, we have to hold on to the CREATE part of the
    statement until the statement has finished.
   */
3415 3416
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
3417 3418 3419 3420 3421 3422
    MY_HOOKS(select_create *x, TABLE_LIST *create_table,
             TABLE_LIST *select_tables)
      : ptr(x), all_tables(*create_table)
      {
        all_tables.next_global= select_tables;
      }
3423 3424

  private:
3425
    virtual int do_postlock(TABLE **tables, uint count)
3426
    {
3427 3428 3429 3430
      THD *thd= const_cast<THD*>(ptr->get_thd());
      if (int error= decide_logging_format(thd, &all_tables))
        return error;

3431
      TABLE const *const table = *tables;
3432
      if (thd->current_stmt_binlog_row_based  &&
3433
          !table->s->tmp_table &&
3434 3435 3436 3437
          !ptr->get_create_info()->table_existed)
      {
        ptr->binlog_show_create_table(tables, count);
      }
3438
      return 0;
3439 3440 3441
    }

    select_create *ptr;
3442
    TABLE_LIST all_tables;
3443 3444
  };

3445
  MY_HOOKS hooks(this, create_table, select_tables);
3446
  hook_ptr= &hooks;
3447

3448
  unit= u;
3449 3450

  /*
3451 3452 3453
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
3454 3455 3456 3457 3458 3459 3460
  */
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
      thd->current_stmt_binlog_row_based)
  {
    thd->binlog_start_trans_and_stmt();
  }

3461
  if (!(table= create_table_from_items(thd, create_info, create_table,
3462
                                       alter_info, &values,
3463
                                       &thd->extra_lock, hook_ptr)))
unknown's avatar
unknown committed
3464 3465
    DBUG_RETURN(-1);				// abort() deletes table

3466
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
3467
  {
3468
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
3469 3470 3471
    DBUG_RETURN(-1);
  }

3472
 /* First field to copy */
unknown's avatar
unknown committed
3473 3474 3475 3476
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
3477
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
3478

3479
  /* Don't set timestamp if used */
3480
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
3481 3482
  table->next_number_field=table->found_next_number_field;

3483
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
3484
  thd->cuted_fields=0;
unknown's avatar
unknown committed
3485
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
3486
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3487 3488 3489
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
3490 3491
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3492
  if (!thd->prelocked_mode)
3493
    table->file->ha_start_bulk_insert((ha_rows) 0);
3494
  thd->no_trans_update.stmt= FALSE;
unknown's avatar
unknown committed
3495
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
3496 3497 3498
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
3499 3500 3501
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
unknown's avatar
unknown committed
3502
  table->file->extra(HA_EXTRA_WRITE_CACHE);
3503
  DBUG_RETURN(0);
unknown's avatar
unknown committed
3504 3505
}

3506
void
3507
select_create::binlog_show_create_table(TABLE **tables, uint count)
3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
3525
  */
3526
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
3527
  DBUG_ASSERT(tables && *tables && count > 0);
3528 3529 3530

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
3531
  int result;
3532
  TABLE_LIST table_list;
3533

3534 3535
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
3536
  query.length(0);      // Have to zero it since constructor doesn't
3537

3538
  result= store_create_info(thd, &table_list, &query, create_info);
3539
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3540

3541 3542 3543 3544 3545 3546
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}

unknown's avatar
unknown committed
3547
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
3548
{
unknown's avatar
unknown committed
3549 3550
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
3551 3552
}

unknown's avatar
unknown committed
3553

3554 3555
void select_create::send_error(uint errcode,const char *err)
{
3556 3557 3558 3559 3560 3561 3562
  DBUG_ENTER("select_create::send_error");

  DBUG_PRINT("info",
             ("Current statement %s row-based",
              thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
  DBUG_PRINT("info",
             ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3563
              (ulong) table,
3564 3565 3566 3567 3568
              table && !table->s->tmp_table ? "is NOT" : "is"));
  DBUG_PRINT("info",
             ("Table %s prior to executing this statement",
              get_create_info()->table_existed ? "existed" : "did not exist"));

3569
  /*
3570 3571 3572 3573 3574 3575 3576 3577 3578
    This will execute any rollbacks that are necessary before writing
    the transcation cache.

    We disable the binary log since nothing should be written to the
    binary log.  This disabling is important, since we potentially do
    a "roll back" of non-transactional tables by removing the table,
    and the actual rollback might generate events that should not be
    written to the binary log.

3579 3580 3581 3582
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
3583 3584

  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3585 3586
}

unknown's avatar
unknown committed
3587

unknown's avatar
unknown committed
3588 3589 3590 3591 3592 3593 3594
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
3595 3596 3597 3598 3599 3600 3601 3602
    /*
      Do an implicit commit at end of statement for non-temporary
      tables.  This can fail, but we should unlock the table
      nevertheless.
    */
    if (!table->s->tmp_table)
      ha_commit(thd);               // Can fail, but we proceed anyway

unknown's avatar
unknown committed
3603
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3604
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3605
    if (thd->extra_lock)
3606
    {
unknown's avatar
unknown committed
3607 3608
      mysql_unlock_tables(thd, thd->extra_lock);
      thd->extra_lock=0;
3609
    }
unknown's avatar
unknown committed
3610 3611 3612 3613
  }
  return tmp;
}

unknown's avatar
unknown committed
3614

unknown's avatar
unknown committed
3615 3616
void select_create::abort()
{
3617 3618
  DBUG_ENTER("select_create::abort");

3619 3620 3621 3622 3623 3624 3625 3626
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::abort();
  reenable_binlog(thd);

3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642
  /*
    We roll back the statement, including truncating the transaction
    cache of the binary log, if the statement failed.

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
  */
  if (thd->current_stmt_binlog_row_based)
    ha_rollback_stmt(thd);

3643
  if (thd->extra_lock)
unknown's avatar
unknown committed
3644
  {
3645 3646
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
unknown's avatar
unknown committed
3647
  }
3648

unknown's avatar
unknown committed
3649 3650
  if (table)
  {
unknown's avatar
unknown committed
3651
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3652
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3653 3654
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
unknown's avatar
unknown committed
3655
    table=0;                                    // Safety
unknown's avatar
unknown committed
3656
  }
3657
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3658 3659 3660 3661
}


/*****************************************************************************
unknown's avatar
unknown committed
3662
  Instansiate templates
unknown's avatar
unknown committed
3663 3664
*****************************************************************************/

3665
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
3666
template class List_iterator_fast<List_item>;
3667
#ifndef EMBEDDED_LIBRARY
3668 3669
template class I_List<Delayed_insert>;
template class I_List_iterator<Delayed_insert>;
unknown's avatar
unknown committed
3670
template class I_List<delayed_row>;
3671
#endif /* EMBEDDED_LIBRARY */
3672
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */