row0merge.c 46.3 KB
Newer Older
1 2 3
/******************************************************
New index creation routines using a merge sort

4
(c) 2005,2007 Innobase Oy
5 6

Created 12/4/2005 Jan Lindstrom
7
Completed by Sunny Bains and Marko Makela
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
*******************************************************/

#include "row0merge.h"
#include "row0ext.h"
#include "row0row.h"
#include "row0upd.h"
#include "row0ins.h"
#include "row0sel.h"
#include "dict0dict.h"
#include "dict0mem.h"
#include "dict0boot.h"
#include "dict0crea.h"
#include "dict0load.h"
#include "btr0btr.h"
#include "mach0data.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "trx0roll.h"
#include "trx0undo.h"
#include "trx0purge.h"
#include "trx0rec.h"
#include "que0que.h"
#include "rem0cmp.h"
#include "read0read.h"
#include "os0file.h"
#include "lock0lock.h"
#include "data0data.h"
#include "data0type.h"
#include "que0que.h"
#include "pars0pars.h"
#include "mem0mem.h"
#include "log0log.h"
40
#include "ut0sort.h"
41

42 43
/* Block size for I/O operations in merge sort */

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
typedef byte	row_merge_block_t[1048576];

/* Secondary buffer for I/O operations of merge records */

typedef byte	mrec_buf_t[UNIV_PAGE_SIZE / 2];

/* Merge record in row_merge_block_t.  The format is the same as a
record in ROW_FORMAT=COMPACT with the exception that the
REC_N_NEW_EXTRA_BYTES are omitted. */
typedef byte	mrec_t;

/* Buffer for sorting in main memory. */
struct row_merge_buf_struct {
	mem_heap_t*	heap;		/* memory heap where allocated */
	dict_index_t*	index;		/* the index the tuples belong to */
	ulint		total_size;	/* total amount of data bytes */
	ulint		n_tuples;	/* number of data tuples */
	ulint		max_tuples;	/* maximum number of data tuples */
	const dfield_t**tuples;		/* array of pointers to
					arrays of fields that form
					the data tuples */
	const dfield_t**tmp_tuples;	/* temporary copy of tuples,
					for sorting */
67 68
};

69
typedef struct row_merge_buf_struct row_merge_buf_t;
70

71 72
/* Information about temporary files used in merge sort are stored
to this structure */
73

74 75 76
struct merge_file_struct {
	int	fd;		/* File descriptor */
	ulint	offset;		/* File offset */
77 78
};

79
typedef struct merge_file_struct merge_file_t;
80

81 82
/**********************************************************
Allocate a sort buffer. */
83
static
84 85 86 87 88 89
row_merge_buf_t*
row_merge_buf_create_low(
/*=====================*/
					/* out,own: sort buffer */
	mem_heap_t*	heap,		/* in: heap where allocated */
	dict_index_t*	index,		/* in: secondary index */
90 91
	ulint		max_tuples,	/* in: maximum number of data tuples */
	ulint		buf_size)	/* in: size of the buffer, in bytes */
92
{
93 94
	row_merge_buf_t*	buf;

95 96 97 98
	ut_ad(max_tuples > 0);
	ut_ad(max_tuples <= sizeof(row_merge_block_t));
	ut_ad(max_tuples < buf_size);

99
	buf = mem_heap_calloc(heap, buf_size);
100 101 102 103 104 105 106 107
	buf->heap = heap;
	buf->index = index;
	buf->max_tuples = max_tuples;
	buf->tuples = mem_heap_alloc(heap,
				     2 * max_tuples * sizeof *buf->tuples);
	buf->tmp_tuples = buf->tuples + max_tuples;

	return(buf);
108 109
}

110 111
/**********************************************************
Allocate a sort buffer. */
112
static
113 114
row_merge_buf_t*
row_merge_buf_create(
115
/*=================*/
116 117
				/* out,own: sort buffer */
	dict_index_t*	index)	/* in: secondary index */
118
{
119 120 121 122
	row_merge_buf_t*	buf;
	ulint			max_tuples;
	ulint			buf_size;
	mem_heap_t*		heap;
123

124 125
	max_tuples = sizeof(row_merge_block_t)
		/ ut_max(1, dict_index_get_min_size(index));
126

127
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
128

129
	heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
130

131
	buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
132

133
	return(buf);
134 135
}

136 137
/**********************************************************
Empty a sort buffer. */
138
static
139
row_merge_buf_t*
140 141
row_merge_buf_empty(
/*================*/
142 143
					/* out: sort buffer */
	row_merge_buf_t*	buf)	/* in,own: sort buffer */
144
{
145 146 147 148
	ulint		buf_size;
	ulint		max_tuples	= buf->max_tuples;
	mem_heap_t*	heap		= buf->heap;
	dict_index_t*	index		= buf->index;
149

150
	buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
151

152
	mem_heap_empty(heap);
153

154
	return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
155 156
}

157 158
/**********************************************************
Deallocate a sort buffer. */
159
static
160 161 162 163
void
row_merge_buf_free(
/*===============*/
	row_merge_buf_t*	buf)	/* in,own: sort buffer, to be freed */
164
{
165
	mem_heap_free(buf->heap);
166 167
}

168 169
/**********************************************************
Insert a data tuple into a sort buffer. */
170
static
171 172 173 174 175 176 177 178 179
ibool
row_merge_buf_add(
/*==============*/
					/* out: TRUE if added,
					FALSE if out of space */
	row_merge_buf_t*	buf,	/* in/out: sort buffer */
	const dtuple_t*		row,	/* in: row in clustered index */
	row_ext_t*		ext)	/* in/out: cache of externally stored
					column prefixes, or NULL */
180
{
181
	ulint		i;
182
	ulint		j;
183 184 185
	ulint		n_fields;
	ulint		data_size;
	ulint		extra_size;
186
	dict_index_t*	index;
187 188
	dfield_t*	entry;
	dfield_t*	field;
189

190 191
	if (buf->n_tuples >= buf->max_tuples) {
		return(FALSE);
192 193
	}

194 195 196
	index = buf->index;

	n_fields = dict_index_get_n_fields(index);
197 198 199 200 201 202

	entry = mem_heap_alloc(buf->heap, n_fields * sizeof *entry);
	buf->tuples[buf->n_tuples] = entry;
	field = entry;

	data_size = 0;
203
	extra_size = UT_BITS_IN_BYTES(index->n_nullable);
204

205
	for (i = j = 0; i < n_fields; i++, field++) {
206 207 208 209 210
		dict_field_t*		ifield;
		const dict_col_t*	col;
		ulint			col_no;
		const dfield_t*		row_field;

211
		ifield = dict_index_get_nth_field(index, i);
212 213 214 215 216
		col = ifield->col;
		col_no = dict_col_get_no(col);
		row_field = dtuple_get_nth_field(row, col_no);
		dfield_copy(field, row_field);

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
		if (dfield_is_null(field)) {
			ut_ad(!(col->prtype & DATA_NOT_NULL));
			field->data = NULL;
			continue;
		} else if (UNIV_LIKELY(!ext)) {
		} else if (dict_index_is_clust(index)) {
			/* Flag externally stored fields. */
			if (j < ext->n_ext && col_no == ext->ext[j]) {
				j++;

				ut_a(field->len >= BTR_EXTERN_FIELD_REF_SIZE);
				dfield_set_ext(field);
			}
		} else {
			ulint	len = field->len;
232 233 234
			byte*	buf = row_ext_lookup(ext, col_no,
						     row_field->data,
						     row_field->len,
235
						     &len);
236
			if (UNIV_LIKELY_NULL(buf)) {
237
				dfield_set_data(field, buf, len);
238 239
			}
		}
240

241
		/* If a column prefix index, take only the prefix */
242

243 244 245 246 247 248 249
		if (ifield->prefix_len) {
			field->len = dtype_get_at_most_n_mbchars(
				col->prtype,
				col->mbminlen, col->mbmaxlen,
				ifield->prefix_len,
				field->len, field->data);
		}
250

251
		ut_ad(field->len <= col->len || col->mtype == DATA_BLOB);
252

253 254
		if (ifield->fixed_len) {
			ut_ad(field->len == ifield->fixed_len);
255 256 257
			ut_ad(!dfield_is_ext(field));
		} else if (dfield_is_ext(field)) {
			extra_size += 2;
258 259 260 261 262 263 264 265
		} else if (field->len < 128
			   || (col->len < 256 && col->mtype != DATA_BLOB)) {
			extra_size++;
		} else {
			extra_size += 2;
		}
		data_size += field->len;
	}
266

267 268
	ut_ad(!ext || !dict_index_is_clust(index) || j == ext->n_ext);

269 270 271 272
#ifdef UNIV_DEBUG
	{
		ulint	size;
		ulint	extra;
273

274
		size = rec_get_converted_size_comp(index,
275
						   REC_STATUS_ORDINARY,
276
						   entry, n_fields, &extra);
277

278 279
		ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
		ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
280
	}
281
#endif /* UNIV_DEBUG */
282

283 284 285 286
	/* Add to the total size of the record in row_merge_block_t
	the encoded length of extra_size and the extra bytes (extra_size).
	See row_merge_buf_write() for the variable-length encoding
	of extra_size. */
287
	data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
288

289 290 291 292
	/* Reserve one byte for the end marker of row_merge_block_t. */
	if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
		return(FALSE);
	}
293

294 295
	buf->total_size += data_size;
	buf->n_tuples++;
296

297
	field = entry;
298

299 300
	/* Copy the data fields. */
	for (i = 0; i < n_fields; i++, field++) {
301
		if (!dfield_is_null(field)) {
302 303 304
			field->data = mem_heap_dup(buf->heap,
						   field->data, field->len);
		}
305 306
	}

307
	return(TRUE);
308 309 310
}

/*****************************************************************
311
Compare two tuples. */
312
static
313
int
314 315 316 317 318 319 320 321
row_merge_tuple_cmp(
/*================*/
					/* out: 1, 0, -1 if a is greater,
					equal, less, respectively, than b */
	ulint			n_field,/* in: number of fields */
	ulint*			n_dup,	/* in/out: number of duplicates */
	const dfield_t*		a,	/* in: first tuple to be compared */
	const dfield_t*		b)	/* in: second tuple to be compared */
322
{
323
	int	cmp;
324

325 326 327
	do {
		cmp = cmp_dfield_dfield(a++, b++);
	} while (!cmp && --n_field);
328

329 330
	if (!cmp) {
		(*n_dup)++;
331
	}
332

333
	return(cmp);
334 335
}

336 337
/**************************************************************************
Merge sort the tuple buffer in main memory. */
338
static
339 340 341 342 343 344 345 346 347 348 349
void
row_merge_tuple_sort(
/*=================*/
	ulint			n_field,/* in: number of fields */
	ulint*			n_dup,	/* in/out: number of duplicates */
	const dfield_t**	tuples,	/* in/out: tuples */
	const dfield_t**	aux,	/* in/out: work area */
	ulint			low,	/* in: lower bound of the
					sorting area, inclusive */
	ulint			high)	/* in: upper bound of the
					sorting area, exclusive */
350
{
351 352 353
#define row_merge_tuple_sort_ctx(a,b,c,d) \
	row_merge_tuple_sort(n_field, n_dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, n_dup, a, b)
354

355 356
	UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
			      tuples, aux, low, high, row_merge_tuple_cmp_ctx);
357 358
}

359 360
/**********************************************************
Sort a buffer. */
361
static
362 363
ulint
row_merge_buf_sort(
364
/*===============*/
365 366 367
					/* out: number of duplicates
					encountered */
	row_merge_buf_t*	buf)	/* in/out: sort buffer */
368
{
369
	ulint	n_dup	= 0;
370

371
	row_merge_tuple_sort(dict_index_get_n_unique(buf->index), &n_dup,
372
			     buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
373

374
	return(n_dup);
375 376
}

377 378
/**********************************************************
Write a buffer to a block. */
379
static
380 381 382 383 384
void
row_merge_buf_write(
/*================*/
	const row_merge_buf_t*	buf,	/* in: sorted buffer */
	row_merge_block_t*	block)	/* out: buffer for writing to file */
385
{
386 387 388 389
	dict_index_t*	index	= buf->index;
	ulint		n_fields= dict_index_get_n_fields(index);
	byte*		b	= &(*block)[0];

390
	ulint		i;
391 392 393 394

	for (i = 0; i < buf->n_tuples; i++) {
		ulint		size;
		ulint		extra_size;
395
		const dfield_t*	entry		= buf->tuples[i];
396 397 398

		size = rec_get_converted_size_comp(buf->index,
						   REC_STATUS_ORDINARY,
399
						   entry, n_fields,
400 401 402 403 404 405 406 407 408 409
						   &extra_size);
		ut_ad(size > extra_size);
		ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
		extra_size -= REC_N_NEW_EXTRA_BYTES;
		size -= REC_N_NEW_EXTRA_BYTES;

		/* Encode extra_size + 1 */
		if (extra_size + 1 < 0x80) {
			*b++ = extra_size + 1;
		} else {
410
			ut_ad((extra_size + 1) < 0x8000);
411 412
			*b++ = 0x80 | ((extra_size + 1) >> 8);
			*b++ = (byte) (extra_size + 1);
413 414
		}

415
		ut_ad(b + size < block[1]);
416

417 418
		rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
					       REC_STATUS_ORDINARY,
419
					       entry, n_fields);
420

421
		b += size;
422 423
	}

424 425
	/* Write an "end-of-chunk" marker. */
	ut_a(b < block[1]);
426
	ut_a(b == block[0] + buf->total_size);
427 428 429 430 431 432
	*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
	memset(b, 0, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */
433 434
}

435 436
/**********************************************************
Create a memory heap and allocate space for row_merge_rec_offsets(). */
437
static
438 439 440 441 442 443 444
mem_heap_t*
row_merge_heap_create(
/*==================*/
					/* out: memory heap */
	dict_index_t*	index,		/* in: record descriptor */
	ulint**		offsets1,	/* out: offsets */
	ulint**		offsets2)	/* out: offsets */
445
{
446
	ulint		i	= 1 + REC_OFFS_HEADER_SIZE
447 448
		+ dict_index_get_n_fields(index);
	mem_heap_t*	heap	= mem_heap_create(2 * i * sizeof *offsets1);
449

450 451
	*offsets1 = mem_heap_alloc(heap, i * sizeof *offsets1);
	*offsets2 = mem_heap_alloc(heap, i * sizeof *offsets2);
452

453 454
	(*offsets1)[0] = (*offsets2)[0] = i;
	(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
455

456
	return(heap);
457 458
}

459 460 461
/**************************************************************************
Search an index object by name and column names.  If several indexes match,
return the index with the max id. */
462
static
463 464 465 466 467 468 469
dict_index_t*
row_merge_dict_table_get_index(
/*===========================*/
						/* out: matching index,
						NULL if not found */
	dict_table_t*		table,		/* in: table */
	const merge_index_def_t*index_def)	/* in: index definition */
470
{
471 472 473
	ulint		i;
	dict_index_t*	index;
	const char**	column_names;
474

475
	column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
476

477 478
	for (i = 0; i < index_def->n_fields; ++i) {
		column_names[i] = index_def->fields[i].field_name;
479 480
	}

481 482
	index = dict_table_get_index_by_max_id(
		table, index_def->name, column_names, index_def->n_fields);
483

484
	mem_free(column_names);
485

486
	return(index);
487 488
}

489 490 491 492 493 494 495 496 497 498 499 500 501
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_read(
/*===========*/
					/* out: TRUE if request was
					successful, FALSE if fail */
	int			fd,	/* in: file descriptor */
	ulint			offset,	/* in: offset where to read */
	row_merge_block_t*	buf)	/* out: data */
{
	ib_uint64_t	ofs = ((ib_uint64_t) offset) * sizeof *buf;
502

503 504 505 506 507
	return(UNIV_LIKELY(os_file_read(OS_FILE_FROM_FD(fd), buf,
					(ulint) (ofs & 0xFFFFFFFF),
					(ulint) (ofs >> 32),
					sizeof *buf)));
}
508

509 510 511 512 513 514 515 516 517 518 519 520 521 522
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_write(
/*============*/
				/* out: TRUE if request was
				successful, FALSE if fail */
	int		fd,	/* in: file descriptor */
	ulint		offset,	/* in: offset where to write */
	const void*	buf)	/* in: data */
{
	ib_uint64_t	ofs = ((ib_uint64_t) offset)
		* sizeof(row_merge_block_t);
523

524 525 526 527 528
	return(UNIV_LIKELY(os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
					 (ulint) (ofs & 0xFFFFFFFF),
					 (ulint) (ofs >> 32),
					 sizeof(row_merge_block_t))));
}
529

530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
/************************************************************************
Read a merge record. */
static
const byte*
row_merge_read_rec(
/*===============*/
					/* out: pointer to next record,
					or NULL on I/O error
					or end of list */
	row_merge_block_t*	block,	/* in/out: file buffer */
	mrec_buf_t*		buf,	/* in/out: secondary buffer */
	const byte*		b,	/* in: pointer to record */
	dict_index_t*		index,	/* in: index of the record */
	int			fd,	/* in: file descriptor */
	ulint*			foffs,	/* in/out: file offset */
	const mrec_t**		mrec,	/* out: pointer to merge record,
					or NULL on end of list
					(non-NULL on I/O error) */
	ulint*			offsets)/* out: offsets of mrec */
{
	ulint	extra_size;
	ulint	data_size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(index);
	ut_ad(foffs);
	ut_ad(mrec);
	ut_ad(offsets);
562

563
	ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
564
	      + dict_index_get_n_fields(index));
565

566
	extra_size = *b++;
567

568 569 570 571 572
	if (UNIV_UNLIKELY(!extra_size)) {
		/* End of list */
		*mrec = NULL;
		return(NULL);
	}
573

574 575
	if (extra_size >= 0x80) {
		/* Read another byte of extra_size. */
576

577 578 579 580 581 582 583
		if (UNIV_UNLIKELY(b >= block[1])) {
			if (!row_merge_read(fd, ++(*foffs), block)) {
err_exit:
				/* Signal I/O error. */
				*mrec = b;
				return(NULL);
			}
584

585 586 587
			/* Wrap around to the beginning of the buffer. */
			b = block[0];
		}
588

589 590 591
		extra_size = (extra_size & 0x7f) << 8;
		extra_size |= *b++;
	}
592

593 594
	/* Normalize extra_size.  Above, value 0 signals "end of list. */
	extra_size--;
595

596
	/* Read the extra bytes. */
597

598 599 600 601
	if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
		/* The record spans two blocks.  Copy the entire record
		to the auxiliary buffer and handle this as a special
		case. */
602

603
		avail_size = block[1] - b;
604

605
		memcpy(*buf, b, avail_size);
606

607
		if (!row_merge_read(fd, ++(*foffs), block)) {
608

609 610
			goto err_exit;
		}
611

612 613
		/* Wrap around to the beginning of the buffer. */
		b = block[0];
614

615 616 617
		/* Copy the record. */
		memcpy(*buf + avail_size, b, extra_size - avail_size);
		b += extra_size - avail_size;
618

619
		*mrec = *buf + extra_size;
620

621
		rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
622

623
		data_size = rec_offs_data_size(offsets);
624

625 626 627 628 629
		/* These overflows should be impossible given that
		records are much smaller than either buffer, and
		the record starts near the beginning of each buffer. */
		ut_a(extra_size + data_size < sizeof *buf);
		ut_a(b + data_size < block[1]);
630

631 632 633
		/* Copy the data bytes. */
		memcpy(*buf + extra_size, b, data_size);
		b += data_size;
634

635 636
		return(b);
	}
637

638
	*mrec = b + extra_size;
639

640
	rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
641

642 643
	data_size = rec_offs_data_size(offsets);
	ut_ad(extra_size + data_size < sizeof *buf);
644

645
	b += extra_size + data_size;
646

647 648 649 650 651
	if (UNIV_LIKELY(b < block[1])) {
		/* The record fits entirely in the block.
		This is the normal case. */
		return(b);
	}
652

653
	/* The record spans two blocks.  Copy it to buf. */
654

655 656 657 658
	avail_size = block[1] - b;
	memcpy(*buf, b, avail_size);
	*mrec = *buf + extra_size;
	rec_offs_make_valid(*mrec, index, offsets);
659

660
	if (!row_merge_read(fd, ++(*foffs), block)) {
661

662 663
		goto err_exit;
	}
664

665 666
	/* Wrap around to the beginning of the buffer. */
	b = block[0];
667

668 669 670
	/* Copy the rest of the record. */
	memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
	b += extra_size + data_size - avail_size;
671

672 673
	return(b);
}
674

675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
/************************************************************************
Write a merge record. */
static
void
row_merge_write_rec_low(
/*====================*/
	byte*		b,	/* out: buffer */
	ulint		e,	/* in: encoded extra_size */
	const mrec_t*	mrec,	/* in: record to write */
	const ulint*	offsets)/* in: offsets of mrec */
{
	if (e < 0x80) {
		*b++ = e;
	} else {
		*b++ = 0x80 | (e >> 8);
		*b++ = (byte) e;
691 692
	}

693
	memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
694 695 696
}

/************************************************************************
697
Write a merge record. */
698
static
699 700 701 702 703 704 705 706 707 708 709 710
byte*
row_merge_write_rec(
/*================*/
					/* out: pointer to end of block,
					or NULL on error */
	row_merge_block_t*	block,	/* in/out: file buffer */
	mrec_buf_t*		buf,	/* in/out: secondary buffer */
	byte*			b,	/* in: pointer to end of block */
	int			fd,	/* in: file descriptor */
	ulint*			foffs,	/* in/out: file offset */
	const mrec_t*		mrec,	/* in: record to write */
	const ulint*		offsets)/* in: offsets of mrec */
711
{
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
	ulint	extra_size;
	ulint	size;
	ulint	avail_size;

	ut_ad(block);
	ut_ad(buf);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(mrec);
	ut_ad(foffs);
	ut_ad(mrec < block[0] || mrec > block[1]);
	ut_ad(mrec < buf[0] || mrec > buf[1]);

	/* Normalize extra_size.  Value 0 signals "end of list". */
	extra_size = rec_offs_extra_size(offsets) + 1;

	size = extra_size + (extra_size >= 0x80)
		+ rec_offs_data_size(offsets);

	if (UNIV_UNLIKELY(b + size >= block[1])) {
		/* The record spans two blocks.
		Copy it to the temporary buffer first. */
		avail_size = block[1] - b;

		row_merge_write_rec_low(buf[0], extra_size, mrec, offsets);

		/* Copy the head of the temporary buffer, write
		the completed block, and copy the tail of the
		record to the head of the new block. */
		memcpy(b, buf[0], avail_size);

		if (!row_merge_write(fd, (*foffs)++, block)) {
			return(NULL);
		}
746

747 748 749 750 751 752 753
		/* Copy the rest. */
		b = block[0];
		memcpy(b, buf[0] + avail_size, size - avail_size);
		b += size - avail_size;
	} else {
		row_merge_write_rec_low(b, extra_size, mrec, offsets);
		b += rec_offs_size(offsets);
754 755
	}

756
	return(b);
757 758 759
}

/************************************************************************
760
Write an end-of-list marker. */
761
static
762 763 764 765 766 767 768 769 770
byte*
row_merge_write_eof(
/*================*/
					/* out: pointer to end of block,
					or NULL on error */
	row_merge_block_t*	block,	/* in/out: file buffer */
	byte*			b,	/* in: pointer to end of block */
	int			fd,	/* in: file descriptor */
	ulint*			foffs)	/* in/out: file offset */
771
{
772 773 774 775 776 777 778 779 780 781 782 783 784 785
	ut_ad(block);
	ut_ad(b >= block[0]);
	ut_ad(b < block[1]);
	ut_ad(foffs);

	*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
	/* The rest of the block is uninitialized.  Initialize it
	to avoid bogus warnings. */
	memset(b, 0, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */

	if (!row_merge_write(fd, (*foffs)++, block)) {
		return(NULL);
786 787
	}

788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
	return(block[0]);
}

/*****************************************************************
Compare two merge records. */
static
int
row_merge_cmp(
/*==========*/
					/* out: 1, 0, -1 if mrec1 is
					greater, equal, less,
					respectively, than mrec2 */
	const mrec_t*	mrec1,		/* in: first merge record to be
					compared */
	const mrec_t*	mrec2,		/* in: second merge record to be
					compared */
	const ulint*	offsets1,	/* in: first record offsets */
	const ulint*	offsets2,	/* in: second record offsets */
	dict_index_t*	index)		/* in: index */
{
	return(cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index));
809 810 811 812
}

/************************************************************************
Reads clustered index of the table and create temporary files
813
containing the index entries for the indexes to be built. */
814
static
815 816 817
ulint
row_merge_read_clustered_index(
/*===========================*/
818 819
					/* out: DB_SUCCESS or error */
	trx_t*			trx,	/* in: transaction */
820 821 822 823 824
	dict_table_t*		old_table,/* in: table where rows are
					read from */
	dict_table_t*		new_table,/* in: table where indexes are
					created; identical to old_table
					unless creating a PRIMARY KEY */
825 826 827 828
	dict_index_t**		index,	/* in: indexes to be created */
	merge_file_t*		files,	/* in: temporary files */
	ulint			n_index,/* in: number of indexes to create */
	row_merge_block_t*	block)	/* in/out: file buffer */
829
{
830 831
	dict_index_t*		clust_index;	/* Clustered index */
	mem_heap_t*		row_heap;	/* Heap memory to create
832
						clustered index records */
833 834
	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
	btr_pcur_t		pcur;		/* Persistent cursor on the
835
						clustered index */
836 837 838
	mtr_t			mtr;		/* Mini transaction */
	ulint			err = DB_SUCCESS;/* Return code */
	ulint			i;
839 840 841
	ulint			n_nonnull = 0;	/* number of columns
						changed to NOT NULL */
	ulint*			nonnull = NULL;	/* NOT NULL columns */
842

843
	trx->op_info = "reading clustered index";
844

845
	ut_ad(trx);
846 847
	ut_ad(old_table);
	ut_ad(new_table);
848 849
	ut_ad(index);
	ut_ad(files);
850

851
	/* Create and initialize memory for record buffers */
852

853
	merge_buf = mem_alloc(n_index * sizeof *merge_buf);
854

855 856
	for (i = 0; i < n_index; i++) {
		merge_buf[i] = row_merge_buf_create(index[i]);
857 858 859 860 861 862 863
	}

	mtr_start(&mtr);

	/* Find the clustered index and create a persistent cursor
	based on that. */

864
	clust_index = dict_table_get_first_index(old_table);
865 866 867 868

	btr_pcur_open_at_index_side(
		TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);

869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
	if (UNIV_UNLIKELY(old_table != new_table)) {
		ulint	n_cols = dict_table_get_n_cols(old_table);

		/* A primary key will be created.  Identify the
		columns that were flagged NOT NULL in the new table,
		so that we can quickly check that the records in the
		(old) clustered index do not violate the added NOT
		NULL constraints. */

		ut_a(n_cols == dict_table_get_n_cols(new_table));

		nonnull = mem_alloc(n_cols * sizeof *nonnull);

		for (i = 0; i < n_cols; i++) {
			if (dict_table_get_nth_col(old_table, i)->prtype
			    & DATA_NOT_NULL) {

				continue;
			}

			if (dict_table_get_nth_col(new_table, i)->prtype
			    & DATA_NOT_NULL) {

				nonnull[n_nonnull++] = i;
			}
		}

		if (!n_nonnull) {
			mem_free(nonnull);
			nonnull = NULL;
		}
	}

902
	row_heap = mem_heap_create(UNIV_PAGE_SIZE);
903

904
	/* Scan the clustered index. */
905 906
	for (;;) {
		const rec_t*	rec;
907
		dtuple_t*	row		= NULL;
908
		row_ext_t*	ext;
909
		ibool		has_next	= TRUE;
910

911 912
		btr_pcur_move_to_next_on_page(&pcur, &mtr);

913 914 915 916 917 918 919 920 921
		/* When switching pages, commit the mini-transaction
		in order to release the latch on the old page. */

		if (btr_pcur_is_after_last_on_page(&pcur, &mtr)) {
			btr_pcur_store_position(&pcur, &mtr);
			mtr_commit(&mtr);
			mtr_start(&mtr);
			btr_pcur_restore_position(BTR_SEARCH_LEAF,
						  &pcur, &mtr);
922
			has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
923
		}
924

925 926
		if (UNIV_LIKELY(has_next)) {
			rec = btr_pcur_get_rec(&pcur);
927

928
			/* Skip delete marked records. */
929 930
			if (rec_get_deleted_flag(
				    rec, dict_table_is_comp(old_table))) {
931 932
				continue;
			}
933 934

			srv_n_rows_inserted++;
935

936
			/* Build a row based on the clustered index. */
937 938 939 940

			row = row_build(ROW_COPY_POINTERS, clust_index,
					rec, NULL, &ext, row_heap);

941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
			if (UNIV_LIKELY_NULL(nonnull)) {
				for (i = 0; i < n_nonnull; i++) {
					dfield_t*	field
						= &row->fields[nonnull[i]];

					ut_a(!(field->type.prtype
					       & DATA_NOT_NULL));

					if (dfield_is_null(field)) {
						trx->error_key_num = 0;
						err = DB_PRIMARY_KEY_IS_NULL;
						goto func_exit;
					}

					field->type.prtype |= DATA_NOT_NULL;
				}
			}
958 959
		}

960 961 962
		/* Build all entries for all the indexes to be created
		in a single scan of the clustered index. */

963 964 965
		for (i = 0; i < n_index; i++) {
			row_merge_buf_t*	buf	= merge_buf[i];
			merge_file_t*		file	= &files[i];
966

967
			if (UNIV_LIKELY
968
			    (row && row_merge_buf_add(buf, row, ext))) {
969 970 971 972
				continue;
			}

			ut_ad(buf->n_tuples || !has_next);
973

974 975
			/* We have enough data tuples to form a block.
			Sort them and write to disk. */
976

977 978 979
			if (buf->n_tuples
			    && row_merge_buf_sort(buf)
			    && dict_index_is_unique(buf->index)) {
980
				trx->error_key_num = i;
981 982 983
				err = DB_DUPLICATE_KEY;
				goto func_exit;
			}
984

985
			row_merge_buf_write(buf, block);
986

987 988 989 990 991 992
			if (!row_merge_write(file->fd, file->offset++,
					     block)) {
				trx->error_key_num = i;
				err = DB_OUT_OF_FILE_SPACE;
				goto func_exit;
			}
993

994
			merge_buf[i] = row_merge_buf_empty(buf);
995
		}
996

997
		mem_heap_empty(row_heap);
998

999 1000 1001 1002
		if (UNIV_UNLIKELY(!has_next)) {
			goto func_exit;
		}
	}
1003

1004 1005 1006 1007
func_exit:
	btr_pcur_close(&pcur);
	mtr_commit(&mtr);
	mem_heap_free(row_heap);
1008

1009 1010 1011 1012
	if (UNIV_LIKELY_NULL(nonnull)) {
		mem_free(nonnull);
	}

1013 1014 1015
	for (i = 0; i < n_index; i++) {
		row_merge_buf_free(merge_buf[i]);
	}
1016

1017
	mem_free(merge_buf);
1018

1019
	trx->op_info = "";
1020

1021 1022
	return(err);
}
1023

1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
/*****************************************************************
Merge two blocks of linked lists on disk and write a bigger block. */
static
ulint
row_merge_blocks(
/*=============*/
					/* out: DB_SUCCESS or error code */
	dict_index_t*		index,	/* in: index being created */
	merge_file_t*		file,	/* in/out: file containing
					index entries */
1034 1035
	row_merge_block_t*	block,	/* in/out: 3 buffers */
	ulint*			foffs0,	/* in/out: offset of first
1036
					source list in the file */
1037
	ulint*			foffs1,	/* in/out: offset of second
1038 1039 1040
					source list in the file */
	merge_file_t*		of)	/* in/out: output file */
{
1041 1042 1043 1044 1045 1046 1047 1048 1049
	mem_heap_t*	heap;	/* memory heap for offsets0, offsets1 */

	mrec_buf_t	buf[3];	/* buffer for handling split mrec in block[] */
	const byte*	b0;	/* pointer to block[0] */
	const byte*	b1;	/* pointer to block[1] */
	byte*		b2;	/* pointer to block[2] */
	const mrec_t*	mrec0;	/* merge rec, points to block[0] or buf[0] */
	const mrec_t*	mrec1;	/* merge rec, points to block[1] or buf[1] */
	ulint*		offsets0;/* offsets of mrec0 */
1050 1051
	ulint*		offsets1;/* offsets of mrec1 */

1052
	heap = row_merge_heap_create(index, &offsets0, &offsets1);
1053 1054 1055 1056 1057

	/* Write a record and read the next record.  Split the output
	file in two halves, which can be merged on the following pass. */
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)				\
	do {								\
1058
		b2 = row_merge_write_rec(&block[2], &buf[2], b2,	\
1059 1060
					 of->fd, &of->offset,		\
					 mrec##N, offsets##N);		\
1061
		if (UNIV_UNLIKELY(!b2)) {				\
1062 1063
			goto corrupt;					\
		}							\
1064
		b##N = row_merge_read_rec(&block[N], &buf[N],		\
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
					  b##N, index,			\
					  file->fd, foffs##N,		\
					  &mrec##N, offsets##N);	\
		if (UNIV_UNLIKELY(!b##N)) {				\
			if (mrec##N) {					\
				goto corrupt;				\
			}						\
			AT_END;						\
		}							\
	} while (0)

1076 1077
	if (!row_merge_read(file->fd, *foffs0, &block[0])
	    || !row_merge_read(file->fd, *foffs1, &block[1])) {
1078 1079 1080 1081 1082
corrupt:
		mem_heap_free(heap);
		return(DB_CORRUPTION);
	}

1083 1084 1085
	b0 = block[0];
	b1 = block[1];
	b2 = block[2];
1086

1087 1088 1089
	b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
				foffs0, &mrec0, offsets0);
	b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
1090
				foffs1, &mrec1, offsets1);
1091 1092
	if (UNIV_UNLIKELY(!b0 && mrec0)
	    || UNIV_UNLIKELY(!b1 && mrec1)) {
1093

1094 1095 1096
		goto corrupt;
	}

1097 1098 1099
	while (mrec0 && mrec1) {
		switch (row_merge_cmp(mrec0, mrec1,
				      offsets0, offsets1, index)) {
1100 1101 1102 1103 1104
		case 0:
			if (UNIV_UNLIKELY
			    (dict_index_is_unique(index))) {
				mem_heap_free(heap);
				return(DB_DUPLICATE_KEY);
1105
			}
1106 1107
			/* fall through */
		case -1:
1108
			ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
1109 1110
			break;
		case 1:
1111
			ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
1112 1113 1114
			break;
		default:
			ut_error;
1115
		}
1116

1117 1118
	}

1119
merged:
1120 1121
	if (mrec0) {
		/* append all mrec0 to output */
1122
		for (;;) {
1123
			ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
1124 1125
		}
	}
1126
done0:
1127 1128
	if (mrec1) {
		/* append all mrec1 to output */
1129
		for (;;) {
1130
			ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
1131 1132
		}
	}
1133
done1:
1134

1135
	mem_heap_free(heap);
1136 1137
	b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
	return(b2 ? DB_SUCCESS : DB_CORRUPTION);
1138
}
1139

1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
/*****************************************************************
Merge disk files. */
static
ulint
row_merge(
/*======*/
						/* out: DB_SUCCESS
						or error code */
	dict_index_t*		index,		/* in: index being created */
	merge_file_t*		file,		/* in/out: file containing
						index entries */
1151
	row_merge_block_t*	block,		/* in/out: 3 buffers */
1152 1153 1154
	int*			tmpfd)		/* in/out: temporary file
						handle */
{
1155 1156
	ulint		foffs0;	/* first input offset */
	ulint		foffs1;	/* second input offset */
1157 1158 1159
	ulint		half;	/* upper limit of foffs1 */
	ulint		error;	/* error code */
	merge_file_t	of;	/* output file */
1160

1161 1162
	of.fd = *tmpfd;
	of.offset = 0;
1163

1164 1165
	/* Split the input file in two halves. */
	half = file->offset / 2;
1166

1167
	/* Merge blocks to the output file. */
1168 1169
	foffs0 = 0;
	foffs1 = half;
1170

1171 1172 1173
	for (; foffs0 < half; foffs0++, foffs1++) {
		error = row_merge_blocks(index, file, block,
					 &foffs0, &foffs1, &of);
1174

1175 1176
		if (error != DB_SUCCESS) {
			return(error);
1177
		}
1178
	}
1179

1180
	/* Copy the last block, if there is one. */
1181 1182 1183
	while (foffs1 < file->offset) {
		if (!row_merge_read(file->fd, foffs1++, block)
		    || !row_merge_write(of.fd, of.offset++, block)) {
1184
			return(DB_CORRUPTION);
1185
		}
1186 1187
	}

1188 1189 1190
	/* Swap file descriptors for the next pass. */
	*tmpfd = file->fd;
	*file = of;
1191

1192 1193
	return(DB_SUCCESS);
}
1194

1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
/*****************************************************************
Merge disk files. */
static
ulint
row_merge_sort(
/*===========*/
						/* out: DB_SUCCESS
						or error code */
	dict_index_t*		index,		/* in: index being created */
	merge_file_t*		file,		/* in/out: file containing
						index entries */
1206
	row_merge_block_t*	block,		/* in/out: 3 buffers */
1207 1208 1209 1210
	int*			tmpfd)		/* in/out: temporary file
						handle */
{
	ulint	blksz;	/* block size */
1211

1212
	blksz = 1;
1213

1214
	for (;; blksz *= 2) {
1215
		ulint	error = row_merge(index, file, block, tmpfd);
1216 1217 1218
		if (error != DB_SUCCESS) {
			return(error);
		}
1219

1220 1221 1222 1223
		if (blksz >= file->offset) {
			/* everything is in a single block */
			break;
		}
1224

1225 1226 1227
		/* Round up the file size to a multiple of blksz. */
		file->offset = ut_2pow_round(file->offset - 1, blksz) + blksz;
	}
1228

1229
	return(DB_SUCCESS);
1230 1231
}

1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
/*****************************************************************
Copy externally stored columns to the data tuple. */
static
void
row_merge_copy_blobs(
/*=================*/
	const mrec_t*	mrec,	/* in: merge record */
	const ulint*	offsets,/* in: offsets of mrec */
	ulint		zip_size,/* in: compressed page size in bytes, or 0 */
	dtuple_t*	tuple,	/* in/out: data tuple */
	mem_heap_t*	heap)	/* in/out: memory heap */
{
	ulint	i;
	ulint	n_fields = dtuple_get_n_fields(tuple);

	for (i = 0; i < n_fields; i++) {
		ulint		len;
		const void*	data;
		dfield_t*	field = dtuple_get_nth_field(tuple, i);

		if (!dfield_is_ext(field)) {
			continue;
		}

		ut_ad(!dfield_is_null(field));

		data = btr_rec_copy_externally_stored_field(
			mrec, offsets, zip_size, i, &len, heap);

		dfield_set_data(field, data, len);
	}
}

1265 1266 1267
/************************************************************************
Read sorted file containing index data tuples and insert these data
tuples to the index */
1268
static
1269 1270 1271
ulint
row_merge_insert_index_tuples(
/*==========================*/
1272 1273 1274
					/* out: DB_SUCCESS or error number */
	trx_t*			trx,	/* in: transaction */
	dict_index_t*		index,	/* in: index */
1275 1276 1277
	dict_table_t*		table,	/* in: new table */
	ulint			zip_size,/* in: compressed page size of
					 the old table, or 0 if uncompressed */
1278 1279
	int			fd,	/* in: file descriptor */
	row_merge_block_t*	block)	/* in/out: file buffer */
1280
{
1281 1282 1283 1284 1285 1286 1287 1288 1289
	mrec_buf_t		buf;
	const byte*		b;
	que_thr_t*		thr;
	ins_node_t*		node;
	mem_heap_t*		tuple_heap;
	mem_heap_t*		graph_heap;
	ulint			error = DB_SUCCESS;
	ulint			foffs = 0;
	ulint*			offsets;
1290

1291 1292 1293
	ut_ad(trx);
	ut_ad(index);
	ut_ad(table);
1294 1295 1296 1297 1298 1299

	/* We use the insert query graph as the dummy graph
	needed in the row module call */

	trx->op_info = "inserting index entries";

1300
	graph_heap = mem_heap_create(500);
1301 1302 1303 1304 1305 1306
	node = ins_node_create(INS_DIRECT, table, graph_heap);

	thr = pars_complete_graph_for_exec(node, trx, graph_heap);

	que_thr_move_to_run_state_for_mysql(thr, trx);

1307
	tuple_heap = mem_heap_create(1000);
1308

1309
	{
1310
		ulint i	= 1 + REC_OFFS_HEADER_SIZE
1311 1312 1313 1314 1315
			+ dict_index_get_n_fields(index);
		offsets = mem_heap_alloc(graph_heap, i * sizeof *offsets);
		offsets[0] = i;
		offsets[1] = dict_index_get_n_fields(index);
	}
1316

1317
	b = *block;
1318

1319 1320 1321 1322 1323 1324
	if (!row_merge_read(fd, foffs, block)) {
		error = DB_CORRUPTION;
	} else {
		for (;;) {
			const mrec_t*	mrec;
			dtuple_t*	dtuple;
1325
			ulint		n_ext;
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335

			b = row_merge_read_rec(block, &buf, b, index,
					       fd, &foffs, &mrec, offsets);
			if (UNIV_UNLIKELY(!b)) {
				/* End of list, or I/O error */
				if (mrec) {
					error = DB_CORRUPTION;
				}
				break;
			}
1336

1337
			n_ext = 0;
1338
			dtuple = row_rec_to_index_entry_low(
1339 1340 1341 1342 1343 1344
				mrec, index, offsets, &n_ext, tuple_heap);

			if (UNIV_UNLIKELY(n_ext)) {
				row_merge_copy_blobs(mrec, offsets, zip_size,
						     dtuple, tuple_heap);
			}
1345

1346 1347 1348
			node->row = dtuple;
			node->table = table;
			node->trx_id = trx->id;
1349

1350
			ut_ad(dtuple_validate(dtuple));
1351

1352 1353 1354
			do {
				thr->run_node = thr;
				thr->prev_node = thr->common.parent;
1355

1356 1357
				error = row_ins_index_entry(index, dtuple,
							    0, FALSE, thr);
1358

1359
				if (UNIV_LIKELY(error == DB_SUCCESS)) {
1360

1361 1362
					goto next_rec;
				}
1363

1364 1365 1366 1367 1368 1369
				thr->lock_state = QUE_THR_LOCK_ROW;
				trx->error_state = error;
				que_thr_stop_for_mysql(thr);
				thr->lock_state = QUE_THR_LOCK_NOLOCK;
			} while (row_mysql_handle_errors(&error, trx,
							 thr, NULL));
1370

1371
			goto err_exit;
1372
next_rec:
1373
			mem_heap_empty(tuple_heap);
1374
		}
1375
	}
1376 1377

	que_thr_stop_for_mysql_no_error(thr, trx);
1378
err_exit:
1379 1380 1381 1382
	que_graph_free(thr->graph);

	trx->op_info = "";

1383
	mem_heap_free(tuple_heap);
1384 1385 1386 1387 1388

	return(error);
}

/*************************************************************************
marko's avatar
marko committed
1389
Drop an index from the InnoDB system tables. */
1390

marko's avatar
marko committed
1391 1392 1393
void
row_merge_drop_index(
/*=================*/
1394 1395 1396 1397 1398 1399
	dict_index_t*	index,	/* in: index to be removed */
	dict_table_t*	table,	/* in: table */
	trx_t*		trx)	/* in: transaction handle */
{
	ulint		err;
	ibool		dict_lock = FALSE;
1400
	pars_info_t*	info = pars_info_create();
1401 1402 1403 1404 1405 1406 1407 1408 1409

	/* We use the private SQL parser of Innobase to generate the
	query graphs needed in deleting the dictionary data from system
	tables in Innobase. Deleting a row from SYS_INDEXES table also
	frees the file segments of the B-tree associated with the index. */

	static const char str1[] =
		"PROCEDURE DROP_INDEX_PROC () IS\n"
		"BEGIN\n"
1410 1411 1412
		"DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
		"DELETE FROM SYS_INDEXES WHERE ID = :indexid\n"
		"		AND TABLE_ID = :tableid;\n"
1413 1414 1415 1416
		"END;\n";

	ut_ad(index && table && trx);

1417 1418 1419
	pars_info_add_dulint_literal(info, "indexid", index->id);
	pars_info_add_dulint_literal(info, "tableid", table->id);

1420 1421 1422 1423 1424 1425 1426 1427
	trx_start_if_not_started(trx);
	trx->op_info = "dropping index";

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_lock = TRUE;
	}

1428
	err = que_eval_sql(info, str1, FALSE, trx);
1429

1430
	ut_a(err == DB_SUCCESS);
1431

1432 1433
	/* Replace this index with another equivalent index for all
	foreign key constraints on this table where this index is used */
1434

1435
	dict_table_replace_index_in_foreign_list(table, index);
1436

1437 1438
	if (trx->dict_redo_list) {
		dict_redo_remove_index(trx, index);
1439 1440
	}

1441
	dict_index_remove_from_cache(table, index);
1442 1443 1444 1445 1446 1447

	if (dict_lock) {
		row_mysql_unlock_data_dictionary(trx);
	}

	trx->op_info = "";
marko's avatar
marko committed
1448
}
1449

marko's avatar
marko committed
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
/*************************************************************************
Drop those indexes which were created before an error occurred
when building an index. */

void
row_merge_drop_indexes(
/*===================*/
	trx_t*		trx,		/* in: transaction */
	dict_table_t*	table,		/* in: table containing the indexes */
	dict_index_t**	index,		/* in: indexes to drop */
	ulint		num_created)	/* in: number of elements in index[] */
{
	ulint	key_num;

	for (key_num = 0; key_num < num_created; key_num++) {
		row_merge_drop_index(index[key_num], table, trx);
	}
1467 1468 1469
}

/*************************************************************************
1470 1471
Create a merge file. */
static
1472 1473 1474 1475
void
row_merge_file_create(
/*==================*/
	merge_file_t*	merge_file)	/* out: merge file structure */
1476
{
1477
	merge_file->fd = innobase_mysql_tmpfile();
1478
	merge_file->offset = 0;
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
}

/*************************************************************************
Destroy a merge file. */
static
void
row_merge_file_destroy(
/*===================*/
	merge_file_t*	merge_file)	/* out: merge file structure */
{
	if (merge_file->fd != -1) {
		close(merge_file->fd);
		merge_file->fd = -1;
	}
1493 1494 1495
}

/*************************************************************************
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
Determine the precise type of a column that is added to a tem
if a column must be constrained NOT NULL. */
UNIV_INLINE
ulint
row_merge_col_prtype(
/*=================*/
						/* out: col->prtype, possibly
						ORed with DATA_NOT_NULL */
	const dict_col_t*	col,		/* in: column */
	const char*		col_name,	/* in: name of the column */
	const merge_index_def_t*index_def)	/* in: the index definition
						of the primary key */
{
	ulint	prtype = col->prtype;
	ulint	i;

	ut_ad(index_def->ind_type & DICT_CLUSTERED);

	if (prtype & DATA_NOT_NULL) {

		return(prtype);
	}

	/* All columns that are included
	in the PRIMARY KEY must be NOT NULL. */

	for (i = 0; i < index_def->n_fields; i++) {
		if (!strcmp(col_name, index_def->fields[i].field_name)) {
			return(prtype | DATA_NOT_NULL);
		}
	}

	return(prtype);
}

/*************************************************************************
Create a temporary table for creating a primary key, using the definition
of an existing table. */
1534 1535 1536 1537

dict_table_t*
row_merge_create_temporary_table(
/*=============================*/
1538 1539 1540 1541 1542 1543 1544 1545
						/* out: table,
						or NULL on error */
	const char*		table_name,	/* in: new table name */
	const merge_index_def_t*index_def,	/* in: the index definition
						of the primary key */
	const dict_table_t*	table,		/* in: old table definition */
	trx_t*			trx)		/* in/out: transaction
						(sets error_state) */
1546 1547 1548 1549
{
	ulint		i;
	dict_table_t*	new_table = NULL;
	ulint		n_cols = dict_table_get_n_user_cols(table);
1550
	ulint		error;
1551

1552
	ut_ad(table_name);
1553
	ut_ad(index_def);
1554
	ut_ad(table);
1555 1556
	ut_ad(mutex_own(&dict_sys->mutex));

1557
	error = row_undo_report_create_table_dict_operation(trx, table_name);
1558

1559
	if (error == DB_SUCCESS) {
1560 1561 1562 1563 1564 1565 1566 1567 1568

		mem_heap_t*	heap = mem_heap_create(1000);
		log_buffer_flush_to_disk();

		new_table = dict_mem_table_create(
			table_name, 0, n_cols, table->flags);

		for (i = 0; i < n_cols; i++) {
			const dict_col_t*	col;
1569
			const char*		col_name;
1570 1571

			col = dict_table_get_nth_col(table, i);
1572
			col_name = dict_table_get_col_name(table, i);
1573 1574

			dict_mem_table_add_col(
1575 1576 1577
				new_table, heap, col_name, col->mtype,
				row_merge_col_prtype(col, col_name, index_def),
				col->len);
1578 1579
		}

1580
		error = row_create_table_for_mysql(new_table, trx);
1581
		mem_heap_free(heap);
1582 1583 1584 1585 1586 1587 1588 1589 1590

		if (error != DB_SUCCESS) {
			dict_mem_table_free(new_table);
			new_table = NULL;
		}
	}

	if (error != DB_SUCCESS) {
		trx->error_state = error;
1591 1592 1593 1594 1595 1596
	}

	return(new_table);
}

/*************************************************************************
1597
Rename the indexes in the dictionary. */
1598 1599 1600 1601

ulint
row_merge_rename_index(
/*===================*/
1602
					/* out: DB_SUCCESS if all OK */
1603 1604 1605 1606 1607 1608
	trx_t*		trx,		/* in: Transaction */
	dict_table_t*	table,		/* in: Table for index */
	dict_index_t*	index)		/* in: Index to rename */
{
	ibool		dict_lock = FALSE;
	ulint		err = DB_SUCCESS;
1609
	pars_info_t*	info = pars_info_create();
1610 1611 1612 1613 1614 1615 1616 1617 1618 1619

	/* Only rename from temp names */
	ut_a(*index->name == TEMP_TABLE_PREFIX);

	/* We use the private SQL parser of Innobase to generate the
	query graphs needed in renaming index. */

	static const char str1[] =
		"PROCEDURE RENAME_INDEX_PROC () IS\n"
		"BEGIN\n"
1620 1621
		"UPDATE SYS_INDEXES SET NAME = :name\n"
		" WHERE ID = :indexid AND TABLE_ID = :tableid;\n"
1622 1623 1624 1625 1626 1627 1628 1629 1630
		"END;\n";

	table = index->table;

	ut_ad(index && table && trx);

	trx_start_if_not_started(trx);
	trx->op_info = "renaming index";

1631
	pars_info_add_str_literal(info, "name", index->name + 1);
1632 1633
	pars_info_add_dulint_literal(info, "indexid", index->id);
	pars_info_add_dulint_literal(info, "tableid", table->id);
1634 1635 1636 1637 1638 1639

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_lock = TRUE;
	}

1640
	err = que_eval_sql(info, str1, FALSE, trx);
1641 1642

	if (err == DB_SUCCESS) {
1643
		index->name++;
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
	}

	if (dict_lock) {
		row_mysql_unlock_data_dictionary(trx);
	}

	trx->op_info = "";

	return(err);
}

/*************************************************************************
1656
Create the index and load in to the dictionary. */
1657

1658
dict_index_t*
1659 1660
row_merge_create_index(
/*===================*/
1661 1662
					/* out: index, or NULL on error */
	trx_t*		trx,		/* in/out: trx (sets error_state) */
1663 1664 1665 1666
	dict_table_t*	table,		/* in: the index is on this table */
	const merge_index_def_t*	/* in: the index definition */
			index_def)
{
1667
	dict_index_t*	index;
1668 1669 1670 1671 1672
	ulint		err = DB_SUCCESS;
	ulint		n_fields = index_def->n_fields;

	/* Create the index prototype, using the passed in def, this is not
	a persistent operation. We pass 0 as the space id, and determine at
1673
	a lower level the space id where to store the table. */
1674

1675 1676
	index = dict_mem_index_create(table->name, index_def->name,
				      0, index_def->ind_type, n_fields);
1677

1678
	ut_a(index);
1679 1680 1681

	/* Create the index id, as it will be required when we build
	the index. We assign the id here because we want to write an
1682 1683
	UNDO record before we insert the entry into SYS_INDEXES. */
	ut_a(ut_dulint_is_zero(index->id));
1684

1685 1686
	index->id = dict_hdr_get_new_id(DICT_HDR_INDEX_ID);
	index->table = table;
1687 1688

	/* Write the UNDO record for the create index */
1689
	err = row_undo_report_create_index_dict_operation(trx, index);
1690 1691 1692 1693 1694 1695 1696 1697 1698 1699

	if (err == DB_SUCCESS) {
		ulint		i;

		/* Make sure the UNDO record gets to disk */
		log_buffer_flush_to_disk();

		for (i = 0; i < n_fields; i++) {
			merge_index_field_t* ifield;

1700
			ifield = &index_def->fields[i];
1701

1702
			dict_mem_index_add_field(index,
1703 1704
						 ifield->field_name,
						 ifield->prefix_len);
1705 1706 1707
		}

		/* Add the index to SYS_INDEXES, this will use the prototype
1708 1709
		to create an entry in SYS_INDEXES. */
		err = row_create_index_graph_for_mysql(trx, table, index);
1710 1711 1712

		if (err == DB_SUCCESS) {

1713
			index = row_merge_dict_table_get_index(
1714 1715
				table, index_def);

1716
			ut_a(index);
1717 1718 1719

			/* Note the id of the transaction that created this
			index, we use it to restrict readers from accessing
1720 1721
			this index, to ensure read consistency. */
			index->trx_id = trx->id;
1722 1723

			/* Create element and append to list in trx. So that
1724
			we can rename from temp name to real name. */
1725 1726 1727 1728
			if (trx->dict_redo_list) {
				dict_redo_t*	dict_redo;

				dict_redo = dict_redo_create_element(trx);
1729
				dict_redo->index = index;
1730 1731 1732 1733
			}
		}
	}

1734 1735 1736 1737 1738
	if (err != DB_SUCCESS) {
		trx->error_state = err;
	}

	return(index);
1739 1740 1741
}

/*************************************************************************
1742
Check if a transaction can use an index. */
1743 1744 1745 1746

ibool
row_merge_is_index_usable(
/*======================*/
1747 1748
	const trx_t*		trx,	/* in: transaction */
	const dict_index_t*	index)	/* in: index to check */
1749 1750 1751 1752 1753 1754 1755 1756 1757
{
	if (!trx->read_view) {
		return(TRUE);
	}

	return(ut_dulint_cmp(index->trx_id, trx->read_view->low_limit_id) < 0);
}

/*************************************************************************
1758
Drop the old table. */
1759 1760 1761 1762

ulint
row_merge_drop_table(
/*=================*/
1763
					/* out: DB_SUCCESS or error code */
1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776
	trx_t*		trx,		/* in: transaction */
	dict_table_t*	table)		/* in: table to drop */
{
	ulint		err = DB_SUCCESS;
	ibool		dict_locked = FALSE;

	if (trx->dict_operation_lock_mode == 0) {
		row_mysql_lock_data_dictionary(trx);
		dict_locked = TRUE;
	}

	ut_a(*table->name == TEMP_TABLE_PREFIX);

marko's avatar
marko committed
1777
	/* Drop the table immediately if it is not referenced by MySQL */
1778
	if (table->n_mysql_handles_opened == 0) {
1779 1780 1781 1782 1783
		/* Copy table->name, because table will have been
		freed when row_drop_table_for_mysql_no_commit()
		checks with dict_load_table() that the table was
		indeed dropped. */
		char* table_name = mem_strdup(table->name);
1784
		/* Set the commit flag to FALSE. */
1785 1786
		err = row_drop_table_for_mysql(table_name, trx, FALSE);
		mem_free(table_name);
1787 1788 1789 1790 1791 1792 1793 1794
	}

	if (dict_locked) {
		row_mysql_unlock_data_dictionary(trx);
	}

	return(err);
}
1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805

/*************************************************************************
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */

ulint
row_merge_build_indexes(
/*====================*/
					/* out: DB_SUCCESS or error code */
	trx_t*		trx,		/* in: transaction */
marko's avatar
marko committed
1806
	dict_table_t*	old_table,	/* in: table where rows are
1807
					read from */
marko's avatar
marko committed
1808 1809 1810
	dict_table_t*	new_table,	/* in: table where indexes are
					created; identical to old_table
					unless creating a PRIMARY KEY */
1811 1812 1813 1814
	dict_index_t**	indexes,	/* in: indexes to be created */
	ulint		n_indexes)	/* in: size of indexes[] */
{
	merge_file_t*		merge_files;
1815 1816
	row_merge_block_t*	block;
	ulint			block_size;
1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
	ulint			i;
	ulint			error;
	int			tmpfd;

	ut_ad(trx);
	ut_ad(old_table);
	ut_ad(new_table);
	ut_ad(indexes);
	ut_ad(n_indexes);

	trx_start_if_not_started(trx);

	/* Allocate memory for merge file data structure and initialize
	fields */

	merge_files = mem_alloc(n_indexes * sizeof *merge_files);
1833 1834
	block_size = 3 * sizeof *block;
	block = os_mem_alloc_large(&block_size);
1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846

	for (i = 0; i < n_indexes; i++) {

		row_merge_file_create(&merge_files[i]);
	}

	tmpfd = innobase_mysql_tmpfile();

	/* Read clustered index of the table and create files for
	secondary index entries for merge sort */

	error = row_merge_read_clustered_index(
1847 1848
		trx, old_table, new_table, indexes,
		merge_files, n_indexes, block);
1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861

	if (error != DB_SUCCESS) {

		goto func_exit;
	}

	trx_start_if_not_started(trx);

	/* Now we have files containing index entries ready for
	sorting and inserting. */

	for (i = 0; i < n_indexes; i++) {
		error = row_merge_sort(indexes[i], &merge_files[i],
1862
				       block, &tmpfd);
1863 1864 1865 1866

		if (error == DB_SUCCESS) {
			error = row_merge_insert_index_tuples(
				trx, indexes[i], new_table,
1867
				dict_table_zip_size(old_table),
1868
				merge_files[i].fd, block);
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
		}

		/* Close the temporary file to free up space. */
		row_merge_file_destroy(&merge_files[i]);

		if (error != DB_SUCCESS) {
			trx->error_key_num = i;
			goto func_exit;
		}
	}

func_exit:
	close(tmpfd);

	for (i = 0; i < n_indexes; i++) {
		row_merge_file_destroy(&merge_files[i]);
	}

	mem_free(merge_files);
1888
	os_mem_free_large(block, block_size);
1889 1890 1891

	return(error);
}