btr0btr.c 106 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1994, 2010, Innobase Oy. All Rights Reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place, Suite 330, Boston, MA 02111-1307 USA

*****************************************************************************/

19 20
/**************************************************//**
@file btr/btr0btr.c
osku's avatar
osku committed
21 22 23 24
The B-tree

Created 6/2/1994 Heikki Tuuri
*******************************************************/
25

osku's avatar
osku committed
26 27 28 29 30 31 32 33
#include "btr0btr.h"

#ifdef UNIV_NONINL
#include "btr0btr.ic"
#endif

#include "fsp0fsp.h"
#include "page0page.h"
marko's avatar
marko committed
34
#include "page0zip.h"
35 36

#ifndef UNIV_HOTBACKUP
osku's avatar
osku committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
#include "btr0cur.h"
#include "btr0sea.h"
#include "btr0pcur.h"
#include "rem0cmp.h"
#include "lock0lock.h"
#include "ibuf0ibuf.h"
#include "trx0trx.h"

/*
Latching strategy of the InnoDB B-tree
--------------------------------------
A tree latch protects all non-leaf nodes of the tree. Each node of a tree
also has a latch of its own.

A B-tree operation normally first acquires an S-latch on the tree. It
searches down the tree and releases the tree latch when it has the
leaf node latch. To save CPU time we do not acquire any latch on
non-leaf nodes of the tree during a search, those pages are only bufferfixed.

If an operation needs to restructure the tree, it acquires an X-latch on
the tree before searching to a leaf node. If it needs, for example, to
split a leaf,
(1) InnoDB decides the split point in the leaf,
(2) allocates a new page,
(3) inserts the appropriate node pointer to the first non-leaf level,
(4) releases the tree X-latch,
(5) and then moves records from the leaf to the new allocated page.

Node pointers
-------------
Leaf pages of a B-tree contain the index records stored in the
tree. On levels n > 0 we store 'node pointers' to pages on level
n - 1. For each page there is exactly one node pointer stored:
thus the our tree is an ordinary B-tree, not a B-link tree.

A node pointer contains a prefix P of an index record. The prefix
is long enough so that it determines an index record uniquely.
The file page number of the child page is added as the last
field. To the child page we can store node pointers or index records
which are >= P in the alphabetical order, but < P1 if there is
a next node pointer on the level, and P1 is its prefix.

79
If a node pointer with a prefix P points to a non-leaf child,
osku's avatar
osku committed
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
then the leftmost record in the child must have the same
prefix P. If it points to a leaf node, the child is not required
to contain any record with a prefix equal to P. The leaf case
is decided this way to allow arbitrary deletions in a leaf node
without touching upper levels of the tree.

We have predefined a special minimum record which we
define as the smallest record in any alphabetical order.
A minimum record is denoted by setting a bit in the record
header. A minimum record acts as the prefix of a node pointer
which points to a leftmost node on any level of the tree.

File page allocation
--------------------
In the root node of a B-tree there are two file segment headers.
The leaf pages of a tree are allocated from one file segment, to
make them consecutive on disk if possible. From the other file segment
we allocate pages for the non-leaf levels of the tree.
*/

100
#ifdef UNIV_BTR_DEBUG
101
/**************************************************************//**
102 103
Checks a file segment header within a B-tree root page.
@return	TRUE if valid */
104 105 106 107
static
ibool
btr_root_fseg_validate(
/*===================*/
108 109
	const fseg_header_t*	seg_header,	/*!< in: segment header */
	ulint			space)		/*!< in: tablespace identifier */
110 111 112 113 114 115 116 117 118 119
{
	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);

	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
	ut_a(offset >= FIL_PAGE_DATA);
	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
	return(TRUE);
}
#endif /* UNIV_BTR_DEBUG */

120
/**************************************************************//**
121 122
Gets the root node of a tree and x-latches it.
@return	root page, x-latched */
osku's avatar
osku committed
123
static
124 125 126
buf_block_t*
btr_root_block_get(
/*===============*/
127 128
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
129 130
{
	ulint		space;
131
	ulint		zip_size;
132 133 134 135
	ulint		root_page_no;
	buf_block_t*	block;

	space = dict_index_get_space(index);
136
	zip_size = dict_table_zip_size(index->table);
137 138
	root_page_no = dict_index_get_page(index);

139
	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH, mtr);
140 141
	ut_a((ibool)!!page_is_comp(buf_block_get_frame(block))
	     == dict_table_is_comp(index->table));
142
#ifdef UNIV_BTR_DEBUG
143 144 145 146 147 148 149 150
	if (!dict_index_is_ibuf(index)) {
		const page_t*	root = buf_block_get_frame(block);

		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}
151
#endif /* UNIV_BTR_DEBUG */
152 153 154

	return(block);
}
osku's avatar
osku committed
155

156
/**************************************************************//**
157 158
Gets the root node of a tree and x-latches it.
@return	root page, x-latched */
159
UNIV_INTERN
osku's avatar
osku committed
160 161 162
page_t*
btr_root_get(
/*=========*/
163 164
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
165
{
166
	return(buf_block_get_frame(btr_root_block_get(index, mtr)));
osku's avatar
osku committed
167 168
}

169
/*************************************************************//**
osku's avatar
osku committed
170
Gets pointer to the previous user record in the tree. It is assumed that
171 172
the caller has appropriate latches on the page and its neighbor.
@return	previous user record, NULL if there is none */
173
UNIV_INTERN
osku's avatar
osku committed
174 175 176
rec_t*
btr_get_prev_user_rec(
/*==================*/
177 178
	rec_t*	rec,	/*!< in: record on leaf level */
	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
osku's avatar
osku committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
			needed, also to the previous page */
{
	page_t*	page;
	page_t*	prev_page;
	ulint	prev_page_no;

	if (!page_rec_is_infimum(rec)) {

		rec_t*	prev_rec = page_rec_get_prev(rec);

		if (!page_rec_is_infimum(prev_rec)) {

			return(prev_rec);
		}
	}
194

195
	page = page_align(rec);
osku's avatar
osku committed
196
	prev_page_no = btr_page_get_prev(page, mtr);
197

osku's avatar
osku committed
198 199
	if (prev_page_no != FIL_NULL) {

200 201
		ulint		space;
		ulint		zip_size;
202 203
		buf_block_t*	prev_block;

204 205 206 207 208
		space = page_get_space_id(page);
		zip_size = fil_space_get_zip_size(space);

		prev_block = buf_page_get_with_no_latch(space, zip_size,
							prev_page_no, mtr);
209
		prev_page = buf_block_get_frame(prev_block);
osku's avatar
osku committed
210
		/* The caller must already have a latch to the brother */
211 212 213 214
		ut_ad(mtr_memo_contains(mtr, prev_block,
					MTR_MEMO_PAGE_S_FIX)
		      || mtr_memo_contains(mtr, prev_block,
					   MTR_MEMO_PAGE_X_FIX));
215
#ifdef UNIV_BTR_DEBUG
216
		ut_a(page_is_comp(prev_page) == page_is_comp(page));
217
		ut_a(btr_page_get_next(prev_page, mtr)
218
		     == page_get_page_no(page));
219
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
220 221 222 223 224 225 226

		return(page_rec_get_prev(page_get_supremum_rec(prev_page)));
	}

	return(NULL);
}

227
/*************************************************************//**
osku's avatar
osku committed
228
Gets pointer to the next user record in the tree. It is assumed that the
229 230
caller has appropriate latches on the page and its neighbor.
@return	next user record, NULL if there is none */
231
UNIV_INTERN
osku's avatar
osku committed
232 233 234
rec_t*
btr_get_next_user_rec(
/*==================*/
235 236
	rec_t*	rec,	/*!< in: record on leaf level */
	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
osku's avatar
osku committed
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
			needed, also to the next page */
{
	page_t*	page;
	page_t*	next_page;
	ulint	next_page_no;

	if (!page_rec_is_supremum(rec)) {

		rec_t*	next_rec = page_rec_get_next(rec);

		if (!page_rec_is_supremum(next_rec)) {

			return(next_rec);
		}
	}
252

253
	page = page_align(rec);
osku's avatar
osku committed
254
	next_page_no = btr_page_get_next(page, mtr);
255

osku's avatar
osku committed
256
	if (next_page_no != FIL_NULL) {
257 258
		ulint		space;
		ulint		zip_size;
259
		buf_block_t*	next_block;
osku's avatar
osku committed
260

261 262 263 264 265
		space = page_get_space_id(page);
		zip_size = fil_space_get_zip_size(space);

		next_block = buf_page_get_with_no_latch(space, zip_size,
							next_page_no, mtr);
266
		next_page = buf_block_get_frame(next_block);
osku's avatar
osku committed
267
		/* The caller must already have a latch to the brother */
268 269 270
		ut_ad(mtr_memo_contains(mtr, next_block, MTR_MEMO_PAGE_S_FIX)
		      || mtr_memo_contains(mtr, next_block,
					   MTR_MEMO_PAGE_X_FIX));
271
#ifdef UNIV_BTR_DEBUG
272
		ut_a(page_is_comp(next_page) == page_is_comp(page));
273
		ut_a(btr_page_get_prev(next_page, mtr)
274
		     == page_get_page_no(page));
275
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
276 277 278 279 280 281 282

		return(page_rec_get_next(page_get_infimum_rec(next_page)));
	}

	return(NULL);
}

283
/**************************************************************//**
284
Creates a new index page (not the root, and also not
285
used in page reorganization).  @see btr_page_empty(). */
osku's avatar
osku committed
286 287 288 289
static
void
btr_page_create(
/*============*/
290 291 292 293 294
	buf_block_t*	block,	/*!< in/out: page to be created */
	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
	dict_index_t*	index,	/*!< in: index */
	ulint		level,	/*!< in: the B-tree level of the page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
295
{
296
	page_t*		page = buf_block_get_frame(block);
297 298

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
299 300

	if (UNIV_LIKELY_NULL(page_zip)) {
301
		page_create_zip(block, index, level, mtr);
302
	} else {
303
		page_create(block, mtr, dict_table_is_comp(index->table));
304 305 306 307
		/* Set the level of the new index page */
		btr_page_set_level(page, NULL, level, mtr);
	}

308
	block->check_index_page_at_flush = TRUE;
309

310
	btr_page_set_index_id(page, page_zip, index->id, mtr);
osku's avatar
osku committed
311 312
}

313
/**************************************************************//**
osku's avatar
osku committed
314
Allocates a new file page to be used in an ibuf tree. Takes the page from
315 316
the free list of the tree, which must contain pages!
@return	new allocated block, x-latched */
osku's avatar
osku committed
317
static
318
buf_block_t*
osku's avatar
osku committed
319 320
btr_page_alloc_for_ibuf(
/*====================*/
321 322
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
323 324 325 326
{
	fil_addr_t	node_addr;
	page_t*		root;
	page_t*		new_page;
327
	buf_block_t*	new_block;
osku's avatar
osku committed
328

329
	root = btr_root_get(index, mtr);
330

osku's avatar
osku committed
331
	node_addr = flst_get_first(root + PAGE_HEADER
332
				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
osku's avatar
osku committed
333 334
	ut_a(node_addr.page != FIL_NULL);

335 336 337
	new_block = buf_page_get(dict_index_get_space(index),
				 dict_table_zip_size(index->table),
				 node_addr.page, RW_X_LATCH, mtr);
338 339
	new_page = buf_block_get_frame(new_block);
	buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
osku's avatar
osku committed
340 341

	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
342 343 344 345
		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
		    mtr);
	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
			    mtr));
osku's avatar
osku committed
346

347
	return(new_block);
osku's avatar
osku committed
348 349
}

350
/**************************************************************//**
osku's avatar
osku committed
351
Allocates a new file page to be used in an index tree. NOTE: we assume
352 353
that the caller has made the reservation for free extents!
@return	new allocated block, x-latched; NULL if out of space */
354
UNIV_INTERN
355
buf_block_t*
osku's avatar
osku committed
356 357
btr_page_alloc(
/*===========*/
358 359 360
	dict_index_t*	index,		/*!< in: index */
	ulint		hint_page_no,	/*!< in: hint of a good page */
	byte		file_direction,	/*!< in: direction where a possible
osku's avatar
osku committed
361
					page split is made */
362
	ulint		level,		/*!< in: level where the page is placed
osku's avatar
osku committed
363
					in the tree */
364
	mtr_t*		mtr)		/*!< in: mtr */
osku's avatar
osku committed
365 366 367
{
	fseg_header_t*	seg_header;
	page_t*		root;
368
	buf_block_t*	new_block;
osku's avatar
osku committed
369 370
	ulint		new_page_no;

371
	if (dict_index_is_ibuf(index)) {
osku's avatar
osku committed
372

373
		return(btr_page_alloc_for_ibuf(index, mtr));
osku's avatar
osku committed
374 375
	}

376
	root = btr_root_get(index, mtr);
377

osku's avatar
osku committed
378 379 380 381 382 383 384 385 386
	if (level == 0) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
	} else {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
	}

	/* Parameter TRUE below states that the caller has made the
	reservation for free extents, and thus we know that a page can
	be allocated: */
387

osku's avatar
osku committed
388
	new_page_no = fseg_alloc_free_page_general(seg_header, hint_page_no,
389
						   file_direction, TRUE, mtr);
osku's avatar
osku committed
390 391 392 393 394
	if (new_page_no == FIL_NULL) {

		return(NULL);
	}

395 396 397
	new_block = buf_page_get(dict_index_get_space(index),
				 dict_table_zip_size(index->table),
				 new_page_no, RW_X_LATCH, mtr);
398
	buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
399

400
	return(new_block);
401
}
osku's avatar
osku committed
402

403
/**************************************************************//**
404 405
Gets the number of pages in a B-tree.
@return	number of pages */
406
UNIV_INTERN
osku's avatar
osku committed
407 408 409
ulint
btr_get_size(
/*=========*/
410 411
	dict_index_t*	index,	/*!< in: index */
	ulint		flag)	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
osku's avatar
osku committed
412 413 414 415 416 417 418 419 420
{
	fseg_header_t*	seg_header;
	page_t*		root;
	ulint		n;
	ulint		dummy;
	mtr_t		mtr;

	mtr_start(&mtr);

421
	mtr_s_lock(dict_index_get_lock(index), &mtr);
osku's avatar
osku committed
422

423
	root = btr_root_get(index, &mtr);
424

osku's avatar
osku committed
425 426
	if (flag == BTR_N_LEAF_PAGES) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
427

osku's avatar
osku committed
428
		fseg_n_reserved_pages(seg_header, &n, &mtr);
429

osku's avatar
osku committed
430 431 432 433
	} else if (flag == BTR_TOTAL_SIZE) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;

		n = fseg_n_reserved_pages(seg_header, &dummy, &mtr);
434

osku's avatar
osku committed
435
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
436 437

		n += fseg_n_reserved_pages(seg_header, &dummy, &mtr);
osku's avatar
osku committed
438 439 440 441 442 443 444
	} else {
		ut_error;
	}

	mtr_commit(&mtr);

	return(n);
445
}
osku's avatar
osku committed
446

447
/**************************************************************//**
osku's avatar
osku committed
448 449 450 451 452 453
Frees a page used in an ibuf tree. Puts the page to the free list of the
ibuf tree. */
static
void
btr_page_free_for_ibuf(
/*===================*/
454 455 456
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
457 458 459
{
	page_t*		root;

460
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
461
	root = btr_root_get(index, mtr);
462

osku's avatar
osku committed
463
	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
464 465
		       buf_block_get_frame(block)
		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
osku's avatar
osku committed
466 467

	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
468
			    mtr));
osku's avatar
osku committed
469 470
}

471
/**************************************************************//**
osku's avatar
osku committed
472 473 474
Frees a file page used in an index tree. Can be used also to (BLOB)
external storage pages, because the page level 0 can be given as an
argument. */
475
UNIV_INTERN
osku's avatar
osku committed
476 477 478
void
btr_page_free_low(
/*==============*/
479 480 481 482
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	ulint		level,	/*!< in: page level */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
483 484 485 486
{
	fseg_header_t*	seg_header;
	page_t*		root;

487
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
488 489 490
	/* The page gets invalid for optimistic searches: increment the frame
	modify clock */

491
	buf_block_modify_clock_inc(block);
492

493
	if (dict_index_is_ibuf(index)) {
osku's avatar
osku committed
494

495
		btr_page_free_for_ibuf(index, block, mtr);
osku's avatar
osku committed
496 497 498 499

		return;
	}

500
	root = btr_root_get(index, mtr);
501

osku's avatar
osku committed
502 503 504 505 506 507
	if (level == 0) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
	} else {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
	}

508 509 510
	fseg_free_page(seg_header,
		       buf_block_get_space(block),
		       buf_block_get_page_no(block), mtr);
511
}
osku's avatar
osku committed
512

513
/**************************************************************//**
osku's avatar
osku committed
514 515
Frees a file page used in an index tree. NOTE: cannot free field external
storage pages because the page must contain info on its level. */
516
UNIV_INTERN
osku's avatar
osku committed
517 518 519
void
btr_page_free(
/*==========*/
520 521 522
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
523 524 525
{
	ulint		level;

526
	level = btr_page_get_level(buf_block_get_frame(block), mtr);
527

528
	btr_page_free_low(index, block, level, mtr);
529
}
osku's avatar
osku committed
530

531
/**************************************************************//**
osku's avatar
osku committed
532 533 534 535 536
Sets the child node file address in a node pointer. */
UNIV_INLINE
void
btr_node_ptr_set_child_page_no(
/*===========================*/
537 538
	rec_t*		rec,	/*!< in: node pointer record */
	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
539
				part will be updated, or NULL */
540 541 542
	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
	ulint		page_no,/*!< in: child node address */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
543 544 545 546 547
{
	byte*	field;
	ulint	len;

	ut_ad(rec_offs_validate(rec, NULL, offsets));
548
	ut_ad(!page_is_leaf(page_align(rec)));
osku's avatar
osku committed
549 550
	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));

551
	/* The child address is in the last field */
osku's avatar
osku committed
552
	field = rec_get_nth_field(rec, offsets,
553
				  rec_offs_n_fields(offsets) - 1, &len);
osku's avatar
osku committed
554

555
	ut_ad(len == REC_NODE_PTR_SIZE);
556

marko's avatar
marko committed
557
	if (UNIV_LIKELY_NULL(page_zip)) {
558
		page_zip_write_node_ptr(page_zip, rec,
559 560
					rec_offs_data_size(offsets),
					page_no, mtr);
561 562
	} else {
		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
marko's avatar
marko committed
563
	}
osku's avatar
osku committed
564 565
}

566
/************************************************************//**
567 568
Returns the child page of a node pointer and x-latches it.
@return	child page, x-latched */
osku's avatar
osku committed
569
static
570
buf_block_t*
osku's avatar
osku committed
571 572
btr_node_ptr_get_child(
/*===================*/
573 574 575 576
	const rec_t*	node_ptr,/*!< in: node pointer */
	dict_index_t*	index,	/*!< in: index */
	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
577 578 579 580
{
	ulint	page_no;
	ulint	space;

581
	ut_ad(rec_offs_validate(node_ptr, index, offsets));
582
	space = page_get_space_id(page_align(node_ptr));
osku's avatar
osku committed
583 584
	page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);

585 586
	return(btr_block_get(space, dict_table_zip_size(index->table),
			     page_no, RW_X_LATCH, mtr));
osku's avatar
osku committed
587 588
}

589
/************************************************************//**
osku's avatar
osku committed
590
Returns the upper level node pointer to a page. It is assumed that mtr holds
591 592
an x-latch on the tree.
@return	rec_get_offsets() of the node pointer record */
osku's avatar
osku committed
593
static
594
ulint*
595 596
btr_page_get_father_node_ptr_func(
/*==============================*/
597 598 599
	ulint*		offsets,/*!< in: work area for the return value */
	mem_heap_t*	heap,	/*!< in: memory heap to use */
	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
600
				out: cursor on node pointer record,
osku's avatar
osku committed
601
				its page x-latched */
602 603
	const char*	file,	/*!< in: file name */
	ulint		line,	/*!< in: line where called */
604
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
605 606
{
	dtuple_t*	tuple;
607
	rec_t*		user_rec;
osku's avatar
osku committed
608
	rec_t*		node_ptr;
609 610 611 612 613 614
	ulint		level;
	ulint		page_no;
	dict_index_t*	index;

	page_no = buf_block_get_page_no(btr_cur_get_block(cursor));
	index = btr_cur_get_index(cursor);
osku's avatar
osku committed
615

616
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
617
				MTR_MEMO_X_LOCK));
osku's avatar
osku committed
618

619
	ut_ad(dict_index_get_page(index) != page_no);
osku's avatar
osku committed
620

621
	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
622

623
	user_rec = btr_cur_get_rec(cursor);
624 625
	ut_a(page_rec_is_user_rec(user_rec));
	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
osku's avatar
osku committed
626

627
	btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
628 629
				    BTR_CONT_MODIFY_TREE, cursor, 0,
				    file, line, mtr);
osku's avatar
osku committed
630

631 632 633
	node_ptr = btr_cur_get_rec(cursor);
	ut_ad(!page_rec_is_comp(node_ptr)
	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
osku's avatar
osku committed
634
	offsets = rec_get_offsets(node_ptr, index, offsets,
635
				  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
636

637
	if (UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr, offsets)
638
			  != page_no)) {
osku's avatar
osku committed
639 640
		rec_t*	print_rec;
		fputs("InnoDB: Dump of the child page:\n", stderr);
641
		buf_page_print(page_align(user_rec), 0);
osku's avatar
osku committed
642
		fputs("InnoDB: Dump of the parent page:\n", stderr);
643
		buf_page_print(page_align(node_ptr), 0);
osku's avatar
osku committed
644 645

		fputs("InnoDB: Corruption of an index tree: table ", stderr);
646
		ut_print_name(stderr, NULL, TRUE, index->table_name);
osku's avatar
osku committed
647
		fputs(", index ", stderr);
648
		ut_print_name(stderr, NULL, FALSE, index->name);
osku's avatar
osku committed
649
		fprintf(stderr, ",\n"
650
			"InnoDB: father ptr page no %lu, child page no %lu\n",
osku's avatar
osku committed
651 652
			(ulong)
			btr_node_ptr_get_child_page_no(node_ptr, offsets),
653 654 655
			(ulong) page_no);
		print_rec = page_rec_get_next(
			page_get_infimum_rec(page_align(user_rec)));
osku's avatar
osku committed
656
		offsets = rec_get_offsets(print_rec, index,
657
					  offsets, ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
658 659
		page_rec_print(print_rec, offsets);
		offsets = rec_get_offsets(node_ptr, index, offsets,
660
					  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
661 662
		page_rec_print(node_ptr, offsets);

663 664 665 666
		fputs("InnoDB: You should dump + drop + reimport the table"
		      " to fix the\n"
		      "InnoDB: corruption. If the crash happens at "
		      "the database startup, see\n"
667
		      "InnoDB: " REFMAN "forcing-recovery.html about\n"
668 669
		      "InnoDB: forcing recovery. "
		      "Then dump + drop + reimport.\n", stderr);
osku's avatar
osku committed
670

marko's avatar
marko committed
671 672
		ut_error;
	}
673

674 675
	return(offsets);
}
osku's avatar
osku committed
676

677 678 679
#define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
	btr_page_get_father_node_ptr_func(of,heap,cur,__FILE__,__LINE__,mtr)

680
/************************************************************//**
681
Returns the upper level node pointer to a page. It is assumed that mtr holds
682 683
an x-latch on the tree.
@return	rec_get_offsets() of the node pointer record */
684 685 686 687
static
ulint*
btr_page_get_father_block(
/*======================*/
688 689 690 691 692 693
	ulint*		offsets,/*!< in: work area for the return value */
	mem_heap_t*	heap,	/*!< in: memory heap to use */
	dict_index_t*	index,	/*!< in: b-tree index */
	buf_block_t*	block,	/*!< in: child page in the index */
	mtr_t*		mtr,	/*!< in: mtr */
	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
694 695 696 697 698 699 700
				its page x-latched */
{
	rec_t*	rec
		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
								 block)));
	btr_cur_position(index, rec, block, cursor);
	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
osku's avatar
osku committed
701 702
}

703
/************************************************************//**
704 705
Seeks to the upper level node pointer to a page.
It is assumed that mtr holds an x-latch on the tree. */
osku's avatar
osku committed
706
static
707 708 709
void
btr_page_get_father(
/*================*/
710 711 712 713
	dict_index_t*	index,	/*!< in: b-tree index */
	buf_block_t*	block,	/*!< in: child page in the index */
	mtr_t*		mtr,	/*!< in: mtr */
	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
714
				its page x-latched */
osku's avatar
osku committed
715
{
716 717 718 719 720 721 722 723 724
	mem_heap_t*	heap;
	rec_t*		rec
		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
								 block)));
	btr_cur_position(index, rec, block, cursor);

	heap = mem_heap_create(100);
	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
	mem_heap_free(heap);
osku's avatar
osku committed
725 726
}

727
/************************************************************//**
728 729
Creates the root node for a new index tree.
@return	page number of the created root, FIL_NULL if did not succeed */
730
UNIV_INTERN
osku's avatar
osku committed
731 732 733
ulint
btr_create(
/*=======*/
734 735 736
	ulint		type,	/*!< in: type of the index */
	ulint		space,	/*!< in: space where created */
	ulint		zip_size,/*!< in: compressed page size in bytes
737
				or 0 for uncompressed pages */
738
	index_id_t	index_id,/*!< in: index id */
739 740
	dict_index_t*	index,	/*!< in: index */
	mtr_t*		mtr)	/*!< in: mini-transaction handle */
osku's avatar
osku committed
741 742
{
	ulint		page_no;
743
	buf_block_t*	block;
osku's avatar
osku committed
744 745
	buf_frame_t*	frame;
	page_t*		page;
marko's avatar
marko committed
746
	page_zip_des_t*	page_zip;
osku's avatar
osku committed
747 748 749 750 751 752 753 754

	/* Create the two new segments (one, in the case of an ibuf tree) for
	the index tree; the segment headers are put on the allocated root page
	(for an ibuf tree, not in the root, but on a separate ibuf header
	page) */

	if (type & DICT_IBUF) {
		/* Allocate first the ibuf header page */
755
		buf_block_t*	ibuf_hdr_block = fseg_create(
756 757
			space, 0,
			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
osku's avatar
osku committed
758

759
		buf_block_dbg_add_level(ibuf_hdr_block, SYNC_TREE_NODE_NEW);
760

761
		ut_ad(buf_block_get_page_no(ibuf_hdr_block)
762
		      == IBUF_HEADER_PAGE_NO);
osku's avatar
osku committed
763
		/* Allocate then the next page to the segment: it will be the
764
		tree root page */
osku's avatar
osku committed
765

766 767 768
		page_no = fseg_alloc_free_page(buf_block_get_frame(
						       ibuf_hdr_block)
					       + IBUF_HEADER
769 770 771
					       + IBUF_TREE_SEG_HEADER,
					       IBUF_TREE_ROOT_PAGE_NO,
					       FSP_UP, mtr);
osku's avatar
osku committed
772 773
		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);

774 775
		block = buf_page_get(space, zip_size, page_no,
				     RW_X_LATCH, mtr);
osku's avatar
osku committed
776
	} else {
777 778
		block = fseg_create(space, 0,
				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
osku's avatar
osku committed
779
	}
780

781
	if (block == NULL) {
osku's avatar
osku committed
782 783 784 785

		return(FIL_NULL);
	}

786 787
	page_no = buf_block_get_page_no(block);
	frame = buf_block_get_frame(block);
788

789
	buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
osku's avatar
osku committed
790 791 792 793 794

	if (type & DICT_IBUF) {
		/* It is an insert buffer tree: initialize the free list */

		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
795

osku's avatar
osku committed
796
		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
797
	} else {
osku's avatar
osku committed
798 799
		/* It is a non-ibuf tree: create a file segment for leaf
		pages */
800 801 802 803 804 805 806 807 808
		if (!fseg_create(space, page_no,
				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
			/* Not enough space for new segment, free root
			segment before return. */
			btr_free_root(space, zip_size, page_no, mtr);

			return(FIL_NULL);
		}

osku's avatar
osku committed
809 810
		/* The fseg create acquires a second latch on the page,
		therefore we must declare it: */
811
		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
osku's avatar
osku committed
812
	}
813

814
	/* Create a new index page on the allocated segment page */
815
	page_zip = buf_block_get_page_zip(block);
816 817

	if (UNIV_LIKELY_NULL(page_zip)) {
818
		page = page_create_zip(block, index, 0, mtr);
819
	} else {
820
		page = page_create(block, mtr,
821
				   dict_table_is_comp(index->table));
822 823 824 825
		/* Set the level of the new index page */
		btr_page_set_level(page, NULL, 0, mtr);
	}

826
	block->check_index_page_at_flush = TRUE;
osku's avatar
osku committed
827 828

	/* Set the index id of the page */
829
	btr_page_set_index_id(page, page_zip, index_id, mtr);
830

osku's avatar
osku committed
831
	/* Set the next node and previous node fields */
832 833
	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
osku's avatar
osku committed
834 835 836 837

	/* We reset the free bits for the page to allow creation of several
	trees in the same mtr, otherwise the latch on a bitmap page would
	prevent it because of the latching order */
838

839 840 841
	if (!(type & DICT_CLUSTERED)) {
		ibuf_reset_free_bits(block);
	}
osku's avatar
osku committed
842 843 844 845 846 847 848 849 850 851

	/* In the following assertion we test that two records of maximum
	allowed size fit on the root page: this fact is needed to ensure
	correctness of split algorithms */

	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);

	return(page_no);
}

852
/************************************************************//**
osku's avatar
osku committed
853 854
Frees a B-tree except the root page, which MUST be freed after this
by calling btr_free_root. */
855
UNIV_INTERN
osku's avatar
osku committed
856 857 858
void
btr_free_but_not_root(
/*==================*/
859 860
	ulint	space,		/*!< in: space where created */
	ulint	zip_size,	/*!< in: compressed page size in bytes
861
				or 0 for uncompressed pages */
862
	ulint	root_page_no)	/*!< in: root page number */
osku's avatar
osku committed
863 864 865 866 867
{
	ibool	finished;
	page_t*	root;
	mtr_t	mtr;

868
leaf_loop:
osku's avatar
osku committed
869
	mtr_start(&mtr);
870

871
	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH, &mtr);
872 873 874 875 876 877
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
				    + root, space));
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
				    + root, space));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
878 879 880 881

	/* NOTE: page hash indexes are dropped when a page is freed inside
	fsp0fsp. */

882 883
	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
				  &mtr);
osku's avatar
osku committed
884 885 886 887 888 889 890 891
	mtr_commit(&mtr);

	if (!finished) {

		goto leaf_loop;
	}
top_loop:
	mtr_start(&mtr);
892

893
	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH, &mtr);
894 895 896 897
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
				    + root, space));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
898

899 900
	finished = fseg_free_step_not_header(
		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
osku's avatar
osku committed
901 902 903 904 905
	mtr_commit(&mtr);

	if (!finished) {

		goto top_loop;
906
	}
osku's avatar
osku committed
907 908
}

909
/************************************************************//**
osku's avatar
osku committed
910
Frees the B-tree root page. Other tree MUST already have been freed. */
911
UNIV_INTERN
osku's avatar
osku committed
912 913 914
void
btr_free_root(
/*==========*/
915 916
	ulint	space,		/*!< in: space where created */
	ulint	zip_size,	/*!< in: compressed page size in bytes
917
				or 0 for uncompressed pages */
918 919
	ulint	root_page_no,	/*!< in: root page number */
	mtr_t*	mtr)		/*!< in: a mini-transaction which has already
osku's avatar
osku committed
920 921
				been started */
{
922 923
	buf_block_t*	block;
	fseg_header_t*	header;
osku's avatar
osku committed
924

925
	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH, mtr);
osku's avatar
osku committed
926

927
	btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
928

929
	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
930 931 932
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(header, space));
#endif /* UNIV_BTR_DEBUG */
933 934

	while (!fseg_free_step(header, mtr));
osku's avatar
osku committed
935
}
936
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
937

938
/*************************************************************//**
osku's avatar
osku committed
939 940
Reorganizes an index page. */
static
941
ibool
osku's avatar
osku committed
942 943
btr_page_reorganize_low(
/*====================*/
944
	ibool		recovery,/*!< in: TRUE if called in recovery:
osku's avatar
osku committed
945 946 947 948
				locks should not be updated, i.e.,
				there cannot exist locks on the
				page, and a hash index should not be
				dropped: it cannot exist */
949 950 951
	buf_block_t*	block,	/*!< in: page to be reorganized */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
952
{
irana's avatar
irana committed
953
	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
954 955
	page_t*		page		= buf_block_get_frame(block);
	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
956
	buf_block_t*	temp_block;
957 958 959 960 961 962
	page_t*		temp_page;
	ulint		log_mode;
	ulint		data_size1;
	ulint		data_size2;
	ulint		max_ins_size1;
	ulint		max_ins_size2;
963
	ibool		success		= FALSE;
964 965

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
966
	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
967
#ifdef UNIV_ZIP_DEBUG
968
	ut_a(!page_zip || page_zip_validate(page_zip, page));
969
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
970 971 972
	data_size1 = page_get_data_size(page);
	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);

973
#ifndef UNIV_HOTBACKUP
osku's avatar
osku committed
974 975
	/* Write the log record */
	mlog_open_and_write_index(mtr, page, index, page_is_comp(page)
976 977
				  ? MLOG_COMP_PAGE_REORGANIZE
				  : MLOG_PAGE_REORGANIZE, 0);
978
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
979 980 981 982

	/* Turn logging off */
	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);

983
#ifndef UNIV_HOTBACKUP
irana's avatar
irana committed
984
	temp_block = buf_block_alloc(buf_pool, 0);
985 986 987 988
#else /* !UNIV_HOTBACKUP */
	ut_ad(block == back_block1);
	temp_block = back_block2;
#endif /* !UNIV_HOTBACKUP */
989
	temp_page = temp_block->frame;
osku's avatar
osku committed
990 991

	/* Copy the old page to temporary space */
992
	buf_frame_copy(temp_page, page);
osku's avatar
osku committed
993

994
#ifndef UNIV_HOTBACKUP
marko's avatar
marko committed
995
	if (UNIV_LIKELY(!recovery)) {
996
		btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
997 998
	}

999 1000 1001
	block->check_index_page_at_flush = TRUE;
#endif /* !UNIV_HOTBACKUP */

osku's avatar
osku committed
1002 1003 1004
	/* Recreate the page: note that global data on page (possible
	segment headers, next page-field, etc.) is preserved intact */

1005
	page_create(block, mtr, dict_table_is_comp(index->table));
1006

osku's avatar
osku committed
1007 1008 1009
	/* Copy the records from the temporary space to the recreated page;
	do not copy the lock bits yet */

1010
	page_copy_rec_list_end_no_locks(block, temp_block,
1011 1012
					page_get_infimum_rec(temp_page),
					index, mtr);
1013 1014 1015 1016 1017 1018 1019 1020

	if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page)) {
		/* Copy max trx id to recreated page */
		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
		/* In crash recovery, dict_index_is_sec_or_ibuf() always
		returns TRUE, even for clustered indexes.  max_trx_id is
		unused in clustered index pages. */
1021
		ut_ad(max_trx_id != 0 || recovery);
1022
	}
1023

1024
	if (UNIV_LIKELY_NULL(page_zip)
1025 1026
	    && UNIV_UNLIKELY
	    (!page_zip_compress(page_zip, page, index, NULL))) {
marko's avatar
marko committed
1027

1028
		/* Restore the old page and exit. */
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048

#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
		/* Check that the bytes that we skip are identical. */
		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
			     PAGE_HEADER + PAGE_N_RECS + temp_page,
			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
			     FIL_PAGE_DATA_END));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */

		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);

#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1049

1050
		goto func_exit;
marko's avatar
marko committed
1051 1052
	}

1053
#ifndef UNIV_HOTBACKUP
marko's avatar
marko committed
1054
	if (UNIV_LIKELY(!recovery)) {
osku's avatar
osku committed
1055
		/* Update the record lock bitmaps */
1056
		lock_move_reorganize_page(block, temp_block);
osku's avatar
osku committed
1057
	}
1058
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1059 1060 1061 1062

	data_size2 = page_get_data_size(page);
	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);

marko's avatar
marko committed
1063
	if (UNIV_UNLIKELY(data_size1 != data_size2)
1064
	    || UNIV_UNLIKELY(max_ins_size1 != max_ins_size2)) {
1065 1066
		buf_page_print(page, 0);
		buf_page_print(temp_page, 0);
1067
		fprintf(stderr,
1068 1069 1070 1071 1072 1073
			"InnoDB: Error: page old data size %lu"
			" new data size %lu\n"
			"InnoDB: Error: page old max ins size %lu"
			" new max ins size %lu\n"
			"InnoDB: Submit a detailed bug report"
			" to http://bugs.mysql.com\n",
osku's avatar
osku committed
1074 1075 1076
			(unsigned long) data_size1, (unsigned long) data_size2,
			(unsigned long) max_ins_size1,
			(unsigned long) max_ins_size2);
1077 1078
	} else {
		success = TRUE;
osku's avatar
osku committed
1079 1080
	}

1081
func_exit:
1082
#ifdef UNIV_ZIP_DEBUG
1083
	ut_a(!page_zip || page_zip_validate(page_zip, page));
1084
#endif /* UNIV_ZIP_DEBUG */
1085
#ifndef UNIV_HOTBACKUP
1086
	buf_block_free(temp_block);
1087
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1088 1089 1090

	/* Restore logging mode */
	mtr_set_log_mode(mtr, log_mode);
1091 1092

	return(success);
osku's avatar
osku committed
1093 1094
}

1095
#ifndef UNIV_HOTBACKUP
1096
/*************************************************************//**
1097 1098 1099 1100
Reorganizes an index page.
IMPORTANT: if btr_page_reorganize() is invoked on a compressed leaf
page of a non-clustered index, the caller must update the insert
buffer free bits in the same mini-transaction in such a way that the
1101 1102
modification will be redo-logged.
@return	TRUE on success, FALSE on failure */
1103
UNIV_INTERN
1104
ibool
osku's avatar
osku committed
1105 1106
btr_page_reorganize(
/*================*/
1107 1108 1109
	buf_block_t*	block,	/*!< in: page to be reorganized */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1110
{
1111
	return(btr_page_reorganize_low(FALSE, block, index, mtr));
osku's avatar
osku committed
1112
}
1113
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1114

1115
/***********************************************************//**
1116 1117
Parses a redo log record of reorganizing a page.
@return	end of log record or NULL */
1118
UNIV_INTERN
osku's avatar
osku committed
1119 1120 1121
byte*
btr_parse_page_reorganize(
/*======================*/
1122
	byte*		ptr,	/*!< in: buffer */
osku's avatar
osku committed
1123
	byte*		end_ptr __attribute__((unused)),
1124 1125 1126 1127
				/*!< in: buffer end */
	dict_index_t*	index,	/*!< in: record descriptor */
	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
	mtr_t*		mtr)	/*!< in: mtr or NULL */
osku's avatar
osku committed
1128 1129 1130 1131 1132
{
	ut_ad(ptr && end_ptr);

	/* The record is empty, except for the record initial part */

1133 1134
	if (UNIV_LIKELY(block != NULL)) {
		btr_page_reorganize_low(TRUE, block, index, mtr);
osku's avatar
osku committed
1135 1136 1137 1138 1139
	}

	return(ptr);
}

1140
#ifndef UNIV_HOTBACKUP
1141
/*************************************************************//**
1142
Empties an index page.  @see btr_page_create(). */
osku's avatar
osku committed
1143 1144 1145 1146
static
void
btr_page_empty(
/*===========*/
1147 1148 1149 1150 1151
	buf_block_t*	block,	/*!< in: page to be emptied */
	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
	dict_index_t*	index,	/*!< in: index of the page */
	ulint		level,	/*!< in: the B-tree level of the page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1152
{
1153
	page_t*	page = buf_block_get_frame(block);
1154 1155

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1156
	ut_ad(page_zip == buf_block_get_page_zip(block));
1157
#ifdef UNIV_ZIP_DEBUG
1158
	ut_a(!page_zip || page_zip_validate(page_zip, page));
1159
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
1160

1161
	btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
1162 1163 1164 1165

	/* Recreate the page: note that global data on page (possible
	segment headers, next page-field, etc.) is preserved intact */

1166
	if (UNIV_LIKELY_NULL(page_zip)) {
1167
		page_create_zip(block, index, level, mtr);
1168
	} else {
1169
		page_create(block, mtr, dict_table_is_comp(index->table));
1170
		btr_page_set_level(page, NULL, level, mtr);
1171 1172
	}

1173
	block->check_index_page_at_flush = TRUE;
osku's avatar
osku committed
1174 1175
}

1176
/*************************************************************//**
osku's avatar
osku committed
1177 1178 1179 1180
Makes tree one level higher by splitting the root, and inserts
the tuple. It is assumed that mtr contains an x-latch on the tree.
NOTE that the operation of this function must always succeed,
we cannot reverse it: therefore enough free disk space must be
1181 1182
guaranteed to be available before this function is called.
@return	inserted record */
1183
UNIV_INTERN
osku's avatar
osku committed
1184 1185 1186
rec_t*
btr_root_raise_and_insert(
/*======================*/
1187
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
osku's avatar
osku committed
1188 1189 1190
				on the root page; when the function returns,
				the cursor is positioned on the predecessor
				of the inserted record */
1191 1192 1193
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1194
{
1195
	dict_index_t*	index;
osku's avatar
osku committed
1196 1197 1198 1199 1200 1201
	page_t*		root;
	page_t*		new_page;
	ulint		new_page_no;
	rec_t*		rec;
	mem_heap_t*	heap;
	dtuple_t*	node_ptr;
1202
	ulint		level;
osku's avatar
osku committed
1203 1204
	rec_t*		node_ptr_rec;
	page_cur_t*	page_cursor;
1205 1206
	page_zip_des_t*	root_page_zip;
	page_zip_des_t*	new_page_zip;
1207 1208
	buf_block_t*	root_block;
	buf_block_t*	new_block;
1209

osku's avatar
osku committed
1210
	root = btr_cur_get_page(cursor);
1211
	root_block = btr_cur_get_block(cursor);
1212
	root_page_zip = buf_block_get_page_zip(root_block);
1213
#ifdef UNIV_ZIP_DEBUG
1214
	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root));
1215
#endif /* UNIV_ZIP_DEBUG */
1216
	index = btr_cur_get_index(cursor);
1217
#ifdef UNIV_BTR_DEBUG
1218 1219 1220 1221 1222 1223 1224 1225 1226
	if (!dict_index_is_ibuf(index)) {
		ulint	space = dict_index_get_space(index);

		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}

1227 1228
	ut_a(dict_index_get_page(index) == page_get_page_no(root));
#endif /* UNIV_BTR_DEBUG */
1229
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1230
				MTR_MEMO_X_LOCK));
1231
	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
1232 1233 1234 1235

	/* Allocate a new page to the tree. Root splitting is done by first
	moving the root records to the new page, emptying the root, putting
	a node pointer to the new page, and then splitting the new page. */
1236

marko's avatar
marko committed
1237
	level = btr_page_get_level(root, mtr);
osku's avatar
osku committed
1238

1239 1240 1241
	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr);
	new_page = buf_block_get_frame(new_block);
	new_page_zip = buf_block_get_page_zip(new_block);
1242
	ut_a(!new_page_zip == !root_page_zip);
1243 1244 1245
	ut_a(!new_page_zip
	     || page_zip_get_size(new_page_zip)
	     == page_zip_get_size(root_page_zip));
osku's avatar
osku committed
1246

1247
	btr_page_create(new_block, new_page_zip, index, level, mtr);
1248 1249 1250 1251

	/* Set the next node and previous node fields of new page */
	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
osku's avatar
osku committed
1252

1253
	/* Copy the records from root to the new page one by one. */
osku's avatar
osku committed
1254

1255 1256 1257 1258 1259
	if (0
#ifdef UNIV_ZIP_COPY
	    || new_page_zip
#endif /* UNIV_ZIP_COPY */
	    || UNIV_UNLIKELY
1260
	    (!page_copy_rec_list_end(new_block, root_block,
1261
				     page_get_infimum_rec(root),
1262
				     index, mtr))) {
1263
		ut_a(new_page_zip);
marko's avatar
marko committed
1264

1265
		/* Copy the page byte for byte. */
1266 1267
		page_zip_copy_recs(new_page_zip, new_page,
				   root_page_zip, root, index, mtr);
1268 1269 1270 1271 1272 1273 1274 1275

		/* Update the lock table and possible hash index. */

		lock_move_rec_list_end(new_block, root_block,
				       page_get_infimum_rec(root));

		btr_search_move_or_delete_hash_entries(new_block, root_block,
						       index);
1276
	}
marko's avatar
marko committed
1277

osku's avatar
osku committed
1278 1279 1280 1281
	/* If this is a pessimistic insert which is actually done to
	perform a pessimistic update then we have stored the lock
	information of the record to be inserted on the infimum of the
	root page: we cannot discard the lock structs on the root page */
1282

1283
	lock_update_root_raise(new_block, root_block);
osku's avatar
osku committed
1284 1285 1286 1287 1288

	/* Create a memory heap where the node pointer is stored */
	heap = mem_heap_create(100);

	rec = page_rec_get_next(page_get_infimum_rec(new_page));
marko's avatar
marko committed
1289
	new_page_no = buf_block_get_page_no(new_block);
1290

osku's avatar
osku committed
1291 1292 1293
	/* Build the node pointer (= node key and page address) for the
	child */

1294 1295
	node_ptr = dict_index_build_node_ptr(index, rec, new_page_no, heap,
					     level);
1296 1297 1298
	/* The node pointer must be marked as the predefined minimum record,
	as there is no lower alphabetical limit to records in the leftmost
	node of a level: */
1299 1300 1301
	dtuple_set_info_bits(node_ptr,
			     dtuple_get_info_bits(node_ptr)
			     | REC_INFO_MIN_REC_FLAG);
1302

marko's avatar
marko committed
1303
	/* Rebuild the root page to get free space */
1304
	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1305 1306 1307 1308 1309 1310 1311 1312 1313

	/* Set the next node and previous node fields, although
	they should already have been set.  The previous node field
	must be FIL_NULL if root_page_zip != NULL, because the
	REC_INFO_MIN_REC_FLAG (of the first user record) will be
	set if and only if btr_page_get_prev() == FIL_NULL. */
	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);

osku's avatar
osku committed
1314
	page_cursor = btr_cur_get_page_cur(cursor);
1315

osku's avatar
osku committed
1316 1317
	/* Insert node pointer to the root */

1318
	page_cur_set_before_first(root_block, page_cursor);
osku's avatar
osku committed
1319

1320
	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
1321
					     index, 0, mtr);
osku's avatar
osku committed
1322

1323 1324 1325
	/* The root page should only contain the node pointer
	to new_page at this point.  Thus, the data should fit. */
	ut_a(node_ptr_rec);
marko's avatar
marko committed
1326

osku's avatar
osku committed
1327 1328 1329 1330 1331
	/* Free the memory heap */
	mem_heap_free(heap);

	/* We play safe and reset the free bits for the new page */

1332
#if 0
marko's avatar
marko committed
1333
	fprintf(stderr, "Root raise new page no %lu\n", new_page_no);
1334
#endif
osku's avatar
osku committed
1335

1336 1337 1338 1339
	if (!dict_index_is_clust(index)) {
		ibuf_reset_free_bits(new_block);
	}

osku's avatar
osku committed
1340
	/* Reposition the cursor to the child node */
1341
	page_cur_search(new_block, index, tuple,
1342
			PAGE_CUR_LE, page_cursor);
1343

osku's avatar
osku committed
1344
	/* Split the child and insert tuple */
1345
	return(btr_page_split_and_insert(cursor, tuple, n_ext, mtr));
1346
}
osku's avatar
osku committed
1347

1348
/*************************************************************//**
osku's avatar
osku committed
1349
Decides if the page should be split at the convergence point of inserts
1350 1351
converging to the left.
@return	TRUE if split recommended */
1352
UNIV_INTERN
osku's avatar
osku committed
1353 1354 1355
ibool
btr_page_get_split_rec_to_left(
/*===========================*/
1356 1357
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
	rec_t**		split_rec) /*!< out: if split recommended,
osku's avatar
osku committed
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
				the first record on upper half page,
				or NULL if tuple to be inserted should
				be first */
{
	page_t*	page;
	rec_t*	insert_point;
	rec_t*	infimum;

	page = btr_cur_get_page(cursor);
	insert_point = btr_cur_get_rec(cursor);

	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
1370
	    == page_rec_get_next(insert_point)) {
1371 1372

		infimum = page_get_infimum_rec(page);
osku's avatar
osku committed
1373 1374 1375 1376 1377 1378 1379

		/* If the convergence is in the middle of a page, include also
		the record immediately before the new insert to the upper
		page. Otherwise, we could repeatedly move from page to page
		lots of records smaller than the convergence point. */

		if (infimum != insert_point
1380
		    && page_rec_get_next(infimum) != insert_point) {
osku's avatar
osku committed
1381 1382 1383

			*split_rec = insert_point;
		} else {
1384 1385
			*split_rec = page_rec_get_next(insert_point);
		}
osku's avatar
osku committed
1386 1387 1388 1389 1390 1391 1392

		return(TRUE);
	}

	return(FALSE);
}

1393
/*************************************************************//**
osku's avatar
osku committed
1394
Decides if the page should be split at the convergence point of inserts
1395 1396
converging to the right.
@return	TRUE if split recommended */
1397
UNIV_INTERN
osku's avatar
osku committed
1398 1399 1400
ibool
btr_page_get_split_rec_to_right(
/*============================*/
1401 1402
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
	rec_t**		split_rec) /*!< out: if split recommended,
osku's avatar
osku committed
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
				the first record on upper half page,
				or NULL if tuple to be inserted should
				be first */
{
	page_t*	page;
	rec_t*	insert_point;

	page = btr_cur_get_page(cursor);
	insert_point = btr_cur_get_rec(cursor);

	/* We use eager heuristics: if the new insert would be right after
	the previous insert on the same page, we assume that there is a
	pattern of sequential inserts here. */

	if (UNIV_LIKELY(page_header_get_ptr(page, PAGE_LAST_INSERT)
1418
			== insert_point)) {
osku's avatar
osku committed
1419 1420 1421 1422 1423 1424 1425 1426

		rec_t*	next_rec;

		next_rec = page_rec_get_next(insert_point);

		if (page_rec_is_supremum(next_rec)) {
split_at_new:
			/* Split at the new record to insert */
1427
			*split_rec = NULL;
osku's avatar
osku committed
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
		} else {
			rec_t*	next_next_rec = page_rec_get_next(next_rec);
			if (page_rec_is_supremum(next_next_rec)) {

				goto split_at_new;
			}

			/* If there are >= 2 user records up from the insert
			point, split all but 1 off. We want to keep one because
			then sequential inserts can use the adaptive hash
			index, as they can do the necessary checks of the right
			search position just by looking at the records on this
			page. */
1441

osku's avatar
osku committed
1442 1443 1444 1445 1446 1447 1448 1449 1450
			*split_rec = next_next_rec;
		}

		return(TRUE);
	}

	return(FALSE);
}

1451
/*************************************************************//**
osku's avatar
osku committed
1452 1453
Calculates a split record such that the tuple will certainly fit on
its half-page when the split is performed. We assume in this function
1454
only that the cursor page has at least one user record.
1455
@return split record, or NULL if tuple will be the first record on
1456
the lower or upper half-page (determined by btr_page_tuple_smaller()) */
osku's avatar
osku committed
1457 1458
static
rec_t*
1459 1460
btr_page_get_split_rec(
/*===================*/
1461 1462 1463
	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext)	/*!< in: number of externally stored columns */
osku's avatar
osku committed
1464
{
1465
	page_t*		page;
1466
	page_zip_des_t*	page_zip;
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
	ulint		insert_size;
	ulint		free_space;
	ulint		total_data;
	ulint		total_n_recs;
	ulint		total_space;
	ulint		incl_data;
	rec_t*		ins_rec;
	rec_t*		rec;
	rec_t*		next_rec;
	ulint		n;
	mem_heap_t*	heap;
	ulint*		offsets;
osku's avatar
osku committed
1479 1480

	page = btr_cur_get_page(cursor);
1481

1482
	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
osku's avatar
osku committed
1483 1484
	free_space  = page_get_free_space_of_empty(page_is_comp(page));

1485
	page_zip = btr_cur_get_page_zip(cursor);
1486
	if (UNIV_LIKELY_NULL(page_zip)) {
1487 1488
		/* Estimate the free space of an empty compressed page. */
		ulint	free_space_zip = page_zip_empty_size(
1489 1490
			cursor->index->n_fields,
			page_zip_get_size(page_zip));
1491 1492 1493

		if (UNIV_LIKELY(free_space > (ulint) free_space_zip)) {
			free_space = (ulint) free_space_zip;
1494 1495 1496
		}
	}

osku's avatar
osku committed
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
	/* free_space is now the free space of a created new page */

	total_data   = page_get_data_size(page) + insert_size;
	total_n_recs = page_get_n_recs(page) + 1;
	ut_ad(total_n_recs >= 2);
	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);

	n = 0;
	incl_data = 0;
	ins_rec = btr_cur_get_rec(cursor);
	rec = page_get_infimum_rec(page);

	heap = NULL;
	offsets = NULL;

	/* We start to include records to the left half, and when the
	space reserved by them exceeds half of total_space, then if
	the included records fit on the left page, they will be put there
	if something was left over also for the right page,
	otherwise the last included record will be the first on the right
	half page */

1519
	do {
osku's avatar
osku committed
1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
		/* Decide the next record to include */
		if (rec == ins_rec) {
			rec = NULL;	/* NULL denotes that tuple is
					now included */
		} else if (rec == NULL) {
			rec = page_rec_get_next(ins_rec);
		} else {
			rec = page_rec_get_next(rec);
		}

		if (rec == NULL) {
			/* Include tuple */
			incl_data += insert_size;
		} else {
			offsets = rec_get_offsets(rec, cursor->index,
1535 1536
						  offsets, ULINT_UNDEFINED,
						  &heap);
osku's avatar
osku committed
1537 1538 1539 1540
			incl_data += rec_offs_size(offsets);
		}

		n++;
1541 1542
	} while (incl_data + page_dir_calc_reserved_space(n)
		 < total_space / 2);
1543

1544 1545 1546 1547
	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
		/* The next record will be the first on
		the right half page if it is not the
		supremum record of page */
osku's avatar
osku committed
1548

1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560
		if (rec == ins_rec) {
			rec = NULL;

			goto func_exit;
		} else if (rec == NULL) {
			next_rec = page_rec_get_next(ins_rec);
		} else {
			next_rec = page_rec_get_next(rec);
		}
		ut_ad(next_rec);
		if (!page_rec_is_supremum(next_rec)) {
			rec = next_rec;
osku's avatar
osku committed
1561
		}
1562 1563 1564 1565 1566
	}

func_exit:
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
osku's avatar
osku committed
1567
	}
1568
	return(rec);
1569
}
osku's avatar
osku committed
1570

1571
/*************************************************************//**
osku's avatar
osku committed
1572
Returns TRUE if the insert fits on the appropriate half-page with the
1573 1574
chosen split_rec.
@return	TRUE if fits */
osku's avatar
osku committed
1575 1576 1577 1578
static
ibool
btr_page_insert_fits(
/*=================*/
1579
	btr_cur_t*	cursor,	/*!< in: cursor at which insert
1580
				should be made */
1581
	const rec_t*	split_rec,/*!< in: suggestion for first record
1582 1583
				on upper half-page, or NULL if
				tuple to be inserted should be first */
1584
	const ulint*	offsets,/*!< in: rec_get_offsets(
1585
				split_rec, cursor->index) */
1586 1587 1588
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mem_heap_t*	heap)	/*!< in: temporary memory heap */
osku's avatar
osku committed
1589
{
1590 1591 1592 1593 1594 1595 1596 1597
	page_t*		page;
	ulint		insert_size;
	ulint		free_space;
	ulint		total_data;
	ulint		total_n_recs;
	const rec_t*	rec;
	const rec_t*	end_rec;
	ulint*		offs;
1598

osku's avatar
osku committed
1599 1600 1601 1602
	page = btr_cur_get_page(cursor);

	ut_ad(!split_rec == !offsets);
	ut_ad(!offsets
1603
	      || !page_is_comp(page) == !rec_offs_comp(offsets));
osku's avatar
osku committed
1604
	ut_ad(!offsets
1605
	      || rec_offs_validate(split_rec, cursor->index, offsets));
osku's avatar
osku committed
1606

1607
	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
osku's avatar
osku committed
1608 1609 1610 1611 1612 1613
	free_space  = page_get_free_space_of_empty(page_is_comp(page));

	/* free_space is now the free space of a created new page */

	total_data   = page_get_data_size(page) + insert_size;
	total_n_recs = page_get_n_recs(page) + 1;
1614

osku's avatar
osku committed
1615 1616 1617
	/* We determine which records (from rec to end_rec, not including
	end_rec) will end up on the other half page from tuple when it is
	inserted. */
1618

osku's avatar
osku committed
1619 1620 1621 1622 1623 1624 1625
	if (split_rec == NULL) {
		rec = page_rec_get_next(page_get_infimum_rec(page));
		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));

	} else if (cmp_dtuple_rec(tuple, split_rec, offsets) >= 0) {

		rec = page_rec_get_next(page_get_infimum_rec(page));
1626
		end_rec = split_rec;
osku's avatar
osku committed
1627 1628 1629 1630 1631 1632
	} else {
		rec = split_rec;
		end_rec = page_get_supremum_rec(page);
	}

	if (total_data + page_dir_calc_reserved_space(total_n_recs)
1633
	    <= free_space) {
osku's avatar
osku committed
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647

		/* Ok, there will be enough available space on the
		half page where the tuple is inserted */

		return(TRUE);
	}

	offs = NULL;

	while (rec != end_rec) {
		/* In this loop we calculate the amount of reserved
		space after rec is removed from page. */

		offs = rec_get_offsets(rec, cursor->index, offs,
1648
				       ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1649 1650 1651 1652 1653

		total_data -= rec_offs_size(offs);
		total_n_recs--;

		if (total_data + page_dir_calc_reserved_space(total_n_recs)
1654
		    <= free_space) {
osku's avatar
osku committed
1655 1656 1657 1658 1659 1660 1661

			/* Ok, there will be enough available space on the
			half page where the tuple is inserted */

			return(TRUE);
		}

1662
		rec = page_rec_get_next_const(rec);
osku's avatar
osku committed
1663 1664 1665
	}

	return(FALSE);
1666
}
osku's avatar
osku committed
1667

1668
/*******************************************************//**
osku's avatar
osku committed
1669 1670
Inserts a data tuple to a tree on a non-leaf level. It is assumed
that mtr holds an x-latch on the tree. */
1671
UNIV_INTERN
osku's avatar
osku committed
1672
void
1673 1674
btr_insert_on_non_leaf_level_func(
/*==============================*/
1675 1676 1677
	dict_index_t*	index,	/*!< in: index */
	ulint		level,	/*!< in: level, must be > 0 */
	dtuple_t*	tuple,	/*!< in: the record to be inserted */
1678 1679
	const char*	file,	/*!< in: file name */
	ulint		line,	/*!< in: line where called */
1680
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1681 1682
{
	big_rec_t*	dummy_big_rec;
1683
	btr_cur_t	cursor;
osku's avatar
osku committed
1684 1685 1686 1687
	ulint		err;
	rec_t*		rec;

	ut_ad(level > 0);
1688

1689
	btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
1690
				    BTR_CONT_MODIFY_TREE,
1691
				    &cursor, 0, file, line, mtr);
osku's avatar
osku committed
1692 1693

	err = btr_cur_pessimistic_insert(BTR_NO_LOCKING_FLAG
1694 1695 1696
					 | BTR_KEEP_SYS_FLAG
					 | BTR_NO_UNDO_LOG_FLAG,
					 &cursor, tuple, &rec,
1697
					 &dummy_big_rec, 0, NULL, mtr);
osku's avatar
osku committed
1698 1699 1700
	ut_a(err == DB_SUCCESS);
}

1701
/**************************************************************//**
osku's avatar
osku committed
1702 1703 1704 1705 1706 1707
Attaches the halves of an index page on the appropriate level in an
index tree. */
static
void
btr_attach_half_pages(
/*==================*/
1708 1709 1710
	dict_index_t*	index,		/*!< in: the index tree */
	buf_block_t*	block,		/*!< in/out: page to be split */
	rec_t*		split_rec,	/*!< in: first record on upper
osku's avatar
osku committed
1711
					half page */
1712 1713 1714
	buf_block_t*	new_block,	/*!< in/out: the new half page */
	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
	mtr_t*		mtr)		/*!< in: mtr */
osku's avatar
osku committed
1715 1716
{
	ulint		space;
1717
	ulint		zip_size;
osku's avatar
osku committed
1718 1719 1720
	ulint		prev_page_no;
	ulint		next_page_no;
	ulint		level;
1721
	page_t*		page		= buf_block_get_frame(block);
osku's avatar
osku committed
1722 1723 1724 1725
	page_t*		lower_page;
	page_t*		upper_page;
	ulint		lower_page_no;
	ulint		upper_page_no;
marko's avatar
marko committed
1726 1727
	page_zip_des_t*	lower_page_zip;
	page_zip_des_t*	upper_page_zip;
osku's avatar
osku committed
1728
	dtuple_t*	node_ptr_upper;
1729
	mem_heap_t*	heap;
osku's avatar
osku committed
1730

1731 1732
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
1733 1734 1735 1736 1737 1738 1739

	/* Create a memory heap where the data tuple is stored */
	heap = mem_heap_create(1024);

	/* Based on split direction, decide upper and lower pages */
	if (direction == FSP_DOWN) {

1740 1741 1742 1743 1744 1745 1746 1747 1748
		btr_cur_t	cursor;
		ulint*		offsets;

		lower_page = buf_block_get_frame(new_block);
		lower_page_no = buf_block_get_page_no(new_block);
		lower_page_zip = buf_block_get_page_zip(new_block);
		upper_page = buf_block_get_frame(block);
		upper_page_no = buf_block_get_page_no(block);
		upper_page_zip = buf_block_get_page_zip(block);
osku's avatar
osku committed
1749

1750
		/* Look up the index for the node pointer to page */
1751 1752
		offsets = btr_page_get_father_block(NULL, heap, index,
						    block, mtr, &cursor);
osku's avatar
osku committed
1753

1754
		/* Replace the address of the old child node (= page) with the
osku's avatar
osku committed
1755 1756
		address of the new lower half */

1757
		btr_node_ptr_set_child_page_no(
1758 1759 1760
			btr_cur_get_rec(&cursor),
			btr_cur_get_page_zip(&cursor),
			offsets, lower_page_no, mtr);
osku's avatar
osku committed
1761 1762
		mem_heap_empty(heap);
	} else {
1763 1764 1765 1766 1767 1768
		lower_page = buf_block_get_frame(block);
		lower_page_no = buf_block_get_page_no(block);
		lower_page_zip = buf_block_get_page_zip(block);
		upper_page = buf_block_get_frame(new_block);
		upper_page_no = buf_block_get_page_no(new_block);
		upper_page_zip = buf_block_get_page_zip(new_block);
osku's avatar
osku committed
1769
	}
1770

osku's avatar
osku committed
1771
	/* Get the level of the split pages */
1772
	level = btr_page_get_level(buf_block_get_frame(block), mtr);
1773 1774
	ut_ad(level
	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
osku's avatar
osku committed
1775 1776 1777 1778

	/* Build the node pointer (= node key and page address) for the upper
	half */

1779 1780
	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
						   upper_page_no, heap, level);
osku's avatar
osku committed
1781 1782 1783 1784

	/* Insert it next to the pointer to the lower half. Note that this
	may generate recursion leading to a split on the higher level. */

1785
	btr_insert_on_non_leaf_level(index, level + 1, node_ptr_upper, mtr);
1786

osku's avatar
osku committed
1787 1788 1789 1790 1791 1792 1793
	/* Free the memory heap */
	mem_heap_free(heap);

	/* Get the previous and next pages of page */

	prev_page_no = btr_page_get_prev(page, mtr);
	next_page_no = btr_page_get_next(page, mtr);
1794
	space = buf_block_get_space(block);
1795
	zip_size = buf_block_get_zip_size(block);
1796

osku's avatar
osku committed
1797
	/* Update page links of the level */
1798

osku's avatar
osku committed
1799
	if (prev_page_no != FIL_NULL) {
1800 1801
		buf_block_t*	prev_block = btr_block_get(space, zip_size,
							   prev_page_no,
1802
							   RW_X_LATCH, mtr);
1803
#ifdef UNIV_BTR_DEBUG
1804 1805
		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
		ut_a(btr_page_get_next(prev_block->frame, mtr)
1806
		     == buf_block_get_page_no(block));
1807
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
1808

1809 1810
		btr_page_set_next(buf_block_get_frame(prev_block),
				  buf_block_get_page_zip(prev_block),
1811
				  lower_page_no, mtr);
osku's avatar
osku committed
1812 1813 1814
	}

	if (next_page_no != FIL_NULL) {
1815 1816
		buf_block_t*	next_block = btr_block_get(space, zip_size,
							   next_page_no,
1817 1818 1819 1820 1821 1822
							   RW_X_LATCH, mtr);
#ifdef UNIV_BTR_DEBUG
		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
		ut_a(btr_page_get_prev(next_block->frame, mtr)
		     == page_get_page_no(page));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
1823

1824 1825
		btr_page_set_prev(buf_block_get_frame(next_block),
				  buf_block_get_page_zip(next_block),
1826
				  upper_page_no, mtr);
osku's avatar
osku committed
1827
	}
1828

marko's avatar
marko committed
1829 1830
	btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
osku's avatar
osku committed
1831

marko's avatar
marko committed
1832 1833
	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
	btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
osku's avatar
osku committed
1834 1835
}

1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
/*************************************************************//**
Determine if a tuple is smaller than any record on the page.
@return TRUE if smaller */
static
ibool
btr_page_tuple_smaller(
/*===================*/
	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
	const dtuple_t*	tuple,	/*!< in: tuple to consider */
	ulint*		offsets,/*!< in/out: temporary storage */
	ulint		n_uniq,	/*!< in: number of unique fields
				in the index page records */
	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
{
	buf_block_t*	block;
	const rec_t*	first_rec;
	page_cur_t	pcur;

	/* Read the first user record in the page. */
	block = btr_cur_get_block(cursor);
	page_cur_set_before_first(block, &pcur);
	page_cur_move_to_next(&pcur);
	first_rec = page_cur_get_rec(&pcur);

	offsets = rec_get_offsets(
		first_rec, cursor->index, offsets,
		n_uniq, heap);

	return(cmp_dtuple_rec(tuple, first_rec, offsets) < 0);
}

1867
/*************************************************************//**
osku's avatar
osku committed
1868
Splits an index page to halves and inserts the tuple. It is assumed
1869 1870 1871 1872
that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
released within this function! NOTE that the operation of this
function must always succeed, we cannot reverse it: therefore enough
free disk space (2 pages) must be guaranteed to be available before
1873
this function is called.
1874 1875

@return inserted record */
1876
UNIV_INTERN
osku's avatar
osku committed
1877 1878 1879
rec_t*
btr_page_split_and_insert(
/*======================*/
1880
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
osku's avatar
osku committed
1881 1882
				function returns, the cursor is positioned
				on the predecessor of the inserted record */
1883 1884 1885
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1886
{
1887
	buf_block_t*	block;
1888
	page_t*		page;
marko's avatar
marko committed
1889
	page_zip_des_t*	page_zip;
1890
	ulint		page_no;
osku's avatar
osku committed
1891
	byte		direction;
1892
	ulint		hint_page_no;
1893
	buf_block_t*	new_block;
1894
	page_t*		new_page;
1895
	page_zip_des_t*	new_page_zip;
osku's avatar
osku committed
1896
	rec_t*		split_rec;
1897 1898 1899
	buf_block_t*	left_block;
	buf_block_t*	right_block;
	buf_block_t*	insert_block;
osku's avatar
osku committed
1900 1901 1902 1903 1904
	page_cur_t*	page_cursor;
	rec_t*		first_rec;
	byte*		buf = 0; /* remove warning */
	rec_t*		move_limit;
	ibool		insert_will_fit;
marko's avatar
marko committed
1905
	ibool		insert_left;
osku's avatar
osku committed
1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916
	ulint		n_iterations = 0;
	rec_t*		rec;
	mem_heap_t*	heap;
	ulint		n_uniq;
	ulint*		offsets;

	heap = mem_heap_create(1024);
	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
func_start:
	mem_heap_empty(heap);
	offsets = NULL;
1917

1918
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
1919
				MTR_MEMO_X_LOCK));
osku's avatar
osku committed
1920
#ifdef UNIV_SYNC_DEBUG
1921
	ut_ad(rw_lock_own(dict_index_get_lock(cursor->index), RW_LOCK_EX));
osku's avatar
osku committed
1922 1923
#endif /* UNIV_SYNC_DEBUG */

1924
	block = btr_cur_get_block(cursor);
1925 1926
	page = buf_block_get_frame(block);
	page_zip = buf_block_get_page_zip(block);
osku's avatar
osku committed
1927

1928
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1929
	ut_ad(page_get_n_recs(page) >= 1);
1930

1931
	page_no = buf_block_get_page_no(block);
osku's avatar
osku committed
1932 1933 1934 1935

	/* 1. Decide the split record; split_rec == NULL means that the
	tuple to be inserted should be the first record on the upper
	half-page */
Vasil Dimov's avatar
Vasil Dimov committed
1936
	insert_left = FALSE;
osku's avatar
osku committed
1937 1938 1939

	if (n_iterations > 0) {
		direction = FSP_UP;
1940
		hint_page_no = page_no + 1;
1941
		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
1942

1943 1944 1945 1946
		if (UNIV_UNLIKELY(split_rec == NULL)) {
			insert_left = btr_page_tuple_smaller(
				cursor, tuple, offsets, n_uniq, &heap);
		}
osku's avatar
osku committed
1947 1948
	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
		direction = FSP_UP;
1949
		hint_page_no = page_no + 1;
osku's avatar
osku committed
1950 1951 1952

	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
		direction = FSP_DOWN;
1953
		hint_page_no = page_no - 1;
1954
		ut_ad(split_rec);
osku's avatar
osku committed
1955 1956
	} else {
		direction = FSP_UP;
1957
		hint_page_no = page_no + 1;
1958

1959 1960 1961 1962 1963 1964
		/* If there is only one record in the index page, we
		can't split the node in the middle by default. We need
		to determine whether the new record will be inserted
		to the left or right. */

		if (page_get_n_recs(page) > 1) {
1965
			split_rec = page_get_middle_rec(page);
1966 1967 1968 1969 1970 1971
		} else if (btr_page_tuple_smaller(cursor, tuple,
						  offsets, n_uniq, &heap)) {
			split_rec = page_rec_get_next(
				page_get_infimum_rec(page));
		} else {
			split_rec = NULL;
1972
		}
osku's avatar
osku committed
1973 1974
	}

1975
	/* 2. Allocate a new page to the index */
1976
	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
1977
				   btr_page_get_level(page, mtr), mtr);
1978 1979
	new_page = buf_block_get_frame(new_block);
	new_page_zip = buf_block_get_page_zip(new_block);
1980
	btr_page_create(new_block, new_page_zip, cursor->index,
1981
			btr_page_get_level(page, mtr), mtr);
1982

osku's avatar
osku committed
1983 1984 1985 1986
	/* 3. Calculate the first record on the upper half-page, and the
	first record (move_limit) on original page which ends up on the
	upper half */

marko's avatar
marko committed
1987 1988
	if (split_rec) {
		first_rec = move_limit = split_rec;
1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000

		offsets = rec_get_offsets(split_rec, cursor->index, offsets,
					  n_uniq, &heap);

		insert_left = cmp_dtuple_rec(tuple, split_rec, offsets) < 0;

		if (UNIV_UNLIKELY(!insert_left && new_page_zip
				  && n_iterations > 0)) {
			/* If a compressed page has already been split,
			avoid further splits by inserting the record
			to an empty page. */
			split_rec = NULL;
2001
			goto insert_empty;
2002
		}
Vasil Dimov's avatar
Vasil Dimov committed
2003
	} else if (UNIV_UNLIKELY(insert_left)) {
2004
		ut_a(n_iterations > 0);
Vasil Dimov's avatar
Vasil Dimov committed
2005 2006
		first_rec = page_rec_get_next(page_get_infimum_rec(page));
		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
osku's avatar
osku committed
2007
	} else {
2008 2009
insert_empty:
		ut_ad(!split_rec);
Vasil Dimov's avatar
Vasil Dimov committed
2010
		ut_ad(!insert_left);
2011
		buf = mem_alloc(rec_get_converted_size(cursor->index,
2012
						       tuple, n_ext));
osku's avatar
osku committed
2013

2014
		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2015
						      tuple, n_ext);
osku's avatar
osku committed
2016 2017
		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
	}
2018

osku's avatar
osku committed
2019 2020
	/* 4. Do first the modifications in the tree structure */

2021 2022
	btr_attach_half_pages(cursor->index, block,
			      first_rec, new_block, direction, mtr);
osku's avatar
osku committed
2023 2024 2025 2026 2027 2028 2029

	/* If the split is made on the leaf level and the insert will fit
	on the appropriate half-page, we may release the tree x-latch.
	We can then move the records after releasing the tree latch,
	thus reducing the tree latch contention. */

	if (split_rec) {
2030 2031 2032
		insert_will_fit = !new_page_zip
			&& btr_page_insert_fits(cursor, split_rec,
						offsets, tuple, n_ext, heap);
osku's avatar
osku committed
2033
	} else {
Vasil Dimov's avatar
Vasil Dimov committed
2034 2035 2036 2037 2038
		if (!insert_left) {
			mem_free(buf);
			buf = NULL;
		}

2039 2040 2041
		insert_will_fit = !new_page_zip
			&& btr_page_insert_fits(cursor, NULL,
						NULL, tuple, n_ext, heap);
osku's avatar
osku committed
2042
	}
2043

2044
	if (insert_will_fit && page_is_leaf(page)) {
osku's avatar
osku committed
2045

2046
		mtr_memo_release(mtr, dict_index_get_lock(cursor->index),
2047
				 MTR_MEMO_X_LOCK);
osku's avatar
osku committed
2048 2049 2050
	}

	/* 5. Move then the records to the new page */
Vasil Dimov's avatar
Vasil Dimov committed
2051
	if (direction == FSP_DOWN) {
2052
		/*		fputs("Split left\n", stderr); */
marko's avatar
marko committed
2053

2054 2055 2056 2057 2058
		if (0
#ifdef UNIV_ZIP_COPY
		    || page_zip
#endif /* UNIV_ZIP_COPY */
		    || UNIV_UNLIKELY
2059
		    (!page_move_rec_list_start(new_block, block, move_limit,
2060
					       cursor->index, mtr))) {
2061 2062 2063 2064 2065 2066 2067
			/* For some reason, compressing new_page failed,
			even though it should contain fewer records than
			the original page.  Copy the page byte for byte
			and then delete the records from both pages
			as appropriate.  Deleting will always succeed. */
			ut_a(new_page_zip);

2068 2069
			page_zip_copy_recs(new_page_zip, new_page,
					   page_zip, page, cursor->index, mtr);
2070 2071
			page_delete_rec_list_end(move_limit - page + new_page,
						 new_block, cursor->index,
2072
						 ULINT_UNDEFINED,
2073
						 ULINT_UNDEFINED, mtr);
2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085

			/* Update the lock table and possible hash index. */

			lock_move_rec_list_start(
				new_block, block, move_limit,
				new_page + PAGE_NEW_INFIMUM);

			btr_search_move_or_delete_hash_entries(
				new_block, block, cursor->index);

			/* Delete the records from the source page. */

2086 2087
			page_delete_rec_list_start(move_limit, block,
						   cursor->index, mtr);
2088
		}
osku's avatar
osku committed
2089

2090 2091
		left_block = new_block;
		right_block = block;
osku's avatar
osku committed
2092

2093
		lock_update_split_left(right_block, left_block);
osku's avatar
osku committed
2094
	} else {
2095
		/*		fputs("Split right\n", stderr); */
osku's avatar
osku committed
2096

2097 2098 2099 2100 2101
		if (0
#ifdef UNIV_ZIP_COPY
		    || page_zip
#endif /* UNIV_ZIP_COPY */
		    || UNIV_UNLIKELY
2102
		    (!page_move_rec_list_end(new_block, block, move_limit,
2103
					     cursor->index, mtr))) {
2104 2105 2106 2107 2108 2109 2110
			/* For some reason, compressing new_page failed,
			even though it should contain fewer records than
			the original page.  Copy the page byte for byte
			and then delete the records from both pages
			as appropriate.  Deleting will always succeed. */
			ut_a(new_page_zip);

2111 2112
			page_zip_copy_recs(new_page_zip, new_page,
					   page_zip, page, cursor->index, mtr);
2113
			page_delete_rec_list_start(move_limit - page
2114 2115
						   + new_page, new_block,
						   cursor->index, mtr);
2116 2117 2118 2119 2120 2121 2122 2123 2124 2125

			/* Update the lock table and possible hash index. */

			lock_move_rec_list_end(new_block, block, move_limit);

			btr_search_move_or_delete_hash_entries(
				new_block, block, cursor->index);

			/* Delete the records from the source page. */

2126 2127
			page_delete_rec_list_end(move_limit, block,
						 cursor->index,
2128
						 ULINT_UNDEFINED,
2129
						 ULINT_UNDEFINED, mtr);
2130 2131
		}

2132 2133
		left_block = block;
		right_block = new_block;
2134

2135
		lock_update_split_right(right_block, left_block);
osku's avatar
osku committed
2136 2137
	}

2138 2139 2140 2141 2142 2143 2144
#ifdef UNIV_ZIP_DEBUG
	if (UNIV_LIKELY_NULL(page_zip)) {
		ut_a(page_zip_validate(page_zip, page));
		ut_a(page_zip_validate(new_page_zip, new_page));
	}
#endif /* UNIV_ZIP_DEBUG */

marko's avatar
marko committed
2145 2146 2147
	/* At this point, split_rec, move_limit and first_rec may point
	to garbage on the old page. */

osku's avatar
osku committed
2148 2149 2150
	/* 6. The split and the tree modification is now completed. Decide the
	page where the tuple should be inserted */

marko's avatar
marko committed
2151
	if (insert_left) {
2152
		insert_block = left_block;
osku's avatar
osku committed
2153
	} else {
2154
		insert_block = right_block;
osku's avatar
osku committed
2155 2156 2157 2158 2159
	}

	/* 7. Reposition the cursor for insert and try insertion */
	page_cursor = btr_cur_get_page_cur(cursor);

2160
	page_cur_search(insert_block, cursor->index, tuple,
2161
			PAGE_CUR_LE, page_cursor);
osku's avatar
osku committed
2162

2163
	rec = page_cur_tuple_insert(page_cursor, tuple,
2164
				    cursor->index, n_ext, mtr);
marko's avatar
marko committed
2165

2166
#ifdef UNIV_ZIP_DEBUG
2167
	{
2168 2169 2170
		page_t*		insert_page
			= buf_block_get_frame(insert_block);

2171 2172
		page_zip_des_t*	insert_page_zip
			= buf_block_get_page_zip(insert_block);
2173

2174 2175 2176
		ut_a(!insert_page_zip
		     || page_zip_validate(insert_page_zip, insert_page));
	}
2177
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
2178

marko's avatar
marko committed
2179
	if (UNIV_LIKELY(rec != NULL)) {
2180 2181

		goto func_exit;
osku's avatar
osku committed
2182
	}
2183

2184
	/* 8. If insert did not fit, try page reorganization */
osku's avatar
osku committed
2185

2186
	if (UNIV_UNLIKELY
2187
	    (!btr_page_reorganize(insert_block, cursor->index, mtr))) {
2188 2189 2190

		goto insert_failed;
	}
osku's avatar
osku committed
2191

2192
	page_cur_search(insert_block, cursor->index, tuple,
2193
			PAGE_CUR_LE, page_cursor);
2194
	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2195
				    n_ext, mtr);
marko's avatar
marko committed
2196 2197

	if (UNIV_UNLIKELY(rec == NULL)) {
osku's avatar
osku committed
2198 2199
		/* The insert did not fit on the page: loop back to the
		start of the function for a new split */
2200
insert_failed:
2201
		/* We play safe and reset the free bits for new_page */
2202 2203 2204
		if (!dict_index_is_clust(cursor->index)) {
			ibuf_reset_free_bits(new_block);
		}
osku's avatar
osku committed
2205 2206

		/* fprintf(stderr, "Split second round %lu\n",
2207
		page_get_page_no(page)); */
osku's avatar
osku committed
2208
		n_iterations++;
2209 2210
		ut_ad(n_iterations < 2
		      || buf_block_get_page_zip(insert_block));
2211
		ut_ad(!insert_will_fit);
osku's avatar
osku committed
2212 2213 2214 2215

		goto func_start;
	}

2216
func_exit:
osku's avatar
osku committed
2217 2218 2219
	/* Insert fit on the page: update the free bits for the
	left and right pages in the same mtr */

2220 2221 2222 2223 2224 2225
	if (!dict_index_is_clust(cursor->index) && page_is_leaf(page)) {
		ibuf_update_free_bits_for_two_pages_low(
			buf_block_get_zip_size(left_block),
			left_block, right_block, mtr);
	}

2226 2227
#if 0
	fprintf(stderr, "Split and insert done %lu %lu\n",
2228 2229
		buf_block_get_page_no(left_block),
		buf_block_get_page_no(right_block));
2230
#endif
osku's avatar
osku committed
2231

2232
	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2233
	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
osku's avatar
osku committed
2234 2235 2236 2237 2238

	mem_heap_free(heap);
	return(rec);
}

2239
/*************************************************************//**
osku's avatar
osku committed
2240 2241 2242 2243 2244
Removes a page from the level list of pages. */
static
void
btr_level_list_remove(
/*==================*/
2245 2246
	ulint		space,	/*!< in: space where removed */
	ulint		zip_size,/*!< in: compressed page size in bytes
2247
				or 0 for uncompressed pages */
2248 2249
	page_t*		page,	/*!< in: page to remove */
	mtr_t*		mtr)	/*!< in: mtr */
2250
{
osku's avatar
osku committed
2251 2252
	ulint	prev_page_no;
	ulint	next_page_no;
2253

2254
	ut_ad(page && mtr);
2255
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
2256
	ut_ad(space == page_get_space_id(page));
osku's avatar
osku committed
2257 2258 2259 2260
	/* Get the previous and next page numbers of page */

	prev_page_no = btr_page_get_prev(page, mtr);
	next_page_no = btr_page_get_next(page, mtr);
2261

osku's avatar
osku committed
2262
	/* Update page links of the level */
2263

osku's avatar
osku committed
2264
	if (prev_page_no != FIL_NULL) {
2265
		buf_block_t*	prev_block
2266 2267
			= btr_block_get(space, zip_size, prev_page_no,
					RW_X_LATCH, mtr);
2268 2269
		page_t*		prev_page
			= buf_block_get_frame(prev_block);
2270
#ifdef UNIV_BTR_DEBUG
2271
		ut_a(page_is_comp(prev_page) == page_is_comp(page));
2272
		ut_a(btr_page_get_next(prev_page, mtr)
2273
		     == page_get_page_no(page));
2274
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2275

2276 2277
		btr_page_set_next(prev_page,
				  buf_block_get_page_zip(prev_block),
2278
				  next_page_no, mtr);
osku's avatar
osku committed
2279 2280 2281
	}

	if (next_page_no != FIL_NULL) {
2282
		buf_block_t*	next_block
2283 2284
			= btr_block_get(space, zip_size, next_page_no,
					RW_X_LATCH, mtr);
2285 2286
		page_t*		next_page
			= buf_block_get_frame(next_block);
2287
#ifdef UNIV_BTR_DEBUG
2288
		ut_a(page_is_comp(next_page) == page_is_comp(page));
2289
		ut_a(btr_page_get_prev(next_page, mtr)
2290
		     == page_get_page_no(page));
2291
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2292

2293 2294
		btr_page_set_prev(next_page,
				  buf_block_get_page_zip(next_block),
2295
				  prev_page_no, mtr);
osku's avatar
osku committed
2296 2297
	}
}
2298

2299
/****************************************************************//**
osku's avatar
osku committed
2300 2301 2302 2303 2304 2305
Writes the redo log record for setting an index record as the predefined
minimum record. */
UNIV_INLINE
void
btr_set_min_rec_mark_log(
/*=====================*/
2306 2307 2308
	rec_t*	rec,	/*!< in: record */
	byte	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or MLOG_REC_MIN_MARK */
	mtr_t*	mtr)	/*!< in: mtr */
osku's avatar
osku committed
2309
{
marko's avatar
marko committed
2310
	mlog_write_initial_log_record(rec, type, mtr);
osku's avatar
osku committed
2311 2312

	/* Write rec offset as a 2-byte ulint */
2313
	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
osku's avatar
osku committed
2314
}
2315 2316 2317
#else /* !UNIV_HOTBACKUP */
# define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
2318

2319
/****************************************************************//**
osku's avatar
osku committed
2320
Parses the redo log record for setting an index record as the predefined
2321 2322
minimum record.
@return	end of log record or NULL */
2323
UNIV_INTERN
osku's avatar
osku committed
2324 2325 2326
byte*
btr_parse_set_min_rec_mark(
/*=======================*/
2327 2328 2329 2330 2331
	byte*	ptr,	/*!< in: buffer */
	byte*	end_ptr,/*!< in: buffer end */
	ulint	comp,	/*!< in: nonzero=compact page format */
	page_t*	page,	/*!< in: page or NULL */
	mtr_t*	mtr)	/*!< in: mtr or NULL */
osku's avatar
osku committed
2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344
{
	rec_t*	rec;

	if (end_ptr < ptr + 2) {

		return(NULL);
	}

	if (page) {
		ut_a(!page_is_comp(page) == !comp);

		rec = page + mach_read_from_2(ptr);

2345
		btr_set_min_rec_mark(rec, mtr);
osku's avatar
osku committed
2346 2347 2348 2349 2350
	}

	return(ptr + 2);
}

2351
/****************************************************************//**
osku's avatar
osku committed
2352
Sets a record as the predefined minimum record. */
2353
UNIV_INTERN
osku's avatar
osku committed
2354 2355 2356
void
btr_set_min_rec_mark(
/*=================*/
2357 2358
	rec_t*	rec,	/*!< in: record */
	mtr_t*	mtr)	/*!< in: mtr */
osku's avatar
osku committed
2359 2360 2361
{
	ulint	info_bits;

marko's avatar
marko committed
2362 2363
	if (UNIV_LIKELY(page_rec_is_comp(rec))) {
		info_bits = rec_get_info_bits(rec, TRUE);
osku's avatar
osku committed
2364

2365
		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
marko's avatar
marko committed
2366 2367 2368 2369 2370 2371 2372 2373 2374

		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
	} else {
		info_bits = rec_get_info_bits(rec, FALSE);

		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);

		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
	}
osku's avatar
osku committed
2375 2376
}

2377
#ifndef UNIV_HOTBACKUP
2378
/*************************************************************//**
osku's avatar
osku committed
2379
Deletes on the upper level the node pointer to a page. */
2380
UNIV_INTERN
osku's avatar
osku committed
2381 2382 2383
void
btr_node_ptr_delete(
/*================*/
2384 2385 2386
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2387 2388 2389 2390
{
	btr_cur_t	cursor;
	ibool		compressed;
	ulint		err;
2391

2392
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
2393

2394 2395
	/* Delete node pointer on father page */
	btr_page_get_father(index, block, mtr, &cursor);
osku's avatar
osku committed
2396

2397
	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor, RB_NONE,
2398
						mtr);
osku's avatar
osku committed
2399 2400 2401 2402 2403 2404 2405
	ut_a(err == DB_SUCCESS);

	if (!compressed) {
		btr_cur_compress_if_useful(&cursor, mtr);
	}
}

2406
/*************************************************************//**
osku's avatar
osku committed
2407 2408 2409
If page is the only on its level, this function moves its records to the
father page, thus reducing the tree height. */
static
2410
void
osku's avatar
osku committed
2411 2412
btr_lift_page_up(
/*=============*/
2413 2414
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page which is the only on its level;
osku's avatar
osku committed
2415 2416 2417
				must not be empty: use
				btr_discard_only_page_on_level if the last
				record from the page should be removed */
2418
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2419
{
2420
	buf_block_t*	father_block;
osku's avatar
osku committed
2421 2422
	page_t*		father_page;
	ulint		page_level;
marko's avatar
marko committed
2423
	page_zip_des_t*	father_page_zip;
2424
	page_t*		page		= buf_block_get_frame(block);
2425 2426
	ulint		root_page_no;
	buf_block_t*	blocks[BTR_MAX_LEVELS];
2427
	ulint		n_blocks;	/*!< last used index in blocks[] */
2428
	ulint		i;
osku's avatar
osku committed
2429 2430 2431

	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
2432
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2433

2434 2435 2436
	page_level = btr_page_get_level(page, mtr);
	root_page_no = dict_index_get_page(index);

2437 2438 2439 2440
	{
		btr_cur_t	cursor;
		mem_heap_t*	heap	= mem_heap_create(100);
		ulint*		offsets;
2441
		buf_block_t*	b;
2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455

		offsets = btr_page_get_father_block(NULL, heap, index,
						    block, mtr, &cursor);
		father_block = btr_cur_get_block(&cursor);
		father_page_zip = buf_block_get_page_zip(father_block);
		father_page = buf_block_get_frame(father_block);

		n_blocks = 0;

		/* Store all ancestor pages so we can reset their
		levels later on.  We have to do all the searches on
		the tree now because later on, after we've replaced
		the first level, the tree is in an inconsistent state
		and can not be searched. */
2456 2457
		for (b = father_block;
		     buf_block_get_page_no(b) != root_page_no; ) {
2458 2459 2460
			ut_a(n_blocks < BTR_MAX_LEVELS);

			offsets = btr_page_get_father_block(offsets, heap,
2461
							    index, b,
2462 2463
							    mtr, &cursor);

2464
			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
2465
		}
marko's avatar
marko committed
2466

2467 2468
		mem_heap_free(heap);
	}
osku's avatar
osku committed
2469

2470
	btr_search_drop_page_hash_index(block);
2471

osku's avatar
osku committed
2472
	/* Make the father empty */
2473
	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
osku's avatar
osku committed
2474

2475
	/* Copy the records to the father page one by one. */
2476 2477 2478 2479 2480
	if (0
#ifdef UNIV_ZIP_COPY
	    || father_page_zip
#endif /* UNIV_ZIP_COPY */
	    || UNIV_UNLIKELY
2481
	    (!page_copy_rec_list_end(father_block, block,
2482 2483
				     page_get_infimum_rec(page),
				     index, mtr))) {
2484 2485
		const page_zip_des_t*	page_zip
			= buf_block_get_page_zip(block);
2486
		ut_a(father_page_zip);
2487
		ut_a(page_zip);
2488 2489

		/* Copy the page byte for byte. */
2490 2491
		page_zip_copy_recs(father_page_zip, father_page,
				   page_zip, page, index, mtr);
2492 2493 2494 2495 2496 2497 2498 2499

		/* Update the lock table and possible hash index. */

		lock_move_rec_list_end(father_block, block,
				       page_get_infimum_rec(page));

		btr_search_move_or_delete_hash_entries(father_block, block,
						       index);
marko's avatar
marko committed
2500
	}
osku's avatar
osku committed
2501

2502
	lock_update_copy_and_discard(father_block, block);
marko's avatar
marko committed
2503

2504
	/* Go upward to root page, decrementing levels by one. */
2505
	for (i = 0; i < n_blocks; i++, page_level++) {
2506 2507
		page_t*		page	= buf_block_get_frame(blocks[i]);
		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
2508 2509 2510

		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);

2511 2512 2513 2514
		btr_page_set_level(page, page_zip, page_level, mtr);
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
2515 2516
	}

osku's avatar
osku committed
2517
	/* Free the file page */
2518
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
2519

2520
	/* We play it safe and reset the free bits for the father */
2521 2522 2523
	if (!dict_index_is_clust(index)) {
		ibuf_reset_free_bits(father_block);
	}
osku's avatar
osku committed
2524
	ut_ad(page_validate(father_page, index));
2525
	ut_ad(btr_check_node_ptr(index, father_block, mtr));
2526
}
osku's avatar
osku committed
2527

2528
/*************************************************************//**
osku's avatar
osku committed
2529 2530 2531 2532 2533 2534 2535
Tries to merge the page first to the left immediate brother if such a
brother exists, and the node pointers to the current page and to the brother
reside on the same page. If the left brother does not satisfy these
conditions, looks at the right brother. If the page is the only one on that
level lifts the records of the page to the father page, thus reducing the
tree height. It is assumed that mtr holds an x-latch on the tree and on the
page. If cursor is on the leaf level, mtr must also hold x-latches to the
2536 2537
brothers, if they exist.
@return	TRUE on success */
2538
UNIV_INTERN
marko's avatar
marko committed
2539
ibool
osku's avatar
osku committed
2540 2541
btr_compress(
/*=========*/
2542
	btr_cur_t*	cursor,	/*!< in: cursor on the page to merge or lift;
osku's avatar
osku committed
2543 2544 2545
				the page must not be empty: in record delete
				use btr_discard_page if the page would become
				empty */
2546
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2547
{
2548
	dict_index_t*	index;
osku's avatar
osku committed
2549
	ulint		space;
2550
	ulint		zip_size;
osku's avatar
osku committed
2551 2552
	ulint		left_page_no;
	ulint		right_page_no;
2553
	buf_block_t*	merge_block;
osku's avatar
osku committed
2554
	page_t*		merge_page;
2555
	page_zip_des_t*	merge_page_zip;
osku's avatar
osku committed
2556
	ibool		is_left;
2557
	buf_block_t*	block;
osku's avatar
osku committed
2558
	page_t*		page;
2559 2560 2561
	btr_cur_t	father_cursor;
	mem_heap_t*	heap;
	ulint*		offsets;
osku's avatar
osku committed
2562 2563 2564 2565 2566 2567
	ulint		data_size;
	ulint		n_recs;
	ulint		max_ins_size;
	ulint		max_ins_size_reorg;
	ulint		level;

2568
	block = btr_cur_get_block(cursor);
2569
	page = btr_cur_get_page(cursor);
2570 2571
	index = btr_cur_get_index(cursor);
	ut_a((ibool) !!page_is_comp(page) == dict_table_is_comp(index->table));
osku's avatar
osku committed
2572

2573
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
2574
				MTR_MEMO_X_LOCK));
2575
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
2576
	level = btr_page_get_level(page, mtr);
2577
	space = dict_index_get_space(index);
2578
	zip_size = dict_table_zip_size(index->table);
osku's avatar
osku committed
2579 2580 2581 2582

	left_page_no = btr_page_get_prev(page, mtr);
	right_page_no = btr_page_get_next(page, mtr);

2583 2584 2585 2586
#if 0
	fprintf(stderr, "Merge left page %lu right %lu \n",
		left_page_no, right_page_no);
#endif
osku's avatar
osku committed
2587

2588 2589 2590
	heap = mem_heap_create(100);
	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
					    &father_cursor);
osku's avatar
osku committed
2591 2592 2593 2594

	/* Decide the page to which we try to merge and which will inherit
	the locks */

marko's avatar
marko committed
2595 2596 2597
	is_left = left_page_no != FIL_NULL;

	if (is_left) {
osku's avatar
osku committed
2598

2599 2600
		merge_block = btr_block_get(space, zip_size, left_page_no,
					    RW_X_LATCH, mtr);
2601
		merge_page = buf_block_get_frame(merge_block);
2602
#ifdef UNIV_BTR_DEBUG
2603 2604
		ut_a(btr_page_get_next(merge_page, mtr)
		     == buf_block_get_page_no(block));
2605
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2606 2607
	} else if (right_page_no != FIL_NULL) {

2608 2609
		merge_block = btr_block_get(space, zip_size, right_page_no,
					    RW_X_LATCH, mtr);
2610
		merge_page = buf_block_get_frame(merge_block);
2611
#ifdef UNIV_BTR_DEBUG
2612 2613
		ut_a(btr_page_get_prev(merge_page, mtr)
		     == buf_block_get_page_no(block));
2614
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2615 2616 2617
	} else {
		/* The page is the only one on the level, lift the records
		to the father */
2618
		btr_lift_page_up(index, block, mtr);
2619
		mem_heap_free(heap);
2620
		return(TRUE);
osku's avatar
osku committed
2621
	}
2622

osku's avatar
osku committed
2623 2624
	n_recs = page_get_n_recs(page);
	data_size = page_get_data_size(page);
2625
#ifdef UNIV_BTR_DEBUG
marko's avatar
marko committed
2626
	ut_a(page_is_comp(merge_page) == page_is_comp(page));
2627
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2628

2629 2630
	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
		merge_page, n_recs);
osku's avatar
osku committed
2631 2632 2633
	if (data_size > max_ins_size_reorg) {

		/* No space for merge */
2634
err_exit:
2635 2636 2637 2638 2639 2640 2641
		/* We play it safe and reset the free bits. */
		if (zip_size
		    && page_is_leaf(merge_page)
		    && !dict_index_is_clust(index)) {
			ibuf_reset_free_bits(merge_block);
		}

2642
		mem_heap_free(heap);
marko's avatar
marko committed
2643
		return(FALSE);
osku's avatar
osku committed
2644 2645
	}

2646
	ut_ad(page_validate(merge_page, index));
osku's avatar
osku committed
2647 2648

	max_ins_size = page_get_max_insert_size(merge_page, n_recs);
2649

marko's avatar
marko committed
2650
	if (UNIV_UNLIKELY(data_size > max_ins_size)) {
osku's avatar
osku committed
2651 2652 2653

		/* We have to reorganize merge_page */

2654
		if (UNIV_UNLIKELY(!btr_page_reorganize(merge_block,
2655
						       index, mtr))) {
2656

2657
			goto err_exit;
2658
		}
osku's avatar
osku committed
2659 2660 2661

		max_ins_size = page_get_max_insert_size(merge_page, n_recs);

2662
		ut_ad(page_validate(merge_page, index));
2663
		ut_ad(max_ins_size == max_ins_size_reorg);
osku's avatar
osku committed
2664

marko's avatar
marko committed
2665
		if (UNIV_UNLIKELY(data_size > max_ins_size)) {
osku's avatar
osku committed
2666

marko's avatar
marko committed
2667 2668
			/* Add fault tolerance, though this should
			never happen */
osku's avatar
osku committed
2669

2670
			goto err_exit;
marko's avatar
marko committed
2671
		}
osku's avatar
osku committed
2672 2673
	}

2674
	merge_page_zip = buf_block_get_page_zip(merge_block);
2675
#ifdef UNIV_ZIP_DEBUG
marko's avatar
marko committed
2676
	if (UNIV_LIKELY_NULL(merge_page_zip)) {
2677 2678 2679
		const page_zip_des_t*	page_zip
			= buf_block_get_page_zip(block);
		ut_a(page_zip);
marko's avatar
marko committed
2680
		ut_a(page_zip_validate(merge_page_zip, merge_page));
2681
		ut_a(page_zip_validate(page_zip, page));
marko's avatar
marko committed
2682
	}
2683
#endif /* UNIV_ZIP_DEBUG */
2684

osku's avatar
osku committed
2685 2686
	/* Move records to the merge page */
	if (is_left) {
2687
		rec_t*	orig_pred = page_copy_rec_list_start(
2688 2689
			merge_block, block, page_get_supremum_rec(page),
			index, mtr);
2690 2691

		if (UNIV_UNLIKELY(!orig_pred)) {
2692
			goto err_exit;
marko's avatar
marko committed
2693
		}
osku's avatar
osku committed
2694

2695
		btr_search_drop_page_hash_index(block);
2696 2697

		/* Remove the page from the level list */
2698
		btr_level_list_remove(space, zip_size, page, mtr);
2699

2700
		btr_node_ptr_delete(index, block, mtr);
2701
		lock_update_merge_left(merge_block, orig_pred, block);
osku's avatar
osku committed
2702
	} else {
2703
		rec_t*		orig_succ;
2704
#ifdef UNIV_BTR_DEBUG
2705
		byte		fil_page_prev[4];
2706
#endif /* UNIV_BTR_DEBUG */
2707

2708 2709 2710 2711 2712
		if (UNIV_LIKELY_NULL(merge_page_zip)) {
			/* The function page_zip_compress(), which will be
			invoked by page_copy_rec_list_end() below,
			requires that FIL_PAGE_PREV be FIL_NULL.
			Clear the field, but prepare to restore it. */
2713
#ifdef UNIV_BTR_DEBUG
2714
			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
2715
#endif /* UNIV_BTR_DEBUG */
2716 2717 2718 2719 2720 2721
#if FIL_NULL != 0xffffffff
# error "FIL_NULL != 0xffffffff"
#endif
			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
		}

2722
		orig_succ = page_copy_rec_list_end(merge_block, block,
2723 2724
						   page_get_infimum_rec(page),
						   cursor->index, mtr);
2725 2726

		if (UNIV_UNLIKELY(!orig_succ)) {
2727
			ut_a(merge_page_zip);
2728
#ifdef UNIV_BTR_DEBUG
2729
			/* FIL_PAGE_PREV was restored from merge_page_zip. */
2730 2731 2732
			ut_a(!memcmp(fil_page_prev,
				     merge_page + FIL_PAGE_PREV, 4));
#endif /* UNIV_BTR_DEBUG */
2733
			goto err_exit;
marko's avatar
marko committed
2734
		}
osku's avatar
osku committed
2735

2736
		btr_search_drop_page_hash_index(block);
2737

2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749
#ifdef UNIV_BTR_DEBUG
		if (UNIV_LIKELY_NULL(merge_page_zip)) {
			/* Restore FIL_PAGE_PREV in order to avoid an assertion
			failure in btr_level_list_remove(), which will set
			the field again to FIL_NULL.  Even though this makes
			merge_page and merge_page_zip inconsistent for a
			split second, it is harmless, because the pages
			are X-latched. */
			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
		}
#endif /* UNIV_BTR_DEBUG */

2750
		/* Remove the page from the level list */
2751
		btr_level_list_remove(space, zip_size, page, mtr);
2752

2753 2754 2755
		/* Replace the address of the old child node (= page) with the
		address of the merge page to the right */

2756
		btr_node_ptr_set_child_page_no(
2757 2758 2759 2760
			btr_cur_get_rec(&father_cursor),
			btr_cur_get_page_zip(&father_cursor),
			offsets, right_page_no, mtr);
		btr_node_ptr_delete(index, merge_block, mtr);
2761

2762
		lock_update_merge_right(merge_block, orig_succ, block);
osku's avatar
osku committed
2763 2764
	}

2765 2766
	mem_heap_free(heap);

2767
	if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789
		/* Update the free bits of the B-tree page in the
		insert buffer bitmap.  This has to be done in a
		separate mini-transaction that is committed before the
		main mini-transaction.  We cannot update the insert
		buffer bitmap in this mini-transaction, because
		btr_compress() can be invoked recursively without
		committing the mini-transaction in between.  Since
		insert buffer bitmap pages have a lower rank than
		B-tree pages, we must not access other pages in the
		same mini-transaction after accessing an insert buffer
		bitmap page. */

		/* The free bits in the insert buffer bitmap must
		never exceed the free space on a page.  It is safe to
		decrement or reset the bits in the bitmap in a
		mini-transaction that is committed before the
		mini-transaction that affects the free space. */

		/* It is unsafe to increment the bits in a separately
		committed mini-transaction, because in crash recovery,
		the free bits could momentarily be set too high. */

2790
		if (zip_size) {
2791 2792 2793 2794 2795 2796
			/* Because the free bits may be incremented
			and we cannot update the insert buffer bitmap
			in the same mini-transaction, the only safe
			thing we can do here is the pessimistic
			approach: reset the free bits. */
			ibuf_reset_free_bits(merge_block);
2797
		} else {
2798 2799 2800 2801
			/* On uncompressed pages, the free bits will
			never increase here.  Thus, it is safe to
			write the bits accurately in a separate
			mini-transaction. */
2802
			ibuf_update_free_bits_if_full(merge_block,
2803 2804 2805
						      UNIV_PAGE_SIZE,
						      ULINT_UNDEFINED);
		}
2806
	}
2807

2808
	ut_ad(page_validate(merge_page, index));
2809 2810 2811
#ifdef UNIV_ZIP_DEBUG
	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page));
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
2812 2813

	/* Free the file page */
2814
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
2815

2816
	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
marko's avatar
marko committed
2817
	return(TRUE);
2818
}
osku's avatar
osku committed
2819

2820
/*************************************************************//**
2821 2822 2823 2824
Discards a page that is the only page on its level.  This will empty
the whole B-tree, leaving just an empty root page.  This function
should never be reached, because btr_compress(), which is invoked in
delete operations, calls btr_lift_page_up() to flatten the B-tree. */
osku's avatar
osku committed
2825 2826 2827 2828
static
void
btr_discard_only_page_on_level(
/*===========================*/
2829 2830 2831
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page which is the only on its level */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2832
{
2833 2834 2835 2836 2837
	ulint		page_level = 0;
	trx_id_t	max_trx_id;

	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
2838

2839 2840 2841 2842
	while (buf_block_get_page_no(block) != dict_index_get_page(index)) {
		btr_cur_t	cursor;
		buf_block_t*	father;
		const page_t*	page	= buf_block_get_frame(block);
osku's avatar
osku committed
2843

2844 2845 2846 2847
		ut_a(page_get_n_recs(page) == 1);
		ut_a(page_level == btr_page_get_level(page, mtr));
		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
osku's avatar
osku committed
2848

2849 2850
		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
		btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
2851

2852 2853
		btr_page_get_father(index, block, mtr, &cursor);
		father = btr_cur_get_block(&cursor);
osku's avatar
osku committed
2854

2855
		lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
osku's avatar
osku committed
2856

2857 2858
		/* Free the file page */
		btr_page_free(index, block, mtr);
osku's avatar
osku committed
2859

2860 2861 2862 2863 2864 2865
		block = father;
		page_level++;
	}

	/* block is the root page, which must be empty, except
	for the node pointer to the (now discarded) block(s). */
osku's avatar
osku committed
2866

2867
#ifdef UNIV_BTR_DEBUG
2868 2869 2870 2871 2872 2873 2874 2875
	if (!dict_index_is_ibuf(index)) {
		const page_t*	root	= buf_block_get_frame(block);
		const ulint	space	= dict_index_get_space(index);
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}
2876
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2877

2878
	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
osku's avatar
osku committed
2879

2880
	if (!dict_index_is_clust(index)) {
2881
		/* We play it safe and reset the free bits for the root */
2882
		ibuf_reset_free_bits(block);
2883 2884

		if (page_is_leaf(buf_block_get_frame(block))) {
2885
			ut_a(max_trx_id);
2886 2887 2888 2889
			page_set_max_trx_id(block,
					    buf_block_get_page_zip(block),
					    max_trx_id, mtr);
		}
osku's avatar
osku committed
2890
	}
2891
}
osku's avatar
osku committed
2892

2893
/*************************************************************//**
osku's avatar
osku committed
2894 2895 2896
Discards a page from a B-tree. This is used to remove the last record from
a B-tree page: the whole page must be removed at the same time. This cannot
be used for the root page, which is allowed to be empty. */
2897
UNIV_INTERN
osku's avatar
osku committed
2898 2899 2900
void
btr_discard_page(
/*=============*/
2901
	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
osku's avatar
osku committed
2902
				the root page */
2903
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2904
{
2905
	dict_index_t*	index;
osku's avatar
osku committed
2906
	ulint		space;
2907
	ulint		zip_size;
osku's avatar
osku committed
2908 2909
	ulint		left_page_no;
	ulint		right_page_no;
2910
	buf_block_t*	merge_block;
osku's avatar
osku committed
2911
	page_t*		merge_page;
2912
	buf_block_t*	block;
osku's avatar
osku committed
2913 2914
	page_t*		page;
	rec_t*		node_ptr;
2915

2916
	block = btr_cur_get_block(cursor);
2917
	index = btr_cur_get_index(cursor);
osku's avatar
osku committed
2918

2919
	ut_ad(dict_index_get_page(index) != buf_block_get_page_no(block));
2920
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
2921
				MTR_MEMO_X_LOCK));
2922
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2923
	space = dict_index_get_space(index);
2924
	zip_size = dict_table_zip_size(index->table);
2925

osku's avatar
osku committed
2926 2927
	/* Decide the page which will inherit the locks */

2928 2929
	left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
	right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
osku's avatar
osku committed
2930 2931

	if (left_page_no != FIL_NULL) {
2932 2933
		merge_block = btr_block_get(space, zip_size, left_page_no,
					    RW_X_LATCH, mtr);
2934
		merge_page = buf_block_get_frame(merge_block);
2935
#ifdef UNIV_BTR_DEBUG
2936 2937
		ut_a(btr_page_get_next(merge_page, mtr)
		     == buf_block_get_page_no(block));
2938
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2939
	} else if (right_page_no != FIL_NULL) {
2940 2941
		merge_block = btr_block_get(space, zip_size, right_page_no,
					    RW_X_LATCH, mtr);
2942
		merge_page = buf_block_get_frame(merge_block);
2943
#ifdef UNIV_BTR_DEBUG
2944 2945
		ut_a(btr_page_get_prev(merge_page, mtr)
		     == buf_block_get_page_no(block));
2946
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2947
	} else {
2948
		btr_discard_only_page_on_level(index, block, mtr);
osku's avatar
osku committed
2949 2950 2951 2952

		return;
	}

2953
	page = buf_block_get_frame(block);
osku's avatar
osku committed
2954
	ut_a(page_is_comp(merge_page) == page_is_comp(page));
2955
	btr_search_drop_page_hash_index(block);
2956

marko's avatar
marko committed
2957
	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
osku's avatar
osku committed
2958 2959 2960 2961 2962 2963 2964

		/* We have to mark the leftmost node pointer on the right
		side page as the predefined minimum record */
		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));

		ut_ad(page_rec_is_user_rec(node_ptr));

2965 2966 2967 2968 2969
		/* This will make page_zip_validate() fail on merge_page
		until btr_level_list_remove() completes.  This is harmless,
		because everything will take place within a single
		mini-transaction and because writing to the redo log
		is an atomic operation (performed by mtr_commit()). */
2970
		btr_set_min_rec_mark(node_ptr, mtr);
2971 2972
	}

2973
	btr_node_ptr_delete(index, block, mtr);
osku's avatar
osku committed
2974 2975

	/* Remove the page from the level list */
2976
	btr_level_list_remove(space, zip_size, page, mtr);
2977
#ifdef UNIV_ZIP_DEBUG
2978
	{
2979
		page_zip_des_t*	merge_page_zip
2980
			= buf_block_get_page_zip(merge_block);
2981
		ut_a(!merge_page_zip
2982
		     || page_zip_validate(merge_page_zip, merge_page));
2983
	}
2984
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
2985

marko's avatar
marko committed
2986
	if (left_page_no != FIL_NULL) {
2987 2988
		lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
				    block);
osku's avatar
osku committed
2989
	} else {
2990 2991 2992
		lock_update_discard(merge_block,
				    lock_get_min_heap_no(merge_block),
				    block);
osku's avatar
osku committed
2993 2994 2995
	}

	/* Free the file page */
2996
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
2997

2998
	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
2999
}
osku's avatar
osku committed
3000 3001

#ifdef UNIV_BTR_PRINT
3002
/*************************************************************//**
osku's avatar
osku committed
3003
Prints size info of a B-tree. */
3004
UNIV_INTERN
osku's avatar
osku committed
3005 3006 3007
void
btr_print_size(
/*===========*/
3008
	dict_index_t*	index)	/*!< in: index tree */
osku's avatar
osku committed
3009 3010 3011 3012 3013
{
	page_t*		root;
	fseg_header_t*	seg;
	mtr_t		mtr;

3014
	if (dict_index_is_ibuf(index)) {
3015 3016
		fputs("Sorry, cannot print info of an ibuf tree:"
		      " use ibuf functions\n", stderr);
osku's avatar
osku committed
3017 3018 3019 3020 3021

		return;
	}

	mtr_start(&mtr);
3022

3023
	root = btr_root_get(index, &mtr);
osku's avatar
osku committed
3024 3025 3026 3027 3028 3029

	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;

	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
	fseg_print(seg, &mtr);

3030
	if (!(index->type & DICT_UNIVERSAL)) {
osku's avatar
osku committed
3031 3032 3033 3034 3035 3036 3037

		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;

		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
		fseg_print(seg, &mtr);
	}

3038
	mtr_commit(&mtr);
osku's avatar
osku committed
3039 3040
}

3041
/************************************************************//**
osku's avatar
osku committed
3042 3043 3044 3045 3046
Prints recursively index tree pages. */
static
void
btr_print_recursive(
/*================*/
3047 3048 3049
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: index page */
	ulint		width,	/*!< in: print this many entries from start
osku's avatar
osku committed
3050
				and end */
3051 3052 3053
	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3054
{
3055
	const page_t*	page	= buf_block_get_frame(block);
osku's avatar
osku committed
3056 3057 3058 3059 3060
	page_cur_t	cursor;
	ulint		n_recs;
	ulint		i	= 0;
	mtr_t		mtr2;

3061
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
3062
	fprintf(stderr, "NODE ON LEVEL %lu page number %lu\n",
3063
		(ulong) btr_page_get_level(page, mtr),
marko's avatar
marko committed
3064
		(ulong) buf_block_get_page_no(block));
3065

3066
	page_print(block, index, width, width);
3067

osku's avatar
osku committed
3068
	n_recs = page_get_n_recs(page);
3069

3070
	page_cur_set_before_first(block, &cursor);
osku's avatar
osku committed
3071 3072 3073 3074
	page_cur_move_to_next(&cursor);

	while (!page_cur_is_after_last(&cursor)) {

marko's avatar
marko committed
3075
		if (page_is_leaf(page)) {
osku's avatar
osku committed
3076 3077 3078 3079 3080

			/* If this is the leaf level, do nothing */

		} else if ((i <= width) || (i >= n_recs - width)) {

3081 3082
			const rec_t*	node_ptr;

osku's avatar
osku committed
3083 3084 3085 3086 3087
			mtr_start(&mtr2);

			node_ptr = page_cur_get_rec(&cursor);

			*offsets = rec_get_offsets(node_ptr, index, *offsets,
3088
						   ULINT_UNDEFINED, heap);
3089 3090
			btr_print_recursive(index,
					    btr_node_ptr_get_child(node_ptr,
3091
								   index,
3092 3093 3094
								   *offsets,
								   &mtr2),
					    width, heap, offsets, &mtr2);
osku's avatar
osku committed
3095 3096 3097 3098 3099 3100 3101 3102
			mtr_commit(&mtr2);
		}

		page_cur_move_to_next(&cursor);
		i++;
	}
}

3103
/**************************************************************//**
osku's avatar
osku committed
3104
Prints directories and other info of all nodes in the tree. */
3105
UNIV_INTERN
osku's avatar
osku committed
3106
void
3107 3108
btr_print_index(
/*============*/
3109 3110
	dict_index_t*	index,	/*!< in: index */
	ulint		width)	/*!< in: print this many entries from start
osku's avatar
osku committed
3111 3112 3113
				and end */
{
	mtr_t		mtr;
3114
	buf_block_t*	root;
osku's avatar
osku committed
3115 3116 3117
	mem_heap_t*	heap	= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets	= offsets_;
3118
	rec_offs_init(offsets_);
osku's avatar
osku committed
3119 3120

	fputs("--------------------------\n"
3121
	      "INDEX TREE PRINT\n", stderr);
osku's avatar
osku committed
3122 3123 3124

	mtr_start(&mtr);

3125
	root = btr_root_block_get(index, &mtr);
osku's avatar
osku committed
3126

3127
	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
osku's avatar
osku committed
3128 3129 3130 3131 3132 3133
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	mtr_commit(&mtr);

3134
	btr_validate_index(index, NULL);
osku's avatar
osku committed
3135 3136 3137
}
#endif /* UNIV_BTR_PRINT */

3138
#ifdef UNIV_DEBUG
3139
/************************************************************//**
3140 3141
Checks that the node pointer to a page is appropriate.
@return	TRUE */
3142
UNIV_INTERN
osku's avatar
osku committed
3143 3144 3145
ibool
btr_check_node_ptr(
/*===============*/
3146 3147 3148
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: index page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3149 3150
{
	mem_heap_t*	heap;
3151 3152 3153
	dtuple_t*	tuple;
	ulint*		offsets;
	btr_cur_t	cursor;
3154
	page_t*		page = buf_block_get_frame(block);
osku's avatar
osku committed
3155

3156
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3157
	if (dict_index_get_page(index) == buf_block_get_page_no(block)) {
osku's avatar
osku committed
3158 3159 3160 3161

		return(TRUE);
	}

3162 3163 3164
	heap = mem_heap_create(256);
	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
					    &cursor);
3165

marko's avatar
marko committed
3166
	if (page_is_leaf(page)) {
osku's avatar
osku committed
3167

3168
		goto func_exit;
osku's avatar
osku committed
3169
	}
3170

3171
	tuple = dict_index_build_node_ptr(
3172 3173
		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
		btr_page_get_level(page, mtr));
3174

3175 3176
	ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
func_exit:
osku's avatar
osku committed
3177 3178 3179 3180
	mem_heap_free(heap);

	return(TRUE);
}
3181
#endif /* UNIV_DEBUG */
osku's avatar
osku committed
3182

3183
/************************************************************//**
osku's avatar
osku committed
3184 3185 3186 3187 3188
Display identification information for a record. */
static
void
btr_index_rec_validate_report(
/*==========================*/
3189 3190 3191
	const page_t*		page,	/*!< in: index page */
	const rec_t*		rec,	/*!< in: index record */
	const dict_index_t*	index)	/*!< in: index */
osku's avatar
osku committed
3192 3193 3194 3195
{
	fputs("InnoDB: Record in ", stderr);
	dict_index_name_print(stderr, NULL, index);
	fprintf(stderr, ", page %lu, at offset %lu\n",
3196
		page_get_page_no(page), (ulint) page_offset(rec));
osku's avatar
osku committed
3197 3198
}

3199
/************************************************************//**
osku's avatar
osku committed
3200
Checks the size and number of fields in a record based on the definition of
3201 3202
the index.
@return	TRUE if ok */
3203
UNIV_INTERN
osku's avatar
osku committed
3204 3205
ibool
btr_index_rec_validate(
3206
/*===================*/
3207 3208 3209
	const rec_t*		rec,		/*!< in: index record */
	const dict_index_t*	index,		/*!< in: index */
	ibool			dump_on_error)	/*!< in: TRUE if the function
3210 3211
						should print hex dump of record
						and page on error */
osku's avatar
osku committed
3212 3213 3214 3215
{
	ulint		len;
	ulint		n;
	ulint		i;
3216
	const page_t*	page;
osku's avatar
osku committed
3217 3218 3219
	mem_heap_t*	heap	= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets	= offsets_;
3220
	rec_offs_init(offsets_);
osku's avatar
osku committed
3221

3222
	page = page_align(rec);
osku's avatar
osku committed
3223 3224

	if (UNIV_UNLIKELY(index->type & DICT_UNIVERSAL)) {
3225 3226 3227
		/* The insert buffer index tree can contain records from any
		other index: we cannot check the number of fields or
		their length */
osku's avatar
osku committed
3228

3229
		return(TRUE);
osku's avatar
osku committed
3230 3231
	}

3232
	if (UNIV_UNLIKELY((ibool)!!page_is_comp(page)
3233
			  != dict_table_is_comp(index->table))) {
osku's avatar
osku committed
3234 3235 3236
		btr_index_rec_validate_report(page, rec, index);
		fprintf(stderr, "InnoDB: compact flag=%lu, should be %lu\n",
			(ulong) !!page_is_comp(page),
3237 3238
			(ulong) dict_table_is_comp(index->table));

osku's avatar
osku committed
3239 3240 3241 3242 3243 3244
		return(FALSE);
	}

	n = dict_index_get_n_fields(index);

	if (!page_is_comp(page)
3245
	    && UNIV_UNLIKELY(rec_get_n_fields_old(rec) != n)) {
osku's avatar
osku committed
3246 3247 3248 3249 3250
		btr_index_rec_validate_report(page, rec, index);
		fprintf(stderr, "InnoDB: has %lu fields, should have %lu\n",
			(ulong) rec_get_n_fields_old(rec), (ulong) n);

		if (dump_on_error) {
3251
			buf_page_print(page, 0);
osku's avatar
osku committed
3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262

			fputs("InnoDB: corrupt record ", stderr);
			rec_print_old(stderr, rec);
			putc('\n', stderr);
		}
		return(FALSE);
	}

	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);

	for (i = 0; i < n; i++) {
3263
		ulint	fixed_size = dict_col_get_fixed_size(
3264
			dict_index_get_nth_col(index, i), page_is_comp(page));
osku's avatar
osku committed
3265

3266
		rec_get_nth_field_offs(offsets, i, &len);
osku's avatar
osku committed
3267

3268 3269 3270 3271 3272
		/* Note that if fixed_size != 0, it equals the
		length of a fixed-size column in the clustered index.
		A prefix index of the column is of fixed, but different
		length.  When fixed_size == 0, prefix_len is the maximum
		length of the prefix index column. */
osku's avatar
osku committed
3273 3274

		if ((dict_index_get_nth_field(index, i)->prefix_len == 0
3275 3276 3277 3278 3279
		     && len != UNIV_SQL_NULL && fixed_size
		     && len != fixed_size)
		    || (dict_index_get_nth_field(index, i)->prefix_len > 0
			&& len != UNIV_SQL_NULL
			&& len
3280
			> dict_index_get_nth_field(index, i)->prefix_len)) {
osku's avatar
osku committed
3281 3282 3283

			btr_index_rec_validate_report(page, rec, index);
			fprintf(stderr,
3284 3285
				"InnoDB: field %lu len is %lu,"
				" should be %lu\n",
3286
				(ulong) i, (ulong) len, (ulong) fixed_size);
osku's avatar
osku committed
3287 3288

			if (dump_on_error) {
3289
				buf_page_print(page, 0);
osku's avatar
osku committed
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304

				fputs("InnoDB: corrupt record ", stderr);
				rec_print_new(stderr, rec, offsets);
				putc('\n', stderr);
			}
			if (UNIV_LIKELY_NULL(heap)) {
				mem_heap_free(heap);
			}
			return(FALSE);
		}
	}

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
3305
	return(TRUE);
osku's avatar
osku committed
3306 3307
}

3308
/************************************************************//**
osku's avatar
osku committed
3309
Checks the size and number of fields in records based on the definition of
3310 3311
the index.
@return	TRUE if ok */
osku's avatar
osku committed
3312 3313 3314 3315
static
ibool
btr_index_page_validate(
/*====================*/
3316 3317
	buf_block_t*	block,	/*!< in: index page */
	dict_index_t*	index)	/*!< in: index */
osku's avatar
osku committed
3318
{
3319
	page_cur_t	cur;
osku's avatar
osku committed
3320
	ibool		ret	= TRUE;
3321

3322
	page_cur_set_before_first(block, &cur);
osku's avatar
osku committed
3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338
	page_cur_move_to_next(&cur);

	for (;;) {
		if (page_cur_is_after_last(&cur)) {

			break;
		}

		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {

			return(FALSE);
		}

		page_cur_move_to_next(&cur);
	}

3339
	return(ret);
osku's avatar
osku committed
3340 3341
}

3342
/************************************************************//**
osku's avatar
osku committed
3343 3344 3345 3346
Report an error on one page of an index tree. */
static
void
btr_validate_report1(
3347
/*=================*/
3348 3349 3350
	dict_index_t*		index,	/*!< in: index */
	ulint			level,	/*!< in: B-tree level */
	const buf_block_t*	block)	/*!< in: index page */
osku's avatar
osku committed
3351 3352
{
	fprintf(stderr, "InnoDB: Error in page %lu of ",
3353
		buf_block_get_page_no(block));
osku's avatar
osku committed
3354 3355 3356 3357 3358 3359 3360
	dict_index_name_print(stderr, NULL, index);
	if (level) {
		fprintf(stderr, ", index tree level %lu", level);
	}
	putc('\n', stderr);
}

3361
/************************************************************//**
osku's avatar
osku committed
3362 3363 3364 3365
Report an error on two pages of an index tree. */
static
void
btr_validate_report2(
3366
/*=================*/
3367 3368 3369 3370
	const dict_index_t*	index,	/*!< in: index */
	ulint			level,	/*!< in: B-tree level */
	const buf_block_t*	block1,	/*!< in: first index page */
	const buf_block_t*	block2)	/*!< in: second index page */
osku's avatar
osku committed
3371 3372
{
	fprintf(stderr, "InnoDB: Error in pages %lu and %lu of ",
3373 3374
		buf_block_get_page_no(block1),
		buf_block_get_page_no(block2));
osku's avatar
osku committed
3375 3376 3377 3378 3379 3380 3381
	dict_index_name_print(stderr, NULL, index);
	if (level) {
		fprintf(stderr, ", index tree level %lu", level);
	}
	putc('\n', stderr);
}

3382
/************************************************************//**
3383 3384
Validates index tree level.
@return	TRUE if ok */
osku's avatar
osku committed
3385 3386 3387 3388
static
ibool
btr_validate_level(
/*===============*/
3389 3390 3391
	dict_index_t*	index,	/*!< in: index tree */
	trx_t*		trx,	/*!< in: transaction or NULL */
	ulint		level)	/*!< in: level number */
osku's avatar
osku committed
3392 3393
{
	ulint		space;
3394
	ulint		zip_size;
3395
	buf_block_t*	block;
osku's avatar
osku committed
3396
	page_t*		page;
3397
	buf_block_t*	right_block = 0; /* remove warning */
osku's avatar
osku committed
3398 3399
	page_t*		right_page = 0; /* remove warning */
	page_t*		father_page;
3400 3401
	btr_cur_t	node_cur;
	btr_cur_t	right_node_cur;
osku's avatar
osku committed
3402 3403 3404 3405 3406 3407 3408 3409 3410 3411
	rec_t*		rec;
	ulint		right_page_no;
	ulint		left_page_no;
	page_cur_t	cursor;
	dtuple_t*	node_ptr_tuple;
	ibool		ret	= TRUE;
	mtr_t		mtr;
	mem_heap_t*	heap	= mem_heap_create(256);
	ulint*		offsets	= NULL;
	ulint*		offsets2= NULL;
3412
#ifdef UNIV_ZIP_DEBUG
marko's avatar
marko committed
3413
	page_zip_des_t*	page_zip;
3414
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
3415 3416 3417

	mtr_start(&mtr);

3418
	mtr_x_lock(dict_index_get_lock(index), &mtr);
3419

3420 3421
	block = btr_root_block_get(index, &mtr);
	page = buf_block_get_frame(block);
osku's avatar
osku committed
3422

3423 3424
	space = dict_index_get_space(index);
	zip_size = dict_table_zip_size(index->table);
osku's avatar
osku committed
3425 3426

	while (level != btr_page_get_level(page, &mtr)) {
3427
		const rec_t*	node_ptr;
3428

3429 3430
		ut_a(space == buf_block_get_space(block));
		ut_a(space == page_get_space_id(page));
3431
#ifdef UNIV_ZIP_DEBUG
3432
		page_zip = buf_block_get_page_zip(block);
3433
		ut_a(!page_zip || page_zip_validate(page_zip, page));
3434
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
3435
		ut_a(!page_is_leaf(page));
osku's avatar
osku committed
3436

3437
		page_cur_set_before_first(block, &cursor);
osku's avatar
osku committed
3438 3439 3440 3441
		page_cur_move_to_next(&cursor);

		node_ptr = page_cur_get_rec(&cursor);
		offsets = rec_get_offsets(node_ptr, index, offsets,
3442
					  ULINT_UNDEFINED, &heap);
3443
		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
3444
		page = buf_block_get_frame(block);
osku's avatar
osku committed
3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456
	}

	/* Now we are on the desired level. Loop through the pages on that
	level. */
loop:
	if (trx_is_interrupted(trx)) {
		mtr_commit(&mtr);
		mem_heap_free(heap);
		return(ret);
	}
	mem_heap_empty(heap);
	offsets = offsets2 = NULL;
3457
	mtr_x_lock(dict_index_get_lock(index), &mtr);
osku's avatar
osku committed
3458

3459
#ifdef UNIV_ZIP_DEBUG
3460
	page_zip = buf_block_get_page_zip(block);
3461
	ut_a(!page_zip || page_zip_validate(page_zip, page));
3462
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
3463

osku's avatar
osku committed
3464 3465 3466
	/* Check ordering etc. of records */

	if (!page_validate(page, index)) {
3467
		btr_validate_report1(index, level, block);
osku's avatar
osku committed
3468 3469 3470 3471 3472 3473

		ret = FALSE;
	} else if (level == 0) {
		/* We are on level 0. Check that the records have the right
		number of fields, and field lengths are right. */

3474
		if (!btr_index_page_validate(block, index)) {
osku's avatar
osku committed
3475 3476 3477 3478

			ret = FALSE;
		}
	}
3479

osku's avatar
osku committed
3480 3481 3482 3483 3484
	ut_a(btr_page_get_level(page, &mtr) == level);

	right_page_no = btr_page_get_next(page, &mtr);
	left_page_no = btr_page_get_prev(page, &mtr);

3485
	ut_a(page_get_n_recs(page) > 0 || (level == 0
3486
					   && page_get_page_no(page)
3487
					   == dict_index_get_page(index)));
osku's avatar
osku committed
3488 3489

	if (right_page_no != FIL_NULL) {
3490
		const rec_t*	right_rec;
3491 3492
		right_block = btr_block_get(space, zip_size, right_page_no,
					    RW_X_LATCH, &mtr);
3493
		right_page = buf_block_get_frame(right_block);
3494
		if (UNIV_UNLIKELY(btr_page_get_prev(right_page, &mtr)
3495
				  != page_get_page_no(page))) {
3496
			btr_validate_report2(index, level, block, right_block);
3497
			fputs("InnoDB: broken FIL_PAGE_NEXT"
3498
			      " or FIL_PAGE_PREV links\n", stderr);
3499 3500
			buf_page_print(page, 0);
			buf_page_print(right_page, 0);
3501 3502 3503 3504 3505

			ret = FALSE;
		}

		if (UNIV_UNLIKELY(page_is_comp(right_page)
3506
				  != page_is_comp(page))) {
3507
			btr_validate_report2(index, level, block, right_block);
3508
			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
3509 3510
			buf_page_print(page, 0);
			buf_page_print(right_page, 0);
3511 3512 3513 3514 3515 3516

			ret = FALSE;

			goto node_ptr_fails;
		}

osku's avatar
osku committed
3517
		rec = page_rec_get_prev(page_get_supremum_rec(page));
3518 3519
		right_rec = page_rec_get_next(page_get_infimum_rec(
						      right_page));
osku's avatar
osku committed
3520
		offsets = rec_get_offsets(rec, index,
3521
					  offsets, ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
3522
		offsets2 = rec_get_offsets(right_rec, index,
3523
					   offsets2, ULINT_UNDEFINED, &heap);
3524
		if (UNIV_UNLIKELY(cmp_rec_rec(rec, right_rec,
3525 3526
					      offsets, offsets2,
					      index) >= 0)) {
osku's avatar
osku committed
3527

3528
			btr_validate_report2(index, level, block, right_block);
osku's avatar
osku committed
3529 3530

			fputs("InnoDB: records in wrong order"
3531
			      " on adjacent pages\n", stderr);
osku's avatar
osku committed
3532

3533 3534
			buf_page_print(page, 0);
			buf_page_print(right_page, 0);
osku's avatar
osku committed
3535 3536 3537 3538 3539 3540

			fputs("InnoDB: record ", stderr);
			rec = page_rec_get_prev(page_get_supremum_rec(page));
			rec_print(stderr, rec, index);
			putc('\n', stderr);
			fputs("InnoDB: record ", stderr);
3541 3542
			rec = page_rec_get_next(
				page_get_infimum_rec(right_page));
osku's avatar
osku committed
3543 3544 3545
			rec_print(stderr, rec, index);
			putc('\n', stderr);

3546 3547
			ret = FALSE;
		}
osku's avatar
osku committed
3548
	}
3549

osku's avatar
osku committed
3550
	if (level > 0 && left_page_no == FIL_NULL) {
3551 3552 3553
		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
			     page_rec_get_next(page_get_infimum_rec(page)),
			     page_is_comp(page)));
osku's avatar
osku committed
3554 3555
	}

3556
	if (buf_block_get_page_no(block) != dict_index_get_page(index)) {
osku's avatar
osku committed
3557 3558

		/* Check father node pointers */
3559

3560
		rec_t*	node_ptr;
3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571

		offsets = btr_page_get_father_block(offsets, heap, index,
						    block, &mtr, &node_cur);
		father_page = btr_cur_get_page(&node_cur);
		node_ptr = btr_cur_get_rec(&node_cur);

		btr_cur_position(
			index, page_rec_get_prev(page_get_supremum_rec(page)),
			block, &node_cur);
		offsets = btr_page_get_father_node_ptr(offsets, heap,
						       &node_cur, &mtr);
3572

3573 3574 3575 3576
		if (UNIV_UNLIKELY(node_ptr != btr_cur_get_rec(&node_cur))
		    || UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr,
								    offsets)
				     != buf_block_get_page_no(block))) {
3577

3578
			btr_validate_report1(index, level, block);
osku's avatar
osku committed
3579 3580

			fputs("InnoDB: node pointer to the page is wrong\n",
3581
			      stderr);
osku's avatar
osku committed
3582

3583 3584
			buf_page_print(father_page, 0);
			buf_page_print(page, 0);
osku's avatar
osku committed
3585 3586

			fputs("InnoDB: node ptr ", stderr);
3587
			rec_print(stderr, node_ptr, index);
osku's avatar
osku committed
3588

3589
			rec = btr_cur_get_rec(&node_cur);
osku's avatar
osku committed
3590 3591
			fprintf(stderr, "\n"
				"InnoDB: node ptr child page n:o %lu\n",
3592
				(ulong) btr_node_ptr_get_child_page_no(
3593
					rec, offsets));
osku's avatar
osku committed
3594 3595

			fputs("InnoDB: record on page ", stderr);
3596
			rec_print_new(stderr, rec, offsets);
osku's avatar
osku committed
3597
			putc('\n', stderr);
3598
			ret = FALSE;
osku's avatar
osku committed
3599

3600
			goto node_ptr_fails;
osku's avatar
osku committed
3601 3602
		}

marko's avatar
marko committed
3603
		if (!page_is_leaf(page)) {
3604 3605 3606 3607
			node_ptr_tuple = dict_index_build_node_ptr(
				index,
				page_rec_get_next(page_get_infimum_rec(page)),
				0, heap, btr_page_get_level(page, &mtr));
osku's avatar
osku committed
3608 3609

			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
3610
					   offsets)) {
3611
				const rec_t* first_rec = page_rec_get_next(
3612
					page_get_infimum_rec(page));
osku's avatar
osku committed
3613

3614
				btr_validate_report1(index, level, block);
osku's avatar
osku committed
3615

3616 3617
				buf_page_print(father_page, 0);
				buf_page_print(page, 0);
osku's avatar
osku committed
3618 3619

				fputs("InnoDB: Error: node ptrs differ"
3620 3621
				      " on levels > 0\n"
				      "InnoDB: node ptr ", stderr);
osku's avatar
osku committed
3622 3623 3624 3625
				rec_print_new(stderr, node_ptr, offsets);
				fputs("InnoDB: first rec ", stderr);
				rec_print(stderr, first_rec, index);
				putc('\n', stderr);
3626
				ret = FALSE;
osku's avatar
osku committed
3627

3628
				goto node_ptr_fails;
osku's avatar
osku committed
3629 3630 3631 3632
			}
		}

		if (left_page_no == FIL_NULL) {
3633 3634
			ut_a(node_ptr == page_rec_get_next(
				     page_get_infimum_rec(father_page)));
osku's avatar
osku committed
3635 3636 3637 3638
			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
		}

		if (right_page_no == FIL_NULL) {
3639 3640
			ut_a(node_ptr == page_rec_get_prev(
				     page_get_supremum_rec(father_page)));
osku's avatar
osku committed
3641
			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
3642
		} else {
3643 3644 3645
			const rec_t*	right_node_ptr
				= page_rec_get_next(node_ptr);

3646 3647 3648
			offsets = btr_page_get_father_block(
				offsets, heap, index, right_block,
				&mtr, &right_node_cur);
3649
			if (right_node_ptr
3650
			    != page_get_supremum_rec(father_page)) {
osku's avatar
osku committed
3651

3652
				if (btr_cur_get_rec(&right_node_cur)
3653
				    != right_node_ptr) {
osku's avatar
osku committed
3654
					ret = FALSE;
3655 3656 3657
					fputs("InnoDB: node pointer to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
3658 3659

					btr_validate_report1(index, level,
3660
							     block);
osku's avatar
osku committed
3661

3662 3663 3664
					buf_page_print(father_page, 0);
					buf_page_print(page, 0);
					buf_page_print(right_page, 0);
osku's avatar
osku committed
3665 3666
				}
			} else {
3667 3668
				page_t*	right_father_page
					= btr_cur_get_page(&right_node_cur);
3669

3670 3671
				if (btr_cur_get_rec(&right_node_cur)
				    != page_rec_get_next(
3672 3673
					    page_get_infimum_rec(
						    right_father_page))) {
osku's avatar
osku committed
3674
					ret = FALSE;
3675 3676 3677
					fputs("InnoDB: node pointer 2 to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
3678 3679

					btr_validate_report1(index, level,
3680
							     block);
osku's avatar
osku committed
3681

3682 3683 3684 3685
					buf_page_print(father_page, 0);
					buf_page_print(right_father_page, 0);
					buf_page_print(page, 0);
					buf_page_print(right_page, 0);
osku's avatar
osku committed
3686 3687
				}

3688
				if (page_get_page_no(right_father_page)
3689
				    != btr_page_get_next(father_page, &mtr)) {
osku's avatar
osku committed
3690 3691

					ret = FALSE;
3692 3693 3694
					fputs("InnoDB: node pointer 3 to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
3695 3696

					btr_validate_report1(index, level,
3697
							     block);
osku's avatar
osku committed
3698

3699 3700 3701 3702
					buf_page_print(father_page, 0);
					buf_page_print(right_father_page, 0);
					buf_page_print(page, 0);
					buf_page_print(right_page, 0);
osku's avatar
osku committed
3703
				}
3704
			}
osku's avatar
osku committed
3705 3706 3707 3708
		}
	}

node_ptr_fails:
3709 3710 3711
	/* Commit the mini-transaction to release the latch on 'page'.
	Re-acquire the latch on right_page, which will become 'page'
	on the next loop.  The page has already been checked. */
osku's avatar
osku committed
3712 3713 3714 3715
	mtr_commit(&mtr);

	if (right_page_no != FIL_NULL) {
		mtr_start(&mtr);
3716

3717 3718
		block = btr_block_get(space, zip_size, right_page_no,
				      RW_X_LATCH, &mtr);
3719
		page = buf_block_get_frame(block);
osku's avatar
osku committed
3720 3721 3722 3723 3724 3725 3726 3727

		goto loop;
	}

	mem_heap_free(heap);
	return(ret);
}

3728
/**************************************************************//**
3729 3730
Checks the consistency of an index tree.
@return	TRUE if ok */
3731
UNIV_INTERN
osku's avatar
osku committed
3732
ibool
3733 3734
btr_validate_index(
/*===============*/
3735 3736
	dict_index_t*	index,	/*!< in: index */
	trx_t*		trx)	/*!< in: transaction or NULL */
osku's avatar
osku committed
3737 3738 3739 3740 3741 3742 3743
{
	mtr_t	mtr;
	page_t*	root;
	ulint	i;
	ulint	n;

	mtr_start(&mtr);
3744
	mtr_x_lock(dict_index_get_lock(index), &mtr);
osku's avatar
osku committed
3745

3746
	root = btr_root_get(index, &mtr);
osku's avatar
osku committed
3747 3748 3749
	n = btr_page_get_level(root, &mtr);

	for (i = 0; i <= n && !trx_is_interrupted(trx); i++) {
3750
		if (!btr_validate_level(index, trx, n - i)) {
osku's avatar
osku committed
3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761

			mtr_commit(&mtr);

			return(FALSE);
		}
	}

	mtr_commit(&mtr);

	return(TRUE);
}
3762
#endif /* !UNIV_HOTBACKUP */