sql_insert.cc 122 KB
Newer Older
1
/* Copyright (C) 2000-2006 MySQL AB
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
6

bk@work.mysql.com's avatar
bk@work.mysql.com committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
11

bk@work.mysql.com's avatar
bk@work.mysql.com committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
32
  Delayed_insert::get_local_table() the table of the thread is copied
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

bk@work.mysql.com's avatar
bk@work.mysql.com committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
61
#include "sql_show.h"
62
#include "slave.h"
63
#include "rpl_mi.h"
bk@work.mysql.com's avatar
bk@work.mysql.com committed
64

65
#ifndef EMBEDDED_LIBRARY
66
static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
67
static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
68
                         LEX_STRING query, bool ignore, bool log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
69
static void end_delayed_insert(THD *thd);
70
pthread_handler_t handle_delayed_insert(void *arg);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
71
static void unlink_blobs(register TABLE *table);
72
#endif
73
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
74 75 76 77 78 79 80 81 82 83 84

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
                              table_map *map)
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

144

bk@work.mysql.com's avatar
bk@work.mysql.com committed
145
/*
146
  Check if insert fields are correct.
147 148 149 150 151 152 153

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
ingo@mysql.com's avatar
ingo@mysql.com committed
154
    check_unique                If duplicate values should be rejected.
155 156 157 158 159 160 161 162 163

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
164 165
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
166 167
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
168
                               bool check_unique, table_map *map)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
169
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
170
  TABLE *table= table_list->table;
171

172 173
  if (!table_list->updatable)
  {
174
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
175 176 177
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
178 179
  if (fields.elements == 0 && values.elements != 0)
  {
180 181 182 183 184 185
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
186
    if (values.elements != table->s->fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
187
    {
monty@mysql.com's avatar
monty@mysql.com committed
188
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
189 190
      return -1;
    }
hf@deer.(none)'s avatar
hf@deer.(none) committed
191
#ifndef NO_EMBEDDED_ACCESS_CHECKS
192 193 194
    Field_iterator_table_ref field_it;
    field_it.set(table_list);
    if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
195
      return -1;
hf@deer.(none)'s avatar
hf@deer.(none) committed
196
#endif
197 198
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
199 200 201 202
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
203
    bitmap_set_all(table->write_set);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
204 205 206
  }
  else
  {						// Part field list
207 208
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
209
    Name_resolution_context_state ctx_state;
210
    int res;
211

bk@work.mysql.com's avatar
bk@work.mysql.com committed
212 213
    if (fields.elements != values.elements)
    {
monty@mysql.com's avatar
monty@mysql.com committed
214
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
215 216 217
      return -1;
    }

218
    thd->dup_field= 0;
219 220 221
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
222
    ctx_state.save_state(context, table_list);
223 224 225 226 227

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
228
    table_list->next_local= 0;
229
    context->resolve_in_table_list_only(table_list);
230
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
231 232

    /* Restore the current context. */
233
    ctx_state.restore_state(context, table_list);
234
    thd->lex->select_lex.no_wrap_view_item= FALSE;
235

236
    if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
237
      return -1;
238

igor@rurik.mysql.com's avatar
igor@rurik.mysql.com committed
239
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
240
    {
241
      if (check_view_single_update(fields, table_list, map))
242
        return -1;
243
      table= table_list->table;
244
    }
245

246
    if (check_unique && thd->dup_field)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
247
    {
248
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
249 250
      return -1;
    }
251 252 253 254 255 256 257 258 259 260 261 262
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
263
  }
264
  // For the values we need select_priv
hf@deer.(none)'s avatar
hf@deer.(none) committed
265
#ifndef NO_EMBEDDED_ACCESS_CHECKS
266
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
hf@deer.(none)'s avatar
hf@deer.(none) committed
267
#endif
268 269 270

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
271
       check_view_insertability(thd, table_list)))
272
  {
273
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
274 275 276
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
277 278 279 280
  return 0;
}


281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
300
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
301
                               List<Item> &update_fields, table_map *map)
302
{
ingo@mysql.com's avatar
ingo@mysql.com committed
303
  TABLE *table= insert_table_list->table;
304
  my_bool timestamp_mark;
305

306 307
  LINT_INIT(timestamp_mark);

308 309
  if (table->timestamp_field)
  {
310 311 312 313 314 315
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
316 317
  }

318 319
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
320 321
    return -1;

322 323 324 325
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
      check_view_single_update(update_fields, insert_table_list, map))
    return -1;

326 327 328
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
329 330
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
331 332
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
333 334 335
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
336 337 338 339
  }
  return 0;
}

340
/*
341 342 343 344 345 346 347 348 349 350 351 352
  Prepare triggers  for INSERT-like statement.

  SYNOPSIS
    prepare_triggers_for_insert_stmt()
      table   Table to which insert will happen

  NOTE
    Prepare triggers for INSERT-like statement by marking fields
    used by triggers and inform handlers that batching of UPDATE/DELETE 
    cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/

353
void prepare_triggers_for_insert_stmt(TABLE *table)
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
{
  if (table->triggers)
  {
    if (table->triggers->has_triggers(TRG_EVENT_DELETE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER DELETE triggers that might access to 
        subject table and therefore might need delete to be done 
        immediately. So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
    }
    if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER UPDATE triggers that might access to subject 
        table and therefore might need update to be done immediately. 
        So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
    }
  }
378
  table->mark_columns_needed_for_insert();
379 380
}

381

382 383 384 385 386 387 388 389
/**
  Upgrade table-level lock of INSERT statement to TL_WRITE if
  a more concurrent lock is infeasible for some reason. This is
  necessary for engines without internal locking support (MyISAM).
  An engine with internal locking implementation might later
  downgrade the lock in handler::store_lock() method.
*/

390
static
391 392 393 394 395 396 397
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
                       enum_duplicates duplic,
                       bool is_multi_insert)
{
  if (duplic == DUP_UPDATE ||
      duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT)
  {
398
    *lock_type= TL_WRITE_DEFAULT;
399 400 401 402 403 404 405
    return;
  }

  if (*lock_type == TL_WRITE_DELAYED)
  {
    /*
      We do not use delayed threads if:
406 407 408 409 410
      - we're running in the safe mode or skip-new mode -- the
        feature is disabled in these modes
      - we're executing this statement on a replication slave --
        we need to ensure serial execution of queries on the
        slave
411 412
      - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
        insert cannot be concurrent
413 414 415 416 417 418 419 420 421 422 423 424
      - this statement is directly or indirectly invoked from
        a stored function or trigger (under pre-locking) - to
        avoid deadlocks, since INSERT DELAYED involves a lock
        upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
        attempt while keeping other table level locks.
      - this statement itself may require pre-locking.
        We should upgrade the lock even though in most cases
        delayed functionality may work. Unfortunately, we can't
        easily identify whether the subject table is not used in
        the statement indirectly via a stored function or trigger:
        if it is used, that will lead to a deadlock between the
        client connection and the delayed thread.
425 426
    */
    if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
427 428 429
        thd->variables.max_insert_delayed_threads == 0 ||
        thd->prelocked_mode ||
        thd->lex->uses_stored_routines())
430 431 432 433
    {
      *lock_type= TL_WRITE;
      return;
    }
434 435 436 437 438 439 440 441
    if (thd->slave_thread)
    {
      /* Try concurrent insert */
      *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
                  TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
      return;
    }

442 443
    bool log_on= (thd->options & OPTION_BIN_LOG ||
                  ! (thd->security_ctx->master_access & SUPER_ACL));
444 445
    if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
        log_on && mysql_bin_log.is_open() && is_multi_insert)
446 447 448 449 450 451 452 453 454 455 456 457 458 459
    {
      /*
        Statement-based binary logging does not work in this case, because:
        a) two concurrent statements may have their rows intermixed in the
        queue, leading to autoincrement replication problems on slave (because
        the values generated used for one statement don't depend only on the
        value generated for the first row of this statement, so are not
        replicable)
        b) if first row of the statement has an error the full statement is
        not binlogged, while next rows of the statement may be inserted.
        c) if first row succeeds, statement is binlogged immediately with a
        zero error code (i.e. "no error"), if then second row fails, query
        will fail on slave too and slave will stop (wrongly believing that the
        master got no error).
460 461 462 463 464 465 466 467 468
        So we fallback to non-delayed INSERT.
        Note that to be fully correct, we should test the "binlog format which
        the delayed thread is going to use for this row". But in the common case
        where the global binlog format is not changed and the session binlog
        format may be changed, that is equal to the global binlog format.
        We test it without mutex for speed reasons (condition rarely true), and
        in the common case (global not changed) it is as good as without mutex;
        if global value is changed, anyway there is uncertainty as the delayed
        thread may be old and use the before-the-change value.
469 470 471 472 473 474 475
      */
      *lock_type= TL_WRITE;
    }
  }
}


476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
/**
  Find or create a delayed insert thread for the first table in
  the table list, then open and lock the remaining tables.
  If a table can not be used with insert delayed, upgrade the lock
  and open and lock all tables using the standard mechanism.

  @param thd         thread context
  @param table_list  list of "descriptors" for tables referenced
                     directly in statement SQL text.
                     The first element in the list corresponds to
                     the destination table for inserts, remaining
                     tables, if any, are usually tables referenced
                     by sub-queries in the right part of the
                     INSERT.

  @return Status of the operation. In case of success 'table'
  member of every table_list element points to an instance of
  class TABLE.

  @sa open_and_lock_tables for more information about MySQL table
  level locking
*/

static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
  DBUG_ENTER("open_and_lock_for_insert_delayed");

#ifndef EMBEDDED_LIBRARY
  if (delayed_get_table(thd, table_list))
    DBUG_RETURN(TRUE);

  if (table_list->table)
  {
    /*
      Open tables used for sub-selects or in stored functions, will also
      cache these functions.
    */
    if (open_and_lock_tables(thd, table_list->next_global))
    {
      end_delayed_insert(thd);
      DBUG_RETURN(TRUE);
    }
    /*
      First table was not processed by open_and_lock_tables(),
      we need to set updatability flag "by hand".
    */
    if (!table_list->derived && !table_list->view)
      table_list->updatable= 1;  // usual table
    DBUG_RETURN(FALSE);
  }
#endif
  /*
    * This is embedded library and we don't have auxiliary
    threads OR
    * a lock upgrade was requested inside delayed_get_table
      because
      - there are too many delayed insert threads OR
      - the table has triggers.
    Use a normal insert.
  */
  table_list->lock_type= TL_WRITE;
  DBUG_RETURN(open_and_lock_tables(thd, table_list));
}


542 543 544 545
/**
  INSERT statement implementation
*/

546 547 548 549 550
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
551 552
                  enum_duplicates duplic,
		  bool ignore)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
553
{
554
  int error, res;
555
  bool transactional_table, joins_freed= FALSE;
556
  bool changed;
557
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
558 559 560 561
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
562
  TABLE *table= 0;
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
563
  List_iterator_fast<List_item> its(values_list);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
564
  List_item *values;
565
  Name_resolution_context *context;
566
  Name_resolution_context_state ctx_state;
567
#ifndef EMBEDDED_LIBRARY
568
  char *query= thd->query;
569 570 571 572 573 574
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
575
                (!(thd->security_ctx->master_access & SUPER_ACL)));
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
576
#endif
577
  thr_lock_type lock_type;
monty@mysql.com's avatar
monty@mysql.com committed
578
  Item *unused_conds= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
579 580
  DBUG_ENTER("mysql_insert");

581
  /*
582 583 584 585 586 587 588 589 590 591 592 593
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
  upgrade_lock_type(thd, &table_list->lock_type, duplic,
                    values_list.elements > 1);

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
    never be able to get a lock on the table. QQQ: why not
    upgrade the lock here instead?
  */
594
  if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
595
      find_locked_table(thd, table_list->db, table_list->table_name))
596
  {
597 598 599
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
             table_list->table_name);
    DBUG_RETURN(TRUE);
600
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
601

602
  if (table_list->lock_type == TL_WRITE_DELAYED)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
603
  {
604 605
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
606 607
  }
  else
608 609 610 611
  {
    if (open_and_lock_tables(thd, table_list))
      DBUG_RETURN(TRUE);
  }
612
  lock_type= table_list->lock_type;
613

bk@work.mysql.com's avatar
bk@work.mysql.com committed
614
  thd->proc_info="init";
615
  thd->used_tables=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
616
  values= its++;
617
  value_count= values->elements;
618

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
619
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
monty@mysql.com's avatar
monty@mysql.com committed
620
			   update_fields, update_values, duplic, &unused_conds,
621
                           FALSE,
622 623
                           (fields.elements || !value_count ||
                            table_list->view != 0),
624 625 626
                           !ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
627
    goto abort;
628

629 630 631
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

632
  context= &thd->lex->select_lex.context;
633 634 635 636 637 638 639 640 641
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

642
  /* Save the state of the current name resolution context. */
643
  ctx_state.save_state(context, table_list);
644 645 646 647 648

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
649
  table_list->next_local= 0;
650 651
  context->resolve_in_table_list_only(table_list);

652
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
653 654 655 656
  {
    counter++;
    if (values->elements != value_count)
    {
657
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
658 659
      goto abort;
    }
660
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
661 662 663
      goto abort;
  }
  its.rewind ();
664 665
 
  /* Restore the current context. */
666
  ctx_state.restore_state(context, table_list);
667

bk@work.mysql.com's avatar
bk@work.mysql.com committed
668
  /*
669
    Fill in the given fields and dump it to the table file
bk@work.mysql.com's avatar
bk@work.mysql.com committed
670
  */
671
  bzero((char*) &info,sizeof(info));
672
  info.ignore= ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
673
  info.handle_duplicates=duplic;
674 675
  info.update_fields= &update_fields;
  info.update_values= &update_values;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
676
  info.view= (table_list->view ? table_list : 0);
677

678 679 680
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
681
    to NULL.
682
  */
683
  thd->count_cuted_fields= ((values_list.elements == 1 &&
monty@mysql.com's avatar
monty@mysql.com committed
684
                             !ignore) ?
685 686
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
687 688 689
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

690 691 692 693 694 695 696 697
#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    goto abort;
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
698 699
  error=0;
  thd->proc_info="update";
700
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
701
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
702 703 704
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
antony@ppcg5.local's avatar
antony@ppcg5.local committed
705 706
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
707 708 709 710
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
711 712 713 714
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
715 716 717 718
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
719
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
720
    table->file->ha_start_bulk_insert(values_list.elements);
721

722 723 724
  thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES)));
725

726 727
  prepare_triggers_for_insert_stmt(table);

728

729 730 731 732
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

733
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
734 735 736
  {
    if (fields.elements || !value_count)
    {
737
      restore_record(table,s->default_values);	// Get empty record
738 739 740
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
741
      {
742
	if (values_list.elements != 1 && ! thd->is_error())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
743 744 745 746
	{
	  info.records++;
	  continue;
	}
747 748 749 750 751
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
752 753 754 755 756 757
	error=1;
	break;
      }
    }
    else
    {
758
      if (thd->used_tables)			// Column used in values()
759
	restore_record(table,s->default_values);	// Get empty record
760
      else
761 762 763 764 765 766 767 768 769 770 771
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
772
      {
773
	if (values_list.elements != 1 && ! thd->is_error())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
774 775 776 777 778 779 780 781
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
782

783 784 785
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
786
					     ignore))) ==
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
787 788 789
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
790
    {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
791 792
      error= 1;
      break;
793
    }
794
#ifndef EMBEDDED_LIBRARY
795
    if (lock_type == TL_WRITE_DELAYED)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
796
    {
797 798
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
799 800 801
      query=0;
    }
    else
802
#endif
803
      error=write_record(thd, table ,&info);
804 805
    if (error)
      break;
806
    thd->row_count++;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
807
  }
808

809 810 811
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

812 813 814 815
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
816
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
817 818
  if (lock_type == TL_WRITE_DELAYED)
  {
819 820 821 822 823
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
824
    query_cache_invalidate3(thd, table_list, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
825 826
  }
  else
827
#endif
bk@work.mysql.com's avatar
bk@work.mysql.com committed
828
  {
829 830 831 832 833
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
834
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
835
    {
serg@serg.mylan's avatar
serg@serg.mylan committed
836 837
      table->file->print_error(my_errno,MYF(0));
      error=1;
838
    }
839
    transactional_table= table->file->has_transactions();
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
840

841
    if ((changed= (info.copied || info.deleted || info.updated)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
842
    {
843 844 845 846 847
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
848
      query_cache_invalidate3(thd, table_list, 1);
849 850
    }
    if (changed && error <= 0 || thd->transaction.stmt.modified_non_trans_table
851
	|| was_insert_delayed)
852 853
    {
      if (mysql_bin_log.is_open())
854
      {
855
	if (error <= 0)
856
        {
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
	  /*
	    [Guilhem wrote] Temporary errors may have filled
	    thd->net.last_error/errno.  For example if there has
	    been a disk full error when writing the row, and it was
	    MyISAM, then thd->net.last_error/errno will be set to
	    "disk full"... and the my_pwrite() will wait until free
	    space appears, and so when it finishes then the
	    write_row() was entirely successful
	  */
	  /* todo: consider removing */
	  thd->clear_error();
	}
	/* bug#22725:

	A query which per-row-loop can not be interrupted with
	KILLED, like INSERT, and that does not invoke stored
	routines can be binlogged with neglecting the KILLED error.
        
	If there was no error (error == zero) until after the end of
	inserting loop the KILLED flag that appeared later can be
	disregarded since previously possible invocation of stored
	routines did not result in any error due to the KILLED.  In
	such case the flag is ignored for constructing binlog event.
	*/
	DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
	if (thd->binlog_query(THD::ROW_QUERY_TYPE,
			      thd->query, thd->query_length,
			      transactional_table, FALSE,
			      (error>0) ? thd->killed : THD::NOT_KILLED) &&
	    transactional_table)
        {
	  error=1;
	}
890
      }
891
      if (thd->transaction.stmt.modified_non_trans_table)
892
	thd->transaction.all.modified_non_trans_table= TRUE;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
893
    }
894 895
    DBUG_ASSERT(transactional_table || !changed || 
                thd->transaction.stmt.modified_non_trans_table);
896
    if (transactional_table)
897
      error=ha_autocommit_or_rollback(thd,error);
898
    
bk@work.mysql.com's avatar
bk@work.mysql.com committed
899 900 901
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
902 903 904 905 906 907 908 909 910 911
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
912 913 914 915
      thd->lock=0;
    }
  }
  thd->proc_info="end";
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
932
  table->next_number_field=0;
933
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
934
  table->auto_increment_field_not_null= FALSE;
935
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
936
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
937 938 939
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
940

bk@work.mysql.com's avatar
bk@work.mysql.com committed
941 942 943 944
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
945
  {
946 947 948
    thd->row_count_func= info.copied + info.deleted +
                         ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                          info.touched : info.updated);
949
    send_ok(thd, (ulong) thd->row_count_func, id);
950
  }
951 952
  else
  {
bk@work.mysql.com's avatar
bk@work.mysql.com committed
953
    char buff[160];
954 955
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.touched : info.updated);
956
    if (ignore)
957 958 959
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
960
    else
961
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
962 963
	      (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
    thd->row_count_func= info.copied + info.deleted + updated;
964
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
965
  }
966
  thd->abort_on_warning= 0;
967
  DBUG_RETURN(FALSE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
968 969

abort:
970
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
971 972
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
973
#endif
974
  if (table != NULL)
975
    table->file->ha_release_auto_increment();
976 977
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
978
  thd->abort_on_warning= 0;
979
  DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
980 981 982
}


bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
983 984 985 986 987
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
988
    thd     - thread handler
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
989 990
    view    - reference on VIEW

991 992 993 994 995 996
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
997 998
  RETURN
    FALSE - OK
999 1000 1001
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1002 1003 1004
    TRUE  - can't be used for insert
*/

1005
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1006
{
1007
  uint num= view->view->select_lex.item_list.elements;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1008
  TABLE *table= view->table;
1009 1010 1011
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
monty@mysql.com's avatar
monty@mysql.com committed
1012
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1013
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1014
  MY_BITMAP used_fields;
evgen@sunlight.local's avatar
evgen@sunlight.local committed
1015
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
1016 1017
  DBUG_ENTER("check_key_in_view");

1018 1019 1020
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1021 1022
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

1023
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
1024 1025
  bitmap_clear_all(&used_fields);

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1026
  view->contain_auto_increment= 0;
1027 1028 1029 1030
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
evgen@sunlight.local's avatar
evgen@sunlight.local committed
1031
  thd->mark_used_columns= MARK_COLUMNS_NONE;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1032
  /* check simplicity and prepare unique test of view */
1033
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1034
  {
1035
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1036
    {
evgen@sunlight.local's avatar
evgen@sunlight.local committed
1037
      thd->mark_used_columns= save_mark_used_columns;
1038 1039
      DBUG_RETURN(TRUE);
    }
1040
    Item_field *field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1041
    /* simple SELECT list entry (field without expression) */
bell@sanja.is.com.ua's avatar
merge  
bell@sanja.is.com.ua committed
1042
    if (!(field= trans->item->filed_for_view_update()))
1043
    {
evgen@sunlight.local's avatar
evgen@sunlight.local committed
1044
      thd->mark_used_columns= save_mark_used_columns;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1045
      DBUG_RETURN(TRUE);
1046
    }
1047
    if (field->field->unireg_check == Field::NEXT_NUMBER)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1048 1049
      view->contain_auto_increment= 1;
    /* prepare unique test */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1050 1051 1052 1053 1054
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1055
  }
evgen@sunlight.local's avatar
evgen@sunlight.local committed
1056
  thd->mark_used_columns= save_mark_used_columns;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1057
  /* unique test */
1058
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1059
  {
1060
    /* Thanks to test above, we know that all columns are of type Item_field */
1061
    Item_field *field= (Item_field *)trans->item;
1062 1063 1064
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1065 1066 1067 1068 1069 1070 1071
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1072
/*
1073
  Check if table can be updated
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1074 1075

  SYNOPSIS
1076 1077
     mysql_prepare_insert_check_table()
     thd		Thread handle
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1078
     table_list		Table list
1079 1080
     fields		List of fields to be updated
     where		Pointer to where clause
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1081
     select_insert      Check is making for SELECT ... INSERT
1082 1083

   RETURN
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1084 1085
     FALSE ok
     TRUE  ERROR
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1086
*/
1087

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1088
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1089
                                             List<Item> &fields,
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1090
                                             bool select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1091
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1092
  bool insert_into_view= (table_list->view != 0);
1093
  DBUG_ENTER("mysql_prepare_insert_check_table");
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1094

1095 1096 1097 1098 1099 1100 1101
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

1102 1103
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
1104
                                    table_list,
1105
                                    &thd->lex->select_lex.leaf_tables,
1106
                                    select_insert, INSERT_ACL, SELECT_ACL))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1107
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1108 1109 1110 1111

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
1112 1113 1114 1115
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1116
      DBUG_RETURN(TRUE);
1117
    }
1118
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1119 1120
  }

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1121
  DBUG_RETURN(FALSE);
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
monty@mysql.com's avatar
monty@mysql.com committed
1132 1133
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
monty@mysql.com's avatar
monty@mysql.com committed
1134 1135
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
1136 1137 1138 1139
    check_fields        TRUE if need to check that all INSERT fields are 
                        given values.
    abort_on_warning    whether to report if some INSERT field is not 
                        assigned as an error (TRUE) or as a warning (FALSE).
1140

monty@mishka.local's avatar
monty@mishka.local committed
1141 1142 1143 1144 1145
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1146

1147 1148 1149
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
monty@mysql.com's avatar
monty@mysql.com committed
1150
  
1151
  RETURN VALUE
1152 1153
    FALSE OK
    TRUE  error
1154 1155
*/

monty@mysql.com's avatar
monty@mysql.com committed
1156
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
monty@mysql.com's avatar
monty@mysql.com committed
1157
                          TABLE *table, List<Item> &fields, List_item *values,
monty@mishka.local's avatar
monty@mishka.local committed
1158
                          List<Item> &update_fields, List<Item> &update_values,
monty@mysql.com's avatar
monty@mysql.com committed
1159
                          enum_duplicates duplic,
1160 1161
                          COND **where, bool select_insert,
                          bool check_fields, bool abort_on_warning)
1162
{
monty@mysql.com's avatar
monty@mysql.com committed
1163
  SELECT_LEX *select_lex= &thd->lex->select_lex;
1164
  Name_resolution_context *context= &select_lex->context;
1165
  Name_resolution_context_state ctx_state;
1166
  bool insert_into_view= (table_list->view != 0);
monty@mysql.com's avatar
monty@mysql.com committed
1167
  bool res= 0;
1168
  table_map map= 0;
1169
  DBUG_ENTER("mysql_prepare_insert");
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1170 1171 1172
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
1173 1174
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
monty@mishka.local's avatar
monty@mishka.local committed
1175

1176 1177 1178 1179 1180 1181 1182
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
monty@mysql.com's avatar
monty@mysql.com committed
1183
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

monty@mishka.local's avatar
monty@mishka.local committed
1196
  if (duplic == DUP_UPDATE)
1197 1198
  {
    /* it should be allocated before Item::fix_fields() */
monty@mishka.local's avatar
monty@mishka.local committed
1199
    if (table_list->set_insert_values(thd->mem_root))
monty@mysql.com's avatar
monty@mysql.com committed
1200
      DBUG_RETURN(TRUE);
1201
  }
monty@mishka.local's avatar
monty@mishka.local committed
1202

1203
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1204
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1205

1206 1207

  /* Prepare the fields in the statement. */
1208
  if (values)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1209
  {
1210 1211 1212 1213 1214 1215
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1216
    /*
1217 1218 1219 1220 1221 1222
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

1223 1224
    res= check_insert_fields(thd, context->table_list, fields, *values,
                             !insert_into_view, &map) ||
1225
      setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238

    if (!res && check_fields)
    {
      bool saved_abort_on_warning= thd->abort_on_warning;
      thd->abort_on_warning= abort_on_warning;
      res= check_that_all_fields_are_given_values(thd, 
                                                  table ? table : 
                                                  context->table_list->table,
                                                  context->table_list);
      thd->abort_on_warning= saved_abort_on_warning;
    }

    if (!res && duplic == DUP_UPDATE)
monty@mysql.com's avatar
monty@mysql.com committed
1239
    {
1240 1241 1242
      select_lex->no_wrap_view_item= TRUE;
      res= check_update_fields(thd, context->table_list, update_fields, &map);
      select_lex->no_wrap_view_item= FALSE;
monty@mysql.com's avatar
monty@mysql.com committed
1243
    }
1244 1245 1246 1247

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);

monty@mysql.com's avatar
monty@mysql.com committed
1248
    if (!res)
1249
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
monty@mysql.com's avatar
monty@mysql.com committed
1250
  }
1251

monty@mysql.com's avatar
monty@mysql.com committed
1252 1253
  if (res)
    DBUG_RETURN(res);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1254

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1255 1256 1257
  if (!table)
    table= table_list->table;

1258
  if (!select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1259
  {
1260
    Item *fake_conds= 0;
1261
    TABLE_LIST *duplicate;
1262
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1263
    {
1264
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1265 1266
      DBUG_RETURN(TRUE);
    }
1267
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
monty@mysql.com's avatar
monty@mysql.com committed
1268
    select_lex->first_execution= 0;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1269
  }
1270
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1271
    table->prepare_for_position();
1272
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1273 1274 1275
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
1276 1277 1278 1279
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1280
  while (++keynr < table->s->keys)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1281 1282 1283 1284 1285 1286 1287
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
1298

1299 1300 1301 1302 1303
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
1304

1305
    Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
1306 1307 1308 1309 1310
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1311 1312 1313
*/


1314
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1315
{
1316
  int error, trg_error= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1317
  char *key=0;
1318
  MY_BITMAP *save_read_set, *save_write_set;
1319 1320
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1321
  DBUG_ENTER("write_record");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1322

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1323
  info->records++;
1324 1325
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1326

1327 1328
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1329
  {
1330
    while ((error=table->file->ha_write_row(table->record[0])))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1331
    {
1332
      uint key_nr;
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1344
      bool is_duplicate_key_error;
1345
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1346
	goto err;
1347
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
1348 1349
      if (!is_duplicate_key_error)
      {
1350 1351 1352 1353 1354
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
1355
        if (info->ignore)
1356 1357
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
1358
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1359 1360
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1361
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1362 1363
	goto err;
      }
1364 1365
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1366 1367 1368 1369 1370
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1371 1372
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1373
          key_nr == table->s->next_number_index &&
1374
	  (insert_id_for_cur_row > 0))
1375
	goto err;
1376
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1377
      {
1378
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1379 1380 1381 1382
	  goto err;
      }
      else
      {
1383
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1384 1385 1386 1387
	{
	  error=my_errno;
	  goto err;
	}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1388

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1389 1390
	if (!key)
	{
1391
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1392 1393 1394 1395 1396 1397
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1398
	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1399 1400 1401
	if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
                                                    (uchar*) key, HA_WHOLE_KEY,
                                                    HA_READ_KEY_EXACT))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1402 1403
	  goto err;
      }
1404
      if (info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1405
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1406
        int res= 0;
monty@mysql.com's avatar
monty@mysql.com committed
1407 1408 1409 1410
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1411
        */
1412
	DBUG_ASSERT(table->insert_values != NULL);
1413 1414
        store_record(table,insert_values);
        restore_record(table,record[1]);
monty@mysql.com's avatar
monty@mysql.com committed
1415 1416
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
1417
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
gshchepa/uchum@gleb.loc's avatar
gshchepa/uchum@gleb.loc committed
1418 1419
                                                 *info->update_values,
                                                 info->ignore,
1420 1421 1422
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1423 1424

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1425 1426 1427
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
1428
          goto ok_or_after_trg_err;
monty@mysql.com's avatar
monty@mysql.com committed
1429
        if (res == VIEW_CHECK_ERROR)
1430
          goto before_trg_err;
1431

1432
        table->file->restore_auto_increment(prev_insert_id);
1433 1434 1435 1436
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;
1437 1438
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
             !bitmap_is_subset(table->write_set, table->read_set)) ||
1439
            compare_record(table))
1440
        {
1441
          if ((error=table->file->ha_update_row(table->record[1],
1442 1443
                                                table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1444
          {
1445 1446
            if (info->ignore &&
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1447 1448 1449 1450
            {
              goto ok_or_after_trg_err;
            }
            goto err;
1451
          }
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1452

1453 1454 1455 1456
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->updated++;
          else
            error= 0;
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1457 1458 1459 1460 1461 1462 1463 1464
          /*
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
            like a regular UPDATE statement: it should not affect the value of a
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
            handled separately by THD::arg_of_last_insert_id_function.
          */
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1465 1466
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1467
                                                        TRG_ACTION_AFTER, TRUE));
1468 1469
          info->copied++;
        }
1470

1471 1472 1473 1474 1475
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;

1476
        goto ok_or_after_trg_err;
1477 1478 1479
      }
      else /* DUP_REPLACE */
      {
monty@mysql.com's avatar
monty@mysql.com committed
1480 1481 1482 1483 1484
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1485 1486
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1487 1488 1489 1490 1491 1492
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
monty@mysql.com's avatar
monty@mysql.com committed
1493 1494
	*/
	if (last_uniq_key(table,key_nr) &&
1495
	    !table->file->referenced_by_foreign_key() &&
1496
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1497 1498
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1499
        {
1500
          if ((error=table->file->ha_update_row(table->record[1],
1501 1502
					        table->record[0])) &&
              error != HA_ERR_RECORD_IS_THE_SAME)
1503
            goto err;
1504 1505 1506 1507
          if (error != HA_ERR_RECORD_IS_THE_SAME)
            info->deleted++;
          else
            error= 0;
1508
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1509 1510 1511 1512 1513
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
1514 1515 1516 1517 1518 1519 1520
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1521
          if ((error=table->file->ha_delete_row(table->record[1])))
1522 1523 1524
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
1525
            thd->transaction.stmt.modified_non_trans_table= TRUE;
1526 1527 1528 1529 1530 1531 1532 1533
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1534
        }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1535 1536
      }
    }
1537
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1538 1539 1540 1541 1542 1543 1544
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1545
  }
1546
  else if ((error=table->file->ha_write_row(table->record[0])))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1547
  {
1548
    if (!info->ignore ||
1549
        table->file->is_fatal_error(error, HA_CHECK_DUP))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1550
      goto err;
1551
    table->file->restore_auto_increment(prev_insert_id);
1552
    goto ok_or_after_trg_err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1553
  }
1554 1555 1556

after_trg_n_copied_inc:
  info->copied++;
1557
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1558 1559 1560
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
1561 1562

ok_or_after_trg_err:
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1563
  if (key)
1564
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1565
  if (!table->file->has_transactions())
1566
    thd->transaction.stmt.modified_non_trans_table= TRUE;
1567
  DBUG_RETURN(trg_error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1568 1569

err:
1570
  info->last_errno= error;
1571 1572 1573
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1574
  table->file->print_error(error,MYF(0));
1575
  
1576
before_trg_err:
1577
  table->file->restore_auto_increment(prev_insert_id);
1578 1579
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1580
  table->column_bitmaps_set(save_read_set, save_write_set);
1581
  DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1582 1583 1584 1585
}


/******************************************************************************
1586
  Check that all fields with arn't null_fields are used
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1587 1588
******************************************************************************/

1589 1590
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1591
{
1592
  int err= 0;
1593 1594
  MY_BITMAP *write_set= entry->write_set;

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1595 1596
  for (Field **field=entry->field ; *field ; field++)
  {
1597
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1598
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1599
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1600
    {
1601 1602 1603
      bool view= FALSE;
      if (table_list)
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1604
        table_list= table_list->top_table();
kent@mysql.com's avatar
kent@mysql.com committed
1605
        view= test(table_list->view);
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1622
      err= 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1623 1624
    }
  }
1625
  return thd->abort_on_warning ? err : 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1626 1627 1628
}

/*****************************************************************************
1629 1630
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1631 1632
*****************************************************************************/

1633 1634
#ifndef EMBEDDED_LIBRARY

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1635 1636
class delayed_row :public ilink {
public:
1637
  char *record;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1638 1639
  enum_duplicates dup;
  time_t start_time;
1640 1641
  ulong sql_mode;
  bool auto_increment_field_not_null;
1642 1643 1644
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1645
  ulonglong forced_insert_id;
1646 1647
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1648
  timestamp_auto_set_type timestamp_field_type;
1649
  LEX_STRING query;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1650

1651 1652 1653
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1654
      forced_insert_id(0), query(query_arg)
1655
    {}
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1656 1657
  ~delayed_row()
  {
1658
    x_free(query.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1659 1660 1661 1662
    x_free(record);
  }
};

1663
/**
1664
  Delayed_insert - context of a thread responsible for delayed insert
1665 1666 1667 1668
  into one table. When processing delayed inserts, we create an own
  thread for every distinct table. Later on all delayed inserts directed
  into that table are handled by a dedicated thread.
*/
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1669

1670
class Delayed_insert :public ilink {
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1671 1672 1673 1674 1675 1676 1677 1678 1679 1680
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1681
  ulong group_count;
1682
  TABLE_LIST table_list;			// Argument
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1683

1684
  Delayed_insert()
1685
    :locks_in_memory(0),
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1686 1687 1688
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1689 1690
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1691 1692 1693
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1694 1695
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1696 1697 1698 1699
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1700
    thd.lex->set_stmt_unsafe();
1701
    thd.set_current_stmt_binlog_row_based_if_mixed();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1702

1703 1704
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1705
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1706
    thd.security_ctx->host_or_ip= "";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1707
    bzero((char*) &info,sizeof(info));
1708
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1709 1710 1711 1712 1713 1714
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
1715
  ~Delayed_insert()
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1716
  {
1717
    /* The following is not really needed, but just for safety */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1718 1719 1720 1721 1722 1723
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
1724 1725 1726
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1727 1728
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1729
    thd.security_ctx->user= thd.security_ctx->host=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1730 1731 1732
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1733
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


1763
I_List<Delayed_insert> delayed_threads;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1764 1765


1766 1767 1768 1769
/**
  Return an instance of delayed insert thread that can handle
  inserts into a given table, if it exists. Otherwise return NULL.
*/
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1770

1771
static
1772
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1773 1774 1775
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
1776
  I_List_iterator<Delayed_insert> it(delayed_threads);
1777 1778
  Delayed_insert *di;
  while ((di= it++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1779
  {
1780 1781
    if (!strcmp(table_list->db, di->table_list.db) &&
	!strcmp(table_list->table_name, di->table_list.table_name))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1782
    {
1783
      di->lock();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1784 1785 1786 1787
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
1788
  return di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1789 1790 1791
}


1792 1793 1794 1795
/**
  Attempt to find or create a delayed insert thread to handle inserts
  into this table.

1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
  @return In case of success, table_list->table points to a local copy
          of the delayed table or is set to NULL, which indicates a
          request for lock upgrade. In case of failure, value of
          table_list->table is undefined.
  @retval TRUE  - this thread ran out of resources OR
                - a newly created delayed insert thread ran out of
                  resources OR
                - the created thread failed to open and lock the table
                  (e.g. because it does not exist) OR
                - the table opened in the created thread turned out to
                  be a view
  @retval FALSE - table successfully opened OR
                - too many delayed insert threads OR
                - the table has triggers and we have to fall back to
                  a normal INSERT
                Two latter cases indicate a request for lock upgrade.

  XXX: why do we regard INSERT DELAYED into a view as an error and
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
  do not simply perform a lock upgrade?

  TODO: The approach with using two mutexes to work with the
  delayed thread list -- LOCK_delayed_insert and
  LOCK_delayed_create -- is redundant, and we only need one of
  them to protect the list.  The reason we have two locks is that
  we do not want to block look-ups in the list while we're waiting
  for the newly created thread to open the delayed table. However,
  this wait itself is redundant -- we always call get_local_table
  later on, and there wait again until the created thread acquires
  a table lock.

  As is redundant the concept of locks_in_memory, since we already
  have another counter with similar semantics - tables_in_use,
  both of them are devoted to counting the number of producers for
  a given consumer (delayed insert thread), only at different
  stages of producer-consumer relationship.

  'dead' and 'status' variables in Delayed_insert are redundant
  too, since there is already 'di->thd.killed' and
  di->stacked_inserts.
1835 1836
*/

1837 1838
static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1839 1840
{
  int error;
1841
  Delayed_insert *di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1842 1843
  DBUG_ENTER("delayed_get_table");

1844 1845
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1846

1847
  /* Find the thread which handles this table. */
1848
  if (!(di= find_handler(thd, table_list)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1849
  {
1850 1851 1852 1853
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1854
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1855
      DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1856 1857
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1858 1859 1860 1861
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
1862
    if (! (di= find_handler(thd, table_list)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1863
    {
1864
      if (!(di= new Delayed_insert()))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1865
      {
1866
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
1867 1868
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1869
      }
1870 1871 1872
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
1873 1874 1875
      di->thd.set_db(table_list->db, strlen(table_list->db));
      di->thd.query= my_strdup(table_list->table_name, MYF(MY_WME));
      if (di->thd.db == NULL || di->thd.query == NULL)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1876
      {
1877
        /* The error is reported */
1878
	delete di;
1879 1880
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1881
      }
1882
      di->table_list= *table_list;			// Needed to open table
1883
      /* Replace volatile strings with local copies */
1884 1885 1886 1887 1888 1889
      di->table_list.alias= di->table_list.table_name= di->thd.query;
      di->table_list.db= di->thd.db;
      di->lock();
      pthread_mutex_lock(&di->mutex);
      if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
                                 handle_delayed_insert, (void*) di)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1890 1891 1892 1893
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
1894 1895 1896
	pthread_mutex_unlock(&di->mutex);
	di->unlock();
	delete di;
1897
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1898 1899
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1900 1901 1902 1903
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
1904
      while (!di->thd.killed && !di->table && !thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1905
      {
1906
	pthread_cond_wait(&di->cond_client, &di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1907
      }
1908
      pthread_mutex_unlock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1909
      thd->proc_info="got old table";
1910
      if (di->thd.killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1911
      {
1912
        if (di->thd.is_error())
1913
        {
1914 1915 1916 1917 1918 1919
          /*
            Copy the error message. Note that we don't treat fatal
            errors in the delayed thread as fatal errors in the
            main thread. Use of my_message will enable stored
            procedures continue handlers.
          */
1920
          my_message(di->thd.main_da.sql_errno(), di->thd.main_da.message(),
1921
                     MYF(0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1922
	}
1923
	di->unlock();
1924
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1925 1926 1927
      }
      if (thd->killed)
      {
1928
	di->unlock();
1929
	goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1930
      }
1931
      pthread_mutex_lock(&LOCK_delayed_insert);
1932
      delayed_threads.append(di);
1933
      pthread_mutex_unlock(&LOCK_delayed_insert);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1934 1935 1936 1937
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

1938 1939 1940
  pthread_mutex_lock(&di->mutex);
  table_list->table= di->get_local_table(thd);
  pthread_mutex_unlock(&di->mutex);
1941 1942
  if (table_list->table)
  {
1943
    DBUG_ASSERT(! thd->is_error());
1944
    thd->di= di;
1945
  }
1946
  /* Unlock the delayed insert object after its last access. */
1947
  di->unlock();
1948
  DBUG_RETURN((table_list->table == NULL));
1949

1950
end_create:
1951
  pthread_mutex_unlock(&LOCK_delayed_create);
1952
  DBUG_RETURN(thd->is_error());
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1953 1954 1955
}


1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966
/**
  As we can't let many client threads modify the same TABLE
  structure of the dedicated delayed insert thread, we create an
  own structure for each client thread. This includes a row
  buffer to save the column values and new fields that point to
  the new row buffer. The memory is allocated in the client
  thread and is freed automatically.

  @pre This function is called from the client thread.  Delayed
       insert thread mutex must be acquired before invoking this
       function.
1967 1968 1969

  @return Not-NULL table object on success. NULL in case of an error,
                    which is set in client_thd.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1970 1971
*/

1972
TABLE *Delayed_insert::get_local_table(THD* client_thd)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1973 1974
{
  my_ptrdiff_t adjust_ptrs;
1975
  Field **field,**org_field, *found_next_number_field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1976
  TABLE *copy;
1977
  TABLE_SHARE *share;
1978
  uchar *bitmap;
1979
  DBUG_ENTER("Delayed_insert::get_local_table");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
1997
      my_message(thd.main_da.sql_errno(), thd.main_da.message(), MYF(0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1998 1999 2000
      goto error;
    }
  }
2001
  share= table->s;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2002

2003 2004 2005 2006 2007 2008 2009
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2010
  client_thd->proc_info="allocating local table";
2011
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
2012
				   (share->fields+1)*sizeof(Field**)+
2013 2014
				   share->reclength +
                                   share->column_bitmap_size*2);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2015 2016 2017
  if (!copy)
    goto error;

2018
  /* Copy the TABLE object. */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2019
  *copy= *table;
2020
  /* We don't need to change the file handler here */
2021 2022
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
2023
  bitmap= (uchar*) (field + share->fields + 1);
2024 2025
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2026 2027 2028 2029 2030 2031 2032 2033 2034 2035
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2036
  {
2037
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
2038
      goto error;
2039
    (*field)->orig_table= copy;			// Remove connection
2040
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2041 2042
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2043 2044 2045 2046 2047 2048 2049 2050
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
2051
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
2052
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
2053
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2054 2055
  }

2056 2057
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
2058 2059 2060 2061

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

2062 2063 2064 2065 2066 2067 2068 2069 2070
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

2071
  DBUG_RETURN(copy);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2072 2073 2074 2075 2076 2077

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
2078
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2079 2080 2081 2082 2083
}


/* Put a question in queue */

2084
static
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
2085 2086
int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
                  LEX_STRING query, bool ignore, bool log_on)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2087
{
2088
  delayed_row *row= 0;
2089
  Delayed_insert *di=thd->di;
2090
  const Discrete_interval *forced_auto_inc;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2091
  DBUG_ENTER("write_delayed");
2092 2093
  DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
                       (ulong) query.length));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2094 2095 2096 2097 2098 2099 2100

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

2101
  if (thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2102 2103
    goto err;

2104 2105 2106 2107 2108 2109 2110
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
2111 2112 2113
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2114
      goto err;
2115 2116
    query.str= str;
  }
2117 2118 2119 2120
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2121
    goto err;
2122
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2123

2124
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2125
    goto err;
2126
  memcpy(row->record, table->record[0], table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2127 2128
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
2139
  row->timestamp_field_type=    table->timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2140

2141
  /* Copy session variables. */
2142 2143
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
2144 2145 2146
  row->sql_mode=                 thd->variables.sql_mode;
  row->auto_increment_field_not_null= table->auto_increment_field_not_null;

2147 2148 2149 2150 2151 2152 2153
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
2154

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2155 2156 2157
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
2158
  if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}

2172 2173 2174 2175
/**
  Signal the delayed insert thread that this user connection
  is finished using it for this statement.
*/
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2176 2177 2178

static void end_delayed_insert(THD *thd)
{
2179
  DBUG_ENTER("end_delayed_insert");
2180
  Delayed_insert *di=thd->di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2181
  pthread_mutex_lock(&di->mutex);
2182
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2183 2184 2185 2186 2187 2188
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
2189
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2190 2191 2192 2193 2194 2195 2196 2197 2198
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

2199
  I_List_iterator<Delayed_insert> it(delayed_threads);
2200 2201
  Delayed_insert *di;
  while ((di= it++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2202
  {
2203 2204
    di->thd.killed= THD::KILL_CONNECTION;
    if (di->thd.mysys_var)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2205
    {
2206 2207
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      if (di->thd.mysys_var->current_cond)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2208
      {
2209 2210 2211 2212
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
2213 2214 2215 2216 2217
	if (&di->mutex != di->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(di->thd.mysys_var->current_mutex);
	pthread_cond_broadcast(di->thd.mysys_var->current_cond);
	if (&di->mutex != di->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2218
      }
2219
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2220 2221 2222 2223 2224 2225 2226 2227 2228 2229
    }
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

2230
pthread_handler_t handle_delayed_insert(void *arg)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2231
{
2232
  Delayed_insert *di=(Delayed_insert*) arg;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2233 2234 2235 2236 2237
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
2238
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
2239
  thd->set_current_time();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2240
  threads.append(thd);
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2241
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2242 2243
  pthread_mutex_unlock(&LOCK_thread_count);

2244 2245 2246 2247 2248 2249 2250 2251
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2252
  pthread_mutex_lock(&di->mutex);
2253
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2254 2255
  if (my_thread_init())
  {
2256 2257 2258
    /* Can't use my_error since store_globals has not yet been called */
    thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
                                  ER(ER_OUT_OF_RESOURCES));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2259 2260 2261 2262 2263
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
2264
  thd->thread_stack= (char*) &thd;
2265
  if (init_thr_lock() || thd->store_globals())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2266
  {
2267 2268 2269
    /* Can't use my_error since store_globals has perhaps failed */
    thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
                                  ER(ER_OUT_OF_RESOURCES));
2270
    thd->fatal_error();
2271
    goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2272 2273
  }

2274 2275 2276 2277 2278 2279
  /*
    Open table requires an initialized lex in case the table is
    partitioned. The .frm file contains a partial SQL string which is
    parsed using a lex, that depends on initialized thd->lex.
  */
  lex_start(thd);
2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290
  thd->lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
  /*
    Statement-based replication of INSERT DELAYED has problems with RAND()
    and user vars, so in mixed mode we go to row-based.
  */
  thd->lex->set_stmt_unsafe();
  thd->set_current_stmt_binlog_row_based_if_mixed();

  /* Open table */
  if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
                                            TL_WRITE_DELAYED)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2291
  {
2292
    thd->fatal_error();				// Abort waiting inserts
2293
    goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2294
  }
2295
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2296
  {
2297
    thd->fatal_error();
2298
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
2299
    goto err;
2300
  }
2301 2302 2303 2304 2305 2306 2307
  if (di->table->triggers)
  {
    /*
      Table has triggers. This is not an error, but we do
      not support triggers with delayed insert. Terminate the delayed
      thread without an error and thus request lock upgrade.
    */
2308
    goto err;
2309
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
  di->table->copy_blobs=1;

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2320
    if (thd->killed == THD::KILL_CONNECTION)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
2340
      set_timespec(abstime, delayed_insert_timeout);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2341 2342 2343 2344

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
2345
      di->thd.proc_info="Waiting for INSERT";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2346

2347
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
2348
      while (!thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2349 2350
      {
	int error;
2351
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2352 2353 2354 2355
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
2356
	if (error && error != EINTR && error != ETIMEDOUT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2357 2358 2359 2360 2361 2362 2363 2364 2365
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
monty@mysql.com's avatar
monty@mysql.com committed
2366
	if (error == ETIMEDOUT || error == ETIME)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2367
	{
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2368
	  thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2369 2370 2371
	  break;
	}
      }
2372 2373
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2374 2375 2376 2377
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
2378
      pthread_mutex_lock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2379
    }
2380
    di->thd.proc_info=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2381 2382 2383

    if (di->tables_in_use && ! thd->lock)
    {
2384
      bool not_used;
2385 2386 2387 2388 2389 2390 2391 2392 2393 2394
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
2395
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2396 2397
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2398
      {
2399 2400 2401
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2402 2403 2404 2405 2406 2407 2408
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
2409 2410 2411
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2412 2413 2414 2415 2416
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
monty@mysql.com's avatar
monty@mysql.com committed
2417 2418 2419 2420
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2421 2422 2423
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
2424 2425 2426 2427
      /*
        We need to release next_insert_id before unlocking. This is
        enforced by handler::ha_external_lock().
      */
2428
      di->table->file->ha_release_auto_increment();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2429 2430 2431 2432 2433 2434 2435 2436
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

2437 2438 2439 2440 2441 2442 2443
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
2444 2445 2446 2447

    TODO: This is not true any more, table maps are generated on the
    first call to ha_*_row() instead. Remove code that are used to
    cover for the case outlined above.
2448 2449 2450
   */
  ha_rollback_stmt(thd);

2451
#ifndef __WIN__
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2452
end:
2453
#endif
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2454 2455 2456 2457 2458 2459 2460
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2461 2462
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
2497
      uchar *str;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2498 2499 2500 2501 2502 2503 2504 2505
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


2506
bool Delayed_insert::handle_inserts(void)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2507 2508
{
  int error;
2509
  ulong max_rows;
2510 2511
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2512
  delayed_row *row;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2513 2514 2515 2516 2517 2518
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2519
  table->use_all_columns();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2520 2521 2522 2523 2524

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
2525
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2526 2527 2528 2529
    goto err;
  }

  thd.proc_info="insert";
2530
  max_rows= delayed_insert_limit;
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
2531
  if (thd.killed || table->needs_reopen_or_name_lock())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2532
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2533
    thd.killed= THD::KILL_CONNECTION;
2534
    max_rows= ULONG_MAX;                     // Do as much as possible
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2535 2536
  }

2537 2538 2539 2540 2541 2542 2543
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2544
  pthread_mutex_lock(&mutex);
2545

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2546 2547 2548 2549
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2550
    memcpy(table->record[0],row->record,table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2551 2552 2553

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2554 2555 2556 2557 2558
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
2559 2560 2561
    DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
                           row->query.str : "[NULL]",
                           (ulong) row->query.length));
2562 2563
    if (log_query)
    {
2564 2565 2566 2567 2568 2569 2570 2571
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
2572 2573 2574
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
2575 2576 2577 2578
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2579
    table->timestamp_field_type= row->timestamp_field_type;
2580
    table->auto_increment_field_not_null= row->auto_increment_field_not_null;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2581

2582
    /* Copy the session variables. */
2583 2584
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
2585 2586
    thd.variables.sql_mode=                 row->sql_mode;

2587 2588 2589 2590 2591 2592 2593
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
2594

2595
    info.ignore= row->ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2596
    info.handle_duplicates= row->dup;
2597
    if (info.ignore ||
2598
	info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2599 2600 2601 2602
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2603 2604 2605 2606 2607 2608 2609
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
antony@ppcg5.local's avatar
antony@ppcg5.local committed
2610 2611
    if (info.handle_duplicates == DUP_UPDATE)
      table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2612
    thd.clear_error(); // reset error for binlog
2613
    if (write_record(&thd, table, &info))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2614
    {
2615
      info.error_count++;				// Ignore errors
2616
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
sasha@laptop.slkc.uswest.net's avatar
sasha@laptop.slkc.uswest.net committed
2617
      row->log_query = 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2618
    }
2619

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2620 2621 2622 2623 2624
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2625 2626 2627 2628 2629
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2630

2631
    if (log_query && mysql_bin_log.is_open())
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
2632 2633 2634 2635 2636 2637 2638 2639 2640
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2641 2642 2643
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
2644
    }
2645

2646
    if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2647
      free_delayed_insert_blobs(table);
2648
    thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2649 2650 2651 2652
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2653 2654 2655 2656 2657 2658 2659
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2660
	(!(row->log_query & using_bin_log)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
2673
	  sql_print_error("%s", thd.main_da.message());
2674
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2675 2676
	  goto err;
	}
2677
	query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2678 2679 2680
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
2681 2682
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2683
	}
2684 2685
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2686 2687 2688 2689 2690 2691 2692 2693 2694
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2695

2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
2710

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2711 2712 2713
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
2714
    sql_print_error("%s", thd.main_da.message());
2715
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2716 2717
    goto err;
  }
2718
  query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2719 2720 2721 2722
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2723 2724 2725
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
2726 2727 2728 2729 2730 2731
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2732 2733 2734
#ifndef DBUG_OFF
    max_rows++;
#endif
2735
  }
2736
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2737 2738 2739 2740
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2741
#endif /* EMBEDDED_LIBRARY */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2742 2743

/***************************************************************************
2744
  Store records in INSERT ... SELECT *
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2745 2746
***************************************************************************/

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2747 2748 2749 2750 2751 2752 2753 2754 2755

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
2756 2757
    FALSE OK
    TRUE  Error
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2758 2759
*/

2760
bool mysql_insert_select_prepare(THD *thd)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2761 2762
{
  LEX *lex= thd->lex;
monty@mysql.com's avatar
monty@mysql.com committed
2763
  SELECT_LEX *select_lex= &lex->select_lex;
monty@mysql.com's avatar
monty@mysql.com committed
2764
  TABLE_LIST *first_select_leaf_table;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2765
  DBUG_ENTER("mysql_insert_select_prepare");
monty@mysql.com's avatar
monty@mysql.com committed
2766

2767 2768
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
monty@mysql.com's avatar
monty@mysql.com committed
2769
    clause if table is VIEW
2770
  */
monty@mysql.com's avatar
monty@mysql.com committed
2771
  
monty@mysql.com's avatar
monty@mysql.com committed
2772 2773 2774 2775
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
2776
                           &select_lex->where, TRUE, FALSE, FALSE))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2777
    DBUG_RETURN(TRUE);
monty@mysql.com's avatar
monty@mysql.com committed
2778

2779 2780 2781 2782
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
monty@mysql.com's avatar
monty@mysql.com committed
2783 2784
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2785
  /* skip all leaf tables belonged to view where we are insert */
2786
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2787 2788 2789 2790
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
2791
       first_select_leaf_table= first_select_leaf_table->next_leaf)
2792
  {}
monty@mysql.com's avatar
monty@mysql.com committed
2793
  select_lex->leaf_tables= first_select_leaf_table;
2794
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2795 2796 2797
}


2798
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
monty@mishka.local's avatar
monty@mishka.local committed
2799
                             List<Item> *fields_par,
monty@mysql.com's avatar
monty@mysql.com committed
2800 2801
                             List<Item> *update_fields,
                             List<Item> *update_values,
monty@mishka.local's avatar
monty@mishka.local committed
2802
                             enum_duplicates duplic,
2803 2804
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2805
   autoinc_value_of_last_inserted_row(0),
2806
   insert_into_view(table_list_par && table_list_par->view != 0)
2807 2808
{
  bzero((char*) &info,sizeof(info));
monty@mishka.local's avatar
monty@mishka.local committed
2809 2810 2811 2812
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2813
  if (table_list_par)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2814
    info.view= (table_list_par->view ? table_list_par : 0);
2815 2816 2817
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
2818
int
2819
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2820
{
2821
  LEX *lex= thd->lex;
2822
  int res;
2823
  table_map map= 0;
2824
  SELECT_LEX *lex_current_select_save= lex->current_select;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2825 2826
  DBUG_ENTER("select_insert::prepare");

2827
  unit= u;
2828

2829 2830 2831 2832 2833 2834
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2835
  res= check_insert_fields(thd, table_list, *fields, values,
2836
                           !insert_into_view, &map) ||
2837
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2838

2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850
  if (!res && fields->elements)
  {
    bool saved_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
                                            (MODE_STRICT_TRANS_TABLES |
                                             MODE_STRICT_ALL_TABLES));
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
                                                table_list);
    thd->abort_on_warning= saved_abort_on_warning;
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
2851
  {
2852
    Name_resolution_context *context= &lex->select_lex.context;
2853 2854 2855 2856
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2857 2858

    /* Perform name resolution only in the first table - 'table_list'. */
2859
    table_list->next_local= 0;
2860 2861
    context->resolve_in_table_list_only(table_list);

2862
    lex->select_lex.no_wrap_view_item= TRUE;
2863
    res= res || check_update_fields(thd, context->table_list,
2864
                                    *info.update_fields, &map);
2865 2866
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
2867 2868 2869
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
2870
    */
2871 2872 2873 2874 2875 2876 2877 2878
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
2879 2880
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
2881

2882 2883
    res= res || setup_fields(thd, 0, *info.update_values,
                             MARK_COLUMNS_READ, 0, 0);
2884
    if (!res)
2885
    {
2886 2887 2888 2889 2890 2891 2892 2893
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
2894

2895 2896 2897
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
2898
                        (uchar*)lex->current_select);
2899
      }
2900 2901 2902
    }

    /* Restore the current context. */
2903
    ctx_state.restore_state(context, table_list);
2904
  }
2905

2906 2907
  lex->current_select= lex_current_select_save;
  if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2908
    DBUG_RETURN(1);
2909 2910 2911 2912 2913 2914 2915 2916 2917 2918
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2919
  if (unique_table(thd, table_list, table_list->next_global, 0))
2920 2921
  {
    /* Using same table for INSERT and SELECT */
2922 2923
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2924
  }
2925 2926
  else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
           !thd->prelocked_mode)
2927 2928 2929 2930 2931 2932 2933
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2934 2935
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2936
    */
2937
    table->file->ha_start_bulk_insert((ha_rows) 0);
2938
  }
2939
  restore_record(table,s->default_values);		// Get empty record
2940
  table->next_number_field=table->found_next_number_field;
2941 2942 2943 2944 2945 2946 2947 2948 2949

#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    DBUG_RETURN(1);
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2950
  thd->cuted_fields=0;
monty@mysql.com's avatar
monty@mysql.com committed
2951 2952
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2953 2954 2955
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
antony@ppcg5.local's avatar
antony@ppcg5.local committed
2956 2957
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
monty@mysql.com's avatar
monty@mysql.com committed
2958
  thd->abort_on_warning= (!info.ignore &&
2959 2960 2961
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2962
  res= (table_list->prepare_where(thd, 0, TRUE) ||
2963
        table_list->prepare_check_option(thd));
2964 2965

  if (!res)
2966
     prepare_triggers_for_insert_stmt(table);
2967

2968
  DBUG_RETURN(res);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2969 2970
}

2971

2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2991
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
2992
      !thd->prelocked_mode)
2993
    table->file->ha_start_bulk_insert((ha_rows) 0);
monty@mysql.com's avatar
monty@mysql.com committed
2994
  DBUG_RETURN(0);
2995 2996 2997
}


2998 2999 3000 3001 3002 3003
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3004 3005
select_insert::~select_insert()
{
3006
  DBUG_ENTER("~select_insert");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3007 3008 3009
  if (table)
  {
    table->next_number_field=0;
3010
    table->auto_increment_field_not_null= FALSE;
3011
    table->file->ha_reset();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3012
  }
3013
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3014
  thd->abort_on_warning= 0;
3015
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3016 3017 3018 3019 3020
}


bool select_insert::send_data(List<Item> &values)
{
3021
  DBUG_ENTER("select_insert::send_data");
serg@serg.mylan's avatar
serg@serg.mylan committed
3022
  bool error=0;
3023

3024
  if (unit->offset_limit_cnt)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3025
  {						// using limit offset,count
3026
    unit->offset_limit_cnt--;
3027
    DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3028
  }
monty@mysql.com's avatar
monty@mysql.com committed
3029

monty@mysql.com's avatar
monty@mysql.com committed
3030
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
serg@serg.mylan's avatar
serg@serg.mylan committed
3031 3032
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3033
  if (thd->is_error())
monty@mysql.com's avatar
monty@mysql.com committed
3034
    DBUG_RETURN(1);
monty@mysql.com's avatar
monty@mysql.com committed
3035 3036
  if (table_list)                               // Not CREATE ... SELECT
  {
monty@mysql.com's avatar
monty@mysql.com committed
3037
    switch (table_list->view_check_option(thd, info.ignore)) {
monty@mysql.com's avatar
monty@mysql.com committed
3038 3039 3040 3041 3042
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
3043
  }
3044

3045
  error= write_record(thd, table, &info);
3046 3047
    
  if (!error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3048
  {
3049
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
3050 3051
    {
      /*
3052 3053 3054
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
3055 3056 3057 3058 3059 3060 3061 3062
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
3063 3064 3065 3066 3067 3068 3069
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
3070 3071 3072 3073 3074 3075
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3076
  }
serg@serg.mylan's avatar
serg@serg.mylan committed
3077
  DBUG_RETURN(error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3078 3079 3080
}


serg@serg.mylan's avatar
serg@serg.mylan committed
3081 3082 3083
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
3084 3085
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
3086
  else
3087 3088
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
3089 3090
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3091 3092
void select_insert::send_error(uint errcode,const char *err)
{
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
3093 3094
  DBUG_ENTER("select_insert::send_error");

3095
  my_message(errcode, err, MYF(0));
3096

monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
3097
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3098 3099 3100 3101 3102
}


bool select_insert::send_eof()
{
3103
  int error;
3104
  bool const trans_table= table->file->has_transactions();
3105
  ulonglong id;
3106
  bool changed;
3107
  THD::killed_state killed_status= thd->killed;
3108
  DBUG_ENTER("select_insert::send_eof");
3109 3110
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
                       trans_table, table->file->table_type()));
3111

3112
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3113
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3114
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3115

3116
  if (changed= (info.copied || info.deleted || info.updated))
3117 3118 3119 3120 3121
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
3122
    query_cache_invalidate3(thd, table, 1);
3123 3124
    if (thd->transaction.stmt.modified_non_trans_table)
      thd->transaction.all.modified_non_trans_table= TRUE;
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
3125
  }
3126
  DBUG_ASSERT(trans_table || !changed || 
3127
              thd->transaction.stmt.modified_non_trans_table);
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
3128

3129 3130 3131 3132 3133 3134
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
3135 3136
  if (mysql_bin_log.is_open())
  {
guilhem@mysql.com's avatar
guilhem@mysql.com committed
3137 3138
    if (!error)
      thd->clear_error();
3139 3140
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
3141
                      trans_table, FALSE, killed_status);
3142
  }
3143 3144 3145 3146 3147 3148 3149 3150
  /*
    We will call ha_autocommit_or_rollback() also for
    non-transactional tables under row-based replication: there might
    be events in the binary logs transaction, and we need to write
    them to the binary log.
   */
  if (trans_table || thd->current_stmt_binlog_row_based)
  {
3151
    int error2= ha_autocommit_or_rollback(thd, error);
3152
    if (error2 && !error)
3153
      error= error2;
3154
  }
3155
  table->file->ha_release_auto_increment();
3156

3157
  if (error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3158 3159
  {
    table->file->print_error(error,MYF(0));
3160
    DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3161
  }
3162
  char buff[160];
3163
  if (info.ignore)
3164 3165
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3166
  else
3167
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
monty@mysql.com's avatar
monty@mysql.com committed
3168
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
3169 3170 3171
  thd->row_count_func= info.copied + info.deleted +
                       ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                        info.touched : info.updated);
3172 3173 3174 3175 3176 3177 3178

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
3179
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3180 3181
}

3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192
void select_insert::abort() {

  DBUG_ENTER("select_insert::abort");
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
    and the end of the function.
   */
  if (table)
  {
3193
    bool changed, transactional_table;
3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214
    /*
      If we are not in prelocked mode, we end the bulk insert started
      before.
    */
    if (!thd->prelocked_mode)
      table->file->ha_end_bulk_insert();

    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
3215 3216 3217
    changed= (info.copied || info.deleted || info.updated);
    transactional_table= table->file->has_transactions();
    if (thd->transaction.stmt.modified_non_trans_table)
3218 3219 3220
    {
        if (mysql_bin_log.is_open())
          thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
3221 3222
                            transactional_table, FALSE);
        if (!thd->current_stmt_binlog_row_based && !can_rollback_data())
3223
          thd->transaction.all.modified_non_trans_table= TRUE;
3224 3225
	if (changed)
	  query_cache_invalidate3(thd, table, 1);
3226
    }
3227 3228
    DBUG_ASSERT(transactional_table || !changed ||
		thd->transaction.stmt.modified_non_trans_table);
3229 3230 3231 3232 3233 3234 3235
    table->file->ha_release_auto_increment();
  }

  ha_rollback_stmt(thd);
  DBUG_VOID_RETURN;
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3236 3237

/***************************************************************************
3238
  CREATE TABLE (SELECT) ...
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3239 3240
***************************************************************************/

3241
/*
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3242 3243
  Create table from lists of fields and items (or just return TABLE
  object for pre-opened existing table).
3244 3245 3246 3247 3248 3249 3250 3251

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
3252 3253
      alter_info   in/out Initial list of columns and indexes for the table
                          to be created
3254 3255
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
3256
                          be added to the end of alter_info->create_list)
3257
      lock         out    Pointer to the MYSQL_LOCK object for table created
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3258 3259 3260 3261
                          (or open temporary table) will be returned in this
                          parameter. Since this table is not included in
                          THD::lock caller is responsible for explicitly
                          unlocking this table.
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3262
      hooks
3263 3264

  NOTES
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276
    This function behaves differently for base and temporary tables:
    - For base table we assume that either table exists and was pre-opened
      and locked at open_and_lock_tables() stage (and in this case we just
      emit error or warning and return pre-opened TABLE object) or special
      placeholder was put in table cache that guarantees that this table
      won't be created or opened until the placeholder will be removed
      (so there is an exclusive lock on this table).
    - We don't pre-open existing temporary table, instead we either open
      or create and then open table in this function.

    Since this function contains some logic specific to CREATE TABLE ...
    SELECT it should be changed before it can be used in other contexts.
3277 3278 3279 3280 3281 3282 3283 3284

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
3285
                                      Alter_info *alter_info,
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3286 3287 3288
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
3289
{
3290
  TABLE tmp_table;		// Used during 'Create_field()'
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3291
  TABLE_SHARE share;
3292 3293 3294 3295 3296 3297 3298 3299 3300
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319
  DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););

  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
      create_table->table->db_stat)
  {
    /* Table already exists and was open at open_and_lock_tables() stage. */
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
    {
      create_info->table_existed= 1;		// Mark that table existed
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          create_table->table_name);
      DBUG_RETURN(create_table->table);
    }

    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
    DBUG_RETURN(0);
  }

3320 3321
  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3322
  tmp_table.s= &share;
3323
  init_tmp_table_share(thd, &share, "", 0, "", "");
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3324

3325 3326
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3327
  tmp_table.s->db_low_byte_first= 
3328 3329
        test(create_info->db_type == myisam_hton ||
             create_info->db_type == heap_hton);
3330 3331 3332 3333
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
3334
    Create_field *cr_field;
3335
    Field *field, *def_field;
3336
    if (item->type() == Item::FUNC_ITEM)
3337 3338 3339
      if (item->result_type() != STRING_RESULT)
        field= item->tmp_table_field(&tmp_table);
      else
3340
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
3341
    else
3342 3343 3344
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
3345
    if (!field ||
3346
	!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
3347 3348 3349 3350 3351
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
3352
    alter_info->create_list.push_back(cr_field);
3353
  }
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3354 3355 3356

  DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););

3357
  /*
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3358 3359 3360 3361 3362
    Create and lock table.

    Note that we either creating (or opening existing) temporary table or
    creating base table on which name we have exclusive lock. So code below
    should not cause deadlocks or races.
3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
  */
  {
    tmp_disable_binlog(thd);
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3375 3376
    if (!mysql_create_table_no_lock(thd, create_table->db,
                                    create_table->table_name,
3377 3378
                                    create_info, alter_info, 0,
                                    select_field_count))
3379
    {
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417
      if (create_info->table_existed &&
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        /*
          This means that someone created table underneath server
          or it was created via different mysqld front-end to the
          cluster. We don't have much options but throw an error.
        */
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
        DBUG_RETURN(0);
      }

      DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););

      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        VOID(pthread_mutex_lock(&LOCK_open));
        if (reopen_name_locked_table(thd, create_table, FALSE))
        {
          quick_rm_table(create_info->db_type, create_table->db,
                         table_case_name(create_info, create_table->table_name),
                         0);
        }
        else
          table= create_table->table;
        VOID(pthread_mutex_unlock(&LOCK_open));
      }
      else
      {
        if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                                MYSQL_OPEN_TEMPORARY_ONLY)) &&
            !create_info->table_existed)
        {
          /*
            This shouldn't happen as creation of temporary table should make
            it preparable for open. But let us do close_temporary_table() here
            just in case.
          */
3418
          drop_temporary_table(thd, create_table);
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3419 3420
        }
      }
3421 3422 3423 3424 3425 3426
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3427 3428
  DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););

3429
  table->reginfo.lock_type=TL_WRITE;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3430
  hooks->prelock(&table, 1);                    // Call prelock hooks
3431
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
3432 3433
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
        hooks->postlock(&table, 1))
3434
  {
3435 3436 3437 3438 3439 3440
    if (*lock)
    {
      mysql_unlock_tables(thd, *lock);
      *lock= 0;
    }

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3441 3442
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
3443 3444 3445 3446 3447 3448
    DBUG_RETURN(0);
  }
  DBUG_RETURN(table);
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
3449
int
3450
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3451
{
3452
  MYSQL_LOCK *extra_lock= NULL;
3453
  DBUG_ENTER("select_create::prepare");
3454

3455
  TABLEOP_HOOKS *hook_ptr= NULL;
3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473
  /*
    For row-based replication, the CREATE-SELECT statement is written
    in two pieces: the first one contain the CREATE TABLE statement
    necessary to create the table and the second part contain the rows
    that should go into the table.

    For non-temporary tables, the start of the CREATE-SELECT
    implicitly commits the previous transaction, and all events
    forming the statement will be stored the transaction cache. At end
    of the statement, the entire statement is committed as a
    transaction, and all events are written to the binary log.

    On the master, the table is locked for the duration of the
    statement, but since the CREATE part is replicated as a simple
    statement, there is no way to lock the table for accesses on the
    slave.  Hence, we have to hold on to the CREATE part of the
    statement until the statement has finished.
   */
3474 3475
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
3476 3477 3478 3479 3480 3481
    MY_HOOKS(select_create *x, TABLE_LIST *create_table,
             TABLE_LIST *select_tables)
      : ptr(x), all_tables(*create_table)
      {
        all_tables.next_global= select_tables;
      }
3482 3483

  private:
3484
    virtual int do_postlock(TABLE **tables, uint count)
3485
    {
3486 3487 3488 3489
      THD *thd= const_cast<THD*>(ptr->get_thd());
      if (int error= decide_logging_format(thd, &all_tables))
        return error;

3490
      TABLE const *const table = *tables;
3491
      if (thd->current_stmt_binlog_row_based  &&
3492
          !table->s->tmp_table &&
3493 3494 3495 3496
          !ptr->get_create_info()->table_existed)
      {
        ptr->binlog_show_create_table(tables, count);
      }
3497
      return 0;
3498 3499 3500
    }

    select_create *ptr;
3501
    TABLE_LIST all_tables;
3502 3503
  };

3504
  MY_HOOKS hooks(this, create_table, select_tables);
3505
  hook_ptr= &hooks;
3506

3507
  unit= u;
3508 3509

  /*
3510 3511 3512
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
3513 3514 3515 3516 3517 3518 3519
  */
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
      thd->current_stmt_binlog_row_based)
  {
    thd->binlog_start_trans_and_stmt();
  }

3520
  if (!(table= create_table_from_items(thd, create_info, create_table,
3521
                                       alter_info, &values,
3522
                                       &extra_lock, hook_ptr)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3523 3524
    DBUG_RETURN(-1);				// abort() deletes table

3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536
  if (extra_lock)
  {
    DBUG_ASSERT(m_plock == NULL);

    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
      m_plock= &m_lock;
    else
      m_plock= &thd->extra_lock;

    *m_plock= extra_lock;
  }

3537
  if (table->s->fields < values.elements)
3538
  {
3539
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
3540 3541 3542
    DBUG_RETURN(-1);
  }

3543
 /* First field to copy */
3544 3545 3546 3547
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
3548
    bitmap_set_bit(table->write_set, (*f)->field_index);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3549

3550
  /* Don't set timestamp if used */
3551
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3552 3553
  table->next_number_field=table->found_next_number_field;

3554
  restore_record(table,s->default_values);      // Get empty record
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3555
  thd->cuted_fields=0;
3556
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3557
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3558 3559 3560
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
antony@ppcg5.local's avatar
antony@ppcg5.local committed
3561 3562
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3563
  if (!thd->prelocked_mode)
3564
    table->file->ha_start_bulk_insert((ha_rows) 0);
monty@mysql.com's avatar
monty@mysql.com committed
3565
  thd->abort_on_warning= (!info.ignore &&
3566 3567 3568
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
3569 3570 3571
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3572
  table->file->extra(HA_EXTRA_WRITE_CACHE);
3573
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3574 3575
}

3576
void
3577
select_create::binlog_show_create_table(TABLE **tables, uint count)
3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
3595
  */
3596
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
3597
  DBUG_ASSERT(tables && *tables && count > 0);
3598 3599 3600

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
3601
  int result;
3602
  TABLE_LIST tmp_table_list;
3603

3604 3605
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
  tmp_table_list.table = *tables;
3606
  query.length(0);      // Have to zero it since constructor doesn't
3607

3608
  result= store_create_info(thd, &tmp_table_list, &query, create_info);
3609
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3610

3611 3612 3613 3614 3615 3616
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}

serg@serg.mylan's avatar
serg@serg.mylan committed
3617
void select_create::store_values(List<Item> &values)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3618
{
3619 3620
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3621 3622
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3623

3624 3625
void select_create::send_error(uint errcode,const char *err)
{
3626 3627 3628 3629 3630 3631 3632
  DBUG_ENTER("select_create::send_error");

  DBUG_PRINT("info",
             ("Current statement %s row-based",
              thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
  DBUG_PRINT("info",
             ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3633
              (ulong) table,
3634 3635 3636 3637 3638
              table && !table->s->tmp_table ? "is NOT" : "is"));
  DBUG_PRINT("info",
             ("Table %s prior to executing this statement",
              get_create_info()->table_existed ? "existed" : "did not exist"));

3639
  /*
3640 3641 3642 3643 3644 3645 3646 3647 3648
    This will execute any rollbacks that are necessary before writing
    the transcation cache.

    We disable the binary log since nothing should be written to the
    binary log.  This disabling is important, since we potentially do
    a "roll back" of non-transactional tables by removing the table,
    and the actual rollback might generate events that should not be
    written to the binary log.

3649 3650 3651 3652
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
3653 3654

  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3655 3656
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3657

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3658 3659 3660 3661 3662 3663 3664
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
3665 3666 3667 3668 3669 3670 3671 3672
    /*
      Do an implicit commit at end of statement for non-temporary
      tables.  This can fail, but we should unlock the table
      nevertheless.
    */
    if (!table->s->tmp_table)
      ha_commit(thd);               // Can fail, but we proceed anyway

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3673
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3674
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3675
    if (m_plock)
3676
    {
3677
      mysql_unlock_tables(thd, *m_plock);
3678 3679
      *m_plock= NULL;
      m_plock= NULL;
3680
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3681 3682 3683 3684
  }
  return tmp;
}

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3685

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3686 3687
void select_create::abort()
{
3688 3689
  DBUG_ENTER("select_create::abort");

3690 3691 3692 3693 3694 3695 3696 3697
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::abort();
  reenable_binlog(thd);

3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713
  /*
    We roll back the statement, including truncating the transaction
    cache of the binary log, if the statement failed.

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
  */
  if (thd->current_stmt_binlog_row_based)
    ha_rollback_stmt(thd);

3714
  if (m_plock)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3715
  {
3716
    mysql_unlock_tables(thd, *m_plock);
3717 3718
    *m_plock= NULL;
    m_plock= NULL;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3719
  }
3720

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3721 3722
  if (table)
  {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3723
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3724
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3725 3726
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
3727
    table=0;                                    // Safety
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3728
  }
3729
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3730 3731 3732 3733
}


/*****************************************************************************
3734
  Instansiate templates
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3735 3736
*****************************************************************************/

3737
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3738
template class List_iterator_fast<List_item>;
3739
#ifndef EMBEDDED_LIBRARY
3740 3741
template class I_List<Delayed_insert>;
template class I_List_iterator<Delayed_insert>;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3742
template class I_List<delayed_row>;
3743
#endif /* EMBEDDED_LIBRARY */
3744
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */