sql_insert.cc 106 KB
Newer Older
1
/* Copyright (C) 2000-2006 MySQL AB
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
6

bk@work.mysql.com's avatar
bk@work.mysql.com committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
11

bk@work.mysql.com's avatar
bk@work.mysql.com committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
32
  Delayed_insert::get_local_table() the table of the thread is copied
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

bk@work.mysql.com's avatar
bk@work.mysql.com committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
61
#include "slave.h"
bk@work.mysql.com's avatar
bk@work.mysql.com committed
62

63
#ifndef EMBEDDED_LIBRARY
64
static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
65
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
66
			 char *query, uint query_length, bool log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
67
static void end_delayed_insert(THD *thd);
68
pthread_handler_t handle_delayed_insert(void *arg);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
69
static void unlink_blobs(register TABLE *table);
70
#endif
71
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
72 73 74 75 76 77 78 79 80 81 82

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
                              table_map *map)
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

142

bk@work.mysql.com's avatar
bk@work.mysql.com committed
143
/*
144
  Check if insert fields are correct.
145 146 147 148 149 150 151

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
ingo@mysql.com's avatar
ingo@mysql.com committed
152
    check_unique                If duplicate values should be rejected.
153 154 155 156 157 158 159 160 161

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
162 163
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
164 165
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
166
                               bool check_unique, table_map *map)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
167
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
168
  TABLE *table= table_list->table;
169

170 171
  if (!table_list->updatable)
  {
172
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
173 174 175
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
176 177
  if (fields.elements == 0 && values.elements != 0)
  {
178 179 180 181 182 183
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
184
    if (values.elements != table->s->fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
185
    {
monty@mysql.com's avatar
monty@mysql.com committed
186
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
187 188
      return -1;
    }
hf@deer.(none)'s avatar
hf@deer.(none) committed
189
#ifndef NO_EMBEDDED_ACCESS_CHECKS
190
    if (grant_option)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
191
    {
192 193
      Field_iterator_table field_it;
      field_it.set_table(table);
194
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
195
                                  table->s->db, table->s->table_name,
196
                                  &field_it))
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
197 198
        return -1;
    }
hf@deer.(none)'s avatar
hf@deer.(none) committed
199
#endif
200 201
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
202 203 204
  }
  else
  {						// Part field list
205 206
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
207
    Name_resolution_context_state ctx_state;
208
    int res;
209

bk@work.mysql.com's avatar
bk@work.mysql.com committed
210 211
    if (fields.elements != values.elements)
    {
monty@mysql.com's avatar
monty@mysql.com committed
212
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
213 214 215 216
      return -1;
    }

    thd->dupp_field=0;
217 218 219
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
220
    ctx_state.save_state(context, table_list);
221 222 223 224 225

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
226
    table_list->next_local= 0;
227 228
    context->resolve_in_table_list_only(table_list);
    res= setup_fields(thd, 0, fields, 1, 0, 0);
229 230

    /* Restore the current context. */
231
    ctx_state.restore_state(context, table_list);
232
    thd->lex->select_lex.no_wrap_view_item= FALSE;
233

234
    if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
235
      return -1;
236

igor@rurik.mysql.com's avatar
igor@rurik.mysql.com committed
237
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
238
    {
239
      if (check_view_single_update(fields, table_list, map))
240
        return -1;
241
      table= table_list->table;
242
    }
243

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
244
    if (check_unique && thd->dupp_field)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
245
    {
246
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dupp_field->field_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
247 248 249
      return -1;
    }
    if (table->timestamp_field &&	// Don't set timestamp if used
250
	table->timestamp_field->query_id == thd->query_id)
251 252
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_INSERT);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
253
  }
254
  // For the values we need select_priv
hf@deer.(none)'s avatar
hf@deer.(none) committed
255
#ifndef NO_EMBEDDED_ACCESS_CHECKS
256
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
hf@deer.(none)'s avatar
hf@deer.(none) committed
257
#endif
258 259 260

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
261
       check_view_insertability(thd, table_list)))
262
  {
263
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
264 265 266
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
267 268 269 270
  return 0;
}


271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
290
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
291
                               List<Item> &update_fields, table_map *map)
292
{
ingo@mysql.com's avatar
ingo@mysql.com committed
293
  TABLE *table= insert_table_list->table;
294
  query_id_t timestamp_query_id;
295 296 297 298 299 300 301 302 303
  LINT_INIT(timestamp_query_id);

  /*
    Change the query_id for the timestamp column so that we can
    check if this is modified directly.
  */
  if (table->timestamp_field)
  {
    timestamp_query_id= table->timestamp_field->query_id;
ingo@mysql.com's avatar
ingo@mysql.com committed
304
    table->timestamp_field->query_id= thd->query_id - 1;
305 306 307 308 309 310
  }

  /*
    Check the fields we are going to modify. This will set the query_id
    of all used fields to the threads query_id.
  */
311
  if (setup_fields(thd, 0, update_fields, 1, 0, 0))
312 313
    return -1;

314 315 316 317
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
      check_view_single_update(update_fields, insert_table_list, map))
    return -1;

318 319 320 321
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
    if (table->timestamp_field->query_id == thd->query_id)
322 323
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
324 325 326 327 328 329 330 331
    else
      table->timestamp_field->query_id= timestamp_query_id;
  }

  return 0;
}


332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
/*
  Prepare triggers  for INSERT-like statement.

  SYNOPSIS
    prepare_triggers_for_insert_stmt()
      thd     The current thread
      table   Table to which insert will happen
      duplic  Type of duplicate handling for insert which will happen

  NOTE
    Prepare triggers for INSERT-like statement by marking fields
    used by triggers and inform handlers that batching of UPDATE/DELETE 
    cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/

void prepare_triggers_for_insert_stmt(THD *thd, TABLE *table,
                                      enum_duplicates duplic)
{
  if (table->triggers)
  {
    if (table->triggers->has_triggers(TRG_EVENT_DELETE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER DELETE triggers that might access to 
        subject table and therefore might need delete to be done 
        immediately. So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
    }
    if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER UPDATE triggers that might access to subject 
        table and therefore might need update to be done immediately. 
        So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
    }
    mark_fields_used_by_triggers_for_insert_stmt(thd, table, duplic);
  }
}


377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
/*
  Mark fields used by triggers for INSERT-like statement.

  SYNOPSIS
    mark_fields_used_by_triggers_for_insert_stmt()
      thd     The current thread
      table   Table to which insert will happen
      duplic  Type of duplicate handling for insert which will happen

  NOTE
    For REPLACE there is no sense in marking particular fields
    used by ON DELETE trigger as to execute it properly we have
    to retrieve and store values for all table columns anyway.
*/

void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table,
                                                  enum_duplicates duplic)
{
  if (table->triggers)
  {
    table->triggers->mark_fields_used(thd, TRG_EVENT_INSERT);
    if (duplic == DUP_UPDATE)
      table->triggers->mark_fields_used(thd, TRG_EVENT_UPDATE);
  }
}


404 405 406 407 408 409 410 411
/**
  Upgrade table-level lock of INSERT statement to TL_WRITE if
  a more concurrent lock is infeasible for some reason. This is
  necessary for engines without internal locking support (MyISAM).
  An engine with internal locking implementation might later
  downgrade the lock in handler::store_lock() method.
*/

412
static
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
                       enum_duplicates duplic,
                       bool is_multi_insert)
{
  if (duplic == DUP_UPDATE ||
      duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT)
  {
    *lock_type= TL_WRITE;
    return;
  }

  if (*lock_type == TL_WRITE_DELAYED)
  {
    /*
      We do not use delayed threads if:
428 429 430 431 432
      - we're running in the safe mode or skip-new mode -- the
        feature is disabled in these modes
      - we're executing this statement on a replication slave --
        we need to ensure serial execution of queries on the
        slave
433 434
      - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
        insert cannot be concurrent
435 436 437 438 439 440 441 442 443 444 445 446
      - this statement is directly or indirectly invoked from
        a stored function or trigger (under pre-locking) - to
        avoid deadlocks, since INSERT DELAYED involves a lock
        upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
        attempt while keeping other table level locks.
      - this statement itself may require pre-locking.
        We should upgrade the lock even though in most cases
        delayed functionality may work. Unfortunately, we can't
        easily identify whether the subject table is not used in
        the statement indirectly via a stored function or trigger:
        if it is used, that will lead to a deadlock between the
        client connection and the delayed thread.
447 448
    */
    if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
449 450 451
        thd->variables.max_insert_delayed_threads == 0 ||
        thd->prelocked_mode ||
        thd->lex->uses_stored_routines())
452 453 454 455
    {
      *lock_type= TL_WRITE;
      return;
    }
456 457 458 459 460 461 462 463
    if (thd->slave_thread)
    {
      /* Try concurrent insert */
      *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
                  TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
      return;
    }

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
    bool log_on= (thd->options & OPTION_BIN_LOG ||
                  ! (thd->security_ctx->master_access & SUPER_ACL));
    if (log_on && mysql_bin_log.is_open() && is_multi_insert)
    {
      /*
        Statement-based binary logging does not work in this case, because:
        a) two concurrent statements may have their rows intermixed in the
        queue, leading to autoincrement replication problems on slave (because
        the values generated used for one statement don't depend only on the
        value generated for the first row of this statement, so are not
        replicable)
        b) if first row of the statement has an error the full statement is
        not binlogged, while next rows of the statement may be inserted.
        c) if first row succeeds, statement is binlogged immediately with a
        zero error code (i.e. "no error"), if then second row fails, query
        will fail on slave too and slave will stop (wrongly believing that the
        master got no error).
        So we fall back to non-delayed INSERT.
      */
      *lock_type= TL_WRITE;
    }
  }
}


489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
/**
  Find or create a delayed insert thread for the first table in
  the table list, then open and lock the remaining tables.
  If a table can not be used with insert delayed, upgrade the lock
  and open and lock all tables using the standard mechanism.

  @param thd         thread context
  @param table_list  list of "descriptors" for tables referenced
                     directly in statement SQL text.
                     The first element in the list corresponds to
                     the destination table for inserts, remaining
                     tables, if any, are usually tables referenced
                     by sub-queries in the right part of the
                     INSERT.

  @return Status of the operation. In case of success 'table'
  member of every table_list element points to an instance of
  class TABLE.

  @sa open_and_lock_tables for more information about MySQL table
  level locking
*/

static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
  DBUG_ENTER("open_and_lock_for_insert_delayed");

#ifndef EMBEDDED_LIBRARY
  if (delayed_get_table(thd, table_list))
    DBUG_RETURN(TRUE);

  if (table_list->table)
  {
    /*
      Open tables used for sub-selects or in stored functions, will also
      cache these functions.
    */
    if (open_and_lock_tables(thd, table_list->next_global))
    {
      end_delayed_insert(thd);
      DBUG_RETURN(TRUE);
    }
    /*
      First table was not processed by open_and_lock_tables(),
      we need to set updatability flag "by hand".
    */
    if (!table_list->derived && !table_list->view)
      table_list->updatable= 1;  // usual table
    DBUG_RETURN(FALSE);
  }
#endif
  /*
    * This is embedded library and we don't have auxiliary
    threads OR
    * a lock upgrade was requested inside delayed_get_table
      because
      - there are too many delayed insert threads OR
      - the table has triggers.
    Use a normal insert.
  */
  table_list->lock_type= TL_WRITE;
  DBUG_RETURN(open_and_lock_tables(thd, table_list));
}


555 556 557 558
/**
  INSERT statement implementation
*/

559 560 561 562 563
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
564 565
                  enum_duplicates duplic,
		  bool ignore)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
566
{
567
  int error, res;
568
  bool transactional_table, joins_freed= FALSE;
569
  bool changed;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
570 571 572 573
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
574
  TABLE *table= 0;
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
575
  List_iterator_fast<List_item> its(values_list);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
576
  List_item *values;
577
  Name_resolution_context *context;
578
  Name_resolution_context_state ctx_state;
579
#ifndef EMBEDDED_LIBRARY
580
  char *query= thd->query;
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
581 582 583 584 585
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
586 587
  bool log_on= (thd->options & OPTION_BIN_LOG) ||
    (!(thd->security_ctx->master_access & SUPER_ACL));
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
588
#endif
589
  thr_lock_type lock_type = table_list->lock_type;
monty@mysql.com's avatar
monty@mysql.com committed
590
  Item *unused_conds= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
591 592
  DBUG_ENTER("mysql_insert");

593
  /*
594 595 596 597 598 599 600 601 602 603 604 605
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
  upgrade_lock_type(thd, &table_list->lock_type, duplic,
                    values_list.elements > 1);

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
    never be able to get a lock on the table. QQQ: why not
    upgrade the lock here instead?
  */
606
  if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
607
      find_locked_table(thd, table_list->db, table_list->table_name))
608
  {
609 610 611
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
             table_list->table_name);
    DBUG_RETURN(TRUE);
612
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
613

614
  if (table_list->lock_type == TL_WRITE_DELAYED)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
615
  {
616 617
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
618 619
  }
  else
620 621 622 623
  {
    if (open_and_lock_tables(thd, table_list))
      DBUG_RETURN(TRUE);
  }
624

bk@work.mysql.com's avatar
bk@work.mysql.com committed
625
  thd->proc_info="init";
626
  thd->used_tables=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
627
  values= its++;
628
  value_count= values->elements;
629

monty@mysql.com's avatar
monty@mysql.com committed
630
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
monty@mysql.com's avatar
monty@mysql.com committed
631
			   update_fields, update_values, duplic, &unused_conds,
632 633 634 635 636
                           FALSE,
                           (fields.elements || !value_count),
                           !ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
637
    goto abort;
638

639 640
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;
641
  lock_type= table_list->lock_type;
642

643
  context= &thd->lex->select_lex.context;
644 645 646 647 648 649 650 651 652
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

653
  /* Save the state of the current name resolution context. */
654
  ctx_state.save_state(context, table_list);
655 656 657 658 659

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
660
  table_list->next_local= 0;
661 662
  context->resolve_in_table_list_only(table_list);

663
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
664 665 666 667
  {
    counter++;
    if (values->elements != value_count)
    {
668
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
669 670
      goto abort;
    }
671
    if (setup_fields(thd, 0, *values, 0, 0, 0))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
672 673 674
      goto abort;
  }
  its.rewind ();
675 676
 
  /* Restore the current context. */
677
  ctx_state.restore_state(context, table_list);
678

bk@work.mysql.com's avatar
bk@work.mysql.com committed
679
  /*
680
    Fill in the given fields and dump it to the table file
bk@work.mysql.com's avatar
bk@work.mysql.com committed
681
  */
682
  info.records= info.deleted= info.copied= info.updated= info.touched= 0;
683
  info.ignore= ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
684
  info.handle_duplicates=duplic;
685 686
  info.update_fields= &update_fields;
  info.update_values= &update_values;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
687
  info.view= (table_list->view ? table_list : 0);
688

689 690 691
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
692
    to NULL.
693
  */
694
  thd->count_cuted_fields= ((values_list.elements == 1 &&
monty@mysql.com's avatar
monty@mysql.com committed
695
                             !ignore) ?
696 697
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
698 699 700
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
701 702 703 704 705 706 707 708
#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    goto abort;
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
709 710 711
  error=0;
  id=0;
  thd->proc_info="update";
712
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
713
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
714 715 716 717 718 719 720 721 722 723 724
  if (duplic == DUP_REPLACE)
  {
    if (!table->triggers || !table->triggers->has_delete_triggers())
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
    /*
      REPLACE should change values of all columns so we should mark
      all columns as columns to be set. As nice side effect we will
      retrieve columns which values are needed for ON DELETE triggers.
    */
    table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
  }
antony@ppcg5.local's avatar
antony@ppcg5.local committed
725 726
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
727 728 729 730
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
731 732 733 734
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
735 736 737 738
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
739
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
740
    table->file->start_bulk_insert(values_list.elements);
741

742
  thd->no_trans_update.stmt= FALSE;
743 744 745
  thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES)));
746

747
  prepare_triggers_for_insert_stmt(thd, table, duplic);
748

749 750 751 752
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

753
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
754 755 756
  {
    if (fields.elements || !value_count)
    {
757
      restore_record(table,s->default_values);	// Get empty record
758 759 760
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
761
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
762
	if (values_list.elements != 1 && !thd->net.report_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
763 764 765 766
	{
	  info.records++;
	  continue;
	}
767 768 769 770 771
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
772 773 774 775 776 777
	error=1;
	break;
      }
    }
    else
    {
778
      if (thd->used_tables)			// Column used in values()
779
	restore_record(table,s->default_values);	// Get empty record
780
      else
781 782 783 784 785 786 787 788 789 790 791
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
792
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
793
	if (values_list.elements != 1 && ! thd->net.report_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
794 795 796 797 798 799 800 801
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
802

803 804 805
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
806
					     ignore))) ==
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
807 808 809
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
810
    {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
811 812
      error= 1;
      break;
813
    }
814
#ifndef EMBEDDED_LIBRARY
815
    if (lock_type == TL_WRITE_DELAYED)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
816
    {
817
      error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
818 819 820
      query=0;
    }
    else
821
#endif
822
      error=write_record(thd, table ,&info);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
823
    /*
824 825
      If auto_increment values are used, save the first one for
      LAST_INSERT_ID() and for the update log.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
826 827 828 829 830
    */
    if (! id && thd->insert_id_used)
    {						// Get auto increment value
      id= thd->last_insert_id;
    }
831 832
    if (error)
      break;
833
    thd->row_count++;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
834
  }
835

836 837 838
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

839 840 841 842
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
843
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
844 845
  if (lock_type == TL_WRITE_DELAYED)
  {
846 847 848 849 850 851
    if (!error)
    {
      id=0;					// No auto_increment id
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
852
    query_cache_invalidate3(thd, table_list, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
853 854
  }
  else
855
#endif
bk@work.mysql.com's avatar
bk@work.mysql.com committed
856
  {
857
    if (!thd->prelocked_mode && table->file->end_bulk_insert() && !error)
858
    {
serg@serg.mylan's avatar
serg@serg.mylan committed
859 860
      table->file->print_error(my_errno,MYF(0));
      error=1;
861
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
862 863
    if (id && values_list.elements != 1)
      thd->insert_id(id);			// For update log
864
    else if (table->next_number_field && info.copied)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
865
      id=table->next_number_field->val_int();	// Return auto_increment value
866

867
    transactional_table= table->file->has_transactions();
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
868

869
    if ((changed= (info.copied || info.deleted || info.updated)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
870
    {
871 872 873 874 875 876 877
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
878
      {
879 880 881
        if (mysql_bin_log.is_open())
        {
          if (error <= 0)
882 883 884 885 886 887 888 889 890 891 892
          {
            /*
              [Guilhem wrote] Temporary errors may have filled
              thd->net.last_error/errno.  For example if there has
              been a disk full error when writing the row, and it was
              MyISAM, then thd->net.last_error/errno will be set to
              "disk full"... and the my_pwrite() will wait until free
              space appears, and so when it finishes then the
              write_row() was entirely successful
            */
            /* todo: consider removing */
893
            thd->clear_error();
894 895 896 897 898 899 900 901 902 903 904 905 906
          }
          /* bug#22725: 
               
          A query which per-row-loop can not be interrupted with
          KILLED, like INSERT, and that does not invoke stored
          routines can be binlogged with neglecting the KILLED error.
          
          If there was no error (error == zero) until after the end of
          inserting loop the KILLED flag that appeared later can be
          disregarded since previously possible invocation of stored
          routines did not result in any error due to the KILLED.  In
          such case the flag is ignored for constructing binlog event.
          */
907
          Query_log_event qinfo(thd, thd->query, thd->query_length,
908 909
                                transactional_table, FALSE,
                                (error>0) ? thd->killed : THD::NOT_KILLED);
910
          DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
911 912 913 914
          if (mysql_bin_log.write(&qinfo) && transactional_table)
            error=1;
        }
        if (!transactional_table)
915
          thd->no_trans_update.all= TRUE;
916
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
917
    }
918
    if (transactional_table)
919
      error=ha_autocommit_or_rollback(thd,error);
920

bk@work.mysql.com's avatar
bk@work.mysql.com committed
921 922 923
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
924 925 926 927 928 929 930 931 932 933
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
934 935 936 937 938
      thd->lock=0;
    }
  }
  thd->proc_info="end";
  table->next_number_field=0;
939
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
940
  thd->next_insert_id=0;			// Reset this if wrongly used
941
  table->auto_increment_field_not_null= FALSE;
942
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
943
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
944 945 946
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
947

948 949
  /* Reset value of LAST_INSERT_ID if no rows were inserted or touched */
  if (!info.copied && !info.touched && thd->insert_id_used)
950 951 952 953
  {
    thd->insert_id(0);
    id=0;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
954 955 956 957
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
958
  {
959 960 961
    thd->row_count_func= info.copied + info.deleted +
                         ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                          info.touched : info.updated);
962
    send_ok(thd, (ulong) thd->row_count_func, id);
963
  }
964 965
  else
  {
bk@work.mysql.com's avatar
bk@work.mysql.com committed
966
    char buff[160];
967 968
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.touched : info.updated);
969
    if (ignore)
970 971 972
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
973
    else
974
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
975 976
	      (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
    thd->row_count_func= info.copied + info.deleted + updated;
977
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
978
  }
979
  thd->abort_on_warning= 0;
980
  DBUG_RETURN(FALSE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
981 982

abort:
983
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
984 985
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
986
#endif
987 988
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
989
  thd->abort_on_warning= 0;
990
  DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
991 992 993
}


bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
994 995 996 997 998
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
999
    thd     - thread handler
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1000 1001
    view    - reference on VIEW

1002 1003 1004 1005 1006 1007
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1008 1009
  RETURN
    FALSE - OK
1010 1011 1012
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1013 1014 1015
    TRUE  - can't be used for insert
*/

1016
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1017
{
1018
  uint num= view->view->select_lex.item_list.elements;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1019
  TABLE *table= view->table;
1020 1021 1022
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
1023 1024 1025
  uint used_fields_buff_size= (table->s->fields + 7) / 8;
  uchar *used_fields_buff= (uchar*)thd->alloc(used_fields_buff_size);
  MY_BITMAP used_fields;
1026
  bool save_set_query_id= thd->set_query_id;
1027 1028
  DBUG_ENTER("check_key_in_view");

1029 1030 1031
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1032 1033
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

1034 1035
  VOID(bitmap_init(&used_fields, used_fields_buff, used_fields_buff_size * 8,
                   0));
1036 1037
  bitmap_clear_all(&used_fields);

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1038
  view->contain_auto_increment= 0;
1039 1040 1041 1042 1043
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
  thd->set_query_id= 0;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1044
  /* check simplicity and prepare unique test of view */
1045
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1046
  {
1047
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1048 1049 1050 1051
    {
      thd->set_query_id= save_set_query_id;
      DBUG_RETURN(TRUE);
    }
1052
    Item_field *field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1053
    /* simple SELECT list entry (field without expression) */
bell@sanja.is.com.ua's avatar
merge  
bell@sanja.is.com.ua committed
1054
    if (!(field= trans->item->filed_for_view_update()))
1055 1056
    {
      thd->set_query_id= save_set_query_id;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1057
      DBUG_RETURN(TRUE);
1058
    }
1059
    if (field->field->unireg_check == Field::NEXT_NUMBER)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1060 1061
      view->contain_auto_increment= 1;
    /* prepare unique test */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1062 1063 1064 1065 1066
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1067
  }
1068
  thd->set_query_id= save_set_query_id;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1069
  /* unique test */
1070
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1071
  {
1072
    /* Thanks to test above, we know that all columns are of type Item_field */
1073
    Item_field *field= (Item_field *)trans->item;
1074 1075 1076
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1077 1078 1079 1080 1081 1082 1083
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1084
/*
1085
  Check if table can be updated
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1086 1087

  SYNOPSIS
1088 1089
     mysql_prepare_insert_check_table()
     thd		Thread handle
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1090
     table_list		Table list
1091 1092
     fields		List of fields to be updated
     where		Pointer to where clause
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1093
     select_insert      Check is making for SELECT ... INSERT
1094 1095

   RETURN
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1096 1097
     FALSE ok
     TRUE  ERROR
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1098
*/
1099

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1100 1101 1102
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
                                             List<Item> &fields, COND **where,
                                             bool select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1103
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1104
  bool insert_into_view= (table_list->view != 0);
1105
  DBUG_ENTER("mysql_prepare_insert_check_table");
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1106

1107 1108 1109 1110 1111 1112 1113
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

1114 1115 1116 1117
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
                                    table_list, where, 
                                    &thd->lex->select_lex.leaf_tables,
1118
                                    select_insert, INSERT_ACL, SELECT_ACL))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1119
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1120 1121 1122 1123

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
1124 1125 1126 1127
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1128
      DBUG_RETURN(TRUE);
1129
    }
1130
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1131 1132
  }

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1133
  DBUG_RETURN(FALSE);
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
monty@mysql.com's avatar
monty@mysql.com committed
1144 1145
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
monty@mysql.com's avatar
monty@mysql.com committed
1146 1147
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
1148 1149 1150 1151
    check_fields        TRUE if need to check that all INSERT fields are 
                        given values.
    abort_on_warning    whether to report if some INSERT field is not 
                        assigned as an error (TRUE) or as a warning (FALSE).
1152

monty@mishka.local's avatar
monty@mishka.local committed
1153 1154 1155 1156 1157
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1158

1159 1160 1161
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
monty@mysql.com's avatar
monty@mysql.com committed
1162
  
1163
  RETURN VALUE
1164 1165
    FALSE OK
    TRUE  error
1166 1167
*/

monty@mysql.com's avatar
monty@mysql.com committed
1168
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
monty@mysql.com's avatar
monty@mysql.com committed
1169
                          TABLE *table, List<Item> &fields, List_item *values,
monty@mishka.local's avatar
monty@mishka.local committed
1170
                          List<Item> &update_fields, List<Item> &update_values,
monty@mysql.com's avatar
monty@mysql.com committed
1171
                          enum_duplicates duplic,
1172 1173
                          COND **where, bool select_insert,
                          bool check_fields, bool abort_on_warning)
1174
{
monty@mysql.com's avatar
monty@mysql.com committed
1175
  SELECT_LEX *select_lex= &thd->lex->select_lex;
1176
  Name_resolution_context *context= &select_lex->context;
1177
  Name_resolution_context_state ctx_state;
1178
  bool insert_into_view= (table_list->view != 0);
monty@mysql.com's avatar
monty@mysql.com committed
1179
  bool res= 0;
1180
  table_map map= 0;
1181
  DBUG_ENTER("mysql_prepare_insert");
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1182 1183 1184
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
1185 1186
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
monty@mishka.local's avatar
monty@mishka.local committed
1187

1188 1189 1190 1191 1192 1193 1194
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
monty@mysql.com's avatar
monty@mysql.com committed
1195
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

monty@mishka.local's avatar
monty@mishka.local committed
1208
  if (duplic == DUP_UPDATE)
1209 1210
  {
    /* it should be allocated before Item::fix_fields() */
monty@mishka.local's avatar
monty@mishka.local committed
1211
    if (table_list->set_insert_values(thd->mem_root))
monty@mysql.com's avatar
monty@mysql.com committed
1212
      DBUG_RETURN(TRUE);
1213
  }
monty@mishka.local's avatar
monty@mishka.local committed
1214

monty@mysql.com's avatar
monty@mysql.com committed
1215 1216
  if (mysql_prepare_insert_check_table(thd, table_list, fields, where,
                                       select_insert))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1217
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1218

1219 1220

  /* Prepare the fields in the statement. */
1221
  if (values)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1222
  {
1223 1224 1225 1226 1227 1228
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1229
    /*
1230 1231 1232 1233 1234 1235
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
    res= check_insert_fields(thd, context->table_list, fields, *values,
                             !insert_into_view, &map) ||
      setup_fields(thd, 0, *values, 0, 0, 0);

    if (!res && check_fields)
    {
      bool saved_abort_on_warning= thd->abort_on_warning;
      thd->abort_on_warning= abort_on_warning;
      res= check_that_all_fields_are_given_values(thd, 
                                                  table ? table : 
                                                  context->table_list->table,
                                                  context->table_list);
      thd->abort_on_warning= saved_abort_on_warning;
    }

    if (!res && duplic == DUP_UPDATE)
monty@mysql.com's avatar
monty@mysql.com committed
1252
    {
1253 1254 1255
      select_lex->no_wrap_view_item= TRUE;
      res= check_update_fields(thd, context->table_list, update_fields, &map);
      select_lex->no_wrap_view_item= FALSE;
monty@mysql.com's avatar
monty@mysql.com committed
1256
    }
1257 1258 1259 1260

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);

monty@mysql.com's avatar
monty@mysql.com committed
1261 1262
    if (!res)
      res= setup_fields(thd, 0, update_values, 1, 0, 0);
monty@mysql.com's avatar
monty@mysql.com committed
1263
  }
1264

monty@mysql.com's avatar
monty@mysql.com committed
1265 1266 1267
  if (res)
    DBUG_RETURN(res);

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1268 1269 1270
  if (!table)
    table= table_list->table;

1271
  if (!select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1272
  {
1273
    Item *fake_conds= 0;
1274
    TABLE_LIST *duplicate;
1275
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1276
    {
1277
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1278 1279
      DBUG_RETURN(TRUE);
    }
1280
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
monty@mysql.com's avatar
monty@mysql.com committed
1281
    select_lex->first_execution= 0;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1282
  }
1283 1284
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
    table->file->extra(HA_EXTRA_RETRIEVE_PRIMARY_KEY);
1285
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1286 1287 1288
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
1289 1290 1291 1292
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1293
  while (++keynr < table->s->keys)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1294 1295 1296 1297 1298 1299 1300
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
1311

1312 1313 1314 1315 1316
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
1317

1318
    Sets thd->no_trans_update.stmt to TRUE if table which is updated didn't have
1319 1320 1321 1322 1323
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1324 1325 1326
*/


1327
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1328
{
1329
  int error, trg_error= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1330
  char *key=0;
1331
  DBUG_ENTER("write_record");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1332

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1333
  info->records++;
1334 1335
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1336 1337 1338
  {
    while ((error=table->file->write_row(table->record[0])))
    {
1339
      uint key_nr;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1340
      if (error != HA_WRITE_SKIP)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1341 1342 1343
	goto err;
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1344
	error=HA_WRITE_SKIP;			/* Database can't find key */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1345 1346
	goto err;
      }
1347 1348 1349 1350 1351
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1352 1353
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1354
          key_nr == table->s->next_number_index &&
1355 1356
	  table->file->auto_increment_column_changed)
	goto err;
1357
      if (table->file->table_flags() & HA_DUPP_POS)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1358 1359 1360 1361 1362 1363
      {
	if (table->file->rnd_pos(table->record[1],table->file->dupp_ref))
	  goto err;
      }
      else
      {
1364
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1365 1366 1367 1368
	{
	  error=my_errno;
	  goto err;
	}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1369

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1370 1371
	if (!key)
	{
1372
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1373 1374 1375 1376 1377 1378
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1379
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1380
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1381
						(byte*) key,
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1382 1383
						table->key_info[key_nr].
						key_length,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1384 1385 1386
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1387
      if (info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1388
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1389
        int res= 0;
monty@mysql.com's avatar
monty@mysql.com committed
1390 1391 1392 1393
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1394
        */
1395
	DBUG_ASSERT(table->insert_values != NULL);
1396 1397
        store_record(table,insert_values);
        restore_record(table,record[1]);
monty@mysql.com's avatar
monty@mysql.com committed
1398 1399
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
1400
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
gshchepa/uchum@gleb.loc's avatar
gshchepa/uchum@gleb.loc committed
1401 1402
                                                 *info->update_values,
                                                 info->ignore,
1403 1404 1405
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1406 1407

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1408 1409 1410
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
1411
          goto ok_or_after_trg_err;
monty@mysql.com's avatar
monty@mysql.com committed
1412
        if (res == VIEW_CHECK_ERROR)
1413
          goto before_trg_err;
1414

guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
1415
        table->file->restore_auto_increment();
1416 1417
        if ((table->file->table_flags() & HA_PARTIAL_COLUMN_READ) ||
            compare_record(table, thd->query_id))
1418
        {
1419
          if ((error=table->file->update_row(table->record[1],table->record[0])))
1420
          {
1421 1422 1423 1424 1425
            if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore)
            {
              goto ok_or_after_trg_err;
            }
            goto err;
1426
          }
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1427

1428
          info->updated++;
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1429 1430 1431 1432
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
                                                        TRG_ACTION_AFTER,
                                                        TRUE));
1433 1434
          info->copied++;
        }
1435

1436 1437 1438 1439 1440
        if (table->next_number_field)
          table->file->adjust_next_insert_id_after_explicit_value(
            table->next_number_field->val_int());
        info->touched++;

1441
        goto ok_or_after_trg_err;
1442 1443 1444
      }
      else /* DUP_REPLACE */
      {
monty@mysql.com's avatar
monty@mysql.com committed
1445 1446 1447 1448 1449
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1450 1451
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1452 1453 1454 1455 1456 1457
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
monty@mysql.com's avatar
monty@mysql.com committed
1458 1459
	*/
	if (last_uniq_key(table,key_nr) &&
1460
	    !table->file->referenced_by_foreign_key() &&
1461
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1462 1463
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1464
        {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1465 1466
          if ((error=table->file->update_row(table->record[1],
					     table->record[0])))
1467 1468
            goto err;
          info->deleted++;
1469 1470 1471 1472 1473
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
          if ((error=table->file->delete_row(table->record[1])))
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
1485
            thd->no_trans_update.stmt= TRUE;
1486 1487 1488 1489 1490 1491 1492 1493
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1494
        }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1495 1496 1497 1498 1499
      }
    }
  }
  else if ((error=table->file->write_row(table->record[0])))
  {
1500
    if (!info->ignore ||
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1501 1502
	(error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE))
      goto err;
1503
    table->file->restore_auto_increment();
1504
    goto ok_or_after_trg_err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1505
  }
1506 1507 1508 1509 1510 1511

after_trg_n_copied_inc:
  info->copied++;
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
1512 1513

ok_or_after_trg_err:
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1514
  if (key)
1515
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1516
  if (!table->file->has_transactions())
1517
    thd->no_trans_update.stmt= TRUE;
1518
  DBUG_RETURN(trg_error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1519 1520

err:
1521
  info->last_errno= error;
1522 1523 1524
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1525
  table->file->print_error(error,MYF(0));
1526 1527

before_trg_err:
1528
  table->file->restore_auto_increment();
1529 1530
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1531
  DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1532 1533 1534 1535
}


/******************************************************************************
1536
  Check that all fields with arn't null_fields are used
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1537 1538
******************************************************************************/

1539 1540
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1541
{
1542
  int err= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1543 1544
  for (Field **field=entry->field ; *field ; field++)
  {
1545
    if ((*field)->query_id != thd->query_id &&
1546 1547
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
        ((*field)->real_type() != FIELD_TYPE_ENUM))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1548
    {
1549 1550 1551
      bool view= FALSE;
      if (table_list)
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1552
        table_list= table_list->top_table();
kent@mysql.com's avatar
kent@mysql.com committed
1553
        view= test(table_list->view);
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1570
      err= 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1571 1572
    }
  }
1573
  return thd->abort_on_warning ? err : 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1574 1575 1576
}

/*****************************************************************************
1577 1578
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1579 1580
*****************************************************************************/

1581 1582
#ifndef EMBEDDED_LIBRARY

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1583 1584 1585 1586 1587
class delayed_row :public ilink {
public:
  char *record,*query;
  enum_duplicates dup;
  time_t start_time;
1588
  bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1589
  ulonglong last_insert_id;
1590 1591 1592
  ulonglong next_insert_id;
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1593
  timestamp_auto_set_type timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1594 1595
  uint query_length;

1596 1597
  delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
    :record(0), query(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1598 1599 1600 1601 1602 1603
  ~delayed_row()
  {
    x_free(record);
  }
};

1604
/**
1605
  Delayed_insert - context of a thread responsible for delayed insert
1606 1607 1608 1609
  into one table. When processing delayed inserts, we create an own
  thread for every distinct table. Later on all delayed inserts directed
  into that table are handled by a dedicated thread.
*/
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1610

1611
class Delayed_insert :public ilink {
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1612 1613 1614 1615 1616 1617 1618 1619 1620 1621
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1622
  ulong group_count;
1623
  TABLE_LIST table_list;			// Argument
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1624

1625
  Delayed_insert()
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1626 1627 1628 1629
    :locks_in_memory(0),
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1630 1631
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1632 1633 1634
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1635 1636
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1637

1638 1639
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1640
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1641
    thd.security_ctx->host_or_ip= "";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1642
    bzero((char*) &info,sizeof(info));
1643
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1644 1645 1646 1647 1648 1649
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
1650
  ~Delayed_insert()
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1651
  {
1652
    /* The following is not really needed, but just for safety */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1653 1654 1655 1656 1657 1658
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
1659 1660 1661
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1662 1663
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1664
    thd.security_ctx->user= thd.security_ctx->host=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1665 1666 1667
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1668
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


1698
I_List<Delayed_insert> delayed_threads;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1699 1700


1701 1702 1703 1704 1705 1706
/**
  Return an instance of delayed insert thread that can handle
  inserts into a given table, if it exists. Otherwise return NULL.
*/

static
1707
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1708 1709 1710
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
1711
  I_List_iterator<Delayed_insert> it(delayed_threads);
1712 1713
  Delayed_insert *di;
  while ((di= it++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1714
  {
1715 1716
    if (!strcmp(table_list->db, di->table_list.db) &&
	!strcmp(table_list->table_name, di->table_list.table_name))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1717
    {
1718
      di->lock();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1719 1720 1721 1722
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
1723
  return di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1724 1725 1726
}


1727 1728 1729 1730
/**
  Attempt to find or create a delayed insert thread to handle inserts
  into this table.

1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748
  @return In case of success, table_list->table points to a local copy
          of the delayed table or is set to NULL, which indicates a
          request for lock upgrade. In case of failure, value of
          table_list->table is undefined.
  @retval TRUE  - this thread ran out of resources OR
                - a newly created delayed insert thread ran out of
                  resources OR
                - the created thread failed to open and lock the table
                  (e.g. because it does not exist) OR
                - the table opened in the created thread turned out to
                  be a view
  @retval FALSE - table successfully opened OR
                - too many delayed insert threads OR
                - the table has triggers and we have to fall back to
                  a normal INSERT
                Two latter cases indicate a request for lock upgrade.

  XXX: why do we regard INSERT DELAYED into a view as an error and
1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
  do not simply perform a lock upgrade?

  TODO: The approach with using two mutexes to work with the
  delayed thread list -- LOCK_delayed_insert and
  LOCK_delayed_create -- is redundant, and we only need one of
  them to protect the list.  The reason we have two locks is that
  we do not want to block look-ups in the list while we're waiting
  for the newly created thread to open the delayed table. However,
  this wait itself is redundant -- we always call get_local_table
  later on, and there wait again until the created thread acquires
  a table lock.

  As is redundant the concept of locks_in_memory, since we already
  have another counter with similar semantics - tables_in_use,
  both of them are devoted to counting the number of producers for
  a given consumer (delayed insert thread), only at different
  stages of producer-consumer relationship.

  'dead' and 'status' variables in Delayed_insert are redundant
  too, since there is already 'di->thd.killed' and
  di->stacked_inserts.
1770 1771
*/

1772 1773
static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1774 1775
{
  int error;
1776
  Delayed_insert *di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1777 1778
  DBUG_ENTER("delayed_get_table");

1779 1780
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1781

1782
  /* Find the thread which handles this table. */
1783
  if (!(di= find_handler(thd, table_list)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1784
  {
1785 1786 1787 1788
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1789
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1790
      DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1791 1792
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1793 1794 1795 1796
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
1797
    if (! (di= find_handler(thd, table_list)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1798
    {
1799
      if (!(di= new Delayed_insert()))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1800
      {
1801
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
1802 1803
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1804
      }
1805 1806 1807
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
1808 1809 1810
      di->thd.set_db(table_list->db, strlen(table_list->db));
      di->thd.query= my_strdup(table_list->table_name, MYF(MY_WME));
      if (di->thd.db == NULL || di->thd.query == NULL)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1811
      {
1812
        /* The error is reported */
1813
	delete di;
1814 1815
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1816
      }
1817
      di->table_list= *table_list;			// Needed to open table
1818
      /* Replace volatile strings with local copies */
1819 1820 1821 1822 1823 1824
      di->table_list.alias= di->table_list.table_name= di->thd.query;
      di->table_list.db= di->thd.db;
      di->lock();
      pthread_mutex_lock(&di->mutex);
      if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
                                 handle_delayed_insert, (void*) di)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1825 1826 1827 1828
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
1829 1830 1831
	pthread_mutex_unlock(&di->mutex);
	di->unlock();
	delete di;
1832
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1833 1834
        thd->fatal_error();
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1835 1836 1837 1838
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
1839
      while (!di->thd.killed && !di->table && !thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1840
      {
1841
	pthread_cond_wait(&di->cond_client, &di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1842
      }
1843
      pthread_mutex_unlock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1844
      thd->proc_info="got old table";
1845
      if (di->thd.killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1846
      {
1847
	if (di->thd.net.report_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1848
	{
1849 1850 1851 1852 1853 1854
          /*
            Copy the error message. Note that we don't treat fatal
            errors in the delayed thread as fatal errors in the
            main thread. Use of my_message will enable stored
            procedures continue handlers.
          */
1855
          my_message(di->thd.net.last_errno, di->thd.net.last_error,
1856
                     MYF(0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1857
	}
1858
	di->unlock();
1859
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1860 1861 1862
      }
      if (thd->killed)
      {
1863
	di->unlock();
1864
        goto end_create;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1865
      }
1866
      pthread_mutex_lock(&LOCK_delayed_insert);
1867
      delayed_threads.append(di);
1868
      pthread_mutex_unlock(&LOCK_delayed_insert);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1869 1870 1871 1872
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

1873 1874 1875
  pthread_mutex_lock(&di->mutex);
  table_list->table= di->get_local_table(thd);
  pthread_mutex_unlock(&di->mutex);
1876 1877
  if (table_list->table)
  {
kostja@vajra.(none)'s avatar
kostja@vajra.(none) committed
1878
    DBUG_ASSERT(thd->net.report_error == 0);
1879
    thd->di= di;
1880
  }
1881
  /* Unlock the delayed insert object after its last access. */
1882
  di->unlock();
1883
  DBUG_RETURN(table_list->table == NULL);
1884

1885
end_create:
1886
  pthread_mutex_unlock(&LOCK_delayed_create);
1887
  DBUG_RETURN(thd->net.report_error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1888 1889 1890
}


1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901
/**
  As we can't let many client threads modify the same TABLE
  structure of the dedicated delayed insert thread, we create an
  own structure for each client thread. This includes a row
  buffer to save the column values and new fields that point to
  the new row buffer. The memory is allocated in the client
  thread and is freed automatically.

  @pre This function is called from the client thread.  Delayed
       insert thread mutex must be acquired before invoking this
       function.
1902 1903 1904

  @return Not-NULL table object on success. NULL in case of an error,
                    which is set in client_thd.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1905 1906
*/

1907
TABLE *Delayed_insert::get_local_table(THD* client_thd)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1908 1909
{
  my_ptrdiff_t adjust_ptrs;
1910
  Field **field,**org_field, *found_next_number_field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1911
  TABLE *copy;
1912
  DBUG_ENTER("Delayed_insert::get_local_table");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
1930
      my_message(thd.net.last_errno, thd.net.last_error, MYF(0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1931 1932 1933 1934
      goto error;
    }
  }

1935 1936 1937 1938 1939 1940 1941
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1942
  client_thd->proc_info="allocating local table";
1943
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
1944 1945
				   (table->s->fields+1)*sizeof(Field**)+
				   table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1946 1947
  if (!copy)
    goto error;
1948 1949

  /* Copy the TABLE object. */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1950
  *copy= *table;
1951 1952 1953
  copy->s= &copy->share_not_to_be_used;
  // No name hashing
  bzero((char*) &copy->s->name_hash,sizeof(copy->s->name_hash));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1954 1955
  /* We don't need to change the file handler here */

1956 1957 1958 1959 1960
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
  copy->record[0]= (byte*) (field + table->s->fields + 1);
  memcpy((char*) copy->record[0], (char*) table->record[0],
         table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1961

1962 1963 1964 1965 1966 1967 1968 1969
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1970

1971 1972
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1973
  {
1974
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
1975
      goto error;
1976
    (*field)->orig_table= copy;			// Remove connection
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1977
    (*field)->move_field(adjust_ptrs);		// Point at copy->record[0]
1978 1979
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1980 1981 1982 1983 1984 1985 1986 1987
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
1988
      (Field_timestamp*) copy->field[table->s->timestamp_field_offset];
1989
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1990
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1991 1992 1993 1994
  }

  /* _rowid is not used with delayed insert */
  copy->rowid_field=0;
1995 1996 1997

  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1998 1999 2000 2001

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

2002
  DBUG_RETURN(copy);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2003 2004 2005 2006 2007 2008

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
2009
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2010 2011 2012 2013 2014
}


/* Put a question in queue */

2015 2016 2017
static
int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool ignore,
                  char *query, uint query_length, bool log_on)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2018 2019
{
  delayed_row *row=0;
2020
  Delayed_insert *di=thd->di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2021 2022 2023 2024 2025 2026 2027 2028
  DBUG_ENTER("write_delayed");

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

2029
  if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2030 2031 2032 2033
    goto err;

  if (!query)
    query_length=0;
2034
  if (!(row->record= (char*) my_malloc(table->s->reclength+query_length+1,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2035 2036
				       MYF(MY_WME))))
    goto err;
2037
  memcpy(row->record, table->record[0], table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2038 2039
  if (query_length)
  {
2040
    row->query= row->record+table->s->reclength;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2041 2042 2043 2044 2045 2046 2047 2048
    memcpy(row->query,query,query_length+1);
  }
  row->query_length=		query_length;
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
  row->last_insert_id_used=	thd->last_insert_id_used;
  row->insert_id_used=		thd->insert_id_used;
  row->last_insert_id=		thd->last_insert_id;
2049
  row->timestamp_field_type=    table->timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2050

2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066
  /* The session variable settings can always be copied. */
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
  /*
    Next insert id must be set for the first value in a multi-row insert
    only. So clear it after the first use. Assume a multi-row insert.
    Since the user thread doesn't really execute the insert,
    thd->next_insert_id is left untouched between the rows. If we copy
    the same insert id to every row of the multi-row insert, the delayed
    insert thread would copy this before inserting every row. Thus it
    tries to insert all rows with the same insert id. This fails on the
    unique constraint. So just the first row would be really inserted.
  */
  row->next_insert_id= thd->next_insert_id;
  thd->next_insert_id= 0;

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2067 2068 2069
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
2070
  if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}

2084 2085 2086 2087
/**
  Signal the delayed insert thread that this user connection
  is finished using it for this statement.
*/
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2088 2089 2090

static void end_delayed_insert(THD *thd)
{
2091
  DBUG_ENTER("end_delayed_insert");
2092
  Delayed_insert *di=thd->di;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2093
  pthread_mutex_lock(&di->mutex);
2094
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2095 2096 2097 2098 2099 2100
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
2101
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2102 2103 2104 2105 2106 2107 2108 2109 2110
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

2111
  I_List_iterator<Delayed_insert> it(delayed_threads);
2112 2113
  Delayed_insert *di;
  while ((di= it++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2114
  {
2115 2116
    di->thd.killed= THD::KILL_CONNECTION;
    if (di->thd.mysys_var)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2117
    {
2118 2119
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      if (di->thd.mysys_var->current_cond)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2120
      {
2121 2122 2123 2124
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
2125 2126 2127 2128 2129
	if (&di->mutex != di->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(di->thd.mysys_var->current_mutex);
	pthread_cond_broadcast(di->thd.mysys_var->current_cond);
	if (&di->mutex != di->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2130
      }
2131
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141
    }
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

2142
pthread_handler_t handle_delayed_insert(void *arg)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2143
{
2144
  Delayed_insert *di=(Delayed_insert*) arg;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2145 2146 2147 2148 2149 2150
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id=thread_id++;
2151
  thd->end_time();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2152
  threads.append(thd);
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2153
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2154 2155
  pthread_mutex_unlock(&LOCK_thread_count);

2156 2157 2158 2159 2160 2161 2162 2163
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2164
  pthread_mutex_lock(&di->mutex);
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2165
#if !defined( __WIN__) && !defined(OS2)	/* Win32 calls this in pthread_create */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2166 2167 2168 2169 2170 2171 2172 2173
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
2174
  thd->thread_stack= (char*) &thd;
2175
  if (init_thr_lock() || thd->store_globals())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2176
  {
2177
    thd->fatal_error();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2178 2179 2180
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2181
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2182 2183 2184 2185 2186 2187 2188
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  /* open table */

2189
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2190
  {
2191
    thd->fatal_error();				// Abort waiting inserts
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2192 2193
    goto end;
  }
2194
  if (!(di->table->file->table_flags() & HA_CAN_INSERT_DELAYED))
2195
  {
2196
    thd->fatal_error();
2197
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
2198 2199
    goto end;
  }
2200 2201 2202 2203 2204 2205 2206 2207 2208
  if (di->table->triggers)
  {
    /*
      Table has triggers. This is not an error, but we do
      not support triggers with delayed insert. Terminate the delayed
      thread without an error and thus request lock upgrade.
    */
    goto end;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2209 2210 2211 2212 2213 2214 2215 2216 2217 2218
  di->table->copy_blobs=1;

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2219
    if (thd->killed == THD::KILL_CONNECTION)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
2239
      set_timespec(abstime, delayed_insert_timeout);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2240 2241 2242 2243

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
2244
      di->thd.proc_info="Waiting for INSERT";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2245

2246
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
2247
      while (!thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2248 2249
      {
	int error;
2250
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2251 2252 2253 2254
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
2255
	if (error && error != EINTR && error != ETIMEDOUT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2256 2257 2258 2259 2260 2261 2262 2263 2264
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
monty@mysql.com's avatar
monty@mysql.com committed
2265
	if (error == ETIMEDOUT || error == ETIME)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2266
	{
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2267
	  thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2268 2269 2270
	  break;
	}
      }
2271 2272
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2273 2274 2275 2276
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
2277
      pthread_mutex_lock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2278
    }
2279
    di->thd.proc_info=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2280 2281 2282

    if (di->tables_in_use && ! thd->lock)
    {
2283
      bool not_used;
2284 2285 2286 2287 2288 2289 2290 2291 2292 2293
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
2294
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2295 2296
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2297
      {
2298 2299 2300
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2301 2302 2303 2304 2305 2306 2307
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
2308 2309 2310
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2311 2312 2313 2314 2315
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
monty@mysql.com's avatar
monty@mysql.com committed
2316 2317 2318 2319
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

end:
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2339 2340
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


2384
bool Delayed_insert::handle_inserts(void)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2385 2386
{
  int error;
2387
  ulong max_rows;
2388 2389
  bool using_ignore= 0, using_opt_replace= 0;
  bool using_bin_log= mysql_bin_log.is_open();
2390
  delayed_row *row;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
2402
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2403 2404 2405 2406
    goto err;
  }

  thd.proc_info="insert";
2407
  max_rows= delayed_insert_limit;
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
2408
  if (thd.killed || table->needs_reopen_or_name_lock())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2409
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2410
    thd.killed= THD::KILL_CONNECTION;
2411
    max_rows= ~(ulong)0;                        // Do as much as possible
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2412 2413
  }

2414 2415 2416 2417 2418 2419 2420
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2421
  pthread_mutex_lock(&mutex);
2422 2423 2424 2425 2426 2427 2428 2429

  /* Reset auto-increment cacheing */
  if (thd.clear_next_insert_id)
  {
    thd.next_insert_id= 0;
    thd.clear_next_insert_id= 0;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2430 2431 2432 2433
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2434
    memcpy(table->record[0],row->record,table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2435 2436 2437 2438 2439 2440

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
    thd.last_insert_id=row->last_insert_id;
    thd.last_insert_id_used=row->last_insert_id_used;
    thd.insert_id_used=row->insert_id_used;
2441
    table->timestamp_field_type= row->timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2442

2443 2444 2445 2446 2447 2448 2449 2450
    /* The session variable settings can always be copied. */
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
    /* Next insert id must be used only if non-zero. */
    if (row->next_insert_id)
      thd.next_insert_id= row->next_insert_id;
    DBUG_PRINT("loop", ("next_insert_id: %lu", (ulong) thd.next_insert_id));

2451
    info.ignore= row->ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2452
    info.handle_duplicates= row->dup;
2453
    if (info.ignore ||
2454
	info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2455 2456 2457 2458
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2459 2460 2461 2462 2463 2464 2465
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
antony@ppcg5.local's avatar
antony@ppcg5.local committed
2466 2467
    if (info.handle_duplicates == DUP_UPDATE)
      table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2468
    thd.clear_error(); // reset error for binlog
2469
    if (write_record(&thd, table, &info))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2470
    {
2471
      info.error_count++;				// Ignore errors
2472
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
sasha@laptop.slkc.uswest.net's avatar
sasha@laptop.slkc.uswest.net committed
2473
      row->log_query = 0;
2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487
      /*
        We must reset next_insert_id. Otherwise all following rows may
        become duplicates. If write_record() failed on a duplicate and
        next_insert_id would be left unchanged, the next rows would also
        be tried with the same insert id and would fail. Since the end
        of a multi-row statement is unknown here, all following rows in
        the queue would be dropped, regardless which thread added them.
        After the queue is used up, next_insert_id is cleared and the
        next run will succeed. This could even happen if these come from
        the same multi-row statement as the current queue contents. That
        way it would look somewhat random which rows are rejected after
        a duplicate.
      */
      thd.next_insert_id= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2488
    }
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2489 2490 2491 2492 2493
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2494 2495 2496 2497 2498
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2499
    if (row->query && row->log_query && using_bin_log)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2500
    {
monty@mysql.com's avatar
monty@mysql.com committed
2501
      Query_log_event qinfo(&thd, row->query, row->query_length, 0, FALSE);
2502
      mysql_bin_log.write(&qinfo);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2503
    }
2504
    if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2505 2506 2507 2508 2509 2510
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2511 2512 2513 2514 2515 2516 2517
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2518
	(!(row->log_query & using_bin_log) ||
2519
	 row->query))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
2533
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2534 2535
	  goto err;
	}
2536
	query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2537 2538 2539
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
2540
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2541
	}
2542 2543
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }

  thd.proc_info=0;
  table->next_number_field=0;
  pthread_mutex_unlock(&mutex);
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
2559
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2560 2561
    goto err;
  }
2562
  query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2563 2564 2565 2566
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2567
  DBUG_EXECUTE("error", max_rows= 0;);
2568 2569 2570 2571 2572 2573
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2574
    DBUG_EXECUTE("error", max_rows++;);
2575
  }
2576
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2577 2578 2579 2580
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2581
#endif /* EMBEDDED_LIBRARY */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2582 2583

/***************************************************************************
2584
  Store records in INSERT ... SELECT *
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2585 2586
***************************************************************************/

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2587 2588 2589 2590 2591 2592 2593 2594 2595

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
2596 2597
    FALSE OK
    TRUE  Error
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2598 2599
*/

2600
bool mysql_insert_select_prepare(THD *thd)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2601 2602
{
  LEX *lex= thd->lex;
monty@mysql.com's avatar
monty@mysql.com committed
2603
  SELECT_LEX *select_lex= &lex->select_lex;
monty@mysql.com's avatar
monty@mysql.com committed
2604
  TABLE_LIST *first_select_leaf_table;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2605
  DBUG_ENTER("mysql_insert_select_prepare");
monty@mysql.com's avatar
monty@mysql.com committed
2606

2607 2608
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
monty@mysql.com's avatar
monty@mysql.com committed
2609
    clause if table is VIEW
2610
  */
monty@mysql.com's avatar
monty@mysql.com committed
2611
  
monty@mysql.com's avatar
monty@mysql.com committed
2612
  if (mysql_prepare_insert(thd, lex->query_tables,
monty@mysql.com's avatar
monty@mysql.com committed
2613 2614 2615
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
2616
                           &select_lex->where, TRUE, FALSE, FALSE))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2617
    DBUG_RETURN(TRUE);
monty@mysql.com's avatar
monty@mysql.com committed
2618

2619 2620 2621 2622
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
monty@mysql.com's avatar
monty@mysql.com committed
2623 2624
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2625
  /* skip all leaf tables belonged to view where we are insert */
2626
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2627 2628 2629 2630
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
2631
       first_select_leaf_table= first_select_leaf_table->next_leaf)
2632
  {}
monty@mysql.com's avatar
monty@mysql.com committed
2633
  select_lex->leaf_tables= first_select_leaf_table;
2634
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2635 2636 2637
}


2638
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
monty@mishka.local's avatar
monty@mishka.local committed
2639
                             List<Item> *fields_par,
monty@mysql.com's avatar
monty@mysql.com committed
2640 2641
                             List<Item> *update_fields,
                             List<Item> *update_values,
monty@mishka.local's avatar
monty@mishka.local committed
2642
                             enum_duplicates duplic,
2643 2644 2645 2646 2647 2648
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
   last_insert_id(0),
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
monty@mishka.local's avatar
monty@mishka.local committed
2649 2650 2651 2652
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2653
  if (table_list_par)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2654
    info.view= (table_list_par->view ? table_list_par : 0);
2655 2656 2657
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
2658
int
2659
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2660
{
2661
  LEX *lex= thd->lex;
2662
  int res;
2663
  table_map map= 0;
2664
  SELECT_LEX *lex_current_select_save= lex->current_select;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2665 2666
  DBUG_ENTER("select_insert::prepare");

2667
  unit= u;
2668 2669 2670 2671 2672 2673
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2674
  res= check_insert_fields(thd, table_list, *fields, values,
2675
                           !insert_into_view, &map) ||
2676
       setup_fields(thd, 0, values, 0, 0, 0);
2677

2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
  if (!res && fields->elements)
  {
    bool saved_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
                                            (MODE_STRICT_TRANS_TABLES |
                                             MODE_STRICT_ALL_TABLES));
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
                                                table_list);
    thd->abort_on_warning= saved_abort_on_warning;
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
2690
  {
2691
    Name_resolution_context *context= &lex->select_lex.context;
2692 2693 2694 2695
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2696 2697

    /* Perform name resolution only in the first table - 'table_list'. */
2698
    table_list->next_local= 0;
2699 2700
    context->resolve_in_table_list_only(table_list);

2701
    lex->select_lex.no_wrap_view_item= TRUE;
2702
    res= res || check_update_fields(thd, context->table_list,
2703
                                    *info.update_fields, &map);
2704 2705
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
2706 2707 2708
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
2709
    */       
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
      table_list->next_name_resolution_table= ctx_state.get_first_name_resolution_table();

2720
    res= res || setup_fields(thd, 0, *info.update_values, 1, 0, 0);
2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
    if (!res)
    {
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
2731

2732 2733 2734 2735 2736 2737
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
                        (byte*)lex->current_select);
      }
    }
2738
    /* Restore the current context. */
2739
    ctx_state.restore_state(context, table_list);
2740
  }
2741

2742 2743
  lex->current_select= lex_current_select_save;
  if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2744
    DBUG_RETURN(1);
2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2755
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2756
      unique_table(thd, table_list, table_list->next_global, 0))
2757 2758
  {
    /* Using same table for INSERT and SELECT */
2759 2760
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2761
  }
2762
  else if (!thd->prelocked_mode)
2763 2764 2765 2766 2767 2768 2769
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2770 2771
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2772 2773 2774
    */
    table->file->start_bulk_insert((ha_rows) 0);
  }
2775
  restore_record(table,s->default_values);		// Get empty record
2776
  table->next_number_field=table->found_next_number_field;
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
2777 2778 2779 2780 2781 2782 2783 2784 2785

#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    DBUG_RETURN(1);
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2786
  thd->cuted_fields=0;
monty@mysql.com's avatar
monty@mysql.com committed
2787 2788
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2789 2790 2791 2792 2793 2794
  if (info.handle_duplicates == DUP_REPLACE)
  {
    if (!table->triggers || !table->triggers->has_delete_triggers())
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
    table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
  }
antony@ppcg5.local's avatar
antony@ppcg5.local committed
2795 2796
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2797
  thd->no_trans_update.stmt= FALSE;
monty@mysql.com's avatar
monty@mysql.com committed
2798
  thd->abort_on_warning= (!info.ignore &&
2799 2800 2801
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2802
  res= (table_list->prepare_where(thd, 0, TRUE) ||
2803
        table_list->prepare_check_option(thd));
2804 2805

  if (!res)
2806 2807
    prepare_triggers_for_insert_stmt(thd, table,
                                     info.handle_duplicates);
2808
  DBUG_RETURN(res);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2809 2810
}

2811

2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2831 2832
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2833
    table->file->start_bulk_insert((ha_rows) 0);
monty@mysql.com's avatar
monty@mysql.com committed
2834
  DBUG_RETURN(0);
2835 2836 2837
}


2838 2839 2840 2841 2842 2843
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2844 2845
select_insert::~select_insert()
{
2846
  DBUG_ENTER("~select_insert");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2847 2848 2849
  if (table)
  {
    table->next_number_field=0;
2850
    table->auto_increment_field_not_null= FALSE;
2851
    table->file->reset();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2852
  }
2853
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
2854
  thd->abort_on_warning= 0;
2855
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2856 2857 2858 2859 2860
}


bool select_insert::send_data(List<Item> &values)
{
2861
  DBUG_ENTER("select_insert::send_data");
serg@serg.mylan's avatar
serg@serg.mylan committed
2862
  bool error=0;
2863
  if (unit->offset_limit_cnt)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2864
  {						// using limit offset,count
2865
    unit->offset_limit_cnt--;
2866
    DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2867
  }
monty@mysql.com's avatar
monty@mysql.com committed
2868

monty@mysql.com's avatar
monty@mysql.com committed
2869
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
serg@serg.mylan's avatar
serg@serg.mylan committed
2870 2871
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
monty@mysql.com's avatar
monty@mysql.com committed
2872 2873
  if (thd->net.report_error)
    DBUG_RETURN(1);
monty@mysql.com's avatar
monty@mysql.com committed
2874 2875
  if (table_list)                               // Not CREATE ... SELECT
  {
monty@mysql.com's avatar
monty@mysql.com committed
2876
    switch (table_list->view_check_option(thd, info.ignore)) {
monty@mysql.com's avatar
monty@mysql.com committed
2877 2878 2879 2880 2881
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
2882
  }
2883
  if (!(error= write_record(thd, table, &info)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2884
  {
2885
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
2886 2887
    {
      /*
2888 2889 2890
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
      if (!last_insert_id && thd->insert_id_used)
2905
        last_insert_id= thd->last_insert_id;
2906
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2907
  }
serg@serg.mylan's avatar
serg@serg.mylan committed
2908
  DBUG_RETURN(error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2909 2910 2911
}


serg@serg.mylan's avatar
serg@serg.mylan committed
2912 2913 2914
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
2915 2916
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
2917
  else
2918 2919
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
2920 2921
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2922 2923
void select_insert::send_error(uint errcode,const char *err)
{
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2924 2925
  DBUG_ENTER("select_insert::send_error");

2926
  my_message(errcode, err, MYF(0));
2927

monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2928
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2929 2930 2931 2932 2933
}


bool select_insert::send_eof()
{
2934
  int error,error2;
2935 2936
  DBUG_ENTER("select_insert::send_eof");

2937
  error= (!thd->prelocked_mode) ? table->file->end_bulk_insert():0;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2938
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2939
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2940

2941 2942
  /*
    We must invalidate the table in the query cache before binlog writing
2943
    and ha_autocommit_or_rollback
2944
  */
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2945

vva@eagle.mysql.r18.ru's avatar
vva@eagle.mysql.r18.ru committed
2946
  if (info.copied || info.deleted || info.updated)
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2947
  {
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2948
    query_cache_invalidate3(thd, table, 1);
2949
    if (!(table->file->has_transactions() || table->s->tmp_table))
2950
      thd->no_trans_update.all= TRUE;
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2951
  }
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2952

2953
  if (last_insert_id)
2954
    thd->insert_id(info.copied ? last_insert_id : 0);		// For binary log
2955 2956 2957
  /* Write to binlog before commiting transaction */
  if (mysql_bin_log.is_open())
  {
guilhem@mysql.com's avatar
guilhem@mysql.com committed
2958 2959
    if (!error)
      thd->clear_error();
2960
    Query_log_event qinfo(thd, thd->query, thd->query_length,
2961
			  table->file->has_transactions(), FALSE);
2962 2963
    mysql_bin_log.write(&qinfo);
  }
2964 2965 2966
  if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
    error=error2;
  if (error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2967 2968
  {
    table->file->print_error(error,MYF(0));
2969
    DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2970
  }
2971
  char buff[160];
2972
  if (info.ignore)
2973 2974
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2975
  else
2976
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
monty@mysql.com's avatar
monty@mysql.com committed
2977
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2978 2979 2980
  thd->row_count_func= info.copied + info.deleted +
                       ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                        info.touched : info.updated);
2981
  ::send_ok(thd, (ulong) thd->row_count_func, last_insert_id, buff);
2982
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2983 2984
}

2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027
void select_insert::abort()
{
  DBUG_ENTER("select_insert::abort");

  if (!table)
  {
    /*
      This can only happen when using CREATE ... SELECT and the table was not
      created becasue of an syntax error
    */
    DBUG_VOID_RETURN;
  }
  if (!thd->prelocked_mode)
    table->file->end_bulk_insert();
  /*
    If at least one row has been inserted/modified and will stay in the table
    (the table doesn't have transactions) (example: we got a duplicate key
    error while inserting into a MyISAM table) we must write to the binlog (and
    the error code will make the slave stop).
  */
  if ((info.copied || info.deleted || info.updated) &&
      !table->file->has_transactions())
  {
    if (last_insert_id)
      thd->insert_id(last_insert_id);		// For binary log
    if (mysql_bin_log.is_open())
    {
      Query_log_event qinfo(thd, thd->query, thd->query_length,
                            table->file->has_transactions(), FALSE);
      mysql_bin_log.write(&qinfo);
    }
    if (!table->s->tmp_table)
      thd->no_trans_update.all= TRUE;
  }
  if (info.copied || info.deleted || info.updated)
  {
    query_cache_invalidate3(thd, table, 1);
  }
  ha_rollback_stmt(thd);

  DBUG_VOID_RETURN;

}
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3028 3029

/***************************************************************************
3030
  CREATE TABLE (SELECT) ...
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3031 3032
***************************************************************************/

3033
/*
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3034 3035
  Create table from lists of fields and items (or just return TABLE
  object for pre-opened existing table).
3036 3037 3038 3039 3040 3041 3042 3043

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
3044 3045
      alter_info   in/out Initial list of columns and indexes for the table
                          to be created
3046 3047
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
3048
                          be added to the end of alter_info->create_list)
3049
      lock         out    Pointer to the MYSQL_LOCK object for table created
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3050 3051 3052 3053
                          (or open temporary table) will be returned in this
                          parameter. Since this table is not included in
                          THD::lock caller is responsible for explicitly
                          unlocking this table.
3054 3055

  NOTES
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067
    This function behaves differently for base and temporary tables:
    - For base table we assume that either table exists and was pre-opened
      and locked at open_and_lock_tables() stage (and in this case we just
      emit error or warning and return pre-opened TABLE object) or special
      placeholder was put in table cache that guarantees that this table
      won't be created or opened until the placeholder will be removed
      (so there is an exclusive lock on this table).
    - We don't pre-open existing temporary table, instead we either open
      or create and then open table in this function.

    Since this function contains some logic specific to CREATE TABLE ...
    SELECT it should be changed before it can be used in other contexts.
3068 3069 3070 3071 3072 3073 3074 3075

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
3076 3077
                                      Alter_info *alter_info,
                                      List<Item> *items,
3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089
                                      MYSQL_LOCK **lock)
{
  TABLE tmp_table;		// Used during 'create_field()'
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108
  DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););

  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
      create_table->table->db_stat)
  {
    /* Table already exists and was open at open_and_lock_tables() stage. */
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
    {
      create_info->table_existed= 1;		// Mark that table existed
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          create_table->table_name);
      DBUG_RETURN(create_table->table);
    }

    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
    DBUG_RETURN(0);
  }

3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120
  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
  tmp_table.s= &tmp_table.share_not_to_be_used;
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
  tmp_table.s->db_low_byte_first= test(create_info->db_type == DB_TYPE_MYISAM ||
                                       create_info->db_type == DB_TYPE_HEAP);
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
3121
    Field *field, *def_field;
3122
    if (item->type() == Item::FUNC_ITEM)
3123
      field= item->tmp_table_field(&tmp_table);
3124
    else
3125 3126 3127
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
3128 3129 3130 3131 3132 3133 3134
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
3135
    alter_info->create_list.push_back(cr_field);
3136
  }
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3137 3138 3139

  DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););

3140
  /*
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3141 3142 3143 3144 3145
    Create and lock table.

    Note that we either creating (or opening existing) temporary table or
    creating base table on which name we have exclusive lock. So code below
    should not cause deadlocks or races.
3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
3159
                            create_info, alter_info, 0, select_field_count))
3160 3161
    {

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202
      if (create_info->table_existed &&
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        /*
          This means that someone created table underneath server
          or it was created via different mysqld front-end to the
          cluster. We don't have much options but throw an error.
        */
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
        DBUG_RETURN(0);
      }

      DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););

      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
      {
        VOID(pthread_mutex_lock(&LOCK_open));
        if (reopen_name_locked_table(thd, create_table, FALSE))
        {
          quick_rm_table(create_info->db_type, create_table->db,
                         table_case_name(create_info,
                                         create_table->table_name));
        }
        else
          table= create_table->table;
        VOID(pthread_mutex_unlock(&LOCK_open));
      }
      else
      {
        if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                                MYSQL_OPEN_TEMPORARY_ONLY)) &&
            !create_info->table_existed)
        {
          /*
            This shouldn't happen as creation of temporary table should make
            it preparable for open. But let us do close_temporary_table() here
            just in case.
          */
          close_temporary_table(thd, create_table->db, create_table->table_name);
        }
      }
3203 3204 3205 3206 3207 3208
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3209 3210
  DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););

3211 3212 3213 3214
  table->reginfo.lock_type=TL_WRITE;
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3215 3216
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
3217 3218 3219 3220 3221 3222
    DBUG_RETURN(0);
  }
  DBUG_RETURN(table);
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
3223
int
3224
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3225 3226 3227
{
  DBUG_ENTER("select_create::prepare");

3228
  unit= u;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
3229
  table= create_table_from_items(thd, create_info, create_table,
3230
                                 alter_info, &values, &lock);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3231 3232 3233
  if (!table)
    DBUG_RETURN(-1);				// abort() deletes table

3234
  if (table->s->fields < values.elements)
3235
  {
3236
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
3237 3238 3239
    DBUG_RETURN(-1);
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3240
  /* First field to copy */
3241 3242 3243 3244 3245
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
    (*f)->query_id= thd->query_id;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3246

3247
  /* Don't set timestamp if used */
3248
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
serg@serg.mylan's avatar
serg@serg.mylan committed
3249

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3250 3251
  table->next_number_field=table->found_next_number_field;

3252
  restore_record(table,s->default_values);      // Get empty record
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3253
  thd->cuted_fields=0;
3254
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3255
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3256 3257 3258 3259 3260 3261
  if (info.handle_duplicates == DUP_REPLACE)
  {
    if (!table->triggers || !table->triggers->has_delete_triggers())
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
    table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
  }
antony@ppcg5.local's avatar
antony@ppcg5.local committed
3262 3263
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3264 3265
  if (!thd->prelocked_mode)
    table->file->start_bulk_insert((ha_rows) 0);
3266
  thd->no_trans_update.stmt= FALSE;
monty@mysql.com's avatar
monty@mysql.com committed
3267
  thd->abort_on_warning= (!info.ignore &&
3268 3269 3270
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3271 3272 3273 3274
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3275 3276 3277
}


serg@serg.mylan's avatar
serg@serg.mylan committed
3278
void select_create::store_values(List<Item> &values)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3279
{
3280 3281
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3282 3283
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3284

3285 3286 3287
void select_create::send_error(uint errcode,const char *err)
{
  select_insert::send_error(errcode, err);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3288 3289
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3290

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3291 3292 3293 3294 3295 3296 3297
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3298
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3299
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3300
    if (lock)
3301
    {
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3302 3303
      mysql_unlock_tables(thd, lock);
      lock= 0;
3304
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3305 3306 3307 3308
  }
  return tmp;
}

dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3309

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3310 3311
void select_create::abort()
{
3312 3313 3314 3315 3316 3317 3318 3319
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::abort();
  reenable_binlog(thd);

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3320 3321 3322 3323 3324 3325 3326
  if (lock)
  {
    mysql_unlock_tables(thd, lock);
    lock=0;
  }
  if (table)
  {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3327
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3328
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
dlenev@mockturtle.local's avatar
dlenev@mockturtle.local committed
3329 3330
    if (!create_info->table_existed)
      drop_open_table(thd, table, create_table->db, create_table->table_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3331 3332 3333 3334 3335 3336
    table=0;
  }
}


/*****************************************************************************
3337
  Instansiate templates
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3338 3339
*****************************************************************************/

3340
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3341
template class List_iterator_fast<List_item>;
3342
#ifndef EMBEDDED_LIBRARY
3343 3344
template class I_List<Delayed_insert>;
template class I_List_iterator<Delayed_insert>;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3345
template class I_List<delayed_row>;
3346
#endif /* EMBEDDED_LIBRARY */
3347
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */