btr0btr.c 126 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1994, 2012, Oracle and/or its affiliates. All Rights Reserved.
4 5 6 7 8 9 10 11 12 13

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
14 15
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 17 18

*****************************************************************************/

19 20
/**************************************************//**
@file btr/btr0btr.c
osku's avatar
osku committed
21 22 23 24
The B-tree

Created 6/2/1994 Heikki Tuuri
*******************************************************/
25

osku's avatar
osku committed
26 27 28 29 30 31 32 33
#include "btr0btr.h"

#ifdef UNIV_NONINL
#include "btr0btr.ic"
#endif

#include "fsp0fsp.h"
#include "page0page.h"
marko's avatar
marko committed
34
#include "page0zip.h"
35 36

#ifndef UNIV_HOTBACKUP
osku's avatar
osku committed
37 38 39 40 41 42 43 44
#include "btr0cur.h"
#include "btr0sea.h"
#include "btr0pcur.h"
#include "rem0cmp.h"
#include "lock0lock.h"
#include "ibuf0ibuf.h"
#include "trx0trx.h"

45
#endif /* UNIV_HOTBACKUP */
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
/**************************************************************//**
Report that an index page is corrupted. */
UNIV_INTERN
void
btr_corruption_report(
/*==================*/
	const buf_block_t*	block,	/*!< in: corrupted block */
	const dict_index_t*	index)	/*!< in: index tree */
{
	fprintf(stderr, "InnoDB: flag mismatch in space %u page %u"
		" index %s of table %s\n",
		(unsigned) buf_block_get_space(block),
		(unsigned) buf_block_get_page_no(block),
		index->name, index->table_name);
	if (block->page.zip.data) {
		buf_page_print(block->page.zip.data,
62 63
			       buf_block_get_zip_size(block),
			       BUF_PAGE_PRINT_NO_CRASH);
64
	}
65
	buf_page_print(buf_block_get_frame(block), 0, 0);
66 67
}

68
#ifndef UNIV_HOTBACKUP
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
#ifdef UNIV_BLOB_DEBUG
# include "srv0srv.h"
# include "ut0rbt.h"

/** TRUE when messages about index->blobs modification are enabled. */
static ibool btr_blob_dbg_msg;

/** Issue a message about an operation on index->blobs.
@param op	operation
@param b	the entry being subjected to the operation
@param ctx	the context of the operation */
#define btr_blob_dbg_msg_issue(op, b, ctx)			\
	fprintf(stderr, op " %u:%u:%u->%u %s(%u,%u,%u)\n",	\
		(b)->ref_page_no, (b)->ref_heap_no,		\
		(b)->ref_field_no, (b)->blob_page_no, ctx,	\
		(b)->owner, (b)->always_owner, (b)->del)

/** Insert to index->blobs a reference to an off-page column.
@param index	the index tree
@param b	the reference
@param ctx	context (for logging) */
UNIV_INTERN
void
btr_blob_dbg_rbt_insert(
/*====================*/
	dict_index_t*		index,	/*!< in/out: index tree */
	const btr_blob_dbg_t*	b,	/*!< in: the reference */
	const char*		ctx)	/*!< in: context (for logging) */
{
	if (btr_blob_dbg_msg) {
		btr_blob_dbg_msg_issue("insert", b, ctx);
	}
	mutex_enter(&index->blobs_mutex);
	rbt_insert(index->blobs, b, b);
	mutex_exit(&index->blobs_mutex);
}

/** Remove from index->blobs a reference to an off-page column.
@param index	the index tree
@param b	the reference
@param ctx	context (for logging) */
UNIV_INTERN
void
btr_blob_dbg_rbt_delete(
/*====================*/
	dict_index_t*		index,	/*!< in/out: index tree */
	const btr_blob_dbg_t*	b,	/*!< in: the reference */
	const char*		ctx)	/*!< in: context (for logging) */
{
	if (btr_blob_dbg_msg) {
		btr_blob_dbg_msg_issue("delete", b, ctx);
	}
	mutex_enter(&index->blobs_mutex);
	ut_a(rbt_delete(index->blobs, b));
	mutex_exit(&index->blobs_mutex);
}

/**************************************************************//**
Comparator for items (btr_blob_dbg_t) in index->blobs.
The key in index->blobs is (ref_page_no, ref_heap_no, ref_field_no).
@return negative, 0 or positive if *a<*b, *a=*b, *a>*b */
static
int
btr_blob_dbg_cmp(
/*=============*/
	const void*	a,	/*!< in: first btr_blob_dbg_t to compare */
	const void*	b)	/*!< in: second btr_blob_dbg_t to compare */
{
	const btr_blob_dbg_t*	aa	= a;
	const btr_blob_dbg_t*	bb	= b;

	ut_ad(aa != NULL);
	ut_ad(bb != NULL);

	if (aa->ref_page_no != bb->ref_page_no) {
		return(aa->ref_page_no < bb->ref_page_no ? -1 : 1);
	}
	if (aa->ref_heap_no != bb->ref_heap_no) {
		return(aa->ref_heap_no < bb->ref_heap_no ? -1 : 1);
	}
	if (aa->ref_field_no != bb->ref_field_no) {
		return(aa->ref_field_no < bb->ref_field_no ? -1 : 1);
	}
	return(0);
}

/**************************************************************//**
Add a reference to an off-page column to the index->blobs map. */
UNIV_INTERN
void
btr_blob_dbg_add_blob(
/*==================*/
	const rec_t*	rec,		/*!< in: clustered index record */
	ulint		field_no,	/*!< in: off-page column number */
	ulint		page_no,	/*!< in: start page of the column */
	dict_index_t*	index,		/*!< in/out: index tree */
	const char*	ctx)		/*!< in: context (for logging) */
{
	btr_blob_dbg_t	b;
	const page_t*	page	= page_align(rec);

	ut_a(index->blobs);

	b.blob_page_no = page_no;
	b.ref_page_no = page_get_page_no(page);
	b.ref_heap_no = page_rec_get_heap_no(rec);
	b.ref_field_no = field_no;
	ut_a(b.ref_field_no >= index->n_uniq);
	b.always_owner = b.owner = TRUE;
	b.del = FALSE;
	ut_a(!rec_get_deleted_flag(rec, page_is_comp(page)));
	btr_blob_dbg_rbt_insert(index, &b, ctx);
}

/**************************************************************//**
Add to index->blobs any references to off-page columns from a record.
@return number of references added */
UNIV_INTERN
ulint
btr_blob_dbg_add_rec(
/*=================*/
	const rec_t*	rec,	/*!< in: record */
	dict_index_t*	index,	/*!< in/out: index */
	const ulint*	offsets,/*!< in: offsets */
	const char*	ctx)	/*!< in: context (for logging) */
{
	ulint		count	= 0;
	ulint		i;
	btr_blob_dbg_t	b;
	ibool		del;

	ut_ad(rec_offs_validate(rec, index, offsets));

	if (!rec_offs_any_extern(offsets)) {
		return(0);
	}

	b.ref_page_no = page_get_page_no(page_align(rec));
	b.ref_heap_no = page_rec_get_heap_no(rec);
	del = (rec_get_deleted_flag(rec, rec_offs_comp(offsets)) != 0);

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		if (rec_offs_nth_extern(offsets, i)) {
			ulint		len;
			const byte*	field_ref = rec_get_nth_field(
				rec, offsets, i, &len);

			ut_a(len != UNIV_SQL_NULL);
			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;

			if (!memcmp(field_ref, field_ref_zero,
				    BTR_EXTERN_FIELD_REF_SIZE)) {
				/* the column has not been stored yet */
				continue;
			}

			b.ref_field_no = i;
			b.blob_page_no = mach_read_from_4(
				field_ref + BTR_EXTERN_PAGE_NO);
			ut_a(b.ref_field_no >= index->n_uniq);
			b.always_owner = b.owner
				= !(field_ref[BTR_EXTERN_LEN]
				    & BTR_EXTERN_OWNER_FLAG);
			b.del = del;

			btr_blob_dbg_rbt_insert(index, &b, ctx);
			count++;
		}
	}

	return(count);
}

/**************************************************************//**
Display the references to off-page columns.
This function is to be called from a debugger,
for example when a breakpoint on ut_dbg_assertion_failed is hit. */
UNIV_INTERN
void
btr_blob_dbg_print(
/*===============*/
	const dict_index_t*	index)	/*!< in: index tree */
{
	const ib_rbt_node_t*	node;

	if (!index->blobs) {
		return;
	}

	/* We intentionally do not acquire index->blobs_mutex here.
	This function is to be called from a debugger, and the caller
	should make sure that the index->blobs_mutex is held. */

	for (node = rbt_first(index->blobs);
	     node != NULL; node = rbt_next(index->blobs, node)) {
		const btr_blob_dbg_t*	b
			= rbt_value(btr_blob_dbg_t, node);
		fprintf(stderr, "%u:%u:%u->%u%s%s%s\n",
			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
			b->blob_page_no,
			b->owner ? "" : "(disowned)",
			b->always_owner ? "" : "(has disowned)",
			b->del ? "(deleted)" : "");
	}
}

/**************************************************************//**
Remove from index->blobs any references to off-page columns from a record.
@return number of references removed */
UNIV_INTERN
ulint
btr_blob_dbg_remove_rec(
/*====================*/
	const rec_t*	rec,	/*!< in: record */
	dict_index_t*	index,	/*!< in/out: index */
	const ulint*	offsets,/*!< in: offsets */
	const char*	ctx)	/*!< in: context (for logging) */
{
	ulint		i;
	ulint		count	= 0;
	btr_blob_dbg_t	b;

	ut_ad(rec_offs_validate(rec, index, offsets));

	if (!rec_offs_any_extern(offsets)) {
		return(0);
	}

	b.ref_page_no = page_get_page_no(page_align(rec));
	b.ref_heap_no = page_rec_get_heap_no(rec);

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		if (rec_offs_nth_extern(offsets, i)) {
			ulint		len;
			const byte*	field_ref = rec_get_nth_field(
				rec, offsets, i, &len);

			ut_a(len != UNIV_SQL_NULL);
			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;

			b.ref_field_no = i;
			b.blob_page_no = mach_read_from_4(
				field_ref + BTR_EXTERN_PAGE_NO);

			switch (b.blob_page_no) {
			case 0:
				/* The column has not been stored yet.
				The BLOB pointer must be all zero.
				There cannot be a BLOB starting at
				page 0, because page 0 is reserved for
				the tablespace header. */
				ut_a(!memcmp(field_ref, field_ref_zero,
					     BTR_EXTERN_FIELD_REF_SIZE));
				/* fall through */
			case FIL_NULL:
				/* the column has been freed already */
				continue;
			}

			btr_blob_dbg_rbt_delete(index, &b, ctx);
			count++;
		}
	}

	return(count);
}

/**************************************************************//**
Check that there are no references to off-page columns from or to
the given page. Invoked when freeing or clearing a page.
@return TRUE when no orphan references exist */
UNIV_INTERN
ibool
btr_blob_dbg_is_empty(
/*==================*/
	dict_index_t*	index,		/*!< in: index */
	ulint		page_no)	/*!< in: page number */
{
	const ib_rbt_node_t*	node;
	ibool			success	= TRUE;

	if (!index->blobs) {
		return(success);
	}

	mutex_enter(&index->blobs_mutex);

	for (node = rbt_first(index->blobs);
	     node != NULL; node = rbt_next(index->blobs, node)) {
		const btr_blob_dbg_t*	b
			= rbt_value(btr_blob_dbg_t, node);

		if (b->ref_page_no != page_no && b->blob_page_no != page_no) {
			continue;
		}

		fprintf(stderr,
			"InnoDB: orphan BLOB ref%s%s%s %u:%u:%u->%u\n",
			b->owner ? "" : "(disowned)",
			b->always_owner ? "" : "(has disowned)",
			b->del ? "(deleted)" : "",
			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
			b->blob_page_no);

		if (b->blob_page_no != page_no || b->owner || !b->del) {
			success = FALSE;
		}
	}

	mutex_exit(&index->blobs_mutex);
	return(success);
}

/**************************************************************//**
Count and process all references to off-page columns on a page.
@return number of references processed */
UNIV_INTERN
ulint
btr_blob_dbg_op(
/*============*/
	const page_t*		page,	/*!< in: B-tree leaf page */
	const rec_t*		rec,	/*!< in: record to start from
					(NULL to process the whole page) */
	dict_index_t*		index,	/*!< in/out: index */
	const char*		ctx,	/*!< in: context (for logging) */
	const btr_blob_dbg_op_f	op)	/*!< in: operation on records */
{
	ulint		count	= 0;
	mem_heap_t*	heap	= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets	= offsets_;
	rec_offs_init(offsets_);

	ut_a(fil_page_get_type(page) == FIL_PAGE_INDEX);
	ut_a(!rec || page_align(rec) == page);

	if (!index->blobs || !page_is_leaf(page)
	    || !dict_index_is_clust(index)) {
		return(0);
	}

	if (rec == NULL) {
		rec = page_get_infimum_rec(page);
	}

	do {
		offsets = rec_get_offsets(rec, index, offsets,
					  ULINT_UNDEFINED, &heap);
		count += op(rec, index, offsets, ctx);
		rec = page_rec_get_next_const(rec);
	} while (!page_rec_is_supremum(rec));

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	return(count);
}

/**************************************************************//**
Count and add to index->blobs any references to off-page columns
from records on a page.
@return number of references added */
UNIV_INTERN
ulint
btr_blob_dbg_add(
/*=============*/
	const page_t*	page,	/*!< in: rewritten page */
	dict_index_t*	index,	/*!< in/out: index */
	const char*	ctx)	/*!< in: context (for logging) */
{
	btr_blob_dbg_assert_empty(index, page_get_page_no(page));

	return(btr_blob_dbg_op(page, NULL, index, ctx, btr_blob_dbg_add_rec));
}

/**************************************************************//**
Count and remove from index->blobs any references to off-page columns
from records on a page.
Used when reorganizing a page, before copying the records.
@return number of references removed */
UNIV_INTERN
ulint
btr_blob_dbg_remove(
/*================*/
	const page_t*	page,	/*!< in: b-tree page */
	dict_index_t*	index,	/*!< in/out: index */
	const char*	ctx)	/*!< in: context (for logging) */
{
	ulint	count;

	count = btr_blob_dbg_op(page, NULL, index, ctx,
				btr_blob_dbg_remove_rec);

	/* Check that no references exist. */
	btr_blob_dbg_assert_empty(index, page_get_page_no(page));

	return(count);
}

/**************************************************************//**
Restore in index->blobs any references to off-page columns
Used when page reorganize fails due to compressed page overflow. */
UNIV_INTERN
void
btr_blob_dbg_restore(
/*=================*/
	const page_t*	npage,	/*!< in: page that failed to compress  */
	const page_t*	page,	/*!< in: copy of original page */
	dict_index_t*	index,	/*!< in/out: index */
	const char*	ctx)	/*!< in: context (for logging) */
{
	ulint	removed;
	ulint	added;

	ut_a(page_get_page_no(npage) == page_get_page_no(page));
	ut_a(page_get_space_id(npage) == page_get_space_id(page));

	removed = btr_blob_dbg_remove(npage, index, ctx);
	added = btr_blob_dbg_add(page, index, ctx);
	ut_a(added == removed);
}

/**************************************************************//**
Modify the 'deleted' flag of a record. */
UNIV_INTERN
void
btr_blob_dbg_set_deleted_flag(
/*==========================*/
	const rec_t*		rec,	/*!< in: record */
	dict_index_t*		index,	/*!< in/out: index */
	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
	ibool			del)	/*!< in: TRUE=deleted, FALSE=exists */
{
	const ib_rbt_node_t*	node;
	btr_blob_dbg_t		b;
	btr_blob_dbg_t*		c;
	ulint			i;

	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_a(dict_index_is_clust(index));
	ut_a(del == !!del);/* must be FALSE==0 or TRUE==1 */

	if (!rec_offs_any_extern(offsets) || !index->blobs) {

		return;
	}

	b.ref_page_no = page_get_page_no(page_align(rec));
	b.ref_heap_no = page_rec_get_heap_no(rec);

	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
		if (rec_offs_nth_extern(offsets, i)) {
			ulint		len;
			const byte*	field_ref = rec_get_nth_field(
				rec, offsets, i, &len);

			ut_a(len != UNIV_SQL_NULL);
			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;

			b.ref_field_no = i;
			b.blob_page_no = mach_read_from_4(
				field_ref + BTR_EXTERN_PAGE_NO);

			switch (b.blob_page_no) {
			case 0:
				ut_a(memcmp(field_ref, field_ref_zero,
					    BTR_EXTERN_FIELD_REF_SIZE));
				/* page number 0 is for the
				page allocation bitmap */
			case FIL_NULL:
				/* the column has been freed already */
				ut_error;
			}

			mutex_enter(&index->blobs_mutex);
			node = rbt_lookup(index->blobs, &b);
			ut_a(node);

			c = rbt_value(btr_blob_dbg_t, node);
			/* The flag should be modified. */
			c->del = del;
			if (btr_blob_dbg_msg) {
				b = *c;
				mutex_exit(&index->blobs_mutex);
				btr_blob_dbg_msg_issue("del_mk", &b, "");
			} else {
				mutex_exit(&index->blobs_mutex);
			}
		}
	}
}

/**************************************************************//**
Change the ownership of an off-page column. */
UNIV_INTERN
void
btr_blob_dbg_owner(
/*===============*/
	const rec_t*		rec,	/*!< in: record */
	dict_index_t*		index,	/*!< in/out: index */
	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
	ulint			i,	/*!< in: ith field in rec */
	ibool			own)	/*!< in: TRUE=owned, FALSE=disowned */
{
	const ib_rbt_node_t*	node;
	btr_blob_dbg_t		b;
	const byte*		field_ref;
	ulint			len;

	ut_ad(rec_offs_validate(rec, index, offsets));
	ut_a(rec_offs_nth_extern(offsets, i));

	field_ref = rec_get_nth_field(rec, offsets, i, &len);
	ut_a(len != UNIV_SQL_NULL);
	ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
	field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;

	b.ref_page_no = page_get_page_no(page_align(rec));
	b.ref_heap_no = page_rec_get_heap_no(rec);
	b.ref_field_no = i;
	b.owner = !(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG);
	b.blob_page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);

	ut_a(b.owner == own);

	mutex_enter(&index->blobs_mutex);
	node = rbt_lookup(index->blobs, &b);
	/* row_ins_clust_index_entry_by_modify() invokes
	btr_cur_unmark_extern_fields() also for the newly inserted
	references, which are all zero bytes until the columns are stored.
	The node lookup must fail if and only if that is the case. */
	ut_a(!memcmp(field_ref, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE)
	     == !node);

	if (node) {
		btr_blob_dbg_t*	c = rbt_value(btr_blob_dbg_t, node);
		/* Some code sets ownership from TRUE to TRUE.
		We do not allow changing ownership from FALSE to FALSE. */
		ut_a(own || c->owner);

		c->owner = own;
		if (!own) {
			c->always_owner = FALSE;
		}
	}

	mutex_exit(&index->blobs_mutex);
}
#endif /* UNIV_BLOB_DEBUG */

osku's avatar
osku committed
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
/*
Latching strategy of the InnoDB B-tree
--------------------------------------
A tree latch protects all non-leaf nodes of the tree. Each node of a tree
also has a latch of its own.

A B-tree operation normally first acquires an S-latch on the tree. It
searches down the tree and releases the tree latch when it has the
leaf node latch. To save CPU time we do not acquire any latch on
non-leaf nodes of the tree during a search, those pages are only bufferfixed.

If an operation needs to restructure the tree, it acquires an X-latch on
the tree before searching to a leaf node. If it needs, for example, to
split a leaf,
(1) InnoDB decides the split point in the leaf,
(2) allocates a new page,
(3) inserts the appropriate node pointer to the first non-leaf level,
(4) releases the tree X-latch,
(5) and then moves records from the leaf to the new allocated page.

Node pointers
-------------
Leaf pages of a B-tree contain the index records stored in the
tree. On levels n > 0 we store 'node pointers' to pages on level
n - 1. For each page there is exactly one node pointer stored:
thus the our tree is an ordinary B-tree, not a B-link tree.

A node pointer contains a prefix P of an index record. The prefix
is long enough so that it determines an index record uniquely.
The file page number of the child page is added as the last
field. To the child page we can store node pointers or index records
which are >= P in the alphabetical order, but < P1 if there is
a next node pointer on the level, and P1 is its prefix.

657
If a node pointer with a prefix P points to a non-leaf child,
osku's avatar
osku committed
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
then the leftmost record in the child must have the same
prefix P. If it points to a leaf node, the child is not required
to contain any record with a prefix equal to P. The leaf case
is decided this way to allow arbitrary deletions in a leaf node
without touching upper levels of the tree.

We have predefined a special minimum record which we
define as the smallest record in any alphabetical order.
A minimum record is denoted by setting a bit in the record
header. A minimum record acts as the prefix of a node pointer
which points to a leftmost node on any level of the tree.

File page allocation
--------------------
In the root node of a B-tree there are two file segment headers.
The leaf pages of a tree are allocated from one file segment, to
make them consecutive on disk if possible. From the other file segment
we allocate pages for the non-leaf levels of the tree.
*/

678
#ifdef UNIV_BTR_DEBUG
679
/**************************************************************//**
680 681
Checks a file segment header within a B-tree root page.
@return	TRUE if valid */
682 683 684 685
static
ibool
btr_root_fseg_validate(
/*===================*/
686 687
	const fseg_header_t*	seg_header,	/*!< in: segment header */
	ulint			space)		/*!< in: tablespace identifier */
688 689 690 691 692 693 694 695 696 697
{
	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);

	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
	ut_a(offset >= FIL_PAGE_DATA);
	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
	return(TRUE);
}
#endif /* UNIV_BTR_DEBUG */

698
/**************************************************************//**
699 700
Gets the root node of a tree and x-latches it.
@return	root page, x-latched */
osku's avatar
osku committed
701
static
702 703 704
buf_block_t*
btr_root_block_get(
/*===============*/
705 706
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
707 708
{
	ulint		space;
709
	ulint		zip_size;
710 711 712 713
	ulint		root_page_no;
	buf_block_t*	block;

	space = dict_index_get_space(index);
714
	zip_size = dict_table_zip_size(index->table);
715 716
	root_page_no = dict_index_get_page(index);

717 718
	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH,
			      index, mtr);
719
	btr_assert_not_corrupted(block, index);
720
#ifdef UNIV_BTR_DEBUG
721 722 723 724 725 726 727 728
	if (!dict_index_is_ibuf(index)) {
		const page_t*	root = buf_block_get_frame(block);

		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}
729
#endif /* UNIV_BTR_DEBUG */
730 731 732

	return(block);
}
osku's avatar
osku committed
733

734
/**************************************************************//**
735 736
Gets the root node of a tree and x-latches it.
@return	root page, x-latched */
737
UNIV_INTERN
osku's avatar
osku committed
738 739 740
page_t*
btr_root_get(
/*=========*/
741 742
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
743
{
744
	return(buf_block_get_frame(btr_root_block_get(index, mtr)));
osku's avatar
osku committed
745 746
}

747
/*************************************************************//**
osku's avatar
osku committed
748
Gets pointer to the previous user record in the tree. It is assumed that
749 750
the caller has appropriate latches on the page and its neighbor.
@return	previous user record, NULL if there is none */
751
UNIV_INTERN
osku's avatar
osku committed
752 753 754
rec_t*
btr_get_prev_user_rec(
/*==================*/
755 756
	rec_t*	rec,	/*!< in: record on leaf level */
	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
osku's avatar
osku committed
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
			needed, also to the previous page */
{
	page_t*	page;
	page_t*	prev_page;
	ulint	prev_page_no;

	if (!page_rec_is_infimum(rec)) {

		rec_t*	prev_rec = page_rec_get_prev(rec);

		if (!page_rec_is_infimum(prev_rec)) {

			return(prev_rec);
		}
	}
772

773
	page = page_align(rec);
osku's avatar
osku committed
774
	prev_page_no = btr_page_get_prev(page, mtr);
775

osku's avatar
osku committed
776 777
	if (prev_page_no != FIL_NULL) {

778 779
		ulint		space;
		ulint		zip_size;
780 781
		buf_block_t*	prev_block;

782 783 784 785 786
		space = page_get_space_id(page);
		zip_size = fil_space_get_zip_size(space);

		prev_block = buf_page_get_with_no_latch(space, zip_size,
							prev_page_no, mtr);
787
		prev_page = buf_block_get_frame(prev_block);
osku's avatar
osku committed
788
		/* The caller must already have a latch to the brother */
789 790 791 792
		ut_ad(mtr_memo_contains(mtr, prev_block,
					MTR_MEMO_PAGE_S_FIX)
		      || mtr_memo_contains(mtr, prev_block,
					   MTR_MEMO_PAGE_X_FIX));
793
#ifdef UNIV_BTR_DEBUG
794
		ut_a(page_is_comp(prev_page) == page_is_comp(page));
795
		ut_a(btr_page_get_next(prev_page, mtr)
796
		     == page_get_page_no(page));
797
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
798 799 800 801 802 803 804

		return(page_rec_get_prev(page_get_supremum_rec(prev_page)));
	}

	return(NULL);
}

805
/*************************************************************//**
osku's avatar
osku committed
806
Gets pointer to the next user record in the tree. It is assumed that the
807 808
caller has appropriate latches on the page and its neighbor.
@return	next user record, NULL if there is none */
809
UNIV_INTERN
osku's avatar
osku committed
810 811 812
rec_t*
btr_get_next_user_rec(
/*==================*/
813 814
	rec_t*	rec,	/*!< in: record on leaf level */
	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
osku's avatar
osku committed
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
			needed, also to the next page */
{
	page_t*	page;
	page_t*	next_page;
	ulint	next_page_no;

	if (!page_rec_is_supremum(rec)) {

		rec_t*	next_rec = page_rec_get_next(rec);

		if (!page_rec_is_supremum(next_rec)) {

			return(next_rec);
		}
	}
830

831
	page = page_align(rec);
osku's avatar
osku committed
832
	next_page_no = btr_page_get_next(page, mtr);
833

osku's avatar
osku committed
834
	if (next_page_no != FIL_NULL) {
835 836
		ulint		space;
		ulint		zip_size;
837
		buf_block_t*	next_block;
osku's avatar
osku committed
838

839 840 841 842 843
		space = page_get_space_id(page);
		zip_size = fil_space_get_zip_size(space);

		next_block = buf_page_get_with_no_latch(space, zip_size,
							next_page_no, mtr);
844
		next_page = buf_block_get_frame(next_block);
osku's avatar
osku committed
845
		/* The caller must already have a latch to the brother */
846 847 848
		ut_ad(mtr_memo_contains(mtr, next_block, MTR_MEMO_PAGE_S_FIX)
		      || mtr_memo_contains(mtr, next_block,
					   MTR_MEMO_PAGE_X_FIX));
849
#ifdef UNIV_BTR_DEBUG
850
		ut_a(page_is_comp(next_page) == page_is_comp(page));
851
		ut_a(btr_page_get_prev(next_page, mtr)
852
		     == page_get_page_no(page));
853
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
854 855 856 857 858 859 860

		return(page_rec_get_next(page_get_infimum_rec(next_page)));
	}

	return(NULL);
}

861
/**************************************************************//**
862
Creates a new index page (not the root, and also not
863
used in page reorganization).  @see btr_page_empty(). */
osku's avatar
osku committed
864 865 866 867
static
void
btr_page_create(
/*============*/
868 869 870 871 872
	buf_block_t*	block,	/*!< in/out: page to be created */
	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
	dict_index_t*	index,	/*!< in: index */
	ulint		level,	/*!< in: the B-tree level of the page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
873
{
874
	page_t*		page = buf_block_get_frame(block);
875 876

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
877
	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
878 879

	if (UNIV_LIKELY_NULL(page_zip)) {
880
		page_create_zip(block, index, level, mtr);
881
	} else {
882
		page_create(block, mtr, dict_table_is_comp(index->table));
883 884 885 886
		/* Set the level of the new index page */
		btr_page_set_level(page, NULL, level, mtr);
	}

887
	block->check_index_page_at_flush = TRUE;
888

889
	btr_page_set_index_id(page, page_zip, index->id, mtr);
osku's avatar
osku committed
890 891
}

892
/**************************************************************//**
osku's avatar
osku committed
893
Allocates a new file page to be used in an ibuf tree. Takes the page from
894 895
the free list of the tree, which must contain pages!
@return	new allocated block, x-latched */
osku's avatar
osku committed
896
static
897
buf_block_t*
osku's avatar
osku committed
898 899
btr_page_alloc_for_ibuf(
/*====================*/
900 901
	dict_index_t*	index,	/*!< in: index tree */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
902 903 904 905
{
	fil_addr_t	node_addr;
	page_t*		root;
	page_t*		new_page;
906
	buf_block_t*	new_block;
osku's avatar
osku committed
907

908
	root = btr_root_get(index, mtr);
909

osku's avatar
osku committed
910
	node_addr = flst_get_first(root + PAGE_HEADER
911
				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
osku's avatar
osku committed
912 913
	ut_a(node_addr.page != FIL_NULL);

914 915 916
	new_block = buf_page_get(dict_index_get_space(index),
				 dict_table_zip_size(index->table),
				 node_addr.page, RW_X_LATCH, mtr);
917
	new_page = buf_block_get_frame(new_block);
918
	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
osku's avatar
osku committed
919 920

	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
921 922 923 924
		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
		    mtr);
	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
			    mtr));
osku's avatar
osku committed
925

926
	return(new_block);
osku's avatar
osku committed
927 928
}

929
/**************************************************************//**
osku's avatar
osku committed
930
Allocates a new file page to be used in an index tree. NOTE: we assume
931
that the caller has made the reservation for free extents!
932 933 934 935 936
@retval NULL if no page could be allocated
@retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
(init_mtr == mtr, or the page was not previously freed in mtr)
@retval block (not allocated or initialized) otherwise */
static __attribute__((nonnull, warn_unused_result))
937
buf_block_t*
938 939
btr_page_alloc_low(
/*===============*/
940 941 942
	dict_index_t*	index,		/*!< in: index */
	ulint		hint_page_no,	/*!< in: hint of a good page */
	byte		file_direction,	/*!< in: direction where a possible
osku's avatar
osku committed
943
					page split is made */
944
	ulint		level,		/*!< in: level where the page is placed
osku's avatar
osku committed
945
					in the tree */
946 947 948 949 950 951 952 953
	mtr_t*		mtr,		/*!< in/out: mini-transaction
					for the allocation */
	mtr_t*		init_mtr)	/*!< in/out: mtr or another
					mini-transaction in which the
					page should be initialized.
					If init_mtr!=mtr, but the page
					is already X-latched in mtr, do
					not initialize the page. */
osku's avatar
osku committed
954 955 956 957
{
	fseg_header_t*	seg_header;
	page_t*		root;

958
	root = btr_root_get(index, mtr);
959

osku's avatar
osku committed
960 961 962 963 964 965 966 967 968
	if (level == 0) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
	} else {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
	}

	/* Parameter TRUE below states that the caller has made the
	reservation for free extents, and thus we know that a page can
	be allocated: */
969

970 971 972 973
	return(fseg_alloc_free_page_general(
		       seg_header, hint_page_no, file_direction,
		       TRUE, mtr, init_mtr));
}
osku's avatar
osku committed
974

975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
/**************************************************************//**
Allocates a new file page to be used in an index tree. NOTE: we assume
that the caller has made the reservation for free extents!
@retval NULL if no page could be allocated
@retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
(init_mtr == mtr, or the page was not previously freed in mtr)
@retval block (not allocated or initialized) otherwise */
UNIV_INTERN
buf_block_t*
btr_page_alloc(
/*===========*/
	dict_index_t*	index,		/*!< in: index */
	ulint		hint_page_no,	/*!< in: hint of a good page */
	byte		file_direction,	/*!< in: direction where a possible
					page split is made */
	ulint		level,		/*!< in: level where the page is placed
					in the tree */
	mtr_t*		mtr,		/*!< in/out: mini-transaction
					for the allocation */
	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
					for x-latching and initializing
					the page */
{
	buf_block_t*	new_block;

	if (dict_index_is_ibuf(index)) {

		return(btr_page_alloc_for_ibuf(index, mtr));
osku's avatar
osku committed
1003 1004
	}

1005 1006
	new_block = btr_page_alloc_low(
		index, hint_page_no, file_direction, level, mtr, init_mtr);
1007

1008 1009 1010
	if (new_block) {
		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
	}
1011

1012
	return(new_block);
1013
}
osku's avatar
osku committed
1014

1015
/**************************************************************//**
1016
Gets the number of pages in a B-tree.
1017
@return	number of pages, or ULINT_UNDEFINED if the index is unavailable */
1018
UNIV_INTERN
osku's avatar
osku committed
1019 1020 1021
ulint
btr_get_size(
/*=========*/
1022
	dict_index_t*	index,	/*!< in: index */
1023 1024 1025
	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
				is s-latched */
osku's avatar
osku committed
1026 1027 1028 1029 1030 1031
{
	fseg_header_t*	seg_header;
	page_t*		root;
	ulint		n;
	ulint		dummy;

1032 1033
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
				MTR_MEMO_S_LOCK));
osku's avatar
osku committed
1034

1035 1036 1037 1038 1039
	if (index->page == FIL_NULL
	    || index->to_be_dropped
	    || *index->name == TEMP_INDEX_PREFIX) {
		return(ULINT_UNDEFINED);
	}
osku's avatar
osku committed
1040

1041
	root = btr_root_get(index, mtr);
1042

osku's avatar
osku committed
1043 1044
	if (flag == BTR_N_LEAF_PAGES) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1045

1046
		fseg_n_reserved_pages(seg_header, &n, mtr);
1047

osku's avatar
osku committed
1048 1049 1050
	} else if (flag == BTR_TOTAL_SIZE) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;

1051
		n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
1052

osku's avatar
osku committed
1053
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1054

1055
		n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
osku's avatar
osku committed
1056 1057 1058 1059 1060
	} else {
		ut_error;
	}

	return(n);
1061
}
osku's avatar
osku committed
1062

1063
/**************************************************************//**
osku's avatar
osku committed
1064 1065 1066 1067 1068 1069
Frees a page used in an ibuf tree. Puts the page to the free list of the
ibuf tree. */
static
void
btr_page_free_for_ibuf(
/*===================*/
1070 1071 1072
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1073 1074 1075
{
	page_t*		root;

1076
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1077
	root = btr_root_get(index, mtr);
1078

osku's avatar
osku committed
1079
	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1080 1081
		       buf_block_get_frame(block)
		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
osku's avatar
osku committed
1082 1083

	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1084
			    mtr));
osku's avatar
osku committed
1085 1086
}

1087
/**************************************************************//**
osku's avatar
osku committed
1088 1089 1090
Frees a file page used in an index tree. Can be used also to (BLOB)
external storage pages, because the page level 0 can be given as an
argument. */
1091
UNIV_INTERN
osku's avatar
osku committed
1092 1093 1094
void
btr_page_free_low(
/*==============*/
1095 1096 1097 1098
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	ulint		level,	/*!< in: page level */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1099 1100 1101 1102
{
	fseg_header_t*	seg_header;
	page_t*		root;

1103
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
1104 1105 1106
	/* The page gets invalid for optimistic searches: increment the frame
	modify clock */

1107
	buf_block_modify_clock_inc(block);
1108
	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
1109

1110
	if (dict_index_is_ibuf(index)) {
osku's avatar
osku committed
1111

1112
		btr_page_free_for_ibuf(index, block, mtr);
osku's avatar
osku committed
1113 1114 1115 1116

		return;
	}

1117
	root = btr_root_get(index, mtr);
1118

osku's avatar
osku committed
1119 1120 1121 1122 1123 1124
	if (level == 0) {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
	} else {
		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
	}

1125 1126 1127
	fseg_free_page(seg_header,
		       buf_block_get_space(block),
		       buf_block_get_page_no(block), mtr);
1128
}
osku's avatar
osku committed
1129

1130
/**************************************************************//**
osku's avatar
osku committed
1131 1132
Frees a file page used in an index tree. NOTE: cannot free field external
storage pages because the page must contain info on its level. */
1133
UNIV_INTERN
osku's avatar
osku committed
1134 1135 1136
void
btr_page_free(
/*==========*/
1137 1138 1139
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1140
{
1141 1142
	const page_t*	page	= buf_block_get_frame(block);
	ulint		level	= btr_page_get_level(page, mtr);
1143

1144
	ut_ad(fil_page_get_type(block->frame) == FIL_PAGE_INDEX);
1145
	btr_page_free_low(index, block, level, mtr);
1146
}
osku's avatar
osku committed
1147

1148
/**************************************************************//**
osku's avatar
osku committed
1149 1150 1151 1152 1153
Sets the child node file address in a node pointer. */
UNIV_INLINE
void
btr_node_ptr_set_child_page_no(
/*===========================*/
1154 1155
	rec_t*		rec,	/*!< in: node pointer record */
	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
1156
				part will be updated, or NULL */
1157 1158 1159
	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
	ulint		page_no,/*!< in: child node address */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1160 1161 1162 1163 1164
{
	byte*	field;
	ulint	len;

	ut_ad(rec_offs_validate(rec, NULL, offsets));
1165
	ut_ad(!page_is_leaf(page_align(rec)));
osku's avatar
osku committed
1166 1167
	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));

1168
	/* The child address is in the last field */
osku's avatar
osku committed
1169
	field = rec_get_nth_field(rec, offsets,
1170
				  rec_offs_n_fields(offsets) - 1, &len);
osku's avatar
osku committed
1171

1172
	ut_ad(len == REC_NODE_PTR_SIZE);
1173

marko's avatar
marko committed
1174
	if (UNIV_LIKELY_NULL(page_zip)) {
1175
		page_zip_write_node_ptr(page_zip, rec,
1176 1177
					rec_offs_data_size(offsets),
					page_no, mtr);
1178 1179
	} else {
		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
marko's avatar
marko committed
1180
	}
osku's avatar
osku committed
1181 1182
}

1183
/************************************************************//**
1184 1185
Returns the child page of a node pointer and x-latches it.
@return	child page, x-latched */
osku's avatar
osku committed
1186
static
1187
buf_block_t*
osku's avatar
osku committed
1188 1189
btr_node_ptr_get_child(
/*===================*/
1190 1191 1192 1193
	const rec_t*	node_ptr,/*!< in: node pointer */
	dict_index_t*	index,	/*!< in: index */
	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1194 1195 1196 1197
{
	ulint	page_no;
	ulint	space;

1198
	ut_ad(rec_offs_validate(node_ptr, index, offsets));
1199
	space = page_get_space_id(page_align(node_ptr));
osku's avatar
osku committed
1200 1201
	page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);

1202
	return(btr_block_get(space, dict_table_zip_size(index->table),
1203
			     page_no, RW_X_LATCH, index, mtr));
osku's avatar
osku committed
1204 1205
}

1206
/************************************************************//**
osku's avatar
osku committed
1207
Returns the upper level node pointer to a page. It is assumed that mtr holds
1208 1209
an x-latch on the tree.
@return	rec_get_offsets() of the node pointer record */
osku's avatar
osku committed
1210
static
1211
ulint*
1212 1213
btr_page_get_father_node_ptr_func(
/*==============================*/
1214 1215 1216
	ulint*		offsets,/*!< in: work area for the return value */
	mem_heap_t*	heap,	/*!< in: memory heap to use */
	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
1217
				out: cursor on node pointer record,
osku's avatar
osku committed
1218
				its page x-latched */
1219 1220
	const char*	file,	/*!< in: file name */
	ulint		line,	/*!< in: line where called */
1221
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1222 1223
{
	dtuple_t*	tuple;
1224
	rec_t*		user_rec;
osku's avatar
osku committed
1225
	rec_t*		node_ptr;
1226 1227 1228 1229 1230 1231
	ulint		level;
	ulint		page_no;
	dict_index_t*	index;

	page_no = buf_block_get_page_no(btr_cur_get_block(cursor));
	index = btr_cur_get_index(cursor);
osku's avatar
osku committed
1232

1233
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1234
				MTR_MEMO_X_LOCK));
osku's avatar
osku committed
1235

1236
	ut_ad(dict_index_get_page(index) != page_no);
osku's avatar
osku committed
1237

1238
	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
1239

1240
	user_rec = btr_cur_get_rec(cursor);
1241 1242
	ut_a(page_rec_is_user_rec(user_rec));
	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
osku's avatar
osku committed
1243

1244
	btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
1245 1246
				    BTR_CONT_MODIFY_TREE, cursor, 0,
				    file, line, mtr);
osku's avatar
osku committed
1247

1248 1249 1250
	node_ptr = btr_cur_get_rec(cursor);
	ut_ad(!page_rec_is_comp(node_ptr)
	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
osku's avatar
osku committed
1251
	offsets = rec_get_offsets(node_ptr, index, offsets,
1252
				  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1253

1254
	if (UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr, offsets)
1255
			  != page_no)) {
osku's avatar
osku committed
1256 1257
		rec_t*	print_rec;
		fputs("InnoDB: Dump of the child page:\n", stderr);
1258 1259
		buf_page_print(page_align(user_rec), 0,
			       BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
1260
		fputs("InnoDB: Dump of the parent page:\n", stderr);
1261 1262
		buf_page_print(page_align(node_ptr), 0,
			       BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
1263 1264

		fputs("InnoDB: Corruption of an index tree: table ", stderr);
1265
		ut_print_name(stderr, NULL, TRUE, index->table_name);
osku's avatar
osku committed
1266
		fputs(", index ", stderr);
1267
		ut_print_name(stderr, NULL, FALSE, index->name);
osku's avatar
osku committed
1268
		fprintf(stderr, ",\n"
1269
			"InnoDB: father ptr page no %lu, child page no %lu\n",
osku's avatar
osku committed
1270 1271
			(ulong)
			btr_node_ptr_get_child_page_no(node_ptr, offsets),
1272 1273 1274
			(ulong) page_no);
		print_rec = page_rec_get_next(
			page_get_infimum_rec(page_align(user_rec)));
osku's avatar
osku committed
1275
		offsets = rec_get_offsets(print_rec, index,
1276
					  offsets, ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1277 1278
		page_rec_print(print_rec, offsets);
		offsets = rec_get_offsets(node_ptr, index, offsets,
1279
					  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1280 1281
		page_rec_print(node_ptr, offsets);

1282 1283 1284 1285
		fputs("InnoDB: You should dump + drop + reimport the table"
		      " to fix the\n"
		      "InnoDB: corruption. If the crash happens at "
		      "the database startup, see\n"
1286
		      "InnoDB: " REFMAN "forcing-innodb-recovery.html about\n"
1287 1288
		      "InnoDB: forcing recovery. "
		      "Then dump + drop + reimport.\n", stderr);
osku's avatar
osku committed
1289

marko's avatar
marko committed
1290 1291
		ut_error;
	}
1292

1293 1294
	return(offsets);
}
osku's avatar
osku committed
1295

1296 1297 1298
#define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
	btr_page_get_father_node_ptr_func(of,heap,cur,__FILE__,__LINE__,mtr)

1299
/************************************************************//**
1300
Returns the upper level node pointer to a page. It is assumed that mtr holds
1301 1302
an x-latch on the tree.
@return	rec_get_offsets() of the node pointer record */
1303 1304 1305 1306
static
ulint*
btr_page_get_father_block(
/*======================*/
1307 1308 1309 1310 1311 1312
	ulint*		offsets,/*!< in: work area for the return value */
	mem_heap_t*	heap,	/*!< in: memory heap to use */
	dict_index_t*	index,	/*!< in: b-tree index */
	buf_block_t*	block,	/*!< in: child page in the index */
	mtr_t*		mtr,	/*!< in: mtr */
	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1313 1314 1315 1316 1317 1318 1319
				its page x-latched */
{
	rec_t*	rec
		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
								 block)));
	btr_cur_position(index, rec, block, cursor);
	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
osku's avatar
osku committed
1320 1321
}

1322
/************************************************************//**
1323 1324
Seeks to the upper level node pointer to a page.
It is assumed that mtr holds an x-latch on the tree. */
osku's avatar
osku committed
1325
static
1326 1327 1328
void
btr_page_get_father(
/*================*/
1329 1330 1331 1332
	dict_index_t*	index,	/*!< in: b-tree index */
	buf_block_t*	block,	/*!< in: child page in the index */
	mtr_t*		mtr,	/*!< in: mtr */
	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1333
				its page x-latched */
osku's avatar
osku committed
1334
{
1335 1336 1337 1338 1339 1340 1341 1342 1343
	mem_heap_t*	heap;
	rec_t*		rec
		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
								 block)));
	btr_cur_position(index, rec, block, cursor);

	heap = mem_heap_create(100);
	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
	mem_heap_free(heap);
osku's avatar
osku committed
1344 1345
}

1346
/************************************************************//**
1347 1348
Creates the root node for a new index tree.
@return	page number of the created root, FIL_NULL if did not succeed */
1349
UNIV_INTERN
osku's avatar
osku committed
1350 1351 1352
ulint
btr_create(
/*=======*/
1353 1354 1355
	ulint		type,	/*!< in: type of the index */
	ulint		space,	/*!< in: space where created */
	ulint		zip_size,/*!< in: compressed page size in bytes
1356
				or 0 for uncompressed pages */
1357
	index_id_t	index_id,/*!< in: index id */
1358 1359
	dict_index_t*	index,	/*!< in: index */
	mtr_t*		mtr)	/*!< in: mini-transaction handle */
osku's avatar
osku committed
1360 1361
{
	ulint		page_no;
1362
	buf_block_t*	block;
osku's avatar
osku committed
1363 1364
	buf_frame_t*	frame;
	page_t*		page;
marko's avatar
marko committed
1365
	page_zip_des_t*	page_zip;
osku's avatar
osku committed
1366 1367 1368 1369 1370 1371 1372 1373

	/* Create the two new segments (one, in the case of an ibuf tree) for
	the index tree; the segment headers are put on the allocated root page
	(for an ibuf tree, not in the root, but on a separate ibuf header
	page) */

	if (type & DICT_IBUF) {
		/* Allocate first the ibuf header page */
1374
		buf_block_t*	ibuf_hdr_block = fseg_create(
1375 1376
			space, 0,
			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
osku's avatar
osku committed
1377

1378 1379
		buf_block_dbg_add_level(
			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1380

1381
		ut_ad(buf_block_get_page_no(ibuf_hdr_block)
1382
		      == IBUF_HEADER_PAGE_NO);
osku's avatar
osku committed
1383
		/* Allocate then the next page to the segment: it will be the
1384
		tree root page */
osku's avatar
osku committed
1385

1386 1387 1388 1389 1390 1391
		block = fseg_alloc_free_page(
			buf_block_get_frame(ibuf_hdr_block)
			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
			IBUF_TREE_ROOT_PAGE_NO,
			FSP_UP, mtr);
		ut_ad(buf_block_get_page_no(block) == IBUF_TREE_ROOT_PAGE_NO);
osku's avatar
osku committed
1392
	} else {
1393 1394 1395 1396 1397 1398 1399 1400
#ifdef UNIV_BLOB_DEBUG
		if ((type & DICT_CLUSTERED) && !index->blobs) {
			mutex_create(PFS_NOT_INSTRUMENTED,
				     &index->blobs_mutex, SYNC_ANY_LATCH);
			index->blobs = rbt_create(sizeof(btr_blob_dbg_t),
						  btr_blob_dbg_cmp);
		}
#endif /* UNIV_BLOB_DEBUG */
1401 1402
		block = fseg_create(space, 0,
				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
osku's avatar
osku committed
1403
	}
1404

1405
	if (block == NULL) {
osku's avatar
osku committed
1406 1407 1408 1409

		return(FIL_NULL);
	}

1410 1411
	page_no = buf_block_get_page_no(block);
	frame = buf_block_get_frame(block);
1412

osku's avatar
osku committed
1413 1414
	if (type & DICT_IBUF) {
		/* It is an insert buffer tree: initialize the free list */
1415
		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
osku's avatar
osku committed
1416 1417

		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
1418

osku's avatar
osku committed
1419
		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1420
	} else {
osku's avatar
osku committed
1421 1422
		/* It is a non-ibuf tree: create a file segment for leaf
		pages */
1423 1424
		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);

1425 1426 1427 1428 1429 1430 1431 1432 1433
		if (!fseg_create(space, page_no,
				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
			/* Not enough space for new segment, free root
			segment before return. */
			btr_free_root(space, zip_size, page_no, mtr);

			return(FIL_NULL);
		}

osku's avatar
osku committed
1434 1435
		/* The fseg create acquires a second latch on the page,
		therefore we must declare it: */
1436
		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
osku's avatar
osku committed
1437
	}
1438

1439
	/* Create a new index page on the allocated segment page */
1440
	page_zip = buf_block_get_page_zip(block);
1441 1442

	if (UNIV_LIKELY_NULL(page_zip)) {
1443
		page = page_create_zip(block, index, 0, mtr);
1444
	} else {
1445
		page = page_create(block, mtr,
1446
				   dict_table_is_comp(index->table));
1447 1448 1449 1450
		/* Set the level of the new index page */
		btr_page_set_level(page, NULL, 0, mtr);
	}

1451
	block->check_index_page_at_flush = TRUE;
osku's avatar
osku committed
1452 1453

	/* Set the index id of the page */
1454
	btr_page_set_index_id(page, page_zip, index_id, mtr);
1455

osku's avatar
osku committed
1456
	/* Set the next node and previous node fields */
1457 1458
	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
osku's avatar
osku committed
1459 1460 1461 1462

	/* We reset the free bits for the page to allow creation of several
	trees in the same mtr, otherwise the latch on a bitmap page would
	prevent it because of the latching order */
1463

1464 1465 1466
	if (!(type & DICT_CLUSTERED)) {
		ibuf_reset_free_bits(block);
	}
osku's avatar
osku committed
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

	/* In the following assertion we test that two records of maximum
	allowed size fit on the root page: this fact is needed to ensure
	correctness of split algorithms */

	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);

	return(page_no);
}

1477
/************************************************************//**
osku's avatar
osku committed
1478 1479
Frees a B-tree except the root page, which MUST be freed after this
by calling btr_free_root. */
1480
UNIV_INTERN
osku's avatar
osku committed
1481 1482 1483
void
btr_free_but_not_root(
/*==================*/
1484 1485
	ulint	space,		/*!< in: space where created */
	ulint	zip_size,	/*!< in: compressed page size in bytes
1486
				or 0 for uncompressed pages */
1487
	ulint	root_page_no)	/*!< in: root page number */
osku's avatar
osku committed
1488 1489 1490 1491 1492
{
	ibool	finished;
	page_t*	root;
	mtr_t	mtr;

1493
leaf_loop:
osku's avatar
osku committed
1494
	mtr_start(&mtr);
1495

1496 1497
	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
			    NULL, &mtr);
1498 1499 1500 1501 1502 1503
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
				    + root, space));
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
				    + root, space));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
1504 1505 1506 1507

	/* NOTE: page hash indexes are dropped when a page is freed inside
	fsp0fsp. */

1508 1509
	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
				  &mtr);
osku's avatar
osku committed
1510 1511 1512 1513 1514 1515 1516 1517
	mtr_commit(&mtr);

	if (!finished) {

		goto leaf_loop;
	}
top_loop:
	mtr_start(&mtr);
1518

1519 1520
	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
			    NULL, &mtr);
1521 1522 1523 1524
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
				    + root, space));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
1525

1526 1527
	finished = fseg_free_step_not_header(
		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
osku's avatar
osku committed
1528 1529 1530 1531 1532
	mtr_commit(&mtr);

	if (!finished) {

		goto top_loop;
1533
	}
osku's avatar
osku committed
1534 1535
}

1536
/************************************************************//**
osku's avatar
osku committed
1537
Frees the B-tree root page. Other tree MUST already have been freed. */
1538
UNIV_INTERN
osku's avatar
osku committed
1539 1540 1541
void
btr_free_root(
/*==========*/
1542 1543
	ulint	space,		/*!< in: space where created */
	ulint	zip_size,	/*!< in: compressed page size in bytes
1544
				or 0 for uncompressed pages */
1545
	ulint	root_page_no,	/*!< in: root page number */
1546
	mtr_t*	mtr)		/*!< in/out: mini-transaction */
osku's avatar
osku committed
1547
{
1548 1549
	buf_block_t*	block;
	fseg_header_t*	header;
osku's avatar
osku committed
1550

1551 1552
	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH,
			      NULL, mtr);
osku's avatar
osku committed
1553

1554
	btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
1555

1556
	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1557 1558 1559
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_root_fseg_validate(header, space));
#endif /* UNIV_BTR_DEBUG */
1560 1561

	while (!fseg_free_step(header, mtr));
osku's avatar
osku committed
1562
}
1563
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1564

1565
/*************************************************************//**
osku's avatar
osku committed
1566 1567
Reorganizes an index page. */
static
1568
ibool
osku's avatar
osku committed
1569 1570
btr_page_reorganize_low(
/*====================*/
1571
	ibool		recovery,/*!< in: TRUE if called in recovery:
osku's avatar
osku committed
1572 1573 1574 1575
				locks should not be updated, i.e.,
				there cannot exist locks on the
				page, and a hash index should not be
				dropped: it cannot exist */
1576 1577 1578
	buf_block_t*	block,	/*!< in: page to be reorganized */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1579
{
1580
#ifndef UNIV_HOTBACKUP
irana's avatar
irana committed
1581
	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1582
#endif /* !UNIV_HOTBACKUP */
1583 1584
	page_t*		page		= buf_block_get_frame(block);
	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1585
	buf_block_t*	temp_block;
1586 1587 1588 1589 1590 1591
	page_t*		temp_page;
	ulint		log_mode;
	ulint		data_size1;
	ulint		data_size2;
	ulint		max_ins_size1;
	ulint		max_ins_size2;
1592
	ibool		success		= FALSE;
1593 1594

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1595
	btr_assert_not_corrupted(block, index);
1596
#ifdef UNIV_ZIP_DEBUG
1597
	ut_a(!page_zip || page_zip_validate(page_zip, page));
1598
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
1599 1600 1601
	data_size1 = page_get_data_size(page);
	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);

1602
#ifndef UNIV_HOTBACKUP
osku's avatar
osku committed
1603 1604
	/* Write the log record */
	mlog_open_and_write_index(mtr, page, index, page_is_comp(page)
1605 1606
				  ? MLOG_COMP_PAGE_REORGANIZE
				  : MLOG_PAGE_REORGANIZE, 0);
1607
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1608 1609 1610 1611

	/* Turn logging off */
	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);

1612
#ifndef UNIV_HOTBACKUP
1613
	temp_block = buf_block_alloc(buf_pool);
1614 1615 1616 1617
#else /* !UNIV_HOTBACKUP */
	ut_ad(block == back_block1);
	temp_block = back_block2;
#endif /* !UNIV_HOTBACKUP */
1618
	temp_page = temp_block->frame;
osku's avatar
osku committed
1619 1620

	/* Copy the old page to temporary space */
1621
	buf_frame_copy(temp_page, page);
osku's avatar
osku committed
1622

1623
#ifndef UNIV_HOTBACKUP
marko's avatar
marko committed
1624
	if (UNIV_LIKELY(!recovery)) {
1625
		btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
1626 1627
	}

1628 1629
	block->check_index_page_at_flush = TRUE;
#endif /* !UNIV_HOTBACKUP */
1630
	btr_blob_dbg_remove(page, index, "btr_page_reorganize");
1631

osku's avatar
osku committed
1632 1633 1634
	/* Recreate the page: note that global data on page (possible
	segment headers, next page-field, etc.) is preserved intact */

1635
	page_create(block, mtr, dict_table_is_comp(index->table));
1636

osku's avatar
osku committed
1637 1638 1639
	/* Copy the records from the temporary space to the recreated page;
	do not copy the lock bits yet */

1640
	page_copy_rec_list_end_no_locks(block, temp_block,
1641 1642
					page_get_infimum_rec(temp_page),
					index, mtr);
1643 1644 1645 1646 1647 1648 1649 1650

	if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page)) {
		/* Copy max trx id to recreated page */
		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
		/* In crash recovery, dict_index_is_sec_or_ibuf() always
		returns TRUE, even for clustered indexes.  max_trx_id is
		unused in clustered index pages. */
1651
		ut_ad(max_trx_id != 0 || recovery);
1652
	}
1653

1654
	if (UNIV_LIKELY_NULL(page_zip)
1655 1656
	    && UNIV_UNLIKELY
	    (!page_zip_compress(page_zip, page, index, NULL))) {
marko's avatar
marko committed
1657

1658
		/* Restore the old page and exit. */
1659 1660
		btr_blob_dbg_restore(page, temp_page, index,
				     "btr_page_reorganize_compress_fail");
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680

#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
		/* Check that the bytes that we skip are identical. */
		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
			     PAGE_HEADER + PAGE_N_RECS + temp_page,
			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
			     FIL_PAGE_DATA_END));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */

		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);

#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1681

1682
		goto func_exit;
marko's avatar
marko committed
1683 1684
	}

1685
#ifndef UNIV_HOTBACKUP
marko's avatar
marko committed
1686
	if (UNIV_LIKELY(!recovery)) {
osku's avatar
osku committed
1687
		/* Update the record lock bitmaps */
1688
		lock_move_reorganize_page(block, temp_block);
osku's avatar
osku committed
1689
	}
1690
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1691 1692 1693 1694

	data_size2 = page_get_data_size(page);
	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);

marko's avatar
marko committed
1695
	if (UNIV_UNLIKELY(data_size1 != data_size2)
1696
	    || UNIV_UNLIKELY(max_ins_size1 != max_ins_size2)) {
1697 1698
		buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
		buf_page_print(temp_page, 0, BUF_PAGE_PRINT_NO_CRASH);
1699
		fprintf(stderr,
1700 1701 1702 1703 1704 1705
			"InnoDB: Error: page old data size %lu"
			" new data size %lu\n"
			"InnoDB: Error: page old max ins size %lu"
			" new max ins size %lu\n"
			"InnoDB: Submit a detailed bug report"
			" to http://bugs.mysql.com\n",
osku's avatar
osku committed
1706 1707 1708
			(unsigned long) data_size1, (unsigned long) data_size2,
			(unsigned long) max_ins_size1,
			(unsigned long) max_ins_size2);
1709
		ut_ad(0);
1710 1711
	} else {
		success = TRUE;
osku's avatar
osku committed
1712 1713
	}

1714
func_exit:
1715
#ifdef UNIV_ZIP_DEBUG
1716
	ut_a(!page_zip || page_zip_validate(page_zip, page));
1717
#endif /* UNIV_ZIP_DEBUG */
1718
#ifndef UNIV_HOTBACKUP
1719
	buf_block_free(temp_block);
1720
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1721 1722 1723

	/* Restore logging mode */
	mtr_set_log_mode(mtr, log_mode);
1724 1725

	return(success);
osku's avatar
osku committed
1726 1727
}

1728
#ifndef UNIV_HOTBACKUP
1729
/*************************************************************//**
1730 1731 1732 1733
Reorganizes an index page.
IMPORTANT: if btr_page_reorganize() is invoked on a compressed leaf
page of a non-clustered index, the caller must update the insert
buffer free bits in the same mini-transaction in such a way that the
1734 1735
modification will be redo-logged.
@return	TRUE on success, FALSE on failure */
1736
UNIV_INTERN
1737
ibool
osku's avatar
osku committed
1738 1739
btr_page_reorganize(
/*================*/
1740 1741 1742
	buf_block_t*	block,	/*!< in: page to be reorganized */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1743
{
1744
	return(btr_page_reorganize_low(FALSE, block, index, mtr));
osku's avatar
osku committed
1745
}
1746
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
1747

1748
/***********************************************************//**
1749 1750
Parses a redo log record of reorganizing a page.
@return	end of log record or NULL */
1751
UNIV_INTERN
osku's avatar
osku committed
1752 1753 1754
byte*
btr_parse_page_reorganize(
/*======================*/
1755
	byte*		ptr,	/*!< in: buffer */
osku's avatar
osku committed
1756
	byte*		end_ptr __attribute__((unused)),
1757 1758 1759 1760
				/*!< in: buffer end */
	dict_index_t*	index,	/*!< in: record descriptor */
	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
	mtr_t*		mtr)	/*!< in: mtr or NULL */
osku's avatar
osku committed
1761 1762 1763 1764 1765
{
	ut_ad(ptr && end_ptr);

	/* The record is empty, except for the record initial part */

1766 1767
	if (UNIV_LIKELY(block != NULL)) {
		btr_page_reorganize_low(TRUE, block, index, mtr);
osku's avatar
osku committed
1768 1769 1770 1771 1772
	}

	return(ptr);
}

1773
#ifndef UNIV_HOTBACKUP
1774
/*************************************************************//**
1775
Empties an index page.  @see btr_page_create(). */
osku's avatar
osku committed
1776 1777 1778 1779
static
void
btr_page_empty(
/*===========*/
1780 1781 1782 1783 1784
	buf_block_t*	block,	/*!< in: page to be emptied */
	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
	dict_index_t*	index,	/*!< in: index of the page */
	ulint		level,	/*!< in: the B-tree level of the page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1785
{
1786
	page_t*	page = buf_block_get_frame(block);
1787 1788

	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1789
	ut_ad(page_zip == buf_block_get_page_zip(block));
1790
#ifdef UNIV_ZIP_DEBUG
1791
	ut_a(!page_zip || page_zip_validate(page_zip, page));
1792
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
1793

1794
	btr_search_drop_page_hash_index(block);
1795
	btr_blob_dbg_remove(page, index, "btr_page_empty");
osku's avatar
osku committed
1796 1797 1798 1799

	/* Recreate the page: note that global data on page (possible
	segment headers, next page-field, etc.) is preserved intact */

1800
	if (UNIV_LIKELY_NULL(page_zip)) {
1801
		page_create_zip(block, index, level, mtr);
1802
	} else {
1803
		page_create(block, mtr, dict_table_is_comp(index->table));
1804
		btr_page_set_level(page, NULL, level, mtr);
1805 1806
	}

1807
	block->check_index_page_at_flush = TRUE;
osku's avatar
osku committed
1808 1809
}

1810
/*************************************************************//**
osku's avatar
osku committed
1811 1812 1813 1814
Makes tree one level higher by splitting the root, and inserts
the tuple. It is assumed that mtr contains an x-latch on the tree.
NOTE that the operation of this function must always succeed,
we cannot reverse it: therefore enough free disk space must be
1815 1816
guaranteed to be available before this function is called.
@return	inserted record */
1817
UNIV_INTERN
osku's avatar
osku committed
1818 1819 1820
rec_t*
btr_root_raise_and_insert(
/*======================*/
1821
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
osku's avatar
osku committed
1822 1823 1824
				on the root page; when the function returns,
				the cursor is positioned on the predecessor
				of the inserted record */
1825 1826 1827
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1828
{
1829
	dict_index_t*	index;
osku's avatar
osku committed
1830 1831 1832 1833 1834 1835
	page_t*		root;
	page_t*		new_page;
	ulint		new_page_no;
	rec_t*		rec;
	mem_heap_t*	heap;
	dtuple_t*	node_ptr;
1836
	ulint		level;
osku's avatar
osku committed
1837 1838
	rec_t*		node_ptr_rec;
	page_cur_t*	page_cursor;
1839 1840
	page_zip_des_t*	root_page_zip;
	page_zip_des_t*	new_page_zip;
1841 1842
	buf_block_t*	root_block;
	buf_block_t*	new_block;
1843

osku's avatar
osku committed
1844
	root = btr_cur_get_page(cursor);
1845
	root_block = btr_cur_get_block(cursor);
1846
	root_page_zip = buf_block_get_page_zip(root_block);
1847
#ifdef UNIV_ZIP_DEBUG
1848
	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root));
1849
#endif /* UNIV_ZIP_DEBUG */
1850
	index = btr_cur_get_index(cursor);
1851
#ifdef UNIV_BTR_DEBUG
1852 1853 1854 1855 1856 1857 1858 1859 1860
	if (!dict_index_is_ibuf(index)) {
		ulint	space = dict_index_get_space(index);

		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}

1861 1862
	ut_a(dict_index_get_page(index) == page_get_page_no(root));
#endif /* UNIV_BTR_DEBUG */
1863
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1864
				MTR_MEMO_X_LOCK));
1865
	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
1866 1867 1868 1869

	/* Allocate a new page to the tree. Root splitting is done by first
	moving the root records to the new page, emptying the root, putting
	a node pointer to the new page, and then splitting the new page. */
1870

marko's avatar
marko committed
1871
	level = btr_page_get_level(root, mtr);
osku's avatar
osku committed
1872

1873
	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1874 1875
	new_page = buf_block_get_frame(new_block);
	new_page_zip = buf_block_get_page_zip(new_block);
1876
	ut_a(!new_page_zip == !root_page_zip);
1877 1878 1879
	ut_a(!new_page_zip
	     || page_zip_get_size(new_page_zip)
	     == page_zip_get_size(root_page_zip));
osku's avatar
osku committed
1880

1881
	btr_page_create(new_block, new_page_zip, index, level, mtr);
1882 1883 1884 1885

	/* Set the next node and previous node fields of new page */
	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
osku's avatar
osku committed
1886

1887
	/* Copy the records from root to the new page one by one. */
osku's avatar
osku committed
1888

1889 1890 1891 1892 1893
	if (0
#ifdef UNIV_ZIP_COPY
	    || new_page_zip
#endif /* UNIV_ZIP_COPY */
	    || UNIV_UNLIKELY
1894
	    (!page_copy_rec_list_end(new_block, root_block,
1895
				     page_get_infimum_rec(root),
1896
				     index, mtr))) {
1897
		ut_a(new_page_zip);
marko's avatar
marko committed
1898

1899
		/* Copy the page byte for byte. */
1900 1901
		page_zip_copy_recs(new_page_zip, new_page,
				   root_page_zip, root, index, mtr);
1902 1903 1904 1905 1906 1907 1908 1909

		/* Update the lock table and possible hash index. */

		lock_move_rec_list_end(new_block, root_block,
				       page_get_infimum_rec(root));

		btr_search_move_or_delete_hash_entries(new_block, root_block,
						       index);
1910
	}
marko's avatar
marko committed
1911

osku's avatar
osku committed
1912 1913 1914 1915
	/* If this is a pessimistic insert which is actually done to
	perform a pessimistic update then we have stored the lock
	information of the record to be inserted on the infimum of the
	root page: we cannot discard the lock structs on the root page */
1916

1917
	lock_update_root_raise(new_block, root_block);
osku's avatar
osku committed
1918 1919 1920 1921 1922

	/* Create a memory heap where the node pointer is stored */
	heap = mem_heap_create(100);

	rec = page_rec_get_next(page_get_infimum_rec(new_page));
marko's avatar
marko committed
1923
	new_page_no = buf_block_get_page_no(new_block);
1924

osku's avatar
osku committed
1925 1926 1927
	/* Build the node pointer (= node key and page address) for the
	child */

1928 1929
	node_ptr = dict_index_build_node_ptr(index, rec, new_page_no, heap,
					     level);
1930 1931 1932
	/* The node pointer must be marked as the predefined minimum record,
	as there is no lower alphabetical limit to records in the leftmost
	node of a level: */
1933 1934 1935
	dtuple_set_info_bits(node_ptr,
			     dtuple_get_info_bits(node_ptr)
			     | REC_INFO_MIN_REC_FLAG);
1936

marko's avatar
marko committed
1937
	/* Rebuild the root page to get free space */
1938
	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1939 1940 1941 1942 1943 1944 1945 1946 1947

	/* Set the next node and previous node fields, although
	they should already have been set.  The previous node field
	must be FIL_NULL if root_page_zip != NULL, because the
	REC_INFO_MIN_REC_FLAG (of the first user record) will be
	set if and only if btr_page_get_prev() == FIL_NULL. */
	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);

osku's avatar
osku committed
1948
	page_cursor = btr_cur_get_page_cur(cursor);
1949

osku's avatar
osku committed
1950 1951
	/* Insert node pointer to the root */

1952
	page_cur_set_before_first(root_block, page_cursor);
osku's avatar
osku committed
1953

1954
	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
1955
					     index, 0, mtr);
osku's avatar
osku committed
1956

1957 1958 1959
	/* The root page should only contain the node pointer
	to new_page at this point.  Thus, the data should fit. */
	ut_a(node_ptr_rec);
marko's avatar
marko committed
1960

osku's avatar
osku committed
1961 1962 1963 1964 1965
	/* Free the memory heap */
	mem_heap_free(heap);

	/* We play safe and reset the free bits for the new page */

1966
#if 0
marko's avatar
marko committed
1967
	fprintf(stderr, "Root raise new page no %lu\n", new_page_no);
1968
#endif
osku's avatar
osku committed
1969

1970 1971 1972 1973
	if (!dict_index_is_clust(index)) {
		ibuf_reset_free_bits(new_block);
	}

osku's avatar
osku committed
1974
	/* Reposition the cursor to the child node */
1975
	page_cur_search(new_block, index, tuple,
1976
			PAGE_CUR_LE, page_cursor);
1977

osku's avatar
osku committed
1978
	/* Split the child and insert tuple */
1979
	return(btr_page_split_and_insert(cursor, tuple, n_ext, mtr));
1980
}
osku's avatar
osku committed
1981

1982
/*************************************************************//**
osku's avatar
osku committed
1983
Decides if the page should be split at the convergence point of inserts
1984 1985
converging to the left.
@return	TRUE if split recommended */
1986
UNIV_INTERN
osku's avatar
osku committed
1987 1988 1989
ibool
btr_page_get_split_rec_to_left(
/*===========================*/
1990 1991
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
	rec_t**		split_rec) /*!< out: if split recommended,
osku's avatar
osku committed
1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003
				the first record on upper half page,
				or NULL if tuple to be inserted should
				be first */
{
	page_t*	page;
	rec_t*	insert_point;
	rec_t*	infimum;

	page = btr_cur_get_page(cursor);
	insert_point = btr_cur_get_rec(cursor);

	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2004
	    == page_rec_get_next(insert_point)) {
2005 2006

		infimum = page_get_infimum_rec(page);
osku's avatar
osku committed
2007 2008 2009 2010 2011 2012 2013

		/* If the convergence is in the middle of a page, include also
		the record immediately before the new insert to the upper
		page. Otherwise, we could repeatedly move from page to page
		lots of records smaller than the convergence point. */

		if (infimum != insert_point
2014
		    && page_rec_get_next(infimum) != insert_point) {
osku's avatar
osku committed
2015 2016 2017

			*split_rec = insert_point;
		} else {
2018 2019
			*split_rec = page_rec_get_next(insert_point);
		}
osku's avatar
osku committed
2020 2021 2022 2023 2024 2025 2026

		return(TRUE);
	}

	return(FALSE);
}

2027
/*************************************************************//**
osku's avatar
osku committed
2028
Decides if the page should be split at the convergence point of inserts
2029 2030
converging to the right.
@return	TRUE if split recommended */
2031
UNIV_INTERN
osku's avatar
osku committed
2032 2033 2034
ibool
btr_page_get_split_rec_to_right(
/*============================*/
2035 2036
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
	rec_t**		split_rec) /*!< out: if split recommended,
osku's avatar
osku committed
2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
				the first record on upper half page,
				or NULL if tuple to be inserted should
				be first */
{
	page_t*	page;
	rec_t*	insert_point;

	page = btr_cur_get_page(cursor);
	insert_point = btr_cur_get_rec(cursor);

	/* We use eager heuristics: if the new insert would be right after
	the previous insert on the same page, we assume that there is a
	pattern of sequential inserts here. */

	if (UNIV_LIKELY(page_header_get_ptr(page, PAGE_LAST_INSERT)
2052
			== insert_point)) {
osku's avatar
osku committed
2053 2054 2055 2056 2057 2058 2059 2060

		rec_t*	next_rec;

		next_rec = page_rec_get_next(insert_point);

		if (page_rec_is_supremum(next_rec)) {
split_at_new:
			/* Split at the new record to insert */
2061
			*split_rec = NULL;
osku's avatar
osku committed
2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074
		} else {
			rec_t*	next_next_rec = page_rec_get_next(next_rec);
			if (page_rec_is_supremum(next_next_rec)) {

				goto split_at_new;
			}

			/* If there are >= 2 user records up from the insert
			point, split all but 1 off. We want to keep one because
			then sequential inserts can use the adaptive hash
			index, as they can do the necessary checks of the right
			search position just by looking at the records on this
			page. */
2075

osku's avatar
osku committed
2076 2077 2078 2079 2080 2081 2082 2083 2084
			*split_rec = next_next_rec;
		}

		return(TRUE);
	}

	return(FALSE);
}

2085
/*************************************************************//**
osku's avatar
osku committed
2086 2087
Calculates a split record such that the tuple will certainly fit on
its half-page when the split is performed. We assume in this function
2088
only that the cursor page has at least one user record.
2089
@return split record, or NULL if tuple will be the first record on
2090
the lower or upper half-page (determined by btr_page_tuple_smaller()) */
osku's avatar
osku committed
2091 2092
static
rec_t*
2093 2094
btr_page_get_split_rec(
/*===================*/
2095 2096 2097
	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext)	/*!< in: number of externally stored columns */
osku's avatar
osku committed
2098
{
2099
	page_t*		page;
2100
	page_zip_des_t*	page_zip;
2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112
	ulint		insert_size;
	ulint		free_space;
	ulint		total_data;
	ulint		total_n_recs;
	ulint		total_space;
	ulint		incl_data;
	rec_t*		ins_rec;
	rec_t*		rec;
	rec_t*		next_rec;
	ulint		n;
	mem_heap_t*	heap;
	ulint*		offsets;
osku's avatar
osku committed
2113 2114

	page = btr_cur_get_page(cursor);
2115

2116
	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
osku's avatar
osku committed
2117 2118
	free_space  = page_get_free_space_of_empty(page_is_comp(page));

2119
	page_zip = btr_cur_get_page_zip(cursor);
2120
	if (UNIV_LIKELY_NULL(page_zip)) {
2121 2122
		/* Estimate the free space of an empty compressed page. */
		ulint	free_space_zip = page_zip_empty_size(
2123 2124
			cursor->index->n_fields,
			page_zip_get_size(page_zip));
2125 2126 2127

		if (UNIV_LIKELY(free_space > (ulint) free_space_zip)) {
			free_space = (ulint) free_space_zip;
2128 2129 2130
		}
	}

osku's avatar
osku committed
2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152
	/* free_space is now the free space of a created new page */

	total_data   = page_get_data_size(page) + insert_size;
	total_n_recs = page_get_n_recs(page) + 1;
	ut_ad(total_n_recs >= 2);
	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);

	n = 0;
	incl_data = 0;
	ins_rec = btr_cur_get_rec(cursor);
	rec = page_get_infimum_rec(page);

	heap = NULL;
	offsets = NULL;

	/* We start to include records to the left half, and when the
	space reserved by them exceeds half of total_space, then if
	the included records fit on the left page, they will be put there
	if something was left over also for the right page,
	otherwise the last included record will be the first on the right
	half page */

2153
	do {
osku's avatar
osku committed
2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
		/* Decide the next record to include */
		if (rec == ins_rec) {
			rec = NULL;	/* NULL denotes that tuple is
					now included */
		} else if (rec == NULL) {
			rec = page_rec_get_next(ins_rec);
		} else {
			rec = page_rec_get_next(rec);
		}

		if (rec == NULL) {
			/* Include tuple */
			incl_data += insert_size;
		} else {
			offsets = rec_get_offsets(rec, cursor->index,
2169 2170
						  offsets, ULINT_UNDEFINED,
						  &heap);
osku's avatar
osku committed
2171 2172 2173 2174
			incl_data += rec_offs_size(offsets);
		}

		n++;
2175 2176
	} while (incl_data + page_dir_calc_reserved_space(n)
		 < total_space / 2);
2177

2178 2179 2180 2181
	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
		/* The next record will be the first on
		the right half page if it is not the
		supremum record of page */
osku's avatar
osku committed
2182

2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194
		if (rec == ins_rec) {
			rec = NULL;

			goto func_exit;
		} else if (rec == NULL) {
			next_rec = page_rec_get_next(ins_rec);
		} else {
			next_rec = page_rec_get_next(rec);
		}
		ut_ad(next_rec);
		if (!page_rec_is_supremum(next_rec)) {
			rec = next_rec;
osku's avatar
osku committed
2195
		}
2196 2197 2198 2199 2200
	}

func_exit:
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
osku's avatar
osku committed
2201
	}
2202
	return(rec);
2203
}
osku's avatar
osku committed
2204

2205
/*************************************************************//**
osku's avatar
osku committed
2206
Returns TRUE if the insert fits on the appropriate half-page with the
2207 2208
chosen split_rec.
@return	TRUE if fits */
osku's avatar
osku committed
2209 2210 2211 2212
static
ibool
btr_page_insert_fits(
/*=================*/
2213
	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2214
				should be made */
2215
	const rec_t*	split_rec,/*!< in: suggestion for first record
2216 2217
				on upper half-page, or NULL if
				tuple to be inserted should be first */
2218
	const ulint*	offsets,/*!< in: rec_get_offsets(
2219
				split_rec, cursor->index) */
2220 2221 2222
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mem_heap_t*	heap)	/*!< in: temporary memory heap */
osku's avatar
osku committed
2223
{
2224 2225 2226 2227 2228 2229 2230 2231
	page_t*		page;
	ulint		insert_size;
	ulint		free_space;
	ulint		total_data;
	ulint		total_n_recs;
	const rec_t*	rec;
	const rec_t*	end_rec;
	ulint*		offs;
2232

osku's avatar
osku committed
2233 2234 2235 2236
	page = btr_cur_get_page(cursor);

	ut_ad(!split_rec == !offsets);
	ut_ad(!offsets
2237
	      || !page_is_comp(page) == !rec_offs_comp(offsets));
osku's avatar
osku committed
2238
	ut_ad(!offsets
2239
	      || rec_offs_validate(split_rec, cursor->index, offsets));
osku's avatar
osku committed
2240

2241
	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
osku's avatar
osku committed
2242 2243 2244 2245 2246 2247
	free_space  = page_get_free_space_of_empty(page_is_comp(page));

	/* free_space is now the free space of a created new page */

	total_data   = page_get_data_size(page) + insert_size;
	total_n_recs = page_get_n_recs(page) + 1;
2248

osku's avatar
osku committed
2249 2250 2251
	/* We determine which records (from rec to end_rec, not including
	end_rec) will end up on the other half page from tuple when it is
	inserted. */
2252

osku's avatar
osku committed
2253 2254 2255 2256 2257 2258 2259
	if (split_rec == NULL) {
		rec = page_rec_get_next(page_get_infimum_rec(page));
		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));

	} else if (cmp_dtuple_rec(tuple, split_rec, offsets) >= 0) {

		rec = page_rec_get_next(page_get_infimum_rec(page));
2260
		end_rec = split_rec;
osku's avatar
osku committed
2261 2262 2263 2264 2265 2266
	} else {
		rec = split_rec;
		end_rec = page_get_supremum_rec(page);
	}

	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2267
	    <= free_space) {
osku's avatar
osku committed
2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281

		/* Ok, there will be enough available space on the
		half page where the tuple is inserted */

		return(TRUE);
	}

	offs = NULL;

	while (rec != end_rec) {
		/* In this loop we calculate the amount of reserved
		space after rec is removed from page. */

		offs = rec_get_offsets(rec, cursor->index, offs,
2282
				       ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
2283 2284 2285 2286 2287

		total_data -= rec_offs_size(offs);
		total_n_recs--;

		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2288
		    <= free_space) {
osku's avatar
osku committed
2289 2290 2291 2292 2293 2294 2295

			/* Ok, there will be enough available space on the
			half page where the tuple is inserted */

			return(TRUE);
		}

2296
		rec = page_rec_get_next_const(rec);
osku's avatar
osku committed
2297 2298 2299
	}

	return(FALSE);
2300
}
osku's avatar
osku committed
2301

2302
/*******************************************************//**
osku's avatar
osku committed
2303 2304
Inserts a data tuple to a tree on a non-leaf level. It is assumed
that mtr holds an x-latch on the tree. */
2305
UNIV_INTERN
osku's avatar
osku committed
2306
void
2307 2308
btr_insert_on_non_leaf_level_func(
/*==============================*/
2309 2310 2311
	dict_index_t*	index,	/*!< in: index */
	ulint		level,	/*!< in: level, must be > 0 */
	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2312 2313
	const char*	file,	/*!< in: file name */
	ulint		line,	/*!< in: line where called */
2314
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2315 2316
{
	big_rec_t*	dummy_big_rec;
2317
	btr_cur_t	cursor;
osku's avatar
osku committed
2318 2319 2320 2321
	ulint		err;
	rec_t*		rec;

	ut_ad(level > 0);
2322

2323
	btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
2324
				    BTR_CONT_MODIFY_TREE,
2325
				    &cursor, 0, file, line, mtr);
osku's avatar
osku committed
2326 2327

	err = btr_cur_pessimistic_insert(BTR_NO_LOCKING_FLAG
2328 2329 2330
					 | BTR_KEEP_SYS_FLAG
					 | BTR_NO_UNDO_LOG_FLAG,
					 &cursor, tuple, &rec,
2331
					 &dummy_big_rec, 0, NULL, mtr);
osku's avatar
osku committed
2332 2333 2334
	ut_a(err == DB_SUCCESS);
}

2335
/**************************************************************//**
osku's avatar
osku committed
2336 2337 2338 2339 2340 2341
Attaches the halves of an index page on the appropriate level in an
index tree. */
static
void
btr_attach_half_pages(
/*==================*/
2342 2343
	dict_index_t*	index,		/*!< in: the index tree */
	buf_block_t*	block,		/*!< in/out: page to be split */
2344
	rec_t*		split_rec,	/*!< in: first record on upper
osku's avatar
osku committed
2345
					half page */
2346 2347 2348
	buf_block_t*	new_block,	/*!< in/out: the new half page */
	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
	mtr_t*		mtr)		/*!< in: mtr */
osku's avatar
osku committed
2349 2350
{
	ulint		space;
2351
	ulint		zip_size;
osku's avatar
osku committed
2352 2353 2354
	ulint		prev_page_no;
	ulint		next_page_no;
	ulint		level;
2355
	page_t*		page		= buf_block_get_frame(block);
osku's avatar
osku committed
2356 2357 2358 2359
	page_t*		lower_page;
	page_t*		upper_page;
	ulint		lower_page_no;
	ulint		upper_page_no;
marko's avatar
marko committed
2360 2361
	page_zip_des_t*	lower_page_zip;
	page_zip_des_t*	upper_page_zip;
osku's avatar
osku committed
2362
	dtuple_t*	node_ptr_upper;
2363
	mem_heap_t*	heap;
osku's avatar
osku committed
2364

2365 2366
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
2367 2368 2369 2370 2371 2372 2373

	/* Create a memory heap where the data tuple is stored */
	heap = mem_heap_create(1024);

	/* Based on split direction, decide upper and lower pages */
	if (direction == FSP_DOWN) {

2374 2375 2376 2377 2378 2379 2380 2381 2382
		btr_cur_t	cursor;
		ulint*		offsets;

		lower_page = buf_block_get_frame(new_block);
		lower_page_no = buf_block_get_page_no(new_block);
		lower_page_zip = buf_block_get_page_zip(new_block);
		upper_page = buf_block_get_frame(block);
		upper_page_no = buf_block_get_page_no(block);
		upper_page_zip = buf_block_get_page_zip(block);
osku's avatar
osku committed
2383

2384
		/* Look up the index for the node pointer to page */
2385 2386
		offsets = btr_page_get_father_block(NULL, heap, index,
						    block, mtr, &cursor);
osku's avatar
osku committed
2387

2388
		/* Replace the address of the old child node (= page) with the
osku's avatar
osku committed
2389 2390
		address of the new lower half */

2391
		btr_node_ptr_set_child_page_no(
2392 2393 2394
			btr_cur_get_rec(&cursor),
			btr_cur_get_page_zip(&cursor),
			offsets, lower_page_no, mtr);
osku's avatar
osku committed
2395 2396
		mem_heap_empty(heap);
	} else {
2397 2398 2399 2400 2401 2402
		lower_page = buf_block_get_frame(block);
		lower_page_no = buf_block_get_page_no(block);
		lower_page_zip = buf_block_get_page_zip(block);
		upper_page = buf_block_get_frame(new_block);
		upper_page_no = buf_block_get_page_no(new_block);
		upper_page_zip = buf_block_get_page_zip(new_block);
osku's avatar
osku committed
2403
	}
2404

osku's avatar
osku committed
2405
	/* Get the level of the split pages */
2406
	level = btr_page_get_level(buf_block_get_frame(block), mtr);
2407 2408
	ut_ad(level
	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
osku's avatar
osku committed
2409 2410 2411 2412

	/* Build the node pointer (= node key and page address) for the upper
	half */

2413 2414
	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
						   upper_page_no, heap, level);
osku's avatar
osku committed
2415 2416 2417 2418

	/* Insert it next to the pointer to the lower half. Note that this
	may generate recursion leading to a split on the higher level. */

2419
	btr_insert_on_non_leaf_level(index, level + 1, node_ptr_upper, mtr);
2420

osku's avatar
osku committed
2421 2422 2423 2424 2425 2426 2427
	/* Free the memory heap */
	mem_heap_free(heap);

	/* Get the previous and next pages of page */

	prev_page_no = btr_page_get_prev(page, mtr);
	next_page_no = btr_page_get_next(page, mtr);
2428
	space = buf_block_get_space(block);
2429
	zip_size = buf_block_get_zip_size(block);
2430

osku's avatar
osku committed
2431
	/* Update page links of the level */
2432

osku's avatar
osku committed
2433
	if (prev_page_no != FIL_NULL) {
2434 2435
		buf_block_t*	prev_block = btr_block_get(
			space, zip_size, prev_page_no, RW_X_LATCH, index, mtr);
2436
#ifdef UNIV_BTR_DEBUG
2437 2438
		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
		ut_a(btr_page_get_next(prev_block->frame, mtr)
2439
		     == buf_block_get_page_no(block));
2440
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2441

2442 2443
		btr_page_set_next(buf_block_get_frame(prev_block),
				  buf_block_get_page_zip(prev_block),
2444
				  lower_page_no, mtr);
osku's avatar
osku committed
2445 2446 2447
	}

	if (next_page_no != FIL_NULL) {
2448 2449
		buf_block_t*	next_block = btr_block_get(
			space, zip_size, next_page_no, RW_X_LATCH, index, mtr);
2450 2451 2452 2453 2454
#ifdef UNIV_BTR_DEBUG
		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
		ut_a(btr_page_get_prev(next_block->frame, mtr)
		     == page_get_page_no(page));
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2455

2456 2457
		btr_page_set_prev(buf_block_get_frame(next_block),
				  buf_block_get_page_zip(next_block),
2458
				  upper_page_no, mtr);
osku's avatar
osku committed
2459
	}
2460

marko's avatar
marko committed
2461 2462
	btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
osku's avatar
osku committed
2463

marko's avatar
marko committed
2464 2465
	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
	btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
osku's avatar
osku committed
2466 2467
}

2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498
/*************************************************************//**
Determine if a tuple is smaller than any record on the page.
@return TRUE if smaller */
static
ibool
btr_page_tuple_smaller(
/*===================*/
	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
	const dtuple_t*	tuple,	/*!< in: tuple to consider */
	ulint*		offsets,/*!< in/out: temporary storage */
	ulint		n_uniq,	/*!< in: number of unique fields
				in the index page records */
	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
{
	buf_block_t*	block;
	const rec_t*	first_rec;
	page_cur_t	pcur;

	/* Read the first user record in the page. */
	block = btr_cur_get_block(cursor);
	page_cur_set_before_first(block, &pcur);
	page_cur_move_to_next(&pcur);
	first_rec = page_cur_get_rec(&pcur);

	offsets = rec_get_offsets(
		first_rec, cursor->index, offsets,
		n_uniq, heap);

	return(cmp_dtuple_rec(tuple, first_rec, offsets) < 0);
}

2499
/*************************************************************//**
osku's avatar
osku committed
2500
Splits an index page to halves and inserts the tuple. It is assumed
2501 2502 2503 2504
that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
released within this function! NOTE that the operation of this
function must always succeed, we cannot reverse it: therefore enough
free disk space (2 pages) must be guaranteed to be available before
2505
this function is called.
2506 2507

@return inserted record */
2508
UNIV_INTERN
osku's avatar
osku committed
2509 2510 2511
rec_t*
btr_page_split_and_insert(
/*======================*/
2512
	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
osku's avatar
osku committed
2513 2514
				function returns, the cursor is positioned
				on the predecessor of the inserted record */
2515 2516 2517
	const dtuple_t*	tuple,	/*!< in: tuple to insert */
	ulint		n_ext,	/*!< in: number of externally stored columns */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2518
{
2519
	buf_block_t*	block;
2520
	page_t*		page;
marko's avatar
marko committed
2521
	page_zip_des_t*	page_zip;
2522
	ulint		page_no;
osku's avatar
osku committed
2523
	byte		direction;
2524
	ulint		hint_page_no;
2525
	buf_block_t*	new_block;
2526
	page_t*		new_page;
2527
	page_zip_des_t*	new_page_zip;
osku's avatar
osku committed
2528
	rec_t*		split_rec;
2529 2530 2531
	buf_block_t*	left_block;
	buf_block_t*	right_block;
	buf_block_t*	insert_block;
osku's avatar
osku committed
2532 2533 2534 2535 2536
	page_cur_t*	page_cursor;
	rec_t*		first_rec;
	byte*		buf = 0; /* remove warning */
	rec_t*		move_limit;
	ibool		insert_will_fit;
marko's avatar
marko committed
2537
	ibool		insert_left;
osku's avatar
osku committed
2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548
	ulint		n_iterations = 0;
	rec_t*		rec;
	mem_heap_t*	heap;
	ulint		n_uniq;
	ulint*		offsets;

	heap = mem_heap_create(1024);
	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
func_start:
	mem_heap_empty(heap);
	offsets = NULL;
2549

2550
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
2551
				MTR_MEMO_X_LOCK));
osku's avatar
osku committed
2552
#ifdef UNIV_SYNC_DEBUG
2553
	ut_ad(rw_lock_own(dict_index_get_lock(cursor->index), RW_LOCK_EX));
osku's avatar
osku committed
2554 2555
#endif /* UNIV_SYNC_DEBUG */

2556
	block = btr_cur_get_block(cursor);
2557 2558
	page = buf_block_get_frame(block);
	page_zip = buf_block_get_page_zip(block);
osku's avatar
osku committed
2559

2560
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2561
	ut_ad(page_get_n_recs(page) >= 1);
2562

2563
	page_no = buf_block_get_page_no(block);
osku's avatar
osku committed
2564 2565 2566 2567

	/* 1. Decide the split record; split_rec == NULL means that the
	tuple to be inserted should be the first record on the upper
	half-page */
Vasil Dimov's avatar
Vasil Dimov committed
2568
	insert_left = FALSE;
osku's avatar
osku committed
2569 2570 2571

	if (n_iterations > 0) {
		direction = FSP_UP;
2572
		hint_page_no = page_no + 1;
2573
		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2574

2575 2576 2577 2578
		if (UNIV_UNLIKELY(split_rec == NULL)) {
			insert_left = btr_page_tuple_smaller(
				cursor, tuple, offsets, n_uniq, &heap);
		}
osku's avatar
osku committed
2579 2580
	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
		direction = FSP_UP;
2581
		hint_page_no = page_no + 1;
osku's avatar
osku committed
2582 2583 2584

	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
		direction = FSP_DOWN;
2585
		hint_page_no = page_no - 1;
2586
		ut_ad(split_rec);
osku's avatar
osku committed
2587 2588
	} else {
		direction = FSP_UP;
2589
		hint_page_no = page_no + 1;
2590

2591 2592 2593 2594 2595 2596
		/* If there is only one record in the index page, we
		can't split the node in the middle by default. We need
		to determine whether the new record will be inserted
		to the left or right. */

		if (page_get_n_recs(page) > 1) {
2597
			split_rec = page_get_middle_rec(page);
2598 2599 2600 2601 2602 2603
		} else if (btr_page_tuple_smaller(cursor, tuple,
						  offsets, n_uniq, &heap)) {
			split_rec = page_rec_get_next(
				page_get_infimum_rec(page));
		} else {
			split_rec = NULL;
2604
		}
osku's avatar
osku committed
2605 2606
	}

2607
	/* 2. Allocate a new page to the index */
2608
	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2609
				   btr_page_get_level(page, mtr), mtr, mtr);
2610 2611
	new_page = buf_block_get_frame(new_block);
	new_page_zip = buf_block_get_page_zip(new_block);
2612
	btr_page_create(new_block, new_page_zip, cursor->index,
2613
			btr_page_get_level(page, mtr), mtr);
2614

osku's avatar
osku committed
2615 2616 2617 2618
	/* 3. Calculate the first record on the upper half-page, and the
	first record (move_limit) on original page which ends up on the
	upper half */

marko's avatar
marko committed
2619 2620
	if (split_rec) {
		first_rec = move_limit = split_rec;
2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632

		offsets = rec_get_offsets(split_rec, cursor->index, offsets,
					  n_uniq, &heap);

		insert_left = cmp_dtuple_rec(tuple, split_rec, offsets) < 0;

		if (UNIV_UNLIKELY(!insert_left && new_page_zip
				  && n_iterations > 0)) {
			/* If a compressed page has already been split,
			avoid further splits by inserting the record
			to an empty page. */
			split_rec = NULL;
2633
			goto insert_empty;
2634
		}
Vasil Dimov's avatar
Vasil Dimov committed
2635
	} else if (UNIV_UNLIKELY(insert_left)) {
2636
		ut_a(n_iterations > 0);
Vasil Dimov's avatar
Vasil Dimov committed
2637 2638
		first_rec = page_rec_get_next(page_get_infimum_rec(page));
		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
osku's avatar
osku committed
2639
	} else {
2640 2641
insert_empty:
		ut_ad(!split_rec);
Vasil Dimov's avatar
Vasil Dimov committed
2642
		ut_ad(!insert_left);
2643
		buf = mem_alloc(rec_get_converted_size(cursor->index,
2644
						       tuple, n_ext));
osku's avatar
osku committed
2645

2646
		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2647
						      tuple, n_ext);
osku's avatar
osku committed
2648 2649
		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
	}
2650

osku's avatar
osku committed
2651 2652
	/* 4. Do first the modifications in the tree structure */

2653 2654
	btr_attach_half_pages(cursor->index, block,
			      first_rec, new_block, direction, mtr);
osku's avatar
osku committed
2655 2656 2657 2658 2659 2660 2661

	/* If the split is made on the leaf level and the insert will fit
	on the appropriate half-page, we may release the tree x-latch.
	We can then move the records after releasing the tree latch,
	thus reducing the tree latch contention. */

	if (split_rec) {
2662 2663 2664
		insert_will_fit = !new_page_zip
			&& btr_page_insert_fits(cursor, split_rec,
						offsets, tuple, n_ext, heap);
osku's avatar
osku committed
2665
	} else {
Vasil Dimov's avatar
Vasil Dimov committed
2666 2667 2668 2669 2670
		if (!insert_left) {
			mem_free(buf);
			buf = NULL;
		}

2671 2672 2673
		insert_will_fit = !new_page_zip
			&& btr_page_insert_fits(cursor, NULL,
						NULL, tuple, n_ext, heap);
osku's avatar
osku committed
2674
	}
2675

2676
	if (insert_will_fit && page_is_leaf(page)) {
osku's avatar
osku committed
2677

2678
		mtr_memo_release(mtr, dict_index_get_lock(cursor->index),
2679
				 MTR_MEMO_X_LOCK);
osku's avatar
osku committed
2680 2681 2682
	}

	/* 5. Move then the records to the new page */
Vasil Dimov's avatar
Vasil Dimov committed
2683
	if (direction == FSP_DOWN) {
2684
		/*		fputs("Split left\n", stderr); */
marko's avatar
marko committed
2685

2686 2687 2688 2689 2690
		if (0
#ifdef UNIV_ZIP_COPY
		    || page_zip
#endif /* UNIV_ZIP_COPY */
		    || UNIV_UNLIKELY
2691
		    (!page_move_rec_list_start(new_block, block, move_limit,
2692
					       cursor->index, mtr))) {
2693 2694 2695 2696 2697 2698 2699
			/* For some reason, compressing new_page failed,
			even though it should contain fewer records than
			the original page.  Copy the page byte for byte
			and then delete the records from both pages
			as appropriate.  Deleting will always succeed. */
			ut_a(new_page_zip);

2700 2701
			page_zip_copy_recs(new_page_zip, new_page,
					   page_zip, page, cursor->index, mtr);
2702 2703
			page_delete_rec_list_end(move_limit - page + new_page,
						 new_block, cursor->index,
2704
						 ULINT_UNDEFINED,
2705
						 ULINT_UNDEFINED, mtr);
2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717

			/* Update the lock table and possible hash index. */

			lock_move_rec_list_start(
				new_block, block, move_limit,
				new_page + PAGE_NEW_INFIMUM);

			btr_search_move_or_delete_hash_entries(
				new_block, block, cursor->index);

			/* Delete the records from the source page. */

2718 2719
			page_delete_rec_list_start(move_limit, block,
						   cursor->index, mtr);
2720
		}
osku's avatar
osku committed
2721

2722 2723
		left_block = new_block;
		right_block = block;
osku's avatar
osku committed
2724

2725
		lock_update_split_left(right_block, left_block);
osku's avatar
osku committed
2726
	} else {
2727
		/*		fputs("Split right\n", stderr); */
osku's avatar
osku committed
2728

2729 2730 2731 2732 2733
		if (0
#ifdef UNIV_ZIP_COPY
		    || page_zip
#endif /* UNIV_ZIP_COPY */
		    || UNIV_UNLIKELY
2734
		    (!page_move_rec_list_end(new_block, block, move_limit,
2735
					     cursor->index, mtr))) {
2736 2737 2738 2739 2740 2741 2742
			/* For some reason, compressing new_page failed,
			even though it should contain fewer records than
			the original page.  Copy the page byte for byte
			and then delete the records from both pages
			as appropriate.  Deleting will always succeed. */
			ut_a(new_page_zip);

2743 2744
			page_zip_copy_recs(new_page_zip, new_page,
					   page_zip, page, cursor->index, mtr);
2745
			page_delete_rec_list_start(move_limit - page
2746 2747
						   + new_page, new_block,
						   cursor->index, mtr);
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757

			/* Update the lock table and possible hash index. */

			lock_move_rec_list_end(new_block, block, move_limit);

			btr_search_move_or_delete_hash_entries(
				new_block, block, cursor->index);

			/* Delete the records from the source page. */

2758 2759
			page_delete_rec_list_end(move_limit, block,
						 cursor->index,
2760
						 ULINT_UNDEFINED,
2761
						 ULINT_UNDEFINED, mtr);
2762 2763
		}

2764 2765
		left_block = block;
		right_block = new_block;
2766

2767
		lock_update_split_right(right_block, left_block);
osku's avatar
osku committed
2768 2769
	}

2770 2771 2772 2773 2774 2775 2776
#ifdef UNIV_ZIP_DEBUG
	if (UNIV_LIKELY_NULL(page_zip)) {
		ut_a(page_zip_validate(page_zip, page));
		ut_a(page_zip_validate(new_page_zip, new_page));
	}
#endif /* UNIV_ZIP_DEBUG */

marko's avatar
marko committed
2777 2778 2779
	/* At this point, split_rec, move_limit and first_rec may point
	to garbage on the old page. */

osku's avatar
osku committed
2780 2781 2782
	/* 6. The split and the tree modification is now completed. Decide the
	page where the tuple should be inserted */

marko's avatar
marko committed
2783
	if (insert_left) {
2784
		insert_block = left_block;
osku's avatar
osku committed
2785
	} else {
2786
		insert_block = right_block;
osku's avatar
osku committed
2787 2788 2789 2790 2791
	}

	/* 7. Reposition the cursor for insert and try insertion */
	page_cursor = btr_cur_get_page_cur(cursor);

2792
	page_cur_search(insert_block, cursor->index, tuple,
2793
			PAGE_CUR_LE, page_cursor);
osku's avatar
osku committed
2794

2795
	rec = page_cur_tuple_insert(page_cursor, tuple,
2796
				    cursor->index, n_ext, mtr);
marko's avatar
marko committed
2797

2798
#ifdef UNIV_ZIP_DEBUG
2799
	{
2800 2801 2802
		page_t*		insert_page
			= buf_block_get_frame(insert_block);

2803 2804
		page_zip_des_t*	insert_page_zip
			= buf_block_get_page_zip(insert_block);
2805

2806 2807 2808
		ut_a(!insert_page_zip
		     || page_zip_validate(insert_page_zip, insert_page));
	}
2809
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
2810

marko's avatar
marko committed
2811
	if (UNIV_LIKELY(rec != NULL)) {
2812 2813

		goto func_exit;
osku's avatar
osku committed
2814
	}
2815

2816
	/* 8. If insert did not fit, try page reorganization */
osku's avatar
osku committed
2817

2818
	if (UNIV_UNLIKELY
2819
	    (!btr_page_reorganize(insert_block, cursor->index, mtr))) {
2820 2821 2822

		goto insert_failed;
	}
osku's avatar
osku committed
2823

2824
	page_cur_search(insert_block, cursor->index, tuple,
2825
			PAGE_CUR_LE, page_cursor);
2826
	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2827
				    n_ext, mtr);
marko's avatar
marko committed
2828 2829

	if (UNIV_UNLIKELY(rec == NULL)) {
osku's avatar
osku committed
2830 2831
		/* The insert did not fit on the page: loop back to the
		start of the function for a new split */
2832
insert_failed:
2833
		/* We play safe and reset the free bits for new_page */
2834 2835 2836
		if (!dict_index_is_clust(cursor->index)) {
			ibuf_reset_free_bits(new_block);
		}
osku's avatar
osku committed
2837 2838

		/* fprintf(stderr, "Split second round %lu\n",
2839
		page_get_page_no(page)); */
osku's avatar
osku committed
2840
		n_iterations++;
2841 2842
		ut_ad(n_iterations < 2
		      || buf_block_get_page_zip(insert_block));
2843
		ut_ad(!insert_will_fit);
osku's avatar
osku committed
2844 2845 2846 2847

		goto func_start;
	}

2848
func_exit:
osku's avatar
osku committed
2849 2850 2851
	/* Insert fit on the page: update the free bits for the
	left and right pages in the same mtr */

2852 2853 2854 2855 2856 2857
	if (!dict_index_is_clust(cursor->index) && page_is_leaf(page)) {
		ibuf_update_free_bits_for_two_pages_low(
			buf_block_get_zip_size(left_block),
			left_block, right_block, mtr);
	}

2858 2859
#if 0
	fprintf(stderr, "Split and insert done %lu %lu\n",
2860 2861
		buf_block_get_page_no(left_block),
		buf_block_get_page_no(right_block));
2862
#endif
osku's avatar
osku committed
2863

2864
	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2865
	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
osku's avatar
osku committed
2866 2867 2868 2869 2870

	mem_heap_free(heap);
	return(rec);
}

2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892
#ifdef UNIV_SYNC_DEBUG
/*************************************************************//**
Removes a page from the level list of pages.
@param space	in: space where removed
@param zip_size	in: compressed page size in bytes, or 0 for uncompressed
@param page	in/out: page to remove
@param index	in: index tree
@param mtr	in/out: mini-transaction */
# define btr_level_list_remove(space,zip_size,page,index,mtr)		\
	btr_level_list_remove_func(space,zip_size,page,index,mtr)
#else /* UNIV_SYNC_DEBUG */
/*************************************************************//**
Removes a page from the level list of pages.
@param space	in: space where removed
@param zip_size	in: compressed page size in bytes, or 0 for uncompressed
@param page	in/out: page to remove
@param index	in: index tree
@param mtr	in/out: mini-transaction */
# define btr_level_list_remove(space,zip_size,page,index,mtr)		\
	btr_level_list_remove_func(space,zip_size,page,mtr)
#endif /* UNIV_SYNC_DEBUG */

2893
/*************************************************************//**
osku's avatar
osku committed
2894
Removes a page from the level list of pages. */
2895
static __attribute__((nonnull))
osku's avatar
osku committed
2896
void
2897 2898 2899 2900 2901 2902 2903 2904 2905 2906
btr_level_list_remove_func(
/*=======================*/
	ulint			space,	/*!< in: space where removed */
	ulint			zip_size,/*!< in: compressed page size in bytes
					or 0 for uncompressed pages */
	page_t*			page,	/*!< in/out: page to remove */
#ifdef UNIV_SYNC_DEBUG
	const dict_index_t*	index,	/*!< in: index tree */
#endif /* UNIV_SYNC_DEBUG */
	mtr_t*			mtr)	/*!< in/out: mini-transaction */
2907
{
osku's avatar
osku committed
2908 2909
	ulint	prev_page_no;
	ulint	next_page_no;
2910

2911
	ut_ad(page && mtr);
2912
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
2913
	ut_ad(space == page_get_space_id(page));
osku's avatar
osku committed
2914 2915 2916 2917
	/* Get the previous and next page numbers of page */

	prev_page_no = btr_page_get_prev(page, mtr);
	next_page_no = btr_page_get_next(page, mtr);
2918

osku's avatar
osku committed
2919
	/* Update page links of the level */
2920

osku's avatar
osku committed
2921
	if (prev_page_no != FIL_NULL) {
2922
		buf_block_t*	prev_block
2923
			= btr_block_get(space, zip_size, prev_page_no,
2924
					RW_X_LATCH, index, mtr);
2925 2926
		page_t*		prev_page
			= buf_block_get_frame(prev_block);
2927
#ifdef UNIV_BTR_DEBUG
2928
		ut_a(page_is_comp(prev_page) == page_is_comp(page));
2929
		ut_a(btr_page_get_next(prev_page, mtr)
2930
		     == page_get_page_no(page));
2931
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2932

2933 2934
		btr_page_set_next(prev_page,
				  buf_block_get_page_zip(prev_block),
2935
				  next_page_no, mtr);
osku's avatar
osku committed
2936 2937 2938
	}

	if (next_page_no != FIL_NULL) {
2939
		buf_block_t*	next_block
2940
			= btr_block_get(space, zip_size, next_page_no,
2941
					RW_X_LATCH, index, mtr);
2942 2943
		page_t*		next_page
			= buf_block_get_frame(next_block);
2944
#ifdef UNIV_BTR_DEBUG
2945
		ut_a(page_is_comp(next_page) == page_is_comp(page));
2946
		ut_a(btr_page_get_prev(next_page, mtr)
2947
		     == page_get_page_no(page));
2948
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2949

2950 2951
		btr_page_set_prev(next_page,
				  buf_block_get_page_zip(next_block),
2952
				  prev_page_no, mtr);
osku's avatar
osku committed
2953 2954
	}
}
2955

2956
/****************************************************************//**
osku's avatar
osku committed
2957 2958 2959 2960 2961 2962
Writes the redo log record for setting an index record as the predefined
minimum record. */
UNIV_INLINE
void
btr_set_min_rec_mark_log(
/*=====================*/
2963 2964 2965
	rec_t*	rec,	/*!< in: record */
	byte	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or MLOG_REC_MIN_MARK */
	mtr_t*	mtr)	/*!< in: mtr */
osku's avatar
osku committed
2966
{
marko's avatar
marko committed
2967
	mlog_write_initial_log_record(rec, type, mtr);
osku's avatar
osku committed
2968 2969

	/* Write rec offset as a 2-byte ulint */
2970
	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
osku's avatar
osku committed
2971
}
2972 2973 2974
#else /* !UNIV_HOTBACKUP */
# define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
2975

2976
/****************************************************************//**
osku's avatar
osku committed
2977
Parses the redo log record for setting an index record as the predefined
2978 2979
minimum record.
@return	end of log record or NULL */
2980
UNIV_INTERN
osku's avatar
osku committed
2981 2982 2983
byte*
btr_parse_set_min_rec_mark(
/*=======================*/
2984 2985 2986 2987 2988
	byte*	ptr,	/*!< in: buffer */
	byte*	end_ptr,/*!< in: buffer end */
	ulint	comp,	/*!< in: nonzero=compact page format */
	page_t*	page,	/*!< in: page or NULL */
	mtr_t*	mtr)	/*!< in: mtr or NULL */
osku's avatar
osku committed
2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001
{
	rec_t*	rec;

	if (end_ptr < ptr + 2) {

		return(NULL);
	}

	if (page) {
		ut_a(!page_is_comp(page) == !comp);

		rec = page + mach_read_from_2(ptr);

3002
		btr_set_min_rec_mark(rec, mtr);
osku's avatar
osku committed
3003 3004 3005 3006 3007
	}

	return(ptr + 2);
}

3008
/****************************************************************//**
osku's avatar
osku committed
3009
Sets a record as the predefined minimum record. */
3010
UNIV_INTERN
osku's avatar
osku committed
3011 3012 3013
void
btr_set_min_rec_mark(
/*=================*/
3014 3015
	rec_t*	rec,	/*!< in: record */
	mtr_t*	mtr)	/*!< in: mtr */
osku's avatar
osku committed
3016 3017 3018
{
	ulint	info_bits;

marko's avatar
marko committed
3019 3020
	if (UNIV_LIKELY(page_rec_is_comp(rec))) {
		info_bits = rec_get_info_bits(rec, TRUE);
osku's avatar
osku committed
3021

3022
		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
marko's avatar
marko committed
3023 3024 3025 3026 3027 3028 3029 3030 3031

		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
	} else {
		info_bits = rec_get_info_bits(rec, FALSE);

		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);

		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
	}
osku's avatar
osku committed
3032 3033
}

3034
#ifndef UNIV_HOTBACKUP
3035
/*************************************************************//**
osku's avatar
osku committed
3036
Deletes on the upper level the node pointer to a page. */
3037
UNIV_INTERN
osku's avatar
osku committed
3038 3039 3040
void
btr_node_ptr_delete(
/*================*/
3041 3042 3043
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3044 3045 3046 3047
{
	btr_cur_t	cursor;
	ibool		compressed;
	ulint		err;
3048

3049
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
3050

3051 3052
	/* Delete node pointer on father page */
	btr_page_get_father(index, block, mtr, &cursor);
osku's avatar
osku committed
3053

3054
	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor, RB_NONE,
3055
						mtr);
osku's avatar
osku committed
3056 3057 3058
	ut_a(err == DB_SUCCESS);

	if (!compressed) {
3059
		btr_cur_compress_if_useful(&cursor, FALSE, mtr);
osku's avatar
osku committed
3060 3061 3062
	}
}

3063
/*************************************************************//**
osku's avatar
osku committed
3064
If page is the only on its level, this function moves its records to the
3065 3066
father page, thus reducing the tree height.
@return father block */
osku's avatar
osku committed
3067
static
3068
buf_block_t*
osku's avatar
osku committed
3069 3070
btr_lift_page_up(
/*=============*/
3071 3072
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page which is the only on its level;
osku's avatar
osku committed
3073 3074 3075
				must not be empty: use
				btr_discard_only_page_on_level if the last
				record from the page should be removed */
3076
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3077
{
3078
	buf_block_t*	father_block;
osku's avatar
osku committed
3079 3080
	page_t*		father_page;
	ulint		page_level;
marko's avatar
marko committed
3081
	page_zip_des_t*	father_page_zip;
3082
	page_t*		page		= buf_block_get_frame(block);
3083 3084
	ulint		root_page_no;
	buf_block_t*	blocks[BTR_MAX_LEVELS];
3085
	ulint		n_blocks;	/*!< last used index in blocks[] */
3086
	ulint		i;
osku's avatar
osku committed
3087 3088 3089

	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3090
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3091

3092 3093 3094
	page_level = btr_page_get_level(page, mtr);
	root_page_no = dict_index_get_page(index);

3095 3096 3097 3098
	{
		btr_cur_t	cursor;
		mem_heap_t*	heap	= mem_heap_create(100);
		ulint*		offsets;
3099
		buf_block_t*	b;
3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113

		offsets = btr_page_get_father_block(NULL, heap, index,
						    block, mtr, &cursor);
		father_block = btr_cur_get_block(&cursor);
		father_page_zip = buf_block_get_page_zip(father_block);
		father_page = buf_block_get_frame(father_block);

		n_blocks = 0;

		/* Store all ancestor pages so we can reset their
		levels later on.  We have to do all the searches on
		the tree now because later on, after we've replaced
		the first level, the tree is in an inconsistent state
		and can not be searched. */
3114 3115
		for (b = father_block;
		     buf_block_get_page_no(b) != root_page_no; ) {
3116 3117 3118
			ut_a(n_blocks < BTR_MAX_LEVELS);

			offsets = btr_page_get_father_block(offsets, heap,
3119
							    index, b,
3120 3121
							    mtr, &cursor);

3122
			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3123
		}
marko's avatar
marko committed
3124

3125 3126
		mem_heap_free(heap);
	}
osku's avatar
osku committed
3127

3128
	btr_search_drop_page_hash_index(block);
3129

osku's avatar
osku committed
3130
	/* Make the father empty */
3131
	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
osku's avatar
osku committed
3132

3133
	/* Copy the records to the father page one by one. */
3134 3135 3136 3137 3138
	if (0
#ifdef UNIV_ZIP_COPY
	    || father_page_zip
#endif /* UNIV_ZIP_COPY */
	    || UNIV_UNLIKELY
3139
	    (!page_copy_rec_list_end(father_block, block,
3140 3141
				     page_get_infimum_rec(page),
				     index, mtr))) {
3142 3143
		const page_zip_des_t*	page_zip
			= buf_block_get_page_zip(block);
3144
		ut_a(father_page_zip);
3145
		ut_a(page_zip);
3146 3147

		/* Copy the page byte for byte. */
3148 3149
		page_zip_copy_recs(father_page_zip, father_page,
				   page_zip, page, index, mtr);
3150 3151 3152 3153 3154 3155 3156 3157

		/* Update the lock table and possible hash index. */

		lock_move_rec_list_end(father_block, block,
				       page_get_infimum_rec(page));

		btr_search_move_or_delete_hash_entries(father_block, block,
						       index);
marko's avatar
marko committed
3158
	}
osku's avatar
osku committed
3159

3160
	btr_blob_dbg_remove(page, index, "btr_lift_page_up");
3161
	lock_update_copy_and_discard(father_block, block);
marko's avatar
marko committed
3162

3163
	/* Go upward to root page, decrementing levels by one. */
3164
	for (i = 0; i < n_blocks; i++, page_level++) {
3165 3166
		page_t*		page	= buf_block_get_frame(blocks[i]);
		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3167 3168 3169

		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);

3170 3171 3172 3173
		btr_page_set_level(page, page_zip, page_level, mtr);
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
3174 3175
	}

osku's avatar
osku committed
3176
	/* Free the file page */
3177
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
3178

3179
	/* We play it safe and reset the free bits for the father */
3180 3181 3182
	if (!dict_index_is_clust(index)) {
		ibuf_reset_free_bits(father_block);
	}
osku's avatar
osku committed
3183
	ut_ad(page_validate(father_page, index));
3184
	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3185 3186

	return(father_block);
3187
}
osku's avatar
osku committed
3188

3189
/*************************************************************//**
osku's avatar
osku committed
3190 3191 3192 3193 3194 3195 3196
Tries to merge the page first to the left immediate brother if such a
brother exists, and the node pointers to the current page and to the brother
reside on the same page. If the left brother does not satisfy these
conditions, looks at the right brother. If the page is the only one on that
level lifts the records of the page to the father page, thus reducing the
tree height. It is assumed that mtr holds an x-latch on the tree and on the
page. If cursor is on the leaf level, mtr must also hold x-latches to the
3197 3198
brothers, if they exist.
@return	TRUE on success */
3199
UNIV_INTERN
marko's avatar
marko committed
3200
ibool
osku's avatar
osku committed
3201 3202
btr_compress(
/*=========*/
3203 3204 3205 3206 3207 3208 3209
	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
				or lift; the page must not be empty:
				when deleting records, use btr_discard_page()
				if the page would become empty */
	ibool		adjust,	/*!< in: TRUE if should adjust the
				cursor position even if compression occurs */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
osku's avatar
osku committed
3210
{
3211
	dict_index_t*	index;
osku's avatar
osku committed
3212
	ulint		space;
3213
	ulint		zip_size;
osku's avatar
osku committed
3214 3215
	ulint		left_page_no;
	ulint		right_page_no;
3216
	buf_block_t*	merge_block;
osku's avatar
osku committed
3217
	page_t*		merge_page;
3218
	page_zip_des_t*	merge_page_zip;
osku's avatar
osku committed
3219
	ibool		is_left;
3220
	buf_block_t*	block;
osku's avatar
osku committed
3221
	page_t*		page;
3222 3223 3224
	btr_cur_t	father_cursor;
	mem_heap_t*	heap;
	ulint*		offsets;
osku's avatar
osku committed
3225 3226
	ulint		data_size;
	ulint		n_recs;
3227
	ulint		nth_rec = 0; /* remove bogus warning */
osku's avatar
osku committed
3228 3229 3230
	ulint		max_ins_size;
	ulint		max_ins_size_reorg;

3231
	block = btr_cur_get_block(cursor);
3232
	page = btr_cur_get_page(cursor);
3233
	index = btr_cur_get_index(cursor);
3234 3235

	btr_assert_not_corrupted(block, index);
osku's avatar
osku committed
3236

3237
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3238
				MTR_MEMO_X_LOCK));
3239
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3240
	space = dict_index_get_space(index);
3241
	zip_size = dict_table_zip_size(index->table);
osku's avatar
osku committed
3242 3243 3244 3245

	left_page_no = btr_page_get_prev(page, mtr);
	right_page_no = btr_page_get_next(page, mtr);

3246 3247 3248 3249
#if 0
	fprintf(stderr, "Merge left page %lu right %lu \n",
		left_page_no, right_page_no);
#endif
osku's avatar
osku committed
3250

3251 3252 3253
	heap = mem_heap_create(100);
	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
					    &father_cursor);
osku's avatar
osku committed
3254

3255 3256 3257
	if (adjust) {
		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
	}
3258

osku's avatar
osku committed
3259 3260 3261
	/* Decide the page to which we try to merge and which will inherit
	the locks */

marko's avatar
marko committed
3262 3263 3264
	is_left = left_page_no != FIL_NULL;

	if (is_left) {
osku's avatar
osku committed
3265

3266
		merge_block = btr_block_get(space, zip_size, left_page_no,
3267
					    RW_X_LATCH, index, mtr);
3268
		merge_page = buf_block_get_frame(merge_block);
3269
#ifdef UNIV_BTR_DEBUG
3270 3271
		ut_a(btr_page_get_next(merge_page, mtr)
		     == buf_block_get_page_no(block));
3272
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3273 3274
	} else if (right_page_no != FIL_NULL) {

3275
		merge_block = btr_block_get(space, zip_size, right_page_no,
3276
					    RW_X_LATCH, index, mtr);
3277
		merge_page = buf_block_get_frame(merge_block);
3278
#ifdef UNIV_BTR_DEBUG
3279 3280
		ut_a(btr_page_get_prev(merge_page, mtr)
		     == buf_block_get_page_no(block));
3281
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3282 3283 3284
	} else {
		/* The page is the only one on the level, lift the records
		to the father */
3285

3286 3287
		merge_block = btr_lift_page_up(index, block, mtr);
		goto func_exit;
osku's avatar
osku committed
3288
	}
3289

osku's avatar
osku committed
3290 3291
	n_recs = page_get_n_recs(page);
	data_size = page_get_data_size(page);
3292
#ifdef UNIV_BTR_DEBUG
marko's avatar
marko committed
3293
	ut_a(page_is_comp(merge_page) == page_is_comp(page));
3294
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3295

3296 3297
	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
		merge_page, n_recs);
osku's avatar
osku committed
3298 3299 3300
	if (data_size > max_ins_size_reorg) {

		/* No space for merge */
3301
err_exit:
3302 3303 3304 3305 3306 3307 3308
		/* We play it safe and reset the free bits. */
		if (zip_size
		    && page_is_leaf(merge_page)
		    && !dict_index_is_clust(index)) {
			ibuf_reset_free_bits(merge_block);
		}

3309
		mem_heap_free(heap);
marko's avatar
marko committed
3310
		return(FALSE);
osku's avatar
osku committed
3311 3312
	}

3313
	ut_ad(page_validate(merge_page, index));
osku's avatar
osku committed
3314 3315

	max_ins_size = page_get_max_insert_size(merge_page, n_recs);
3316

marko's avatar
marko committed
3317
	if (UNIV_UNLIKELY(data_size > max_ins_size)) {
osku's avatar
osku committed
3318 3319 3320

		/* We have to reorganize merge_page */

3321
		if (UNIV_UNLIKELY(!btr_page_reorganize(merge_block,
3322
						       index, mtr))) {
3323

3324
			goto err_exit;
3325
		}
osku's avatar
osku committed
3326 3327 3328

		max_ins_size = page_get_max_insert_size(merge_page, n_recs);

3329
		ut_ad(page_validate(merge_page, index));
3330
		ut_ad(max_ins_size == max_ins_size_reorg);
osku's avatar
osku committed
3331

marko's avatar
marko committed
3332
		if (UNIV_UNLIKELY(data_size > max_ins_size)) {
osku's avatar
osku committed
3333

marko's avatar
marko committed
3334 3335
			/* Add fault tolerance, though this should
			never happen */
osku's avatar
osku committed
3336

3337
			goto err_exit;
marko's avatar
marko committed
3338
		}
osku's avatar
osku committed
3339 3340
	}

3341
	merge_page_zip = buf_block_get_page_zip(merge_block);
3342
#ifdef UNIV_ZIP_DEBUG
marko's avatar
marko committed
3343
	if (UNIV_LIKELY_NULL(merge_page_zip)) {
3344 3345 3346
		const page_zip_des_t*	page_zip
			= buf_block_get_page_zip(block);
		ut_a(page_zip);
marko's avatar
marko committed
3347
		ut_a(page_zip_validate(merge_page_zip, merge_page));
3348
		ut_a(page_zip_validate(page_zip, page));
marko's avatar
marko committed
3349
	}
3350
#endif /* UNIV_ZIP_DEBUG */
3351

osku's avatar
osku committed
3352 3353
	/* Move records to the merge page */
	if (is_left) {
3354
		rec_t*	orig_pred = page_copy_rec_list_start(
3355 3356
			merge_block, block, page_get_supremum_rec(page),
			index, mtr);
3357 3358

		if (UNIV_UNLIKELY(!orig_pred)) {
3359
			goto err_exit;
marko's avatar
marko committed
3360
		}
osku's avatar
osku committed
3361

3362
		btr_search_drop_page_hash_index(block);
3363 3364

		/* Remove the page from the level list */
3365
		btr_level_list_remove(space, zip_size, page, index, mtr);
3366

3367
		btr_node_ptr_delete(index, block, mtr);
3368
		lock_update_merge_left(merge_block, orig_pred, block);
3369 3370 3371 3372

		if (adjust) {
			nth_rec += page_rec_get_n_recs_before(orig_pred);
		}
osku's avatar
osku committed
3373
	} else {
3374
		rec_t*		orig_succ;
3375
#ifdef UNIV_BTR_DEBUG
3376
		byte		fil_page_prev[4];
3377
#endif /* UNIV_BTR_DEBUG */
3378

3379 3380 3381 3382 3383
		if (UNIV_LIKELY_NULL(merge_page_zip)) {
			/* The function page_zip_compress(), which will be
			invoked by page_copy_rec_list_end() below,
			requires that FIL_PAGE_PREV be FIL_NULL.
			Clear the field, but prepare to restore it. */
3384
#ifdef UNIV_BTR_DEBUG
3385
			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3386
#endif /* UNIV_BTR_DEBUG */
3387 3388 3389 3390 3391 3392
#if FIL_NULL != 0xffffffff
# error "FIL_NULL != 0xffffffff"
#endif
			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
		}

3393
		orig_succ = page_copy_rec_list_end(merge_block, block,
3394 3395
						   page_get_infimum_rec(page),
						   cursor->index, mtr);
3396 3397

		if (UNIV_UNLIKELY(!orig_succ)) {
3398
			ut_a(merge_page_zip);
3399
#ifdef UNIV_BTR_DEBUG
3400
			/* FIL_PAGE_PREV was restored from merge_page_zip. */
3401 3402 3403
			ut_a(!memcmp(fil_page_prev,
				     merge_page + FIL_PAGE_PREV, 4));
#endif /* UNIV_BTR_DEBUG */
3404
			goto err_exit;
marko's avatar
marko committed
3405
		}
osku's avatar
osku committed
3406

3407
		btr_search_drop_page_hash_index(block);
3408

3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420
#ifdef UNIV_BTR_DEBUG
		if (UNIV_LIKELY_NULL(merge_page_zip)) {
			/* Restore FIL_PAGE_PREV in order to avoid an assertion
			failure in btr_level_list_remove(), which will set
			the field again to FIL_NULL.  Even though this makes
			merge_page and merge_page_zip inconsistent for a
			split second, it is harmless, because the pages
			are X-latched. */
			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
		}
#endif /* UNIV_BTR_DEBUG */

3421
		/* Remove the page from the level list */
3422
		btr_level_list_remove(space, zip_size, page, index, mtr);
3423

3424 3425 3426
		/* Replace the address of the old child node (= page) with the
		address of the merge page to the right */

3427
		btr_node_ptr_set_child_page_no(
3428 3429 3430 3431
			btr_cur_get_rec(&father_cursor),
			btr_cur_get_page_zip(&father_cursor),
			offsets, right_page_no, mtr);
		btr_node_ptr_delete(index, merge_block, mtr);
3432

3433
		lock_update_merge_right(merge_block, orig_succ, block);
osku's avatar
osku committed
3434 3435
	}

3436
	btr_blob_dbg_remove(page, index, "btr_compress");
3437

3438
	if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460
		/* Update the free bits of the B-tree page in the
		insert buffer bitmap.  This has to be done in a
		separate mini-transaction that is committed before the
		main mini-transaction.  We cannot update the insert
		buffer bitmap in this mini-transaction, because
		btr_compress() can be invoked recursively without
		committing the mini-transaction in between.  Since
		insert buffer bitmap pages have a lower rank than
		B-tree pages, we must not access other pages in the
		same mini-transaction after accessing an insert buffer
		bitmap page. */

		/* The free bits in the insert buffer bitmap must
		never exceed the free space on a page.  It is safe to
		decrement or reset the bits in the bitmap in a
		mini-transaction that is committed before the
		mini-transaction that affects the free space. */

		/* It is unsafe to increment the bits in a separately
		committed mini-transaction, because in crash recovery,
		the free bits could momentarily be set too high. */

3461
		if (zip_size) {
3462 3463 3464 3465 3466 3467
			/* Because the free bits may be incremented
			and we cannot update the insert buffer bitmap
			in the same mini-transaction, the only safe
			thing we can do here is the pessimistic
			approach: reset the free bits. */
			ibuf_reset_free_bits(merge_block);
3468
		} else {
3469 3470 3471 3472
			/* On uncompressed pages, the free bits will
			never increase here.  Thus, it is safe to
			write the bits accurately in a separate
			mini-transaction. */
3473
			ibuf_update_free_bits_if_full(merge_block,
3474 3475 3476
						      UNIV_PAGE_SIZE,
						      ULINT_UNDEFINED);
		}
3477
	}
3478

3479
	ut_ad(page_validate(merge_page, index));
3480 3481 3482
#ifdef UNIV_ZIP_DEBUG
	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page));
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
3483 3484

	/* Free the file page */
3485
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
3486

3487
	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
3488 3489
func_exit:
	mem_heap_free(heap);
3490

3491 3492 3493 3494 3495 3496 3497
	if (adjust) {
		btr_cur_position(
			index,
			page_rec_get_nth(merge_block->frame, nth_rec),
			merge_block, cursor);
	}

marko's avatar
marko committed
3498
	return(TRUE);
3499
}
osku's avatar
osku committed
3500

3501
/*************************************************************//**
3502 3503 3504 3505
Discards a page that is the only page on its level.  This will empty
the whole B-tree, leaving just an empty root page.  This function
should never be reached, because btr_compress(), which is invoked in
delete operations, calls btr_lift_page_up() to flatten the B-tree. */
osku's avatar
osku committed
3506 3507 3508 3509
static
void
btr_discard_only_page_on_level(
/*===========================*/
3510 3511 3512
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: page which is the only on its level */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3513
{
3514 3515 3516 3517 3518
	ulint		page_level = 0;
	trx_id_t	max_trx_id;

	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3519

3520 3521 3522 3523
	while (buf_block_get_page_no(block) != dict_index_get_page(index)) {
		btr_cur_t	cursor;
		buf_block_t*	father;
		const page_t*	page	= buf_block_get_frame(block);
osku's avatar
osku committed
3524

3525 3526 3527 3528
		ut_a(page_get_n_recs(page) == 1);
		ut_a(page_level == btr_page_get_level(page, mtr));
		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
osku's avatar
osku committed
3529

3530 3531
		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
		btr_search_drop_page_hash_index(block);
osku's avatar
osku committed
3532

3533 3534
		btr_page_get_father(index, block, mtr, &cursor);
		father = btr_cur_get_block(&cursor);
osku's avatar
osku committed
3535

3536
		lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
osku's avatar
osku committed
3537

3538 3539
		/* Free the file page */
		btr_page_free(index, block, mtr);
osku's avatar
osku committed
3540

3541 3542 3543 3544 3545 3546
		block = father;
		page_level++;
	}

	/* block is the root page, which must be empty, except
	for the node pointer to the (now discarded) block(s). */
osku's avatar
osku committed
3547

3548
#ifdef UNIV_BTR_DEBUG
3549 3550 3551 3552 3553 3554 3555 3556
	if (!dict_index_is_ibuf(index)) {
		const page_t*	root	= buf_block_get_frame(block);
		const ulint	space	= dict_index_get_space(index);
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
					    + root, space));
		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
					    + root, space));
	}
3557
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3558

3559
	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
osku's avatar
osku committed
3560

3561
	if (!dict_index_is_clust(index)) {
3562
		/* We play it safe and reset the free bits for the root */
3563
		ibuf_reset_free_bits(block);
3564 3565

		if (page_is_leaf(buf_block_get_frame(block))) {
3566
			ut_a(max_trx_id);
3567 3568 3569 3570
			page_set_max_trx_id(block,
					    buf_block_get_page_zip(block),
					    max_trx_id, mtr);
		}
osku's avatar
osku committed
3571
	}
3572
}
osku's avatar
osku committed
3573

3574
/*************************************************************//**
osku's avatar
osku committed
3575 3576 3577
Discards a page from a B-tree. This is used to remove the last record from
a B-tree page: the whole page must be removed at the same time. This cannot
be used for the root page, which is allowed to be empty. */
3578
UNIV_INTERN
osku's avatar
osku committed
3579 3580 3581
void
btr_discard_page(
/*=============*/
3582
	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
osku's avatar
osku committed
3583
				the root page */
3584
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3585
{
3586
	dict_index_t*	index;
osku's avatar
osku committed
3587
	ulint		space;
3588
	ulint		zip_size;
osku's avatar
osku committed
3589 3590
	ulint		left_page_no;
	ulint		right_page_no;
3591
	buf_block_t*	merge_block;
osku's avatar
osku committed
3592
	page_t*		merge_page;
3593
	buf_block_t*	block;
osku's avatar
osku committed
3594 3595
	page_t*		page;
	rec_t*		node_ptr;
3596

3597
	block = btr_cur_get_block(cursor);
3598
	index = btr_cur_get_index(cursor);
osku's avatar
osku committed
3599

3600
	ut_ad(dict_index_get_page(index) != buf_block_get_page_no(block));
3601
	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3602
				MTR_MEMO_X_LOCK));
3603
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3604
	space = dict_index_get_space(index);
3605
	zip_size = dict_table_zip_size(index->table);
3606

osku's avatar
osku committed
3607 3608
	/* Decide the page which will inherit the locks */

3609 3610
	left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
	right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
osku's avatar
osku committed
3611 3612

	if (left_page_no != FIL_NULL) {
3613
		merge_block = btr_block_get(space, zip_size, left_page_no,
3614
					    RW_X_LATCH, index, mtr);
3615
		merge_page = buf_block_get_frame(merge_block);
3616
#ifdef UNIV_BTR_DEBUG
3617 3618
		ut_a(btr_page_get_next(merge_page, mtr)
		     == buf_block_get_page_no(block));
3619
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3620
	} else if (right_page_no != FIL_NULL) {
3621
		merge_block = btr_block_get(space, zip_size, right_page_no,
3622
					    RW_X_LATCH, index, mtr);
3623
		merge_page = buf_block_get_frame(merge_block);
3624
#ifdef UNIV_BTR_DEBUG
3625 3626
		ut_a(btr_page_get_prev(merge_page, mtr)
		     == buf_block_get_page_no(block));
3627
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
3628
	} else {
3629
		btr_discard_only_page_on_level(index, block, mtr);
osku's avatar
osku committed
3630 3631 3632 3633

		return;
	}

3634
	page = buf_block_get_frame(block);
osku's avatar
osku committed
3635
	ut_a(page_is_comp(merge_page) == page_is_comp(page));
3636
	btr_search_drop_page_hash_index(block);
3637

marko's avatar
marko committed
3638
	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
osku's avatar
osku committed
3639 3640 3641 3642 3643 3644 3645

		/* We have to mark the leftmost node pointer on the right
		side page as the predefined minimum record */
		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));

		ut_ad(page_rec_is_user_rec(node_ptr));

3646 3647 3648 3649 3650
		/* This will make page_zip_validate() fail on merge_page
		until btr_level_list_remove() completes.  This is harmless,
		because everything will take place within a single
		mini-transaction and because writing to the redo log
		is an atomic operation (performed by mtr_commit()). */
3651
		btr_set_min_rec_mark(node_ptr, mtr);
3652 3653
	}

3654
	btr_node_ptr_delete(index, block, mtr);
osku's avatar
osku committed
3655 3656

	/* Remove the page from the level list */
3657
	btr_level_list_remove(space, zip_size, page, index, mtr);
3658
#ifdef UNIV_ZIP_DEBUG
3659
	{
3660
		page_zip_des_t*	merge_page_zip
3661
			= buf_block_get_page_zip(merge_block);
3662
		ut_a(!merge_page_zip
3663
		     || page_zip_validate(merge_page_zip, merge_page));
3664
	}
3665
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
3666

marko's avatar
marko committed
3667
	if (left_page_no != FIL_NULL) {
3668 3669
		lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
				    block);
osku's avatar
osku committed
3670
	} else {
3671 3672 3673
		lock_update_discard(merge_block,
				    lock_get_min_heap_no(merge_block),
				    block);
osku's avatar
osku committed
3674 3675
	}

3676 3677
	btr_blob_dbg_remove(page, index, "btr_discard_page");

osku's avatar
osku committed
3678
	/* Free the file page */
3679
	btr_page_free(index, block, mtr);
osku's avatar
osku committed
3680

3681
	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
3682
}
osku's avatar
osku committed
3683 3684

#ifdef UNIV_BTR_PRINT
3685
/*************************************************************//**
osku's avatar
osku committed
3686
Prints size info of a B-tree. */
3687
UNIV_INTERN
osku's avatar
osku committed
3688 3689 3690
void
btr_print_size(
/*===========*/
3691
	dict_index_t*	index)	/*!< in: index tree */
osku's avatar
osku committed
3692 3693 3694 3695 3696
{
	page_t*		root;
	fseg_header_t*	seg;
	mtr_t		mtr;

3697
	if (dict_index_is_ibuf(index)) {
3698 3699
		fputs("Sorry, cannot print info of an ibuf tree:"
		      " use ibuf functions\n", stderr);
osku's avatar
osku committed
3700 3701 3702 3703 3704

		return;
	}

	mtr_start(&mtr);
3705

3706
	root = btr_root_get(index, &mtr);
osku's avatar
osku committed
3707 3708 3709 3710 3711 3712

	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;

	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
	fseg_print(seg, &mtr);

3713
	if (!(index->type & DICT_UNIVERSAL)) {
osku's avatar
osku committed
3714 3715 3716 3717 3718 3719 3720

		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;

		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
		fseg_print(seg, &mtr);
	}

3721
	mtr_commit(&mtr);
osku's avatar
osku committed
3722 3723
}

3724
/************************************************************//**
osku's avatar
osku committed
3725 3726 3727 3728 3729
Prints recursively index tree pages. */
static
void
btr_print_recursive(
/*================*/
3730 3731 3732
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: index page */
	ulint		width,	/*!< in: print this many entries from start
osku's avatar
osku committed
3733
				and end */
3734 3735 3736
	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3737
{
3738
	const page_t*	page	= buf_block_get_frame(block);
osku's avatar
osku committed
3739 3740 3741 3742 3743
	page_cur_t	cursor;
	ulint		n_recs;
	ulint		i	= 0;
	mtr_t		mtr2;

3744
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
3745
	fprintf(stderr, "NODE ON LEVEL %lu page number %lu\n",
3746
		(ulong) btr_page_get_level(page, mtr),
marko's avatar
marko committed
3747
		(ulong) buf_block_get_page_no(block));
3748

3749
	page_print(block, index, width, width);
3750

osku's avatar
osku committed
3751
	n_recs = page_get_n_recs(page);
3752

3753
	page_cur_set_before_first(block, &cursor);
osku's avatar
osku committed
3754 3755 3756 3757
	page_cur_move_to_next(&cursor);

	while (!page_cur_is_after_last(&cursor)) {

marko's avatar
marko committed
3758
		if (page_is_leaf(page)) {
osku's avatar
osku committed
3759 3760 3761 3762 3763

			/* If this is the leaf level, do nothing */

		} else if ((i <= width) || (i >= n_recs - width)) {

3764 3765
			const rec_t*	node_ptr;

osku's avatar
osku committed
3766 3767 3768 3769 3770
			mtr_start(&mtr2);

			node_ptr = page_cur_get_rec(&cursor);

			*offsets = rec_get_offsets(node_ptr, index, *offsets,
3771
						   ULINT_UNDEFINED, heap);
3772 3773
			btr_print_recursive(index,
					    btr_node_ptr_get_child(node_ptr,
3774
								   index,
3775 3776 3777
								   *offsets,
								   &mtr2),
					    width, heap, offsets, &mtr2);
osku's avatar
osku committed
3778 3779 3780 3781 3782 3783 3784 3785
			mtr_commit(&mtr2);
		}

		page_cur_move_to_next(&cursor);
		i++;
	}
}

3786
/**************************************************************//**
osku's avatar
osku committed
3787
Prints directories and other info of all nodes in the tree. */
3788
UNIV_INTERN
osku's avatar
osku committed
3789
void
3790 3791
btr_print_index(
/*============*/
3792 3793
	dict_index_t*	index,	/*!< in: index */
	ulint		width)	/*!< in: print this many entries from start
osku's avatar
osku committed
3794 3795 3796
				and end */
{
	mtr_t		mtr;
3797
	buf_block_t*	root;
osku's avatar
osku committed
3798 3799 3800
	mem_heap_t*	heap	= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets	= offsets_;
3801
	rec_offs_init(offsets_);
osku's avatar
osku committed
3802 3803

	fputs("--------------------------\n"
3804
	      "INDEX TREE PRINT\n", stderr);
osku's avatar
osku committed
3805 3806 3807

	mtr_start(&mtr);

3808
	root = btr_root_block_get(index, &mtr);
osku's avatar
osku committed
3809

3810
	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
osku's avatar
osku committed
3811 3812 3813 3814 3815 3816
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	mtr_commit(&mtr);

3817
	btr_validate_index(index, NULL);
osku's avatar
osku committed
3818 3819 3820
}
#endif /* UNIV_BTR_PRINT */

3821
#ifdef UNIV_DEBUG
3822
/************************************************************//**
3823 3824
Checks that the node pointer to a page is appropriate.
@return	TRUE */
3825
UNIV_INTERN
osku's avatar
osku committed
3826 3827 3828
ibool
btr_check_node_ptr(
/*===============*/
3829 3830 3831
	dict_index_t*	index,	/*!< in: index tree */
	buf_block_t*	block,	/*!< in: index page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3832 3833
{
	mem_heap_t*	heap;
3834 3835 3836
	dtuple_t*	tuple;
	ulint*		offsets;
	btr_cur_t	cursor;
3837
	page_t*		page = buf_block_get_frame(block);
osku's avatar
osku committed
3838

3839
	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3840
	if (dict_index_get_page(index) == buf_block_get_page_no(block)) {
osku's avatar
osku committed
3841 3842 3843 3844

		return(TRUE);
	}

3845 3846 3847
	heap = mem_heap_create(256);
	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
					    &cursor);
3848

marko's avatar
marko committed
3849
	if (page_is_leaf(page)) {
osku's avatar
osku committed
3850

3851
		goto func_exit;
osku's avatar
osku committed
3852
	}
3853

3854
	tuple = dict_index_build_node_ptr(
3855 3856
		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
		btr_page_get_level(page, mtr));
3857

3858 3859
	ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
func_exit:
osku's avatar
osku committed
3860 3861 3862 3863
	mem_heap_free(heap);

	return(TRUE);
}
3864
#endif /* UNIV_DEBUG */
osku's avatar
osku committed
3865

3866
/************************************************************//**
osku's avatar
osku committed
3867 3868 3869 3870 3871
Display identification information for a record. */
static
void
btr_index_rec_validate_report(
/*==========================*/
3872 3873 3874
	const page_t*		page,	/*!< in: index page */
	const rec_t*		rec,	/*!< in: index record */
	const dict_index_t*	index)	/*!< in: index */
osku's avatar
osku committed
3875 3876 3877 3878
{
	fputs("InnoDB: Record in ", stderr);
	dict_index_name_print(stderr, NULL, index);
	fprintf(stderr, ", page %lu, at offset %lu\n",
3879
		page_get_page_no(page), (ulint) page_offset(rec));
osku's avatar
osku committed
3880 3881
}

3882
/************************************************************//**
osku's avatar
osku committed
3883
Checks the size and number of fields in a record based on the definition of
3884 3885
the index.
@return	TRUE if ok */
3886
UNIV_INTERN
osku's avatar
osku committed
3887 3888
ibool
btr_index_rec_validate(
3889
/*===================*/
3890 3891 3892
	const rec_t*		rec,		/*!< in: index record */
	const dict_index_t*	index,		/*!< in: index */
	ibool			dump_on_error)	/*!< in: TRUE if the function
3893 3894
						should print hex dump of record
						and page on error */
osku's avatar
osku committed
3895 3896 3897 3898
{
	ulint		len;
	ulint		n;
	ulint		i;
3899
	const page_t*	page;
osku's avatar
osku committed
3900 3901 3902
	mem_heap_t*	heap	= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets	= offsets_;
3903
	rec_offs_init(offsets_);
osku's avatar
osku committed
3904

3905
	page = page_align(rec);
osku's avatar
osku committed
3906 3907

	if (UNIV_UNLIKELY(index->type & DICT_UNIVERSAL)) {
3908 3909 3910
		/* The insert buffer index tree can contain records from any
		other index: we cannot check the number of fields or
		their length */
osku's avatar
osku committed
3911

3912
		return(TRUE);
osku's avatar
osku committed
3913 3914
	}

3915
	if (UNIV_UNLIKELY((ibool)!!page_is_comp(page)
3916
			  != dict_table_is_comp(index->table))) {
osku's avatar
osku committed
3917 3918 3919
		btr_index_rec_validate_report(page, rec, index);
		fprintf(stderr, "InnoDB: compact flag=%lu, should be %lu\n",
			(ulong) !!page_is_comp(page),
3920 3921
			(ulong) dict_table_is_comp(index->table));

osku's avatar
osku committed
3922 3923 3924 3925 3926 3927
		return(FALSE);
	}

	n = dict_index_get_n_fields(index);

	if (!page_is_comp(page)
3928
	    && UNIV_UNLIKELY(rec_get_n_fields_old(rec) != n)) {
osku's avatar
osku committed
3929 3930 3931 3932 3933
		btr_index_rec_validate_report(page, rec, index);
		fprintf(stderr, "InnoDB: has %lu fields, should have %lu\n",
			(ulong) rec_get_n_fields_old(rec), (ulong) n);

		if (dump_on_error) {
3934
			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945

			fputs("InnoDB: corrupt record ", stderr);
			rec_print_old(stderr, rec);
			putc('\n', stderr);
		}
		return(FALSE);
	}

	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);

	for (i = 0; i < n; i++) {
3946
		ulint	fixed_size = dict_col_get_fixed_size(
3947
			dict_index_get_nth_col(index, i), page_is_comp(page));
osku's avatar
osku committed
3948

3949
		rec_get_nth_field_offs(offsets, i, &len);
osku's avatar
osku committed
3950

3951 3952 3953 3954 3955
		/* Note that if fixed_size != 0, it equals the
		length of a fixed-size column in the clustered index.
		A prefix index of the column is of fixed, but different
		length.  When fixed_size == 0, prefix_len is the maximum
		length of the prefix index column. */
osku's avatar
osku committed
3956 3957

		if ((dict_index_get_nth_field(index, i)->prefix_len == 0
3958 3959 3960 3961 3962
		     && len != UNIV_SQL_NULL && fixed_size
		     && len != fixed_size)
		    || (dict_index_get_nth_field(index, i)->prefix_len > 0
			&& len != UNIV_SQL_NULL
			&& len
3963
			> dict_index_get_nth_field(index, i)->prefix_len)) {
osku's avatar
osku committed
3964 3965 3966

			btr_index_rec_validate_report(page, rec, index);
			fprintf(stderr,
3967 3968
				"InnoDB: field %lu len is %lu,"
				" should be %lu\n",
3969
				(ulong) i, (ulong) len, (ulong) fixed_size);
osku's avatar
osku committed
3970 3971

			if (dump_on_error) {
3972 3973
				buf_page_print(page, 0,
					       BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988

				fputs("InnoDB: corrupt record ", stderr);
				rec_print_new(stderr, rec, offsets);
				putc('\n', stderr);
			}
			if (UNIV_LIKELY_NULL(heap)) {
				mem_heap_free(heap);
			}
			return(FALSE);
		}
	}

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
3989
	return(TRUE);
osku's avatar
osku committed
3990 3991
}

3992
/************************************************************//**
osku's avatar
osku committed
3993
Checks the size and number of fields in records based on the definition of
3994 3995
the index.
@return	TRUE if ok */
osku's avatar
osku committed
3996 3997 3998 3999
static
ibool
btr_index_page_validate(
/*====================*/
4000 4001
	buf_block_t*	block,	/*!< in: index page */
	dict_index_t*	index)	/*!< in: index */
osku's avatar
osku committed
4002
{
4003
	page_cur_t	cur;
osku's avatar
osku committed
4004
	ibool		ret	= TRUE;
4005

4006
	page_cur_set_before_first(block, &cur);
osku's avatar
osku committed
4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022
	page_cur_move_to_next(&cur);

	for (;;) {
		if (page_cur_is_after_last(&cur)) {

			break;
		}

		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {

			return(FALSE);
		}

		page_cur_move_to_next(&cur);
	}

4023
	return(ret);
osku's avatar
osku committed
4024 4025
}

4026
/************************************************************//**
osku's avatar
osku committed
4027 4028 4029 4030
Report an error on one page of an index tree. */
static
void
btr_validate_report1(
4031
/*=================*/
4032 4033 4034
	dict_index_t*		index,	/*!< in: index */
	ulint			level,	/*!< in: B-tree level */
	const buf_block_t*	block)	/*!< in: index page */
osku's avatar
osku committed
4035 4036
{
	fprintf(stderr, "InnoDB: Error in page %lu of ",
4037
		buf_block_get_page_no(block));
osku's avatar
osku committed
4038 4039 4040 4041 4042 4043 4044
	dict_index_name_print(stderr, NULL, index);
	if (level) {
		fprintf(stderr, ", index tree level %lu", level);
	}
	putc('\n', stderr);
}

4045
/************************************************************//**
osku's avatar
osku committed
4046 4047 4048 4049
Report an error on two pages of an index tree. */
static
void
btr_validate_report2(
4050
/*=================*/
4051 4052 4053 4054
	const dict_index_t*	index,	/*!< in: index */
	ulint			level,	/*!< in: B-tree level */
	const buf_block_t*	block1,	/*!< in: first index page */
	const buf_block_t*	block2)	/*!< in: second index page */
osku's avatar
osku committed
4055 4056
{
	fprintf(stderr, "InnoDB: Error in pages %lu and %lu of ",
4057 4058
		buf_block_get_page_no(block1),
		buf_block_get_page_no(block2));
osku's avatar
osku committed
4059 4060 4061 4062 4063 4064 4065
	dict_index_name_print(stderr, NULL, index);
	if (level) {
		fprintf(stderr, ", index tree level %lu", level);
	}
	putc('\n', stderr);
}

4066
/************************************************************//**
4067 4068
Validates index tree level.
@return	TRUE if ok */
osku's avatar
osku committed
4069 4070 4071 4072
static
ibool
btr_validate_level(
/*===============*/
4073 4074 4075
	dict_index_t*	index,	/*!< in: index tree */
	trx_t*		trx,	/*!< in: transaction or NULL */
	ulint		level)	/*!< in: level number */
osku's avatar
osku committed
4076 4077
{
	ulint		space;
4078
	ulint		zip_size;
4079
	buf_block_t*	block;
osku's avatar
osku committed
4080
	page_t*		page;
4081
	buf_block_t*	right_block = 0; /* remove warning */
osku's avatar
osku committed
4082 4083
	page_t*		right_page = 0; /* remove warning */
	page_t*		father_page;
4084 4085
	btr_cur_t	node_cur;
	btr_cur_t	right_node_cur;
osku's avatar
osku committed
4086 4087 4088 4089 4090 4091 4092 4093 4094 4095
	rec_t*		rec;
	ulint		right_page_no;
	ulint		left_page_no;
	page_cur_t	cursor;
	dtuple_t*	node_ptr_tuple;
	ibool		ret	= TRUE;
	mtr_t		mtr;
	mem_heap_t*	heap	= mem_heap_create(256);
	ulint*		offsets	= NULL;
	ulint*		offsets2= NULL;
4096
#ifdef UNIV_ZIP_DEBUG
marko's avatar
marko committed
4097
	page_zip_des_t*	page_zip;
4098
#endif /* UNIV_ZIP_DEBUG */
osku's avatar
osku committed
4099 4100 4101

	mtr_start(&mtr);

4102
	mtr_x_lock(dict_index_get_lock(index), &mtr);
4103

4104 4105
	block = btr_root_block_get(index, &mtr);
	page = buf_block_get_frame(block);
osku's avatar
osku committed
4106

4107 4108
	space = dict_index_get_space(index);
	zip_size = dict_table_zip_size(index->table);
osku's avatar
osku committed
4109 4110

	while (level != btr_page_get_level(page, &mtr)) {
4111
		const rec_t*	node_ptr;
4112

4113 4114
		ut_a(space == buf_block_get_space(block));
		ut_a(space == page_get_space_id(page));
4115
#ifdef UNIV_ZIP_DEBUG
4116
		page_zip = buf_block_get_page_zip(block);
4117
		ut_a(!page_zip || page_zip_validate(page_zip, page));
4118
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
4119
		ut_a(!page_is_leaf(page));
osku's avatar
osku committed
4120

4121
		page_cur_set_before_first(block, &cursor);
osku's avatar
osku committed
4122 4123 4124 4125
		page_cur_move_to_next(&cursor);

		node_ptr = page_cur_get_rec(&cursor);
		offsets = rec_get_offsets(node_ptr, index, offsets,
4126
					  ULINT_UNDEFINED, &heap);
4127
		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4128
		page = buf_block_get_frame(block);
osku's avatar
osku committed
4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140
	}

	/* Now we are on the desired level. Loop through the pages on that
	level. */
loop:
	if (trx_is_interrupted(trx)) {
		mtr_commit(&mtr);
		mem_heap_free(heap);
		return(ret);
	}
	mem_heap_empty(heap);
	offsets = offsets2 = NULL;
4141
	mtr_x_lock(dict_index_get_lock(index), &mtr);
osku's avatar
osku committed
4142

4143
#ifdef UNIV_ZIP_DEBUG
4144
	page_zip = buf_block_get_page_zip(block);
4145
	ut_a(!page_zip || page_zip_validate(page_zip, page));
4146
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
4147

osku's avatar
osku committed
4148 4149 4150
	/* Check ordering etc. of records */

	if (!page_validate(page, index)) {
4151
		btr_validate_report1(index, level, block);
osku's avatar
osku committed
4152 4153 4154 4155 4156 4157

		ret = FALSE;
	} else if (level == 0) {
		/* We are on level 0. Check that the records have the right
		number of fields, and field lengths are right. */

4158
		if (!btr_index_page_validate(block, index)) {
osku's avatar
osku committed
4159 4160 4161 4162

			ret = FALSE;
		}
	}
4163

osku's avatar
osku committed
4164 4165 4166 4167 4168
	ut_a(btr_page_get_level(page, &mtr) == level);

	right_page_no = btr_page_get_next(page, &mtr);
	left_page_no = btr_page_get_prev(page, &mtr);

4169
	ut_a(page_get_n_recs(page) > 0 || (level == 0
4170
					   && page_get_page_no(page)
4171
					   == dict_index_get_page(index)));
osku's avatar
osku committed
4172 4173

	if (right_page_no != FIL_NULL) {
4174
		const rec_t*	right_rec;
4175
		right_block = btr_block_get(space, zip_size, right_page_no,
4176
					    RW_X_LATCH, index, &mtr);
4177
		right_page = buf_block_get_frame(right_block);
4178
		if (UNIV_UNLIKELY(btr_page_get_prev(right_page, &mtr)
4179
				  != page_get_page_no(page))) {
4180
			btr_validate_report2(index, level, block, right_block);
4181
			fputs("InnoDB: broken FIL_PAGE_NEXT"
4182
			      " or FIL_PAGE_PREV links\n", stderr);
4183 4184
			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4185 4186 4187 4188 4189

			ret = FALSE;
		}

		if (UNIV_UNLIKELY(page_is_comp(right_page)
4190
				  != page_is_comp(page))) {
4191
			btr_validate_report2(index, level, block, right_block);
4192
			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4193 4194
			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4195 4196 4197 4198 4199 4200

			ret = FALSE;

			goto node_ptr_fails;
		}

osku's avatar
osku committed
4201
		rec = page_rec_get_prev(page_get_supremum_rec(page));
4202 4203
		right_rec = page_rec_get_next(page_get_infimum_rec(
						      right_page));
osku's avatar
osku committed
4204
		offsets = rec_get_offsets(rec, index,
4205
					  offsets, ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
4206
		offsets2 = rec_get_offsets(right_rec, index,
4207
					   offsets2, ULINT_UNDEFINED, &heap);
4208
		if (UNIV_UNLIKELY(cmp_rec_rec(rec, right_rec,
4209 4210
					      offsets, offsets2,
					      index) >= 0)) {
osku's avatar
osku committed
4211

4212
			btr_validate_report2(index, level, block, right_block);
osku's avatar
osku committed
4213 4214

			fputs("InnoDB: records in wrong order"
4215
			      " on adjacent pages\n", stderr);
osku's avatar
osku committed
4216

4217 4218
			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4219 4220 4221 4222 4223 4224

			fputs("InnoDB: record ", stderr);
			rec = page_rec_get_prev(page_get_supremum_rec(page));
			rec_print(stderr, rec, index);
			putc('\n', stderr);
			fputs("InnoDB: record ", stderr);
4225 4226
			rec = page_rec_get_next(
				page_get_infimum_rec(right_page));
osku's avatar
osku committed
4227 4228 4229
			rec_print(stderr, rec, index);
			putc('\n', stderr);

4230 4231
			ret = FALSE;
		}
osku's avatar
osku committed
4232
	}
4233

osku's avatar
osku committed
4234
	if (level > 0 && left_page_no == FIL_NULL) {
4235 4236 4237
		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
			     page_rec_get_next(page_get_infimum_rec(page)),
			     page_is_comp(page)));
osku's avatar
osku committed
4238 4239
	}

4240
	if (buf_block_get_page_no(block) != dict_index_get_page(index)) {
osku's avatar
osku committed
4241 4242

		/* Check father node pointers */
4243

4244
		rec_t*	node_ptr;
4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255

		offsets = btr_page_get_father_block(offsets, heap, index,
						    block, &mtr, &node_cur);
		father_page = btr_cur_get_page(&node_cur);
		node_ptr = btr_cur_get_rec(&node_cur);

		btr_cur_position(
			index, page_rec_get_prev(page_get_supremum_rec(page)),
			block, &node_cur);
		offsets = btr_page_get_father_node_ptr(offsets, heap,
						       &node_cur, &mtr);
4256

4257 4258 4259 4260
		if (UNIV_UNLIKELY(node_ptr != btr_cur_get_rec(&node_cur))
		    || UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr,
								    offsets)
				     != buf_block_get_page_no(block))) {
4261

4262
			btr_validate_report1(index, level, block);
osku's avatar
osku committed
4263 4264

			fputs("InnoDB: node pointer to the page is wrong\n",
4265
			      stderr);
osku's avatar
osku committed
4266

4267 4268
			buf_page_print(father_page, 0, BUF_PAGE_PRINT_NO_CRASH);
			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4269 4270

			fputs("InnoDB: node ptr ", stderr);
4271
			rec_print(stderr, node_ptr, index);
osku's avatar
osku committed
4272

4273
			rec = btr_cur_get_rec(&node_cur);
osku's avatar
osku committed
4274 4275
			fprintf(stderr, "\n"
				"InnoDB: node ptr child page n:o %lu\n",
4276
				(ulong) btr_node_ptr_get_child_page_no(
4277
					rec, offsets));
osku's avatar
osku committed
4278 4279

			fputs("InnoDB: record on page ", stderr);
4280
			rec_print_new(stderr, rec, offsets);
osku's avatar
osku committed
4281
			putc('\n', stderr);
4282
			ret = FALSE;
osku's avatar
osku committed
4283

4284
			goto node_ptr_fails;
osku's avatar
osku committed
4285 4286
		}

marko's avatar
marko committed
4287
		if (!page_is_leaf(page)) {
4288 4289 4290 4291
			node_ptr_tuple = dict_index_build_node_ptr(
				index,
				page_rec_get_next(page_get_infimum_rec(page)),
				0, heap, btr_page_get_level(page, &mtr));
osku's avatar
osku committed
4292 4293

			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4294
					   offsets)) {
4295
				const rec_t* first_rec = page_rec_get_next(
4296
					page_get_infimum_rec(page));
osku's avatar
osku committed
4297

4298
				btr_validate_report1(index, level, block);
osku's avatar
osku committed
4299

4300 4301 4302 4303
				buf_page_print(father_page, 0,
					       BUF_PAGE_PRINT_NO_CRASH);
				buf_page_print(page, 0,
					       BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4304 4305

				fputs("InnoDB: Error: node ptrs differ"
4306 4307
				      " on levels > 0\n"
				      "InnoDB: node ptr ", stderr);
osku's avatar
osku committed
4308 4309 4310 4311
				rec_print_new(stderr, node_ptr, offsets);
				fputs("InnoDB: first rec ", stderr);
				rec_print(stderr, first_rec, index);
				putc('\n', stderr);
4312
				ret = FALSE;
osku's avatar
osku committed
4313

4314
				goto node_ptr_fails;
osku's avatar
osku committed
4315 4316 4317 4318
			}
		}

		if (left_page_no == FIL_NULL) {
4319 4320
			ut_a(node_ptr == page_rec_get_next(
				     page_get_infimum_rec(father_page)));
osku's avatar
osku committed
4321 4322 4323 4324
			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
		}

		if (right_page_no == FIL_NULL) {
4325 4326
			ut_a(node_ptr == page_rec_get_prev(
				     page_get_supremum_rec(father_page)));
osku's avatar
osku committed
4327
			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4328
		} else {
4329 4330 4331
			const rec_t*	right_node_ptr
				= page_rec_get_next(node_ptr);

4332 4333 4334
			offsets = btr_page_get_father_block(
				offsets, heap, index, right_block,
				&mtr, &right_node_cur);
4335
			if (right_node_ptr
4336
			    != page_get_supremum_rec(father_page)) {
osku's avatar
osku committed
4337

4338
				if (btr_cur_get_rec(&right_node_cur)
4339
				    != right_node_ptr) {
osku's avatar
osku committed
4340
					ret = FALSE;
4341 4342 4343
					fputs("InnoDB: node pointer to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
4344 4345

					btr_validate_report1(index, level,
4346
							     block);
osku's avatar
osku committed
4347

4348 4349 4350 4351 4352 4353 4354 4355 4356
					buf_page_print(
						father_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						right_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4357 4358
				}
			} else {
4359 4360
				page_t*	right_father_page
					= btr_cur_get_page(&right_node_cur);
4361

4362 4363
				if (btr_cur_get_rec(&right_node_cur)
				    != page_rec_get_next(
4364 4365
					    page_get_infimum_rec(
						    right_father_page))) {
osku's avatar
osku committed
4366
					ret = FALSE;
4367 4368 4369
					fputs("InnoDB: node pointer 2 to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
4370 4371

					btr_validate_report1(index, level,
4372
							     block);
osku's avatar
osku committed
4373

4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385
					buf_page_print(
						father_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						right_father_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						right_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4386 4387
				}

4388
				if (page_get_page_no(right_father_page)
4389
				    != btr_page_get_next(father_page, &mtr)) {
osku's avatar
osku committed
4390 4391

					ret = FALSE;
4392 4393 4394
					fputs("InnoDB: node pointer 3 to"
					      " the right page is wrong\n",
					      stderr);
osku's avatar
osku committed
4395 4396

					btr_validate_report1(index, level,
4397
							     block);
osku's avatar
osku committed
4398

4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410
					buf_page_print(
						father_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						right_father_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
					buf_page_print(
						right_page, 0,
						BUF_PAGE_PRINT_NO_CRASH);
osku's avatar
osku committed
4411
				}
4412
			}
osku's avatar
osku committed
4413 4414 4415 4416
		}
	}

node_ptr_fails:
4417 4418 4419
	/* Commit the mini-transaction to release the latch on 'page'.
	Re-acquire the latch on right_page, which will become 'page'
	on the next loop.  The page has already been checked. */
osku's avatar
osku committed
4420 4421 4422 4423
	mtr_commit(&mtr);

	if (right_page_no != FIL_NULL) {
		mtr_start(&mtr);
4424

4425
		block = btr_block_get(space, zip_size, right_page_no,
4426
				      RW_X_LATCH, index, &mtr);
4427
		page = buf_block_get_frame(block);
osku's avatar
osku committed
4428 4429 4430 4431 4432 4433 4434 4435

		goto loop;
	}

	mem_heap_free(heap);
	return(ret);
}

4436
/**************************************************************//**
4437 4438
Checks the consistency of an index tree.
@return	TRUE if ok */
4439
UNIV_INTERN
osku's avatar
osku committed
4440
ibool
4441 4442
btr_validate_index(
/*===============*/
4443 4444
	dict_index_t*	index,	/*!< in: index */
	trx_t*		trx)	/*!< in: transaction or NULL */
osku's avatar
osku committed
4445 4446 4447 4448 4449 4450 4451
{
	mtr_t	mtr;
	page_t*	root;
	ulint	i;
	ulint	n;

	mtr_start(&mtr);
4452
	mtr_x_lock(dict_index_get_lock(index), &mtr);
osku's avatar
osku committed
4453

4454
	root = btr_root_get(index, &mtr);
osku's avatar
osku committed
4455 4456 4457
	n = btr_page_get_level(root, &mtr);

	for (i = 0; i <= n && !trx_is_interrupted(trx); i++) {
4458
		if (!btr_validate_level(index, trx, n - i)) {
osku's avatar
osku committed
4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469

			mtr_commit(&mtr);

			return(FALSE);
		}
	}

	mtr_commit(&mtr);

	return(TRUE);
}
4470
#endif /* !UNIV_HOTBACKUP */