sql_insert.cc 90 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
unknown's avatar
unknown committed
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
12

unknown's avatar
unknown committed
13 14 15 16 17 18 19 20
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

#include "mysql_priv.h"
21 22
#include "sp_head.h"
#include "sql_trigger.h"
23
#include "sql_select.h"
24
#include "sql_show.h"
unknown's avatar
unknown committed
25 26

static int check_null_fields(THD *thd,TABLE *entry);
unknown's avatar
unknown committed
27
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
28
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
29
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
30
			 char *query, uint query_length, bool log_on);
unknown's avatar
unknown committed
31
static void end_delayed_insert(THD *thd);
32
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
33
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
34
#endif
35
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
36 37 38 39 40 41 42 43 44 45 46

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

47

unknown's avatar
unknown committed
48
/*
49
  Check if insert fields are correct.
50 51 52 53 54 55 56

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
57
    check_unique                If duplicate values should be rejected.
58 59 60 61 62 63 64 65 66

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
67 68
*/

unknown's avatar
unknown committed
69 70 71
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
                               bool check_unique)
unknown's avatar
unknown committed
72
{
unknown's avatar
VIEW  
unknown committed
73
  TABLE *table= table_list->table;
74

75 76 77 78 79 80
  if (!table_list->updatable)
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
81 82
  if (fields.elements == 0 && values.elements != 0)
  {
83 84 85 86 87 88
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
89
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
90
    {
unknown's avatar
unknown committed
91
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
92 93
      return -1;
    }
unknown's avatar
unknown committed
94
#ifndef NO_EMBEDDED_ACCESS_CHECKS
95
    if (grant_option)
unknown's avatar
VIEW  
unknown committed
96 97 98
    {
      Field_iterator_table fields;
      fields.set_table(table);
99
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
unknown's avatar
unknown committed
100
                                  table->s->db.str, table->s->table_name.str,
unknown's avatar
VIEW  
unknown committed
101 102 103
                                  &fields))
        return -1;
    }
unknown's avatar
unknown committed
104
#endif
105 106
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
107 108 109 110
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
111
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
112 113 114
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
115 116
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
117
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
118
    int res;
unknown's avatar
unknown committed
119

unknown's avatar
unknown committed
120 121
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
122
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
123 124 125
      return -1;
    }

126
    thd->dup_field= 0;
unknown's avatar
unknown committed
127 128 129
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
130
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
131 132 133 134 135

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
136
    table_list->next_local= 0;
137
    context->resolve_in_table_list_only(table_list);
138
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
139 140

    /* Restore the current context. */
141
    ctx_state.restore_state(context, table_list);
142
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
143

unknown's avatar
unknown committed
144
    if (res)
unknown's avatar
unknown committed
145
      return -1;
unknown's avatar
unknown committed
146

unknown's avatar
unknown committed
147
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
148 149 150 151
    {
      /* it is join view => we need to find table for update */
      List_iterator_fast<Item> it(fields);
      Item *item;
unknown's avatar
unknown committed
152
      TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
153
      table_map map= 0;
unknown's avatar
unknown committed
154 155

      while ((item= it++))
156
        map|= item->used_tables();
unknown's avatar
unknown committed
157
      if (table_list->check_single_table(&tbl, map, table_list) || tbl == 0)
158 159 160 161 162 163 164
      {
        my_error(ER_VIEW_MULTIUPDATE, MYF(0),
                 table_list->view_db.str, table_list->view_name.str);
        return -1;
      }
      table_list->table= table= tbl->table;
    }
165

166
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
167
    {
168
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
169 170
      return -1;
    }
171 172 173 174 175 176 177 178 179 180 181 182
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
183
  }
184 185 186
  if (table->found_next_number_field)
    table->mark_auto_increment_column();
  table->mark_columns_needed_for_insert();
unknown's avatar
unknown committed
187
  // For the values we need select_priv
unknown's avatar
unknown committed
188
#ifndef NO_EMBEDDED_ACCESS_CHECKS
189
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
unknown committed
190
#endif
191 192 193

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
194
       check_view_insertability(thd, table_list)))
195 196 197 198 199
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
200 201 202 203
  return 0;
}


204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
223
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
224 225
                               List<Item> &update_fields)
{
unknown's avatar
unknown committed
226
  TABLE *table= insert_table_list->table;
227
  my_bool timestamp_mark;
228 229 230

  if (table->timestamp_field)
  {
231 232 233 234 235 236
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
237 238
  }

239 240
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
241 242 243 244 245
    return -1;

  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
246 247
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
248 249
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
250 251 252
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
253 254 255 256 257
  }
  return 0;
}


unknown's avatar
unknown committed
258 259 260 261 262
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
263 264
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
265
{
266
  int error, res;
267 268 269 270 271
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
272 273
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
                (!(thd->security_ctx->master_access & SUPER_ACL)));
274
  bool transactional_table, joins_freed= FALSE;
275
  bool changed;
unknown's avatar
unknown committed
276 277 278 279
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
280
  TABLE *table= 0;
unknown's avatar
unknown committed
281
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
282
  List_item *values;
unknown's avatar
unknown committed
283
  Name_resolution_context *context;
284
  Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
285 286 287
#ifndef EMBEDDED_LIBRARY
  char *query= thd->query;
#endif
unknown's avatar
unknown committed
288
  thr_lock_type lock_type = table_list->lock_type;
unknown's avatar
unknown committed
289
  Item *unused_conds= 0;
unknown's avatar
unknown committed
290 291
  DBUG_ENTER("mysql_insert");

292 293 294 295 296
  /*
    in safe mode or with skip-new change delayed insert to be regular
    if we are told to replace duplicates, the insert cannot be concurrent
    delayed insert changed to regular in slave thread
   */
297 298 299 300
#ifdef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    lock_type=TL_WRITE;
#else
unknown's avatar
unknown committed
301 302
  if ((lock_type == TL_WRITE_DELAYED &&
       ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
303
	thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
304 305
      (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
      (duplic == DUP_UPDATE))
unknown's avatar
unknown committed
306
    lock_type=TL_WRITE;
307
#endif
308
  table_list->lock_type= lock_type;
unknown's avatar
unknown committed
309

310
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
311 312 313 314 315 316
  if (lock_type == TL_WRITE_DELAYED)
  {
    if (thd->locked_tables)
    {
      if (find_locked_table(thd,
			    table_list->db ? table_list->db : thd->db,
317
			    table_list->table_name))
unknown's avatar
unknown committed
318
      {
319
	my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
320
                 table_list->table_name);
unknown's avatar
unknown committed
321
	DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
322 323
      }
    }
324
    if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
325
    {
326 327 328 329 330
      /*
        Open tables used for sub-selects or in stored functions, will also
        cache these functions.
      */
      res= open_and_lock_tables(thd, table_list->next_global);
unknown's avatar
VIEW  
unknown committed
331 332 333 334 335 336
      /*
	First is not processed by open_and_lock_tables() => we need set
	updateability flags "by hands".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
337
    }
338
    else
339
    {
340 341
      /* Too many delayed insert threads;  Use a normal insert */
      table_list->lock_type= lock_type= TL_WRITE;
342
      res= open_and_lock_tables(thd, table_list);
343
    }
unknown's avatar
unknown committed
344 345
  }
  else
346
#endif /* EMBEDDED_LIBRARY */
347
    res= open_and_lock_tables(thd, table_list);
348
  if (res || thd->is_fatal_error)
unknown's avatar
unknown committed
349
    DBUG_RETURN(TRUE);
350

unknown's avatar
unknown committed
351
  thd->proc_info="init";
352
  thd->used_tables=0;
unknown's avatar
unknown committed
353
  values= its++;
unknown's avatar
unknown committed
354

unknown's avatar
VIEW  
unknown committed
355
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
356 357
			   update_fields, update_values, duplic, &unused_conds,
                           FALSE))
unknown's avatar
unknown committed
358
    goto abort;
359

360 361 362
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

unknown's avatar
unknown committed
363 364
  context= &thd->lex->select_lex.context;
  /* Save the state of the current name resolution context. */
365
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
366 367 368 369 370

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
371
  table_list->next_local= 0;
unknown's avatar
unknown committed
372 373
  context->resolve_in_table_list_only(table_list);

unknown's avatar
unknown committed
374
  value_count= values->elements;
375
  while ((values= its++))
unknown's avatar
unknown committed
376 377 378 379
  {
    counter++;
    if (values->elements != value_count)
    {
380
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
381 382
      goto abort;
    }
383
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
384 385 386
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
387 388
 
  /* Restore the current context. */
389
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
390

unknown's avatar
unknown committed
391
  /*
unknown's avatar
unknown committed
392
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
393
  */
unknown's avatar
unknown committed
394
  info.records= info.deleted= info.copied= info.updated= 0;
395
  info.ignore= ignore;
unknown's avatar
unknown committed
396
  info.handle_duplicates=duplic;
397 398
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
399
  info.view= (table_list->view ? table_list : 0);
400

401 402 403
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
404
    to NULL.
405
  */
unknown's avatar
unknown committed
406
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
407
                             !ignore) ?
408 409
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
410 411 412 413 414 415
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

  error=0;
  id=0;
  thd->proc_info="update";
unknown's avatar
unknown committed
416
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
417
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
418 419 420 421
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
422 423 424 425
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
426 427 428 429
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
430
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
431
    table->file->ha_start_bulk_insert(values_list.elements);
432

unknown's avatar
unknown committed
433
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
434
  thd->abort_on_warning= (!ignore &&
unknown's avatar
unknown committed
435 436 437 438
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));

439
  if ((fields.elements || !value_count) &&
440
      check_that_all_fields_are_given_values(thd, table, table_list))
unknown's avatar
unknown committed
441 442 443 444 445
  {
    /* thd->net.report_error is now set, which will abort the next loop */
    error= 1;
  }

446 447 448 449
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
450
  while ((values= its++))
unknown's avatar
unknown committed
451 452 453
  {
    if (fields.elements || !value_count)
    {
454
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
455 456 457
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
458
      {
unknown's avatar
unknown committed
459
	if (values_list.elements != 1 && !thd->net.report_error)
unknown's avatar
unknown committed
460 461 462 463
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
464 465 466 467 468
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
469 470 471 472 473 474
	error=1;
	break;
      }
    }
    else
    {
475
      if (thd->used_tables)			// Column used in values()
476
	restore_record(table,s->default_values);	// Get empty record
477
      else
unknown's avatar
unknown committed
478 479 480 481 482 483 484 485 486 487 488
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
489
      {
unknown's avatar
unknown committed
490
	if (values_list.elements != 1 && ! thd->net.report_error)
unknown's avatar
unknown committed
491 492 493 494 495 496 497 498
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
499

500 501 502
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
503
					     ignore))) ==
unknown's avatar
unknown committed
504 505 506
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
507
    {
unknown's avatar
unknown committed
508 509
      error= 1;
      break;
unknown's avatar
unknown committed
510
    }
511
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
512
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
513
    {
514
      error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
unknown's avatar
unknown committed
515 516 517
      query=0;
    }
    else
518
#endif
unknown's avatar
unknown committed
519
      error=write_record(thd, table ,&info);
unknown's avatar
unknown committed
520 521 522 523 524 525 526 527 528 529
    /*
      If auto_increment values are used, save the first one
       for LAST_INSERT_ID() and for the update log.
       We can't use insert_id() as we don't want to touch the
       last_insert_id_used flag.
    */
    if (! id && thd->insert_id_used)
    {						// Get auto increment value
      id= thd->last_insert_id;
    }
530 531
    if (error)
      break;
532
    thd->row_count++;
unknown's avatar
unknown committed
533
  }
534

535 536 537
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

538 539 540 541
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
542
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
543 544
  if (lock_type == TL_WRITE_DELAYED)
  {
545 546 547 548 549 550
    if (!error)
    {
      id=0;					// No auto_increment id
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
551
    query_cache_invalidate3(thd, table_list, 1);
unknown's avatar
unknown committed
552 553
  }
  else
554
#endif
unknown's avatar
unknown committed
555
  {
556
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
557
    {
unknown's avatar
unknown committed
558 559
      table->file->print_error(my_errno,MYF(0));
      error=1;
560
    }
unknown's avatar
unknown committed
561 562
    if (id && values_list.elements != 1)
      thd->insert_id(id);			// For update log
563
    else if (table->next_number_field && info.copied)
unknown's avatar
unknown committed
564
      id=table->next_number_field->val_int();	// Return auto_increment value
unknown's avatar
unknown committed
565

566
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
567

568
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
569
    {
570 571 572 573 574 575 576
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
577
      {
578
        if (mysql_bin_log.is_open())
579
        {
580 581
          if (error <= 0)
            thd->clear_error();
unknown's avatar
unknown committed
582 583 584 585 586
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
                                transactional_table, FALSE) &&
              transactional_table)
          {
587
            error=1;
unknown's avatar
unknown committed
588
          }
589
        }
590 591
        if (!transactional_table)
          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
592
      }
unknown's avatar
unknown committed
593
    }
594
    if (transactional_table)
595
      error=ha_autocommit_or_rollback(thd,error);
unknown's avatar
unknown committed
596

unknown's avatar
unknown committed
597 598 599
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
600 601 602 603 604 605 606 607 608 609
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
unknown's avatar
unknown committed
610 611 612 613 614
      thd->lock=0;
    }
  }
  thd->proc_info="end";
  table->next_number_field=0;
615
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
616
  thd->next_insert_id=0;			// Reset this if wrongly used
unknown's avatar
unknown committed
617
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
618
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
619 620 621 622 623 624 625

  /* Reset value of LAST_INSERT_ID if no rows where inserted */
  if (!info.copied && thd->insert_id_used)
  {
    thd->insert_id(0);
    id=0;
  }
unknown's avatar
unknown committed
626 627 628 629
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
630 631
  {
    thd->row_count_func= info.copied+info.deleted+info.updated;
632
    send_ok(thd, (ulong) thd->row_count_func, id);
633
  }
634 635
  else
  {
unknown's avatar
unknown committed
636
    char buff[160];
637
    if (ignore)
638 639 640
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
641
    else
642
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
643
	      (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
644
    thd->row_count_func= info.copied+info.deleted+info.updated;
645
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
646
  }
647 648
  if (table != NULL)
    table->file->release_auto_increment();
649
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
650
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
651 652

abort:
653
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
654 655
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
656
#endif
657 658
  if (table != NULL)
    table->file->release_auto_increment();
659 660
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
661
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
662
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
663 664 665
}


unknown's avatar
VIEW  
unknown committed
666 667 668 669 670
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
671
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
672 673
    view    - reference on VIEW

674 675 676 677 678 679
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
680 681
  RETURN
    FALSE - OK
682 683 684
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
685 686 687
    TRUE  - can't be used for insert
*/

688
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
689
{
690
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
691
  TABLE *table= view->table;
692 693 694
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
VIEW  
unknown committed
695
  Field **field_ptr= table->field;
unknown's avatar
unknown committed
696
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
697
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
698
  MY_BITMAP used_fields;
699 700
  DBUG_ENTER("check_key_in_view");

701 702 703
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
704 705
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

706
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
707 708
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
709 710
  view->contain_auto_increment= 0;
  /* check simplicity and prepare unique test of view */
711
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
712
  {
713 714
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
      return TRUE;
715
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
716
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
717
    if (!(field= trans->item->filed_for_view_update()))
unknown's avatar
VIEW  
unknown committed
718
      DBUG_RETURN(TRUE);
719
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
720 721
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
722 723 724 725 726
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
727 728
  }
  /* unique test */
729
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
730
  {
731
    /* Thanks to test above, we know that all columns are of type Item_field */
732
    Item_field *field= (Item_field *)trans->item;
733 734 735
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
736 737 738 739 740 741 742
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
743
/*
744
  Check if table can be updated
unknown's avatar
unknown committed
745 746

  SYNOPSIS
747 748
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
749
     table_list		Table list
750 751
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
752
     select_insert      Check is making for SELECT ... INSERT
753 754

   RETURN
unknown's avatar
unknown committed
755 756
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
757
*/
758

unknown's avatar
unknown committed
759
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
760
                                             List<Item> &fields,
unknown's avatar
unknown committed
761
                                             bool select_insert)
unknown's avatar
unknown committed
762
{
unknown's avatar
VIEW  
unknown committed
763
  bool insert_into_view= (table_list->view != 0);
764
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
765

766 767
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
768
                                    table_list,
769 770
                                    &thd->lex->select_lex.leaf_tables,
                                    select_insert, INSERT_ACL))
unknown's avatar
unknown committed
771
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
772 773 774 775

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
776 777 778 779
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
780
      DBUG_RETURN(TRUE);
781
    }
782
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
783 784
  }

unknown's avatar
unknown committed
785
  DBUG_RETURN(FALSE);
786 787 788 789 790 791 792 793 794 795
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
796 797
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
798 799
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
800

unknown's avatar
unknown committed
801 802 803 804 805
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
806

807 808 809
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
810
  
811
  RETURN VALUE
unknown's avatar
unknown committed
812 813
    FALSE OK
    TRUE  error
814 815
*/

unknown's avatar
unknown committed
816
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
817
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
818
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
819 820
                          enum_duplicates duplic,
                          COND **where, bool select_insert)
821
{
unknown's avatar
unknown committed
822
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
823
  Name_resolution_context *context= &select_lex->context;
824
  Name_resolution_context_state ctx_state;
825
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
826
  bool res= 0;
827
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
828 829 830
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
unknown's avatar
unknown committed
831

832 833 834 835 836 837 838
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
839
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
840 841 842 843 844 845 846 847 848 849 850 851
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
852
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
853 854
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
855
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
856
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
857
  }
unknown's avatar
unknown committed
858

859
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
860
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
861

unknown's avatar
unknown committed
862
  /* Save the state of the current name resolution context. */
863
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
864

unknown's avatar
unknown committed
865 866 867 868
  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
unknown's avatar
unknown committed
869
  table_list->next_local= 0;
unknown's avatar
unknown committed
870 871 872
  context->resolve_in_table_list_only(table_list);

  /* Prepare the fields in the statement. */
873
  if (values &&
874 875
      !(res= check_insert_fields(thd, context->table_list, fields, *values,
                                 !insert_into_view) ||
876
        setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) &&
877
      duplic == DUP_UPDATE)
unknown's avatar
unknown committed
878
  {
unknown's avatar
unknown committed
879
    select_lex->no_wrap_view_item= TRUE;
unknown's avatar
unknown committed
880
    res= check_update_fields(thd, context->table_list, update_fields);
unknown's avatar
unknown committed
881
    select_lex->no_wrap_view_item= FALSE;
882 883 884 885
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part.
    */       
unknown's avatar
unknown committed
886 887
    if (select_lex->group_list.elements == 0)
    {
888
      context->table_list->next_local=       ctx_state.save_next_local;
889
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
unknown's avatar
unknown committed
890
      context->first_name_resolution_table->
891
        next_name_resolution_table=          ctx_state.save_next_local;
unknown's avatar
unknown committed
892 893
    }
    if (!res)
894
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
895
  }
unknown's avatar
unknown committed
896 897

  /* Restore the current context. */
898
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
899

unknown's avatar
unknown committed
900 901
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
902

unknown's avatar
unknown committed
903 904 905
  if (!table)
    table= table_list->table;

906
  if (!select_insert)
unknown's avatar
unknown committed
907
  {
908
    Item *fake_conds= 0;
909
    TABLE_LIST *duplicate;
910
    if ((duplicate= unique_table(thd, table_list, table_list->next_global)))
911
    {
912
      update_non_unique_table_error(table_list, "INSERT", duplicate);
913 914
      DBUG_RETURN(TRUE);
    }
unknown's avatar
unknown committed
915 916
    select_lex->fix_prepare_information(thd, &fake_conds);
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
917
  }
918
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
919
    table->prepare_for_position();
unknown's avatar
unknown committed
920
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
921 922 923
}


unknown's avatar
unknown committed
924 925 926 927
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
928
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
929 930 931 932 933 934 935
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
936 937 938 939 940 941 942 943 944 945
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
946

unknown's avatar
unknown committed
947 948 949 950 951
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
952

unknown's avatar
unknown committed
953 954 955 956 957 958
    Sets thd->no_trans_update if table which is updated didn't have
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
959 960 961
*/


unknown's avatar
unknown committed
962
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
963
{
unknown's avatar
unknown committed
964
  int error, trg_error= 0;
unknown's avatar
unknown committed
965
  char *key=0;
966
  MY_BITMAP *save_read_set, *save_write_set;
967
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
968

unknown's avatar
unknown committed
969
  info->records++;
970 971
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
972

973 974
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
975
  {
976
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
977
    {
978
      uint key_nr;
unknown's avatar
unknown committed
979
      if (error != HA_WRITE_SKIP)
unknown's avatar
unknown committed
980
	goto err;
981
      table->file->restore_auto_increment(); // it's too early here! BUG#20188
unknown's avatar
unknown committed
982 983
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
unknown's avatar
unknown committed
984
	error=HA_WRITE_SKIP;			/* Database can't find key */
unknown's avatar
unknown committed
985 986
	goto err;
      }
987 988
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
989 990 991 992 993
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
994 995
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
996
          key_nr == table->s->next_number_index &&
997 998
	  table->file->auto_increment_column_changed)
	goto err;
999
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1000
      {
1001
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1002 1003 1004 1005
	  goto err;
      }
      else
      {
1006
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1007 1008 1009 1010
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1011

unknown's avatar
unknown committed
1012 1013
	if (!key)
	{
1014
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1015 1016 1017 1018 1019 1020
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1021
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
unknown's avatar
unknown committed
1022
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1023
						(byte*) key,
unknown's avatar
unknown committed
1024 1025
						table->key_info[key_nr].
						key_length,
unknown's avatar
unknown committed
1026 1027 1028
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1029
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1030
      {
unknown's avatar
unknown committed
1031
        int res= 0;
unknown's avatar
unknown committed
1032 1033 1034 1035
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1036
        */
unknown's avatar
unknown committed
1037
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1038 1039
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1040 1041
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1042 1043 1044 1045 1046
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
                                                 *info->update_values, 0,
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1047 1048

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1049 1050 1051
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1052
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1053
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1054
          goto before_trg_err;
1055

1056 1057 1058 1059 1060 1061
        if (thd->clear_next_insert_id)
        {
          /* Reset auto-increment cacheing if we do an update */
          thd->clear_next_insert_id= 0;
          thd->next_insert_id= 0;
        }
1062 1063
        if ((error=table->file->ha_update_row(table->record[1],
                                              table->record[0])))
1064 1065
	{
	  if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore)
unknown's avatar
unknown committed
1066
            goto ok_or_after_trg_err;
1067
          goto err;
1068
	}
unknown's avatar
unknown committed
1069
        info->updated++;
unknown's avatar
unknown committed
1070 1071 1072 1073 1074 1075

        trg_error= (table->triggers &&
                    table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
                                                      TRG_ACTION_AFTER, TRUE));
        info->copied++;
        goto ok_or_after_trg_err;
1076 1077 1078
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1079 1080 1081 1082 1083
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1084 1085
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1086 1087 1088 1089 1090 1091
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1092 1093
	*/
	if (last_uniq_key(table,key_nr) &&
1094
	    !table->file->referenced_by_foreign_key() &&
1095
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1096 1097
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1098
        {
1099 1100 1101 1102 1103 1104
          if (thd->clear_next_insert_id)
          {
            /* Reset auto-increment cacheing if we do an update */
            thd->clear_next_insert_id= 0;
            thd->next_insert_id= 0;
          }
1105 1106
          if ((error=table->file->ha_update_row(table->record[1],
					        table->record[0])))
1107 1108
            goto err;
          info->deleted++;
1109 1110 1111 1112 1113
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1114 1115 1116 1117 1118 1119 1120
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1121
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
            thd->no_trans_update= 1;
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1134
        }
unknown's avatar
unknown committed
1135 1136
      }
    }
1137 1138 1139 1140 1141 1142 1143
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1144
  }
1145
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1146
  {
1147
    if (!info->ignore ||
unknown's avatar
unknown committed
1148 1149
	(error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE))
      goto err;
1150
    table->file->restore_auto_increment();
1151
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1152
  }
1153 1154 1155 1156 1157 1158

after_trg_n_copied_inc:
  info->copied++;
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1159 1160

ok_or_after_trg_err:
unknown's avatar
unknown committed
1161
  if (key)
1162
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1163 1164
  if (!table->file->has_transactions())
    thd->no_trans_update= 1;
unknown's avatar
unknown committed
1165
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1166 1167

err:
1168
  info->last_errno= error;
1169 1170 1171
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1172
  table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
1173 1174 1175 1176

before_trg_err:
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1177
  table->column_bitmaps_set(save_read_set, save_write_set);
1178
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1179 1180 1181 1182
}


/******************************************************************************
unknown's avatar
unknown committed
1183
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1184 1185
******************************************************************************/

1186 1187
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1188
{
1189
  int err= 0;
1190 1191
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1192 1193
  for (Field **field=entry->field ; *field ; field++)
  {
1194
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1195 1196
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
        ((*field)->real_type() != FIELD_TYPE_ENUM))
unknown's avatar
unknown committed
1197
    {
1198 1199 1200
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1201
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1202
        view= test(table_list->view);
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1219
      err= 1;
unknown's avatar
unknown committed
1220 1221
    }
  }
1222
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1223 1224 1225
}

/*****************************************************************************
unknown's avatar
unknown committed
1226 1227
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1228 1229
*****************************************************************************/

1230 1231
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1232 1233
class delayed_row :public ilink {
public:
1234
  char *record;
unknown's avatar
unknown committed
1235 1236
  enum_duplicates dup;
  time_t start_time;
1237
  bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
unknown's avatar
unknown committed
1238
  ulonglong last_insert_id;
1239
  timestamp_auto_set_type timestamp_field_type;
unknown's avatar
unknown committed
1240

1241
  delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
1242
    :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
unknown's avatar
unknown committed
1243 1244 1245 1246 1247 1248 1249 1250 1251
  ~delayed_row()
  {
    x_free(record);
  }
};


class delayed_insert :public ilink {
  uint locks_in_memory;
1252 1253 1254
  char *query;
  ulong query_length;
  ulong query_allocated;
unknown's avatar
unknown committed
1255 1256 1257 1258 1259 1260 1261 1262 1263
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1264
  ulong group_count;
unknown's avatar
unknown committed
1265
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1266 1267

  delayed_insert()
1268
    :locks_in_memory(0), query(0), query_length(0), query_allocated(0),
unknown's avatar
unknown committed
1269 1270 1271
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1272 1273
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1274 1275 1276
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1277 1278
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
unknown's avatar
unknown committed
1279

1280 1281
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1282
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1283
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1284
    bzero((char*) &info,sizeof(info));
1285
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
1286 1287 1288 1289 1290 1291 1292 1293
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
  ~delayed_insert()
  {
1294
    my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
1295
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1296 1297 1298 1299 1300 1301
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1302 1303 1304
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1305 1306
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1307
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1308 1309 1310
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1311
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
unknown's avatar
unknown committed
1312 1313
  }

1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
  int set_query(char const *q, ulong qlen) {
    if (q && qlen > 0)
    {
      if (query_allocated < qlen + 1)
      {
        ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
        query= my_realloc(query, qlen + 1, MYF(flags));
        if (query == 0)
          return HA_ERR_OUT_OF_MEM;
        query_allocated= qlen;
      }
      query_length= qlen;
      memcpy(query, q, qlen + 1);
    }
    else
      query_length= 0;
    return 0;
  }

unknown's avatar
unknown committed
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


I_List<delayed_insert> delayed_threads;


delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1371 1372
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
unknown's avatar
unknown committed
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
  int error;
  delayed_insert *tmp;
1387
  TABLE *table;
unknown's avatar
unknown committed
1388 1389 1390 1391 1392
  DBUG_ENTER("delayed_get_table");

  if (!table_list->db)
    table_list->db=thd->db;

1393
  /* Find the thread which handles this table. */
unknown's avatar
unknown committed
1394 1395
  if (!(tmp=find_handler(thd,table_list)))
  {
1396 1397 1398 1399
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1400
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1401
      DBUG_RETURN(0);
unknown's avatar
unknown committed
1402 1403
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1404 1405 1406 1407 1408
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1409
    {
1410 1411 1412 1413
      /*
        Avoid that a global read lock steps in while we are creating the
        new thread. It would block trying to open the table. Hence, the
        DI thread and this thread would wait until after the global
1414 1415 1416 1417 1418
        readlock is gone. Since the insert thread needs to wait for a
        global read lock anyway, we do it right now. Note that
        wait_if_global_read_lock() sets a protection against a new
        global read lock when it succeeds. This needs to be released by
        start_waiting_global_read_lock().
1419
      */
1420
      if (wait_if_global_read_lock(thd, 0, 1))
1421
        goto err;
unknown's avatar
unknown committed
1422 1423 1424
      if (!(tmp=new delayed_insert()))
      {
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
1425
	goto err1;
unknown's avatar
unknown committed
1426
      }
unknown's avatar
unknown committed
1427 1428 1429
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1430
      if (!(tmp->thd.db=my_strdup(table_list->db,MYF(MY_WME))) ||
1431
	  !(tmp->thd.query=my_strdup(table_list->table_name,MYF(MY_WME))))
unknown's avatar
unknown committed
1432 1433
      {
	delete tmp;
unknown's avatar
unknown committed
1434
	my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1435
	goto err1;
unknown's avatar
unknown committed
1436
      }
unknown's avatar
unknown committed
1437 1438
      tmp->table_list= *table_list;			// Needed to open table
      tmp->table_list.db= tmp->thd.db;
1439
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
unknown's avatar
unknown committed
1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
unknown's avatar
unknown committed
1451
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1452
	goto err1;
unknown's avatar
unknown committed
1453 1454 1455 1456 1457 1458 1459 1460 1461
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
1462 1463 1464 1465 1466
      /*
        Release the protection against the global read lock and wake
        everyone, who might want to set a global read lock.
      */
      start_waiting_global_read_lock(thd);
unknown's avatar
unknown committed
1467 1468 1469
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1470
	if (tmp->thd.is_fatal_error)
unknown's avatar
unknown committed
1471 1472
	{
	  /* Copy error message and abort */
1473
	  thd->fatal_error();
unknown's avatar
unknown committed
1474
	  strmov(thd->net.last_error,tmp->thd.net.last_error);
1475
	  thd->net.last_errno=tmp->thd.net.last_errno;
unknown's avatar
unknown committed
1476 1477
	}
	tmp->unlock();
1478
	goto err;
unknown's avatar
unknown committed
1479 1480 1481 1482
      }
      if (thd->killed)
      {
	tmp->unlock();
1483
	goto err;
unknown's avatar
unknown committed
1484 1485 1486 1487 1488 1489
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
1490
  table= tmp->get_local_table(thd);
unknown's avatar
unknown committed
1491 1492 1493
  pthread_mutex_unlock(&tmp->mutex);
  if (table)
    thd->di=tmp;
1494 1495
  else if (tmp->thd.is_fatal_error)
    thd->fatal_error();
1496 1497
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
unknown's avatar
unknown committed
1498
  DBUG_RETURN((table_list->table=table));
1499 1500 1501

 err1:
  thd->fatal_error();
1502 1503 1504 1505 1506
  /*
    Release the protection against the global read lock and wake
    everyone, who might want to set a global read lock.
  */
  start_waiting_global_read_lock(thd);
1507 1508 1509
 err:
  pthread_mutex_unlock(&LOCK_delayed_create);
  DBUG_RETURN(0); // Continue with normal insert
unknown's avatar
unknown committed
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
}


/*
  As we can't let many threads modify the same TABLE structure, we create
  an own structure for each tread.  This includes a row buffer to save the
  column values and new fields that points to the new row buffer.
  The memory is allocated in the client thread and is freed automaticly.
*/

TABLE *delayed_insert::get_local_table(THD* client_thd)
{
  my_ptrdiff_t adjust_ptrs;
1523
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
1524
  TABLE *copy;
unknown's avatar
unknown committed
1525
  TABLE_SHARE *share= table->s;
1526
  byte *bitmap;
unknown's avatar
unknown committed
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
      strmov(client_thd->net.last_error,thd.net.last_error);
      client_thd->net.last_errno=thd.net.last_errno;
      goto error;
    }
  }

  client_thd->proc_info="allocating local table";
unknown's avatar
unknown committed
1551
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
1552
				   (share->fields+1)*sizeof(Field**)+
1553 1554
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
1555 1556 1557 1558
  if (!copy)
    goto error;
  *copy= *table;

unknown's avatar
unknown committed
1559
  /* We don't need to change the file handler here */
1560 1561 1562
  field= copy->field= (Field**) (copy+1);
  bitmap= (byte*) (field+share->fields+1);
  copy->record[0]= (bitmap+ share->column_bitmap_size*2);
unknown's avatar
unknown committed
1563
  memcpy((char*) copy->record[0],(char*) table->record[0],share->reclength);
unknown's avatar
unknown committed
1564 1565 1566 1567 1568

  /* Make a copy of all fields */

  adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]);

1569
  found_next_number_field=table->found_next_number_field;
unknown's avatar
unknown committed
1570 1571
  for (org_field=table->field ; *org_field ; org_field++,field++)
  {
unknown's avatar
unknown committed
1572
    if (!(*field= (*org_field)->new_field(client_thd->mem_root,copy)))
unknown's avatar
unknown committed
1573
      return 0;
1574
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
1575
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
1576 1577
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
1578 1579 1580 1581 1582 1583 1584 1585
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
1586
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
1587
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1588
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
1589 1590
  }

1591 1592
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1593 1594 1595 1596

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

1597 1598 1599 1600 1601 1602 1603 1604 1605
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

unknown's avatar
unknown committed
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
  return copy;

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
  return 0;
}


/* Put a question in queue */

unknown's avatar
unknown committed
1619 1620 1621
static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
                         bool ignore, char *query, uint query_length,
                         bool log_on)
unknown's avatar
unknown committed
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
{
  delayed_row *row=0;
  delayed_insert *di=thd->di;
  DBUG_ENTER("write_delayed");

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

1633
  if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
unknown's avatar
unknown committed
1634 1635
    goto err;

1636
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
1637
    goto err;
1638
  memcpy(row->record, table->record[0], table->s->reclength);
1639
  di->set_query(query, query_length);
unknown's avatar
unknown committed
1640 1641 1642 1643 1644
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
  row->last_insert_id_used=	thd->last_insert_id_used;
  row->insert_id_used=		thd->insert_id_used;
  row->last_insert_id=		thd->last_insert_id;
1645
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
1646 1647 1648 1649

  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
1650
  if (table->s->blob_fields)
unknown's avatar
unknown committed
1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}


static void end_delayed_insert(THD *thd)
{
1667
  DBUG_ENTER("end_delayed_insert");
unknown's avatar
unknown committed
1668 1669
  delayed_insert *di=thd->di;
  pthread_mutex_lock(&di->mutex);
1670
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
1671 1672 1673 1674 1675 1676
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
1677
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1691
    /* Ensure that the thread doesn't kill itself while we are looking at it */
unknown's avatar
unknown committed
1692
    pthread_mutex_lock(&tmp->mutex);
unknown's avatar
SCRUM  
unknown committed
1693
    tmp->thd.killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1694 1695 1696
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1697
      if (tmp->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
1698
      {
unknown's avatar
unknown committed
1699 1700 1701 1702 1703 1704
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1705
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
unknown's avatar
unknown committed
1706 1707
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
    pthread_mutex_unlock(&tmp->mutex);
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

1721
pthread_handler_t handle_delayed_insert(void *arg)
unknown's avatar
unknown committed
1722 1723 1724 1725 1726 1727 1728 1729
{
  delayed_insert *di=(delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id=thread_id++;
1730
  thd->end_time();
unknown's avatar
unknown committed
1731
  threads.append(thd);
unknown's avatar
SCRUM  
unknown committed
1732
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
unknown's avatar
unknown committed
1733 1734
  pthread_mutex_unlock(&LOCK_thread_count);

1735 1736 1737 1738 1739 1740 1741 1742
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
unknown's avatar
unknown committed
1743
  pthread_mutex_lock(&di->mutex);
1744
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
unknown's avatar
unknown committed
1745 1746 1747 1748 1749 1750 1751 1752
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
1753
  thd->thread_stack= (char*) &thd;
unknown's avatar
unknown committed
1754
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1755
  {
1756
    thd->fatal_error();
unknown's avatar
unknown committed
1757
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
1758
    goto err;
unknown's avatar
unknown committed
1759
  }
1760
#if !defined(__WIN__) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1761 1762 1763 1764 1765 1766 1767
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  /* open table */

unknown's avatar
unknown committed
1768
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
unknown's avatar
unknown committed
1769
  {
1770
    thd->fatal_error();				// Abort waiting inserts
1771
    goto err;
unknown's avatar
unknown committed
1772
  }
1773
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
unknown's avatar
unknown committed
1774
  {
1775
    thd->fatal_error();
1776
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
1777
    goto err;
unknown's avatar
unknown committed
1778
  }
unknown's avatar
unknown committed
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
unknown's avatar
SCRUM  
unknown committed
1794
    if (thd->killed == THD::KILL_CONNECTION)
unknown's avatar
unknown committed
1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
1814
      set_timespec(abstime, delayed_insert_timeout);
unknown's avatar
unknown committed
1815 1816 1817 1818

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
1819
      di->thd.proc_info="Waiting for INSERT";
unknown's avatar
unknown committed
1820

1821
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
unknown's avatar
unknown committed
1822
      while (!thd->killed)
unknown's avatar
unknown committed
1823 1824
      {
	int error;
1825
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
unknown's avatar
unknown committed
1826 1827 1828 1829
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
1830
	if (error && error != EINTR && error != ETIMEDOUT)
unknown's avatar
unknown committed
1831 1832 1833 1834 1835 1836 1837 1838 1839
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
unknown's avatar
unknown committed
1840
	if (error == ETIMEDOUT || error == ETIME)
unknown's avatar
unknown committed
1841
	{
unknown's avatar
SCRUM  
unknown committed
1842
	  thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1843 1844 1845
	  break;
	}
      }
unknown's avatar
unknown committed
1846 1847
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
1848 1849 1850 1851
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1852
      pthread_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
1853
    }
1854
    di->thd.proc_info=0;
unknown's avatar
unknown committed
1855 1856 1857

    if (di->tables_in_use && ! thd->lock)
    {
1858
      bool not_used;
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
1869
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
1870 1871
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
unknown's avatar
unknown committed
1872
      {
1873 1874 1875
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1876 1877 1878 1879 1880 1881 1882
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
1883 1884 1885
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1886 1887 1888 1889 1890
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
unknown's avatar
unknown committed
1891 1892 1893 1894
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
unknown's avatar
unknown committed
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

1906 1907 1908 1909 1910 1911 1912 1913 1914 1915
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
   */
  ha_rollback_stmt(thd);

unknown's avatar
unknown committed
1916 1917 1918 1919 1920 1921 1922 1923
end:
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
1924 1925
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
unknown's avatar
unknown committed
1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


bool delayed_insert::handle_inserts(void)
{
  int error;
1972
  ulong max_rows;
1973 1974 1975
  bool using_ignore=0,
    using_bin_log= mysql_bin_log.is_open();

1976
  delayed_row *row;
unknown's avatar
unknown committed
1977 1978 1979 1980 1981 1982
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
1983
  table->use_all_columns();
unknown's avatar
unknown committed
1984 1985 1986 1987 1988

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
unknown's avatar
unknown committed
1989
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
unknown's avatar
unknown committed
1990 1991 1992 1993
    goto err;
  }

  thd.proc_info="insert";
1994
  max_rows= delayed_insert_limit;
1995
  if (thd.killed || table->s->version != refresh_version)
unknown's avatar
unknown committed
1996
  {
unknown's avatar
SCRUM  
unknown committed
1997
    thd.killed= THD::KILL_CONNECTION;
1998
    max_rows= ~(ulong)0;                        // Do as much as possible
unknown's avatar
unknown committed
1999 2000
  }

2001 2002 2003 2004 2005 2006 2007
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2008
  pthread_mutex_lock(&mutex);
2009 2010 2011 2012 2013 2014 2015 2016

  /* Reset auto-increment cacheing */
  if (thd.clear_next_insert_id)
  {
    thd.next_insert_id= 0;
    thd.clear_next_insert_id= 0;
  }

unknown's avatar
unknown committed
2017 2018 2019 2020
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2021
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2022 2023 2024 2025 2026 2027

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
    thd.last_insert_id=row->last_insert_id;
    thd.last_insert_id_used=row->last_insert_id_used;
    thd.insert_id_used=row->insert_id_used;
2028
    table->timestamp_field_type= row->timestamp_field_type;
unknown's avatar
unknown committed
2029

2030
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2031
    info.handle_duplicates= row->dup;
2032
    if (info.ignore ||
unknown's avatar
unknown committed
2033
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2034 2035 2036 2037
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
unknown's avatar
unknown committed
2038
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2039
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2040
    {
2041
      info.error_count++;				// Ignore errors
2042
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2043
      row->log_query = 0;
unknown's avatar
unknown committed
2044
    }
unknown's avatar
unknown committed
2045 2046 2047 2048 2049
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2050
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2051 2052 2053 2054 2055 2056
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2057 2058 2059 2060 2061 2062 2063
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2064
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
	  goto err;
	}
unknown's avatar
unknown committed
2080
	query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2081 2082 2083
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
unknown's avatar
unknown committed
2084 2085
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
unknown's avatar
unknown committed
2086
	}
2087 2088
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }

  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2099 2100

  /* After releasing the mutex, to prevent deadlocks. */
2101 2102
  if (mysql_bin_log.is_open())
    thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
2103

unknown's avatar
unknown committed
2104 2105 2106 2107 2108 2109
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
    goto err;
  }
unknown's avatar
unknown committed
2110
  query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2111 2112 2113 2114
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2115 2116 2117 2118 2119 2120 2121
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
  }
unknown's avatar
unknown committed
2122 2123 2124 2125
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2126
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2127 2128

/***************************************************************************
unknown's avatar
unknown committed
2129
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2130 2131
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2132 2133 2134 2135 2136 2137 2138 2139 2140

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2141 2142
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2143 2144
*/

unknown's avatar
unknown committed
2145
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2146 2147
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2148
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2149
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
2150
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
2151

unknown's avatar
unknown committed
2152 2153
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
2154
    clause if table is VIEW
unknown's avatar
unknown committed
2155
  */
unknown's avatar
unknown committed
2156
  
unknown's avatar
unknown committed
2157 2158 2159 2160
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
2161
                           &select_lex->where, TRUE))
unknown's avatar
unknown committed
2162
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2163

2164 2165 2166 2167
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
2168 2169
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2170
  /* skip all leaf tables belonged to view where we are insert */
unknown's avatar
unknown committed
2171
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2172 2173 2174 2175 2176 2177
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
       first_select_leaf_table= first_select_leaf_table->next_leaf)
  {}
unknown's avatar
unknown committed
2178
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
2179
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
2180 2181 2182
}


2183
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
2184
                             List<Item> *fields_par,
unknown's avatar
unknown committed
2185 2186
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
2187
                             enum_duplicates duplic,
2188 2189 2190 2191 2192 2193
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
   last_insert_id(0),
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
2194 2195 2196 2197
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2198
  if (table_list_par)
unknown's avatar
unknown committed
2199
    info.view= (table_list_par->view ? table_list_par : 0);
2200 2201 2202
}


unknown's avatar
unknown committed
2203
int
2204
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
2205
{
2206
  LEX *lex= thd->lex;
2207
  int res;
2208
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
2209 2210
  DBUG_ENTER("select_insert::prepare");

2211
  unit= u;
2212 2213 2214 2215 2216 2217
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2218
  res= check_insert_fields(thd, table_list, *fields, values,
2219
                           !insert_into_view) ||
2220
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2221

2222 2223
  if (info.handle_duplicates == DUP_UPDATE)
  {
2224 2225
    /* Save the state of the current name resolution context. */
    Name_resolution_context *context= &lex->select_lex.context;
2226 2227 2228 2229
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2230 2231

    /* Perform name resolution only in the first table - 'table_list'. */
2232
    table_list->next_local= 0;
2233 2234
    context->resolve_in_table_list_only(table_list);

2235
    lex->select_lex.no_wrap_view_item= TRUE;
2236 2237
    res= res || check_update_fields(thd, context->table_list,
                                    *info.update_fields);
2238 2239 2240 2241 2242
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part
    */       
2243 2244
    if (lex->select_lex.group_list.elements == 0)
    {
2245
      context->table_list->next_local=       ctx_state.save_next_local;
2246
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
2247
      context->first_name_resolution_table->
2248
        next_name_resolution_table=          ctx_state.save_next_local;
2249
    }
2250 2251
    res= res || setup_fields(thd, 0, *info.update_values, MARK_COLUMNS_READ,
                             0, 0);
2252 2253

    /* Restore the current context. */
2254
    ctx_state.restore_state(context, table_list);
2255
  }
2256

2257 2258
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
2259
    DBUG_RETURN(1);
2260 2261 2262 2263 2264 2265 2266 2267 2268 2269
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2270
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2271
      unique_table(thd, table_list, table_list->next_global))
2272 2273
  {
    /* Using same table for INSERT and SELECT */
2274 2275
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2276
  }
2277
  else if (!thd->prelocked_mode)
2278 2279 2280 2281 2282 2283 2284
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2285 2286
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2287
    */
2288
    table->file->ha_start_bulk_insert((ha_rows) 0);
2289
  }
2290
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
2291
  table->next_number_field=table->found_next_number_field;
unknown's avatar
unknown committed
2292
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2293 2294
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
unknown's avatar
unknown committed
2295
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2296
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2297 2298 2299
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2300 2301 2302 2303 2304
  res= ((fields->elements &&
         check_that_all_fields_are_given_values(thd, table, table_list)) ||
        table_list->prepare_where(thd, 0, TRUE) ||
        table_list->prepare_check_option(thd));
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2305 2306
}

2307

2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2327 2328
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2329
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2330
  DBUG_RETURN(0);
2331 2332 2333
}


2334 2335 2336 2337 2338 2339
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
2340 2341
select_insert::~select_insert()
{
2342
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
2343 2344 2345
  if (table)
  {
    table->next_number_field=0;
2346
    table->file->ha_reset();
unknown's avatar
unknown committed
2347
  }
2348
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2349
  thd->abort_on_warning= 0;
2350
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2351 2352 2353 2354 2355
}


bool select_insert::send_data(List<Item> &values)
{
2356
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
2357
  bool error=0;
2358

2359
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
2360
  {						// using limit offset,count
2361
    unit->offset_limit_cnt--;
2362
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2363
  }
unknown's avatar
unknown committed
2364

unknown's avatar
unknown committed
2365
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
2366 2367
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2368 2369
  if (thd->net.report_error)
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2370 2371
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
2372
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
2373 2374 2375 2376 2377
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2378
  }
2379

2380
  error= write_record(thd, table, &info);
2381 2382
    
  if (!error)
unknown's avatar
unknown committed
2383
  {
unknown's avatar
unknown committed
2384
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2385 2386
    {
      /*
unknown's avatar
unknown committed
2387 2388 2389
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
      if (!last_insert_id && thd->insert_id_used)
        last_insert_id= thd->insert_id();
    }
unknown's avatar
unknown committed
2406
  }
2407
  table->file->release_auto_increment();
unknown's avatar
unknown committed
2408
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2409 2410 2411
}


unknown's avatar
unknown committed
2412 2413 2414
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
2415 2416
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2417
  else
unknown's avatar
unknown committed
2418 2419
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2420 2421
}

unknown's avatar
unknown committed
2422 2423
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
2424 2425
  DBUG_ENTER("select_insert::send_error");

2426 2427 2428
  /* Avoid an extra 'unknown error' message if we already reported an error */
  if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
    my_message(errcode, err, MYF(0));
2429 2430 2431 2432 2433 2434 2435 2436 2437

  if (!table)
  {
    /*
      This can only happen when using CREATE ... SELECT and the table was not
      created becasue of an syntax error
    */
    DBUG_VOID_RETURN;
  }
2438
  if (!thd->prelocked_mode)
2439
    table->file->ha_end_bulk_insert();
unknown's avatar
unknown committed
2440 2441
  /*
    If at least one row has been inserted/modified and will stay in the table
2442
    (the table doesn't have transactions) we must write to the binlog (and
unknown's avatar
unknown committed
2443
    the error code will make the slave stop).
2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455

    For many errors (example: we got a duplicate key error while
    inserting into a MyISAM table), no row will be added to the table,
    so passing the error to the slave will not help since there will
    be an error code mismatch (the inserts will succeed on the slave
    with no error).

    If we are using row-based replication we have two cases where this
    code is executed: replication of CREATE-SELECT and replication of
    INSERT-SELECT.

    When replicating a CREATE-SELECT statement, we shall not write the
2456 2457
    events to the binary log and should thus not set
    OPTION_STATUS_NO_TRANS_UPDATE.
2458 2459 2460 2461 2462 2463

    When replicating INSERT-SELECT, we shall not write the events to
    the binary log for transactional table, but shall write all events
    if there is one or more writes to non-transactional tables. In
    this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
    write to a non-transactional table, otherwise it is cleared.
2464
  */
2465
  if (info.copied || info.deleted || info.updated)
2466
  {
2467
    if (!table->file->has_transactions())
2468
    {
2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
      if (last_insert_id)
        thd->insert_id(last_insert_id);		// For binary log
      if (mysql_bin_log.is_open())
      {
        thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                          table->file->has_transactions(), FALSE);
      }
      if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
          !can_rollback_data())
        thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
      query_cache_invalidate3(thd, table, 1);
2480
    }
unknown's avatar
unknown committed
2481
  }
unknown's avatar
unknown committed
2482
  ha_rollback_stmt(thd);
unknown's avatar
unknown committed
2483
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2484 2485 2486 2487 2488
}


bool select_insert::send_eof()
{
2489
  int error,error2;
unknown's avatar
unknown committed
2490 2491
  DBUG_ENTER("select_insert::send_eof");

2492
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
unknown's avatar
unknown committed
2493
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2494

unknown's avatar
unknown committed
2495
  if (info.copied || info.deleted || info.updated)
2496 2497 2498 2499 2500
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
2501
    query_cache_invalidate3(thd, table, 1);
2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
    if (!table->file->has_transactions() &&
        (!table->s->tmp_table ||
         !thd->current_stmt_binlog_row_based))
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
   }
unknown's avatar
unknown committed
2514

2515 2516
  if (last_insert_id)
    thd->insert_id(last_insert_id);		// For binary log
2517 2518 2519 2520 2521 2522
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
2523 2524
  if (mysql_bin_log.is_open())
  {
unknown's avatar
unknown committed
2525 2526
    if (!error)
      thd->clear_error();
2527 2528 2529
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
                      table->file->has_transactions(), FALSE);
2530
  }
2531 2532 2533
  if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
    error=error2;
  if (error)
unknown's avatar
unknown committed
2534 2535
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
2536
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2537
  }
unknown's avatar
unknown committed
2538
  char buff[160];
2539
  if (info.ignore)
unknown's avatar
unknown committed
2540 2541
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
2542
  else
unknown's avatar
unknown committed
2543
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
2544
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2545
  thd->row_count_func= info.copied+info.deleted+info.updated;
2546
  ::send_ok(thd, (ulong) thd->row_count_func, last_insert_id, buff);
unknown's avatar
unknown committed
2547
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2548 2549 2550 2551
}


/***************************************************************************
unknown's avatar
unknown committed
2552
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
2553 2554
***************************************************************************/

2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574
/*
  Create table from lists of fields and items (or open existing table
  with same name).

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
      extra_fields in/out Initial list of fields for table to be created
      keys         in     List of keys for table to be created
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
                          be added to the end of 'extra_fields' list)
      lock         out    Pointer to the MYSQL_LOCK object for table created
                          (open) will be returned in this parameter. Since
                          this table is not included in THD::lock caller is
                          responsible for explicitly unlocking this table.
unknown's avatar
unknown committed
2575
      hooks
2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593

  NOTES
    If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS
    flag and table with name provided already exists then this function will
    simply open existing table.
    Also note that create, open and lock sequence in this function is not
    atomic and thus contains gap for deadlock and can cause other troubles.
    Since this function contains some logic specific to CREATE TABLE ... SELECT
    it should be changed before it can be used in other contexts.

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
                                      List<create_field> *extra_fields,
unknown's avatar
unknown committed
2594 2595 2596 2597
                                      List<Key> *keys,
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
2598 2599
{
  TABLE tmp_table;		// Used during 'create_field()'
unknown's avatar
unknown committed
2600
  TABLE_SHARE share;
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
2612 2613 2614
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

2615 2616
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
2617 2618 2619
  tmp_table.s->db_low_byte_first= 
        test(create_info->db_type == &myisam_hton ||
             create_info->db_type == &heap_hton);
2620 2621 2622 2623 2624
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
2625
    Field *field, *def_field;
2626
    if (item->type() == Item::FUNC_ITEM)
2627
      field= item->tmp_table_field(&tmp_table);
2628
    else
2629 2630 2631
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
    extra_fields->push_back(cr_field);
  }
  /*
    create and lock table

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
2652 2653 2654
    NOTE: By locking table which we just have created (or for which we just
    have have found that it already exists) separately from other tables used
    by the statement we create potential window for deadlock.
2655 2656 2657 2658 2659 2660
    TODO: create and open should be done atomic !
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
                            create_info, *extra_fields, *keys, 0,
2661
                            select_field_count, 0))
2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675
    {
      /*
        If we are here in prelocked mode we either create temporary table
        or prelocked mode is caused by the SELECT part of this statement.
      */
      DBUG_ASSERT(!thd->prelocked_mode ||
                  create_info->options & HA_LEX_CREATE_TMP_TABLE ||
                  thd->lex->requires_prelocking());

      /*
        NOTE: We don't want to ignore set of locked tables here if we are
              under explicit LOCK TABLES since it will open gap for deadlock
              too wide (and also is not backward compatible).
      */
unknown's avatar
unknown committed
2676

2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               (MYSQL_LOCK_IGNORE_FLUSH |
                                ((thd->prelocked_mode == PRELOCKED) ?
                                 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0)))))
        quick_rm_table(create_info->db_type, create_table->db,
                       table_case_name(create_info, create_table->table_name));
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
2696
  hooks->prelock(&table, 1);                    // Call prelock hooks
2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
    VOID(pthread_mutex_lock(&LOCK_open));
    hash_delete(&open_cache,(byte*) table);
    VOID(pthread_mutex_unlock(&LOCK_open));
    quick_rm_table(create_info->db_type, create_table->db,
		   table_case_name(create_info, create_table->table_name));
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


2712
class MY_HOOKS : public TABLEOP_HOOKS
unknown's avatar
unknown committed
2713
{
2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725
public:
  MY_HOOKS(select_create *x) : ptr(x) { }
  virtual void do_prelock(TABLE **tables, uint count)
  {
    if (ptr->get_thd()->current_stmt_binlog_row_based)
      ptr->binlog_show_create_table(tables, count);
  }

private:
  select_create *ptr;
};

unknown's avatar
unknown committed
2726 2727

int
2728
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2729 2730
{
  DBUG_ENTER("select_create::prepare");
2731

2732 2733
  TABLEOP_HOOKS *hook_ptr= NULL;
#ifdef HAVE_ROW_BASED_REPLICATION
2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
    MY_HOOKS(select_create *x) : ptr(x) { }
    virtual void do_prelock(TABLE **tables, uint count)
    {
      if (ptr->get_thd()->current_stmt_binlog_row_based)
        ptr->binlog_show_create_table(tables, count);
    }

  private:
    select_create *ptr;
  };

  MY_HOOKS hooks(this);
2748 2749
  hook_ptr= &hooks;
#endif
2750

2751
  unit= u;
2752
  if (!(table= create_table_from_items(thd, create_info, create_table,
2753 2754
                                       extra_fields, keys, &values,
                                       &thd->extra_lock, hook_ptr)))
unknown's avatar
unknown committed
2755 2756
    DBUG_RETURN(-1);				// abort() deletes table

2757
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
2758
  {
2759
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
2760 2761 2762
    DBUG_RETURN(-1);
  }

2763
 /* First field to copy */
unknown's avatar
unknown committed
2764 2765 2766 2767
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
2768
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
2769

2770
  /* Don't set timestamp if used */
2771
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
2772 2773
  table->next_number_field=table->found_next_number_field;

2774
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
2775
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2776
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2777
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2778
  if (!thd->prelocked_mode)
2779
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2780
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2781
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2782 2783 2784
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2785 2786
  DBUG_RETURN(check_that_all_fields_are_given_values(thd, table,
                                                     table_list));
unknown's avatar
unknown committed
2787 2788 2789
}


2790
#ifdef HAVE_ROW_BASED_REPLICATION
2791
void
2792
select_create::binlog_show_create_table(TABLE **tables, uint count)
2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
2810
  */
2811
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
2812
  DBUG_ASSERT(tables && *tables && count > 0);
2813 2814 2815

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
2816
  int result;
2817
  TABLE_LIST table_list;
2818

2819 2820
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
2821
  query.length(0);      // Have to zero it since constructor doesn't
2822

2823
  result= store_create_info(thd, &table_list, &query, create_info);
2824
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
2825

2826 2827 2828 2829 2830
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}
2831
#endif // HAVE_ROW_BASED_REPLICATION
2832

unknown's avatar
unknown committed
2833
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
2834
{
unknown's avatar
unknown committed
2835 2836
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2837 2838
}

unknown's avatar
unknown committed
2839

2840 2841 2842 2843 2844 2845 2846 2847 2848
void select_create::send_error(uint errcode,const char *err)
{
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
unknown's avatar
unknown committed
2849 2850
}

unknown's avatar
unknown committed
2851

unknown's avatar
unknown committed
2852 2853 2854 2855 2856 2857 2858
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
unknown's avatar
unknown committed
2859
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
unknown's avatar
unknown committed
2860
    VOID(pthread_mutex_lock(&LOCK_open));
2861
    mysql_unlock_tables(thd, thd->extra_lock);
2862
    if (!table->s->tmp_table)
2863
    {
2864
      if (close_thread_table(thd, &table))
2865 2866
        VOID(pthread_cond_broadcast(&COND_refresh));
    }
2867
    thd->extra_lock=0;
unknown's avatar
unknown committed
2868
    table=0;
unknown's avatar
unknown committed
2869 2870 2871 2872 2873 2874 2875 2876
    VOID(pthread_mutex_unlock(&LOCK_open));
  }
  return tmp;
}

void select_create::abort()
{
  VOID(pthread_mutex_lock(&LOCK_open));
2877
  if (thd->extra_lock)
unknown's avatar
unknown committed
2878
  {
2879 2880
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
unknown's avatar
unknown committed
2881 2882 2883
  }
  if (table)
  {
unknown's avatar
unknown committed
2884
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
unknown's avatar
unknown committed
2885
    handlerton *table_type=table->s->db_type;
2886
    if (!table->s->tmp_table)
2887
    {
2888
      ulong version= table->s->version;
unknown's avatar
unknown committed
2889
      table->s->version= 0;
2890
      hash_delete(&open_cache,(byte*) table);
2891
      if (!create_info->table_existed)
2892
        quick_rm_table(table_type, create_table->db, create_table->table_name);
2893
      /* Tell threads waiting for refresh that something has happened */
unknown's avatar
unknown committed
2894
      if (version != refresh_version)
2895
        VOID(pthread_cond_broadcast(&COND_refresh));
2896 2897
    }
    else if (!create_info->table_existed)
unknown's avatar
unknown committed
2898 2899
      close_temporary_table(thd, table, 1, 1);
    table=0;                                    // Safety
unknown's avatar
unknown committed
2900 2901 2902 2903 2904 2905
  }
  VOID(pthread_mutex_unlock(&LOCK_open));
}


/*****************************************************************************
unknown's avatar
unknown committed
2906
  Instansiate templates
unknown's avatar
unknown committed
2907 2908
*****************************************************************************/

2909
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
2910
template class List_iterator_fast<List_item>;
2911
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
2912 2913 2914
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
2915
#endif /* EMBEDDED_LIBRARY */
2916
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */