page0page.c 63.4 KB
Newer Older
osku's avatar
osku committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/******************************************************
Index page routines

(c) 1994-1996 Innobase Oy

Created 2/2/1994 Heikki Tuuri
*******************************************************/

#define THIS_MODULE
#include "page0page.h"
#ifdef UNIV_NONINL
#include "page0page.ic"
#endif
#undef THIS_MODULE

#include "page0cur.h"
marko's avatar
marko committed
17
#include "page0zip.h"
osku's avatar
osku committed
18 19 20 21 22 23 24 25 26
#include "lock0lock.h"
#include "fut0lst.h"
#include "btr0sea.h"
#include "buf0buf.h"
#include "srv0srv.h"
#include "btr0btr.h"

/*			THE INDEX PAGE
			==============
27

osku's avatar
osku committed
28 29 30 31 32 33 34 35 36
The index page consists of a page header which contains the page's
id and other information. On top of it are the the index records
in a heap linked into a one way linear list according to alphabetic order.

Just below page end is an array of pointers which we call page directory,
to about every sixth record in the list. The pointers are placed in
the directory in the alphabetical order of the records pointed to,
enabling us to make binary search using the array. Each slot n:o I
in the directory points to a record, where a 4-bit field contains a count
37
of those records which are in the linear list between pointer I and
osku's avatar
osku committed
38 39 40 41
the pointer I - 1 in the directory, including the record
pointed to by pointer I and not including the record pointed to by I - 1.
We say that the record pointed to by slot I, or that slot I, owns
these records. The count is always kept in the range 4 to 8, with
42 43
the exception that it is 1 for the first slot, and 1--8 for the second slot.

osku's avatar
osku committed
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
An essentially binary search can be performed in the list of index
records, like we could do if we had pointer to every record in the
page directory. The data structure is, however, more efficient when
we are doing inserts, because most inserts are just pushed on a heap.
Only every 8th insert requires block move in the directory pointer
table, which itself is quite small. A record is deleted from the page
by just taking it off the linear list and updating the number of owned
records-field of the record which owns it, and updating the page directory,
if necessary. A special case is the one when the record owns itself.
Because the overhead of inserts is so small, we may also increase the
page size from the projected default of 8 kB to 64 kB without too
much loss of efficiency in inserts. Bigger page becomes actual
when the disk transfer rate compared to seek and latency time rises.
On the present system, the page size is set so that the page transfer
time (3 ms) is 20 % of the disk random access time (15 ms).

When the page is split, merged, or becomes full but contains deleted
records, we have to reorganize the page.

Assuming a page size of 8 kB, a typical index page of a secondary
index contains 300 index entries, and the size of the page directory
is 50 x 4 bytes = 200 bytes. */

/*******************************************************************
Looks for the directory slot which owns the given record. */

ulint
page_dir_find_owner_slot(
/*=====================*/
			/* out: the directory slot number */
	rec_t*	rec)	/* in: the physical record */
{
	page_t*				page;
	register uint16			rec_offs_bytes;
	register page_dir_slot_t*	slot;
	register const page_dir_slot_t*	first_slot;
	register rec_t*			r = rec;

	ut_ad(page_rec_check(rec));

84
	page = page_align(rec);
osku's avatar
osku committed
85 86 87 88
	first_slot = page_dir_get_nth_slot(page, 0);
	slot = page_dir_get_nth_slot(page, page_dir_get_n_slots(page) - 1);

	if (page_is_comp(page)) {
marko's avatar
marko committed
89 90
		while (rec_get_n_owned_new(r) == 0) {
			r = rec_get_next_ptr(r, TRUE);
osku's avatar
osku committed
91 92 93 94
			ut_ad(r >= page + PAGE_NEW_SUPREMUM);
			ut_ad(r < page + (UNIV_PAGE_SIZE - PAGE_DIR));
		}
	} else {
marko's avatar
marko committed
95 96
		while (rec_get_n_owned_old(r) == 0) {
			r = rec_get_next_ptr(r, FALSE);
osku's avatar
osku committed
97 98 99 100 101 102 103 104 105 106 107
			ut_ad(r >= page + PAGE_OLD_SUPREMUM);
			ut_ad(r < page + (UNIV_PAGE_SIZE - PAGE_DIR));
		}
	}

	rec_offs_bytes = mach_encode_2(r - page);

	while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {

		if (UNIV_UNLIKELY(slot == first_slot)) {
			fprintf(stderr,
108 109 110 111
				"InnoDB: Probable data corruption on"
				" page %lu\n"
				"InnoDB: Original record ",
				(ulong) buf_frame_get_page_no(page));
osku's avatar
osku committed
112 113 114 115 116 117 118 119

			if (page_is_comp(page)) {
				fputs("(compact record)", stderr);
			} else {
				rec_print_old(stderr, rec);
			}

			fputs("\n"
120 121 122
			      "InnoDB: on that page.\n"
			      "InnoDB: Cannot find the dir slot for record ",
			      stderr);
osku's avatar
osku committed
123 124 125 126
			if (page_is_comp(page)) {
				fputs("(compact record)", stderr);
			} else {
				rec_print_old(stderr, page
127
					      + mach_decode_2(rec_offs_bytes));
osku's avatar
osku committed
128 129
			}
			fputs("\n"
130
			      "InnoDB: on that page!\n", stderr);
osku's avatar
osku committed
131

132
			buf_page_print(page, 0);
osku's avatar
osku committed
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157

			ut_error;
		}

		slot += PAGE_DIR_SLOT_SIZE;
	}

	return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
}

/******************************************************************
Used to check the consistency of a directory slot. */
static
ibool
page_dir_slot_check(
/*================*/
					/* out: TRUE if succeed */
	page_dir_slot_t*	slot)	/* in: slot */
{
	page_t*	page;
	ulint	n_slots;
	ulint	n_owned;

	ut_a(slot);

158
	page = page_align(slot);
osku's avatar
osku committed
159 160 161 162 163 164 165 166

	n_slots = page_dir_get_n_slots(page);

	ut_a(slot <= page_dir_get_nth_slot(page, 0));
	ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));

	ut_a(page_rec_check(page_dir_slot_get_rec(slot)));

marko's avatar
marko committed
167 168 169
	if (page_is_comp(page)) {
		n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
	} else {
marko's avatar
marko committed
170
		n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
marko's avatar
marko committed
171
	}
osku's avatar
osku committed
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

	if (slot == page_dir_get_nth_slot(page, 0)) {
		ut_a(n_owned == 1);
	} else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
		ut_a(n_owned >= 1);
		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
	} else {
		ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
	}

	return(TRUE);
}

/*****************************************************************
Sets the max trx id field value. */

void
page_set_max_trx_id(
/*================*/
marko's avatar
marko committed
192 193 194 195
	page_t*		page,	/* in/out: page */
	page_zip_des_t*	page_zip,/* in/out: compressed page whose
				uncompressed part will be updated, or NULL */
	dulint		trx_id)	/* in: transaction id */
osku's avatar
osku committed
196 197 198 199 200 201 202 203 204 205 206 207 208 209
{
	buf_block_t*	block;

	ut_ad(page);

	block = buf_block_align(page);

	if (block->is_hashed) {
		rw_lock_x_lock(&btr_search_latch);
	}

	/* It is not necessary to write this change to the redo log, as
	during a database recovery we assume that the max trx id of every
	page is the maximum trx id assigned before the crash. */
210

marko's avatar
marko committed
211
	mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
marko's avatar
marko committed
212 213
	if (UNIV_LIKELY_NULL(page_zip)) {
		page_zip_write_header(page_zip,
214 215
				      page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
				      8, NULL);
marko's avatar
marko committed
216
	}
osku's avatar
osku committed
217 218 219 220 221 222 223

	if (block->is_hashed) {
		rw_lock_x_unlock(&btr_search_latch);
	}
}

/****************************************************************
224
Allocates a block of memory from the heap of an index page. */
osku's avatar
osku committed
225 226

byte*
227 228
page_mem_alloc_heap(
/*================*/
osku's avatar
osku committed
229 230
				/* out: pointer to start of allocated
				buffer, or NULL if allocation fails */
marko's avatar
marko committed
231
	page_t*		page,	/* in/out: index page */
232 233 234 235 236
	page_zip_des_t*	page_zip,/* in/out: compressed page with enough
				space available for inserting the record,
				or NULL */
	ulint		need,	/* in: total number of bytes needed */
	ulint*		heap_no)/* out: this contains the heap number
osku's avatar
osku committed
237 238 239 240 241 242
				of the allocated record
				if allocation succeeds */
{
	byte*	block;
	ulint	avl_space;

243
	ut_ad(page && heap_no);
osku's avatar
osku committed
244 245

	avl_space = page_get_max_insert_size(page, 1);
246

osku's avatar
osku committed
247 248 249
	if (avl_space >= need) {
		block = page_header_get_ptr(page, PAGE_HEAP_TOP);

marko's avatar
marko committed
250
		page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP,
251
				    block + need);
osku's avatar
osku committed
252 253
		*heap_no = page_dir_get_n_heap(page);

marko's avatar
marko committed
254
		page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
osku's avatar
osku committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270

		return(block);
	}

	return(NULL);
}

/**************************************************************
Writes a log record of page creation. */
UNIV_INLINE
void
page_create_write_log(
/*==================*/
	buf_frame_t*	frame,	/* in: a buffer frame where the page is
				created */
	mtr_t*		mtr,	/* in: mini-transaction handle */
271
	ibool		comp)	/* in: TRUE=compact page format */
osku's avatar
osku committed
272
{
273 274 275
	mlog_write_initial_log_record(frame, comp
				      ? MLOG_COMP_PAGE_CREATE
				      : MLOG_PAGE_CREATE, mtr);
osku's avatar
osku committed
276 277 278 279 280 281 282 283
}

/***************************************************************
Parses a redo log record of creating a page. */

byte*
page_parse_create(
/*==============*/
284 285 286 287 288 289
				/* out: end of log record or NULL */
	byte*		ptr,	/* in: buffer */
	byte*		end_ptr __attribute__((unused)), /* in: buffer end */
	ulint		comp,	/* in: nonzero=compact page format */
	page_t*		page,	/* in: page or NULL */
	mtr_t*		mtr)	/* in: mtr or NULL */
osku's avatar
osku committed
290 291 292 293 294 295
{
	ut_ad(ptr && end_ptr);

	/* The record is empty, except for the record initial part */

	if (page) {
296
		page_create(page, mtr, comp);
osku's avatar
osku committed
297 298 299 300 301 302 303
	}

	return(ptr);
}

/**************************************************************
The index page creation function. */
304
static
305
page_t*
306 307
page_create_low(
/*============*/
marko's avatar
marko committed
308 309 310
					/* out: pointer to the page */
	buf_frame_t*	frame,		/* in/out: a buffer frame where the
					page is created */
311
	ulint		comp)		/* in: nonzero=compact page format */
osku's avatar
osku committed
312 313 314
{
	page_dir_slot_t* slot;
	mem_heap_t*	heap;
315
	dtuple_t*	tuple;
osku's avatar
osku committed
316 317 318 319 320
	dfield_t*	field;
	byte*		heap_top;
	rec_t*		infimum_rec;
	rec_t*		supremum_rec;
	page_t*		page;
321
	dict_index_t*	index;
osku's avatar
osku committed
322 323
	ulint*		offsets;

324
	ut_ad(frame);
325 326 327 328 329 330
#if PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE > PAGE_DATA
# error "PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE > PAGE_DATA"
#endif
#if PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE > PAGE_DATA
# error "PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE > PAGE_DATA"
#endif
osku's avatar
osku committed
331

332 333 334 335 336 337 338
	/* The infimum and supremum records use a dummy index. */
	if (UNIV_LIKELY(comp)) {
		index = srv_sys->dummy_ind2;
	} else {
		index = srv_sys->dummy_ind1;
	}

osku's avatar
osku committed
339 340 341 342 343 344 345 346
	/* 1. INCREMENT MODIFY CLOCK */
	buf_frame_modify_clock_inc(frame);

	page = frame;

	fil_page_set_type(page, FIL_PAGE_INDEX);

	heap = mem_heap_create(200);
347

osku's avatar
osku committed
348 349 350 351 352 353 354 355 356
	/* 3. CREATE THE INFIMUM AND SUPREMUM RECORDS */

	/* Create first a data tuple for infimum record */
	tuple = dtuple_create(heap, 1);
	dtuple_set_info_bits(tuple, REC_STATUS_INFIMUM);
	field = dtuple_get_nth_field(tuple, 0);

	dfield_set_data(field, "infimum", 8);
	dtype_set(dfield_get_type(field),
357
		  DATA_VARCHAR, DATA_ENGLISH | DATA_NOT_NULL, 8);
osku's avatar
osku committed
358 359 360 361
	/* Set the corresponding physical record to its place in the page
	record heap */

	heap_top = page + PAGE_DATA;
362

363 364
	infimum_rec = rec_convert_dtuple_to_rec(heap_top, index,
						tuple, NULL, 0);
osku's avatar
osku committed
365

marko's avatar
marko committed
366 367 368 369
	if (UNIV_LIKELY(comp)) {
		ut_a(infimum_rec == page + PAGE_NEW_INFIMUM);

		rec_set_n_owned_new(infimum_rec, NULL, 1);
370
		rec_set_heap_no_new(infimum_rec, 0);
marko's avatar
marko committed
371 372 373 374 375 376
	} else {
		ut_a(infimum_rec == page + PAGE_OLD_INFIMUM);

		rec_set_n_owned_old(infimum_rec, 1);
		rec_set_heap_no_old(infimum_rec, 0);
	}
osku's avatar
osku committed
377 378

	offsets = rec_get_offsets(infimum_rec, index, NULL,
379
				  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
380 381 382 383 384 385 386 387 388 389 390

	heap_top = rec_get_end(infimum_rec, offsets);

	/* Create then a tuple for supremum */

	tuple = dtuple_create(heap, 1);
	dtuple_set_info_bits(tuple, REC_STATUS_SUPREMUM);
	field = dtuple_get_nth_field(tuple, 0);

	dfield_set_data(field, "supremum", comp ? 8 : 9);
	dtype_set(dfield_get_type(field),
391
		  DATA_VARCHAR, DATA_ENGLISH | DATA_NOT_NULL, comp ? 8 : 9);
osku's avatar
osku committed
392

393 394
	supremum_rec = rec_convert_dtuple_to_rec(heap_top, index,
						 tuple, NULL, 0);
osku's avatar
osku committed
395

marko's avatar
marko committed
396 397
	if (UNIV_LIKELY(comp)) {
		ut_a(supremum_rec == page + PAGE_NEW_SUPREMUM);
osku's avatar
osku committed
398

marko's avatar
marko committed
399
		rec_set_n_owned_new(supremum_rec, NULL, 1);
400
		rec_set_heap_no_new(supremum_rec, 1);
marko's avatar
marko committed
401 402 403 404 405 406
	} else {
		ut_a(supremum_rec == page + PAGE_OLD_SUPREMUM);

		rec_set_n_owned_old(supremum_rec, 1);
		rec_set_heap_no_old(supremum_rec, 1);
	}
osku's avatar
osku committed
407 408

	offsets = rec_get_offsets(supremum_rec, index, offsets,
409
				  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
410 411
	heap_top = rec_get_end(supremum_rec, offsets);

412 413
	ut_ad(heap_top == page
	      + (comp ? PAGE_NEW_SUPREMUM_END : PAGE_OLD_SUPREMUM_END));
osku's avatar
osku committed
414 415 416 417 418

	mem_heap_free(heap);

	/* 4. INITIALIZE THE PAGE */

marko's avatar
marko committed
419 420 421 422 423 424 425 426 427
	page_header_set_field(page, NULL, PAGE_N_DIR_SLOTS, 2);
	page_header_set_ptr(page, NULL, PAGE_HEAP_TOP, heap_top);
	page_header_set_field(page, NULL, PAGE_N_HEAP, comp ? 0x8002 : 2);
	page_header_set_ptr(page, NULL, PAGE_FREE, NULL);
	page_header_set_field(page, NULL, PAGE_GARBAGE, 0);
	page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, NULL);
	page_header_set_field(page, NULL, PAGE_DIRECTION, PAGE_NO_DIRECTION);
	page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
	page_header_set_field(page, NULL, PAGE_N_RECS, 0);
marko's avatar
marko committed
428
	page_set_max_trx_id(page, NULL, ut_dulint_zero);
osku's avatar
osku committed
429
	memset(heap_top, 0, UNIV_PAGE_SIZE - PAGE_EMPTY_DIR_START
430
	       - (heap_top - page));
osku's avatar
osku committed
431 432 433 434 435 436

	/* 5. SET POINTERS IN RECORDS AND DIR SLOTS */

	/* Set the slots to point to infimum and supremum. */

	slot = page_dir_get_nth_slot(page, 0);
437
	page_dir_slot_set_rec(slot, infimum_rec);
osku's avatar
osku committed
438 439

	slot = page_dir_get_nth_slot(page, 1);
440
	page_dir_slot_set_rec(slot, supremum_rec);
osku's avatar
osku committed
441 442

	/* Set the next pointers in infimum and supremum */
marko's avatar
marko committed
443 444

	if (UNIV_LIKELY(comp)) {
445 446
		rec_set_next_offs_new(infimum_rec, PAGE_NEW_SUPREMUM);
		rec_set_next_offs_new(supremum_rec, 0);
marko's avatar
marko committed
447
	} else {
marko's avatar
marko committed
448
		rec_set_next_offs_old(infimum_rec, PAGE_OLD_SUPREMUM);
marko's avatar
marko committed
449 450 451
		rec_set_next_offs_old(supremum_rec, 0);
	}

452 453
	return(page);
}
marko's avatar
marko committed
454

455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
/**************************************************************
Create an uncompressed B-tree index page. */

page_t*
page_create(
/*========*/
					/* out: pointer to the page */
	buf_frame_t*	frame,		/* in/out: a buffer frame where the
					page is created */
	mtr_t*		mtr,		/* in: mini-transaction handle */
	ulint		comp)		/* in: nonzero=compact page format */
{
	page_create_write_log(frame, mtr, comp);
	return(page_create_low(frame, comp));
}

/**************************************************************
Create a compressed B-tree index page. */

page_t*
page_create_zip(
/*============*/
					/* out: pointer to the page */
	buf_frame_t*	frame,		/* in/out: a buffer frame where the
					page is created */
	page_zip_des_t*	page_zip,	/* in/out: compressed page, or NULL */
	dict_index_t*	index,		/* in: the index of the page */
	ulint		level,		/* in: the B-tree level of the page */
	mtr_t*		mtr)		/* in: mini-transaction handle */
{
	ut_ad(frame && page_zip && index);
	ut_ad(dict_table_is_comp(index->table));

	page_create_low(frame, TRUE);
489
	mach_write_to_2(frame + PAGE_HEADER + PAGE_LEVEL, level);
490

491
	if (UNIV_UNLIKELY(!page_zip_compress(page_zip, frame, index, mtr))) {
492 493 494 495 496 497
		/* The compression of a newly created page
		should always succeed. */
		ut_error;
	}

	return(frame);
osku's avatar
osku committed
498 499 500 501
}

/*****************************************************************
Differs from page_copy_rec_list_end, because this function does not
marko's avatar
marko committed
502
touch the lock table and max trx id on page or compress the page. */
osku's avatar
osku committed
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525

void
page_copy_rec_list_end_no_locks(
/*============================*/
	page_t*		new_page,	/* in: index page to copy to */
	rec_t*		rec,		/* in: record on page */
	dict_index_t*	index,		/* in: record descriptor */
	mtr_t*		mtr)		/* in: mtr */
{
	page_cur_t	cur1;
	page_cur_t	cur2;
	mem_heap_t*	heap		= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets		= offsets_;
	*offsets_ = (sizeof offsets_) / sizeof *offsets_;

	page_cur_position(rec, &cur1);

	if (page_cur_is_before_first(&cur1)) {

		page_cur_move_to_next(&cur1);
	}

526
	ut_a((ibool)!!page_is_comp(new_page)
527
	     == dict_table_is_comp(index->table));
marko's avatar
marko committed
528
	ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
osku's avatar
osku committed
529
	ut_a(mach_read_from_2(new_page + UNIV_PAGE_SIZE - 10) == (ulint)
530
	     (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
osku's avatar
osku committed
531 532

	page_cur_set_before_first(new_page, &cur2);
533 534

	/* Copy records from the original page to the new page */
osku's avatar
osku committed
535

marko's avatar
marko committed
536
	while (!page_cur_is_after_last(&cur1)) {
osku's avatar
osku committed
537 538
		rec_t*	cur1_rec = page_cur_get_rec(&cur1);
		offsets = rec_get_offsets(cur1_rec, index, offsets,
539 540 541
					  ULINT_UNDEFINED, &heap);
		if (UNIV_UNLIKELY(!page_cur_rec_insert(&cur2, NULL, cur1_rec,
						       index, offsets, mtr))) {
osku's avatar
osku committed
542 543 544
			/* Track an assertion failure reported on the mailing
			list on June 18th, 2003 */

545
			buf_page_print(new_page, 0);
546
			buf_page_print(page_align(rec), 0);
osku's avatar
osku committed
547 548 549
			ut_print_timestamp(stderr);

			fprintf(stderr,
550 551
				"InnoDB: rec offset %lu, cur1 offset %lu,"
				" cur2 offset %lu\n",
552 553 554
				(ulong) page_offset(rec),
				(ulong) page_offset(page_cur_get_rec(&cur1)),
				(ulong) page_offset(page_cur_get_rec(&cur2)));
osku's avatar
osku committed
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
			ut_error;
		}

		page_cur_move_to_next(&cur1);
		page_cur_move_to_next(&cur2);
	}

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
}

/*****************************************************************
Copies records from page to new_page, from a given record onward,
including that record. Infimum and supremum records are not copied.
The records are copied to the start of the record list on new_page. */

572
rec_t*
osku's avatar
osku committed
573 574
page_copy_rec_list_end(
/*===================*/
575 576 577 578 579
					/* out: pointer to the original
					successor of the infimum record
					on new_page, or NULL on zip overflow
					(new_page will be decompressed
					from new_page_zip) */
marko's avatar
marko committed
580 581
	page_t*		new_page,	/* in/out: index page to copy to */
	page_zip_des_t*	new_page_zip,	/* in/out: compressed page, or NULL */
osku's avatar
osku committed
582 583 584 585
	rec_t*		rec,		/* in: record on page */
	dict_index_t*	index,		/* in: record descriptor */
	mtr_t*		mtr)		/* in: mtr */
{
586
	page_t*	page	= page_align(rec);
587
	rec_t*	ret	= page_rec_get_next(page_get_infimum_rec(new_page));
marko's avatar
marko committed
588
	ulint	log_mode= 0; /* remove warning */
589

590 591
	/* page_zip_validate() will fail here if btr_compress()
	sets FIL_PAGE_PREV to FIL_NULL */
marko's avatar
marko committed
592 593
	ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
	ut_ad(page_is_comp(page) == page_is_comp(new_page));
594 595 596 597

	if (UNIV_LIKELY_NULL(new_page_zip)) {
		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
	}
marko's avatar
marko committed
598

osku's avatar
osku committed
599
	if (page_dir_get_n_heap(new_page) == 2) {
600 601
		page_copy_rec_list_end_to_created_page(new_page, rec,
						       index, mtr);
osku's avatar
osku committed
602
	} else {
marko's avatar
marko committed
603
		page_copy_rec_list_end_no_locks(new_page, rec, index, mtr);
osku's avatar
osku committed
604 605
	}

marko's avatar
marko committed
606
	if (UNIV_LIKELY_NULL(new_page_zip)) {
607 608
		mtr_set_log_mode(mtr, log_mode);

609 610
		if (UNIV_UNLIKELY
		    (!page_zip_compress(new_page_zip, new_page, index, mtr))) {
611 612 613 614 615
			/* Before trying to reorganize the page,
			store the number of preceding records on the page. */
			ulint	ret_pos
				= page_rec_get_n_recs_before(ret);

616
			if (UNIV_UNLIKELY
617 618
			    (!page_zip_reorganize(new_page_zip, new_page,
						  index, mtr))) {
619

620
				if (UNIV_UNLIKELY
621 622
				    (!page_zip_decompress(new_page_zip,
							  new_page))) {
623 624
					ut_error;
				}
625
				ut_ad(page_validate(new_page, index));
626 627 628 629 630
				return(NULL);
			} else {
				/* The page was reorganized:
				Seek to ret_pos. */
				ret = new_page + PAGE_NEW_INFIMUM;
marko's avatar
marko committed
631

632 633 634
				do {
					ret = rec_get_next_ptr(ret, TRUE);
				} while (--ret_pos);
marko's avatar
marko committed
635 636 637 638
			}
		}
	}

marko's avatar
marko committed
639 640 641 642 643
	/* Update the lock table, MAX_TRX_ID, and possible hash index */

	lock_move_rec_list_end(new_page, page, rec);

	page_update_max_trx_id(new_page, new_page_zip,
644
			       page_get_max_trx_id(page));
marko's avatar
marko committed
645

osku's avatar
osku committed
646
	btr_search_move_or_delete_hash_entries(new_page, page, index);
marko's avatar
marko committed
647

648
	return(ret);
649
}
osku's avatar
osku committed
650 651 652 653 654 655

/*****************************************************************
Copies records from page to new_page, up to the given record,
NOT including that record. Infimum and supremum records are not copied.
The records are copied to the end of the record list on new_page. */

656
rec_t*
osku's avatar
osku committed
657 658
page_copy_rec_list_start(
/*=====================*/
659 660 661 662 663
					/* out: pointer to the original
					predecessor of the supremum record
					on new_page, or NULL on zip overflow
					(new_page will be decompressed
					from new_page_zip) */
marko's avatar
marko committed
664 665
	page_t*		new_page,	/* in/out: index page to copy to */
	page_zip_des_t*	new_page_zip,	/* in/out: compressed page, or NULL */
osku's avatar
osku committed
666 667 668 669 670 671
	rec_t*		rec,		/* in: record on page */
	dict_index_t*	index,		/* in: record descriptor */
	mtr_t*		mtr)		/* in: mtr */
{
	page_cur_t	cur1;
	page_cur_t	cur2;
marko's avatar
marko committed
672
	page_t*		page;
673
	ulint		log_mode	= 0 /* remove warning */;
osku's avatar
osku committed
674
	mem_heap_t*	heap		= NULL;
675 676
	rec_t*		ret
		= page_rec_get_prev(page_get_supremum_rec(new_page));
osku's avatar
osku committed
677 678 679 680
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets		= offsets_;
	*offsets_ = (sizeof offsets_) / sizeof *offsets_;

marko's avatar
marko committed
681
	if (page_rec_is_infimum(rec)) {
osku's avatar
osku committed
682

683
		return(ret);
osku's avatar
osku committed
684 685
	}

686 687 688 689
	if (UNIV_LIKELY_NULL(new_page_zip)) {
		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
	}

690
	page = page_align(rec);
marko's avatar
marko committed
691 692

	page_cur_set_before_first(page, &cur1);
osku's avatar
osku committed
693
	page_cur_move_to_next(&cur1);
694

695
	page_cur_position(ret, &cur2);
696 697

	/* Copy records from the original page to the new page */
osku's avatar
osku committed
698 699 700 701 702

	while (page_cur_get_rec(&cur1) != rec) {
		rec_t*	ins_rec;
		rec_t*	cur1_rec = page_cur_get_rec(&cur1);
		offsets = rec_get_offsets(cur1_rec, index, offsets,
703 704 705
					  ULINT_UNDEFINED, &heap);
		ins_rec = page_cur_rec_insert(&cur2, NULL, cur1_rec,
					      index, offsets, mtr);
osku's avatar
osku committed
706 707 708 709 710 711
		ut_a(ins_rec);

		page_cur_move_to_next(&cur1);
		page_cur_move_to_next(&cur2);
	}

marko's avatar
marko committed
712 713 714
	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
osku's avatar
osku committed
715

marko's avatar
marko committed
716
	if (UNIV_LIKELY_NULL(new_page_zip)) {
717 718
		mtr_set_log_mode(mtr, log_mode);

719 720
		if (UNIV_UNLIKELY
		    (!page_zip_compress(new_page_zip, new_page, index, mtr))) {
721 722 723 724
			/* Before trying to reorganize the page,
			store the number of preceding records on the page. */
			ulint	ret_pos
				= page_rec_get_n_recs_before(ret);
osku's avatar
osku committed
725

726
			if (UNIV_UNLIKELY
727 728
			    (!page_zip_reorganize(new_page_zip, new_page,
						  index, mtr))) {
729

730
				if (UNIV_UNLIKELY
731 732
				    (!page_zip_decompress(new_page_zip,
							  new_page))) {
733 734
					ut_error;
				}
735
				ut_ad(page_validate(new_page, index));
736 737 738 739 740 741 742 743 744 745
				return(NULL);
			} else {
				/* The page was reorganized:
				Seek to ret_pos. */
				ret = new_page + PAGE_NEW_INFIMUM;

				do {
					ret = rec_get_next_ptr(ret, TRUE);
				} while (--ret_pos);
			}
marko's avatar
marko committed
746
		}
osku's avatar
osku committed
747
	}
marko's avatar
marko committed
748

marko's avatar
marko committed
749 750 751
	/* Update MAX_TRX_ID, the lock table, and possible hash index */

	page_update_max_trx_id(new_page, new_page_zip,
752
			       page_get_max_trx_id(page));
marko's avatar
marko committed
753

754
	lock_move_rec_list_start(new_page, page, rec, ret);
marko's avatar
marko committed
755 756 757

	btr_search_move_or_delete_hash_entries(new_page, page, index);

758
	return(ret);
osku's avatar
osku committed
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774
}

/**************************************************************
Writes a log record of a record list end or start deletion. */
UNIV_INLINE
void
page_delete_rec_list_write_log(
/*===========================*/
	rec_t*		rec,	/* in: record on page */
	dict_index_t*	index,	/* in: record descriptor */
	byte		type,	/* in: operation type:
				MLOG_LIST_END_DELETE, ... */
	mtr_t*		mtr)	/* in: mtr */
{
	byte*	log_ptr;
	ut_ad(type == MLOG_LIST_END_DELETE
775 776 777
	      || type == MLOG_LIST_START_DELETE
	      || type == MLOG_COMP_LIST_END_DELETE
	      || type == MLOG_COMP_LIST_START_DELETE);
osku's avatar
osku committed
778 779 780 781

	log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
	if (log_ptr) {
		/* Write the parameter as a 2-byte ulint */
782
		mach_write_to_2(log_ptr, page_offset(rec));
osku's avatar
osku committed
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
		mlog_close(mtr, log_ptr + 2);
	}
}

/**************************************************************
Parses a log record of a record list end or start deletion. */

byte*
page_parse_delete_rec_list(
/*=======================*/
				/* out: end of log record or NULL */
	byte		type,	/* in: MLOG_LIST_END_DELETE,
				MLOG_LIST_START_DELETE,
				MLOG_COMP_LIST_END_DELETE or
				MLOG_COMP_LIST_START_DELETE */
	byte*		ptr,	/* in: buffer */
	byte*		end_ptr,/* in: buffer end */
	dict_index_t*	index,	/* in: record descriptor */
801 802
	page_t*		page,	/* in/out: page or NULL */
	page_zip_des_t*	page_zip,/* in/out: compressed page or NULL */
osku's avatar
osku committed
803 804
	mtr_t*		mtr)	/* in: mtr or NULL */
{
marko's avatar
marko committed
805
	ulint		offset;
806

osku's avatar
osku committed
807
	ut_ad(type == MLOG_LIST_END_DELETE
808 809 810
	      || type == MLOG_LIST_START_DELETE
	      || type == MLOG_COMP_LIST_END_DELETE
	      || type == MLOG_COMP_LIST_START_DELETE);
811

osku's avatar
osku committed
812 813 814 815 816 817
	/* Read the record offset as a 2-byte ulint */

	if (end_ptr < ptr + 2) {

		return(NULL);
	}
818

osku's avatar
osku committed
819 820 821 822 823 824 825 826
	offset = mach_read_from_2(ptr);
	ptr += 2;

	if (!page) {

		return(ptr);
	}

827
	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
osku's avatar
osku committed
828 829

	if (type == MLOG_LIST_END_DELETE
830
	    || type == MLOG_COMP_LIST_END_DELETE) {
marko's avatar
marko committed
831
		page_delete_rec_list_end(page + offset, index,
832 833
					 ULINT_UNDEFINED, ULINT_UNDEFINED,
					 page_zip, mtr);
osku's avatar
osku committed
834
	} else {
835
		page_delete_rec_list_start(page + offset, index,
836
					   page_zip, mtr);
osku's avatar
osku committed
837 838 839 840 841 842 843 844 845 846 847 848
	}

	return(ptr);
}

/*****************************************************************
Deletes records from a page from a given record onward, including that record.
The infimum and supremum records are not deleted. */

void
page_delete_rec_list_end(
/*=====================*/
marko's avatar
marko committed
849
	rec_t*		rec,	/* in: pointer to record on page */
osku's avatar
osku committed
850 851 852 853 854 855
	dict_index_t*	index,	/* in: record descriptor */
	ulint		n_recs,	/* in: number of records to delete,
				or ULINT_UNDEFINED if not known */
	ulint		size,	/* in: the sum of the sizes of the
				records in the end of the chain to
				delete, or ULINT_UNDEFINED if not known */
marko's avatar
marko committed
856
	page_zip_des_t*	page_zip,/* in/out: compressed page, or NULL */
osku's avatar
osku committed
857 858
	mtr_t*		mtr)	/* in: mtr */
{
marko's avatar
marko committed
859 860 861 862 863
	page_dir_slot_t*slot;
	ulint		slot_index;
	rec_t*		last_rec;
	rec_t*		prev_rec;
	ulint		n_owned;
864
	page_t*		page		= page_align(rec);
marko's avatar
marko committed
865 866 867 868
	mem_heap_t*	heap		= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets		= offsets_;
	*offsets_ = (sizeof offsets_) / sizeof *offsets_;
marko's avatar
marko committed
869 870 871

	ut_ad(size == ULINT_UNDEFINED || size < UNIV_PAGE_SIZE);
	ut_ad(!page_zip || page_rec_is_comp(rec));
872
#ifdef UNIV_ZIP_DEBUG
873
	ut_a(!page_zip || page_zip_validate(page_zip, page));
874
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
875 876 877 878

	if (page_rec_is_infimum(rec)) {
		rec = page_rec_get_next(rec);
	}
osku's avatar
osku committed
879

marko's avatar
marko committed
880 881 882 883 884
	if (page_rec_is_supremum(rec)) {

		return;
	}

osku's avatar
osku committed
885 886 887
	/* Reset the last insert info in the page header and increment
	the modify clock for the frame */

888
	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
osku's avatar
osku committed
889 890 891 892 893

	/* The page gets invalid for optimistic searches: increment the
	frame modify clock */

	buf_frame_modify_clock_inc(page);
894

marko's avatar
marko committed
895
	page_delete_rec_list_write_log(rec, index, page_is_comp(page)
896 897
				       ? MLOG_COMP_LIST_END_DELETE
				       : MLOG_LIST_END_DELETE, mtr);
osku's avatar
osku committed
898

marko's avatar
marko committed
899 900 901 902 903 904 905 906 907 908 909 910 911
	if (UNIV_LIKELY_NULL(page_zip)) {
		ulint		log_mode;

		ut_a(page_is_comp(page));
		/* Individual deletes are not logged */

		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);

		do {
			page_cur_t	cur;
			page_cur_position(rec, &cur);

			offsets = rec_get_offsets(rec, index, offsets,
912
						  ULINT_UNDEFINED, &heap);
marko's avatar
marko committed
913
			rec = rec_get_next_ptr(rec, TRUE);
914 915 916
#ifdef UNIV_ZIP_DEBUG
			ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
marko's avatar
marko committed
917
			page_cur_delete_rec(&cur, index, offsets,
918
					    page_zip, mtr);
919
		} while (page_offset(rec) != PAGE_NEW_SUPREMUM);
marko's avatar
marko committed
920 921 922 923 924 925 926 927 928 929 930

		if (UNIV_LIKELY_NULL(heap)) {
			mem_heap_free(heap);
		}

		/* Restore log mode */

		mtr_set_log_mode(mtr, log_mode);
		return;
	}

osku's avatar
osku committed
931 932
	prev_rec = page_rec_get_prev(rec);

marko's avatar
marko committed
933
	last_rec = page_rec_get_prev(page_get_supremum_rec(page));
osku's avatar
osku committed
934

935
	if ((size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED)) {
936
		rec_t*		rec2		= rec;
osku's avatar
osku committed
937 938 939 940
		/* Calculate the sum of sizes and the number of records */
		size = 0;
		n_recs = 0;

marko's avatar
marko committed
941
		do {
osku's avatar
osku committed
942 943
			ulint	s;
			offsets = rec_get_offsets(rec2, index, offsets,
944
						  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
945 946
			s = rec_offs_size(offsets);
			ut_ad(rec2 - page + s - rec_offs_extra_size(offsets)
947
			      < UNIV_PAGE_SIZE);
osku's avatar
osku committed
948 949 950 951 952
			ut_ad(size + s < UNIV_PAGE_SIZE);
			size += s;
			n_recs++;

			rec2 = page_rec_get_next(rec2);
marko's avatar
marko committed
953
		} while (!page_rec_is_supremum(rec2));
osku's avatar
osku committed
954 955 956 957 958 959 960 961 962 963 964

		if (UNIV_LIKELY_NULL(heap)) {
			mem_heap_free(heap);
		}
	}

	ut_ad(size < UNIV_PAGE_SIZE);

	/* Update the page directory; there is no need to balance the number
	of the records owned by the supremum record, as it is allowed to be
	less than PAGE_DIR_SLOT_MIN_N_OWNED */
965

marko's avatar
marko committed
966
	if (page_is_comp(page)) {
967 968 969
		rec_t*	rec2	= rec;
		ulint	count	= 0;

marko's avatar
marko committed
970 971
		while (rec_get_n_owned_new(rec2) == 0) {
			count++;
osku's avatar
osku committed
972

marko's avatar
marko committed
973 974 975 976 977 978
			rec2 = rec_get_next_ptr(rec2, TRUE);
		}

		ut_ad(rec_get_n_owned_new(rec2) > count);

		n_owned = rec_get_n_owned_new(rec2) - count;
979 980
		slot_index = page_dir_find_owner_slot(rec2);
		slot = page_dir_get_nth_slot(page, slot_index);
marko's avatar
marko committed
981
	} else {
982 983 984
		rec_t*	rec2	= rec;
		ulint	count	= 0;

marko's avatar
marko committed
985 986 987 988 989 990 991 992 993
		while (rec_get_n_owned_old(rec2) == 0) {
			count++;

			rec2 = rec_get_next_ptr(rec2, FALSE);
		}

		ut_ad(rec_get_n_owned_old(rec2) > count);

		n_owned = rec_get_n_owned_old(rec2) - count;
994 995
		slot_index = page_dir_find_owner_slot(rec2);
		slot = page_dir_get_nth_slot(page, slot_index);
marko's avatar
marko committed
996
	}
osku's avatar
osku committed
997

998
	page_dir_slot_set_rec(slot, page_get_supremum_rec(page));
marko's avatar
marko committed
999
	page_dir_slot_set_n_owned(slot, NULL, n_owned);
osku's avatar
osku committed
1000

marko's avatar
marko committed
1001
	page_dir_set_n_slots(page, NULL, slot_index + 1);
1002

osku's avatar
osku committed
1003
	/* Remove the record chain segment from the record chain */
1004
	page_rec_set_next(prev_rec, page_get_supremum_rec(page));
osku's avatar
osku committed
1005 1006 1007

	/* Catenate the deleted chain segment to the page free list */

1008
	page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE));
marko's avatar
marko committed
1009
	page_header_set_ptr(page, NULL, PAGE_FREE, rec);
osku's avatar
osku committed
1010

1011 1012
	page_header_set_field(page, NULL, PAGE_GARBAGE, size
			      + page_header_get_field(page, PAGE_GARBAGE));
osku's avatar
osku committed
1013

marko's avatar
marko committed
1014
	page_header_set_field(page, NULL, PAGE_N_RECS,
1015
			      (ulint)(page_get_n_recs(page) - n_recs));
1016
}
osku's avatar
osku committed
1017 1018 1019

/*****************************************************************
Deletes records from page, up to the given record, NOT including
1020
that record. Infimum and supremum records are not deleted. */
1021

osku's avatar
osku committed
1022 1023 1024 1025 1026
void
page_delete_rec_list_start(
/*=======================*/
	rec_t*		rec,	/* in: record on page */
	dict_index_t*	index,	/* in: record descriptor */
1027
	page_zip_des_t*	page_zip,/* in/out: compressed page of rec, or NULL */
osku's avatar
osku committed
1028 1029 1030 1031 1032 1033 1034 1035
	mtr_t*		mtr)	/* in: mtr */
{
	page_cur_t	cur1;
	ulint		log_mode;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets		= offsets_;
	mem_heap_t*	heap		= NULL;
	byte		type;
marko's avatar
marko committed
1036

osku's avatar
osku committed
1037 1038
	*offsets_ = (sizeof offsets_) / sizeof *offsets_;

1039
	ut_ad((ibool) !!page_rec_is_comp(rec)
1040
	      == dict_table_is_comp(index->table));
1041 1042 1043 1044
	/* page_zip_validate() would detect a min_rec_mark mismatch
	in btr_page_split_and_insert()
	between btr_attach_half_pages() and insert_page = ...
	when btr_page_get_split_rec_to_left() holds (direction == FSP_DOWN). */
osku's avatar
osku committed
1045

marko's avatar
marko committed
1046 1047 1048 1049 1050 1051
	if (page_rec_is_infimum(rec)) {

		return;
	}

	if (page_rec_is_comp(rec)) {
osku's avatar
osku committed
1052 1053 1054 1055 1056 1057 1058
		type = MLOG_COMP_LIST_START_DELETE;
	} else {
		type = MLOG_LIST_START_DELETE;
	}

	page_delete_rec_list_write_log(rec, index, type, mtr);

1059
	page_cur_set_before_first(page_align(rec), &cur1);
osku's avatar
osku committed
1060
	page_cur_move_to_next(&cur1);
1061

osku's avatar
osku committed
1062 1063 1064 1065 1066 1067
	/* Individual deletes are not logged */

	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);

	while (page_cur_get_rec(&cur1) != rec) {
		offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1068
					  offsets, ULINT_UNDEFINED, &heap);
1069
		page_cur_delete_rec(&cur1, index, offsets, page_zip, mtr);
osku's avatar
osku committed
1070 1071 1072 1073 1074 1075 1076 1077 1078
	}

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}

	/* Restore log mode */

	mtr_set_log_mode(mtr, log_mode);
1079
}
osku's avatar
osku committed
1080 1081 1082 1083 1084

/*****************************************************************
Moves record list end to another page. Moved records include
split_rec. */

1085
ibool
osku's avatar
osku committed
1086 1087
page_move_rec_list_end(
/*===================*/
1088 1089 1090
					/* out: TRUE on success; FALSE on
					compression failure (new_page will
					be decompressed from new_page_zip) */
osku's avatar
osku committed
1091
	page_t*		new_page,	/* in: index page where to move */
marko's avatar
marko committed
1092 1093
	page_zip_des_t*	new_page_zip,	/* in/out: compressed page of
					new_page, or NULL */
osku's avatar
osku committed
1094
	rec_t*		split_rec,	/* in: first record to move */
marko's avatar
marko committed
1095 1096
	page_zip_des_t*	page_zip,	/* in/out: compressed page of
					split_rec, or NULL */
osku's avatar
osku committed
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
	dict_index_t*	index,		/* in: record descriptor */
	mtr_t*		mtr)		/* in: mtr */
{
	ulint	old_data_size;
	ulint	new_data_size;
	ulint	old_n_recs;
	ulint	new_n_recs;

	old_data_size = page_get_data_size(new_page);
	old_n_recs = page_get_n_recs(new_page);
1107
#ifdef UNIV_ZIP_DEBUG
1108
	ut_a(!new_page_zip || page_zip_validate(new_page_zip, new_page));
1109
	ut_a(!page_zip || page_zip_validate(page_zip, page_align(split_rec)));
1110
#endif /* UNIV_ZIP_DEBUG */
1111

1112
	if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_page, new_page_zip,
1113
						  split_rec, index, mtr))) {
1114
		return(FALSE);
marko's avatar
marko committed
1115
	}
osku's avatar
osku committed
1116 1117 1118 1119 1120 1121

	new_data_size = page_get_data_size(new_page);
	new_n_recs = page_get_n_recs(new_page);

	ut_ad(new_data_size >= old_data_size);

marko's avatar
marko committed
1122
	page_delete_rec_list_end(split_rec, index,
1123 1124 1125
				 new_n_recs - old_n_recs,
				 new_data_size - old_data_size,
				 page_zip, mtr);
1126 1127

	return(TRUE);
osku's avatar
osku committed
1128 1129
}

1130 1131 1132 1133
/*****************************************************************
Moves record list start to another page. Moved records do not include
split_rec. */

1134
ibool
1135 1136
page_move_rec_list_start(
/*=====================*/
1137 1138
					/* out: TRUE on success; FALSE on
					compression failure */
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
	page_t*		new_page,	/* in: index page where to move */
	page_zip_des_t*	new_page_zip,	/* in/out: compressed page of
					new_page, or NULL */
	rec_t*		split_rec,	/* in: first record not to move */
	page_zip_des_t*	page_zip,	/* in/out: compressed page of
					split_rec, or NULL */
	dict_index_t*	index,		/* in: record descriptor */
	mtr_t*		mtr)		/* in: mtr */
{
	if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_page, new_page_zip,
1149
						    split_rec, index, mtr))) {
1150
		return(FALSE);
1151 1152
	}

1153
	page_delete_rec_list_start(split_rec, index, page_zip, mtr);
1154

1155
	return(TRUE);
1156 1157
}

osku's avatar
osku committed
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
/***************************************************************************
This is a low-level operation which is used in a database index creation
to update the page number of a created B-tree to a data dictionary record. */

void
page_rec_write_index_page_no(
/*=========================*/
	rec_t*	rec,	/* in: record to update */
	ulint	i,	/* in: index of the field to update */
	ulint	page_no,/* in: value to write */
	mtr_t*	mtr)	/* in: mtr */
{
	byte*	data;
	ulint	len;
1172

osku's avatar
osku committed
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
	data = rec_get_nth_field_old(rec, i, &len);

	ut_ad(len == 4);

	mlog_write_ulint(data, page_no, MLOG_4BYTES, mtr);
}

/******************************************************************
Used to delete n slots from the directory. This function updates
also n_owned fields in the records, so that the first slot after
the deleted ones inherits the records of the deleted slots. */
UNIV_INLINE
void
marko's avatar
marko committed
1186 1187 1188
page_dir_delete_slot(
/*=================*/
	page_t*		page,	/* in/out: the index page */
1189
	page_zip_des_t*	page_zip,/* in/out: compressed page, or NULL */
marko's avatar
marko committed
1190
	ulint		slot_no)/* in: slot to be deleted */
osku's avatar
osku committed
1191 1192
{
	page_dir_slot_t*	slot;
marko's avatar
marko committed
1193
	ulint			n_owned;
osku's avatar
osku committed
1194 1195 1196
	ulint			i;
	ulint			n_slots;

marko's avatar
marko committed
1197 1198
	ut_ad(!page_zip || page_is_comp(page));
	ut_ad(slot_no > 0);
1199
	ut_ad(slot_no + 1 < page_dir_get_n_slots(page));
osku's avatar
osku committed
1200 1201 1202 1203 1204

	n_slots = page_dir_get_n_slots(page);

	/* 1. Reset the n_owned fields of the slots to be
	deleted */
marko's avatar
marko committed
1205 1206 1207
	slot = page_dir_get_nth_slot(page, slot_no);
	n_owned = page_dir_slot_get_n_owned(slot);
	page_dir_slot_set_n_owned(slot, page_zip, 0);
osku's avatar
osku committed
1208 1209 1210

	/* 2. Update the n_owned value of the first non-deleted slot */

marko's avatar
marko committed
1211
	slot = page_dir_get_nth_slot(page, slot_no + 1);
1212
	page_dir_slot_set_n_owned(slot, page_zip,
1213
				  n_owned + page_dir_slot_get_n_owned(slot));
marko's avatar
marko committed
1214 1215 1216 1217 1218

	/* 3. Destroy the slot by copying slots */
	for (i = slot_no + 1; i < n_slots; i++) {
		rec_t*	rec;
		rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, i));
1219
		page_dir_slot_set_rec(page_dir_get_nth_slot(page, i - 1), rec);
osku's avatar
osku committed
1220 1221
	}

1222 1223 1224 1225
	/* 4. Zero out the last slot, which will be removed */
	mach_write_to_2(page_dir_get_nth_slot(page, n_slots - 1), 0);

	/* 5. Update the page header */
marko's avatar
marko committed
1226
	page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots - 1);
osku's avatar
osku committed
1227 1228 1229 1230 1231 1232 1233 1234
}

/******************************************************************
Used to add n slots to the directory. Does not set the record pointers
in the added slots or update n_owned values: this is the responsibility
of the caller. */
UNIV_INLINE
void
marko's avatar
marko committed
1235 1236
page_dir_add_slot(
/*==============*/
marko's avatar
marko committed
1237
	page_t*		page,	/* in/out: the index page */
1238
	page_zip_des_t*	page_zip,/* in/out: comprssed page, or NULL */
marko's avatar
marko committed
1239
	ulint		start)	/* in: the slot above which the new slots
marko's avatar
marko committed
1240
				are added */
osku's avatar
osku committed
1241 1242 1243
{
	page_dir_slot_t*	slot;
	ulint			n_slots;
1244

osku's avatar
osku committed
1245 1246 1247 1248 1249
	n_slots = page_dir_get_n_slots(page);

	ut_ad(start < n_slots - 1);

	/* Update the page header */
marko's avatar
marko committed
1250
	page_dir_set_n_slots(page, page_zip, n_slots + 1);
osku's avatar
osku committed
1251 1252

	/* Move slots up */
marko's avatar
marko committed
1253 1254 1255
	slot = page_dir_get_nth_slot(page, n_slots);
	memmove(slot, slot + PAGE_DIR_SLOT_SIZE,
		(n_slots - 1 - start) * PAGE_DIR_SLOT_SIZE);
osku's avatar
osku committed
1256 1257 1258 1259 1260 1261 1262 1263
}

/********************************************************************
Splits a directory slot which owns too many records. */

void
page_dir_split_slot(
/*================*/
marko's avatar
marko committed
1264
	page_t*		page,	/* in/out: index page */
1265 1266
	page_zip_des_t*	page_zip,/* in/out: compressed page whose
				uncompressed part will be written, or NULL */
marko's avatar
marko committed
1267
	ulint		slot_no)/* in: the directory slot */
1268
{
osku's avatar
osku committed
1269 1270 1271 1272 1273 1274 1275 1276
	rec_t*			rec;
	page_dir_slot_t*	new_slot;
	page_dir_slot_t*	prev_slot;
	page_dir_slot_t*	slot;
	ulint			i;
	ulint			n_owned;

	ut_ad(page);
marko's avatar
marko committed
1277
	ut_ad(!page_zip || page_is_comp(page));
osku's avatar
osku committed
1278 1279 1280
	ut_ad(slot_no > 0);

	slot = page_dir_get_nth_slot(page, slot_no);
1281

osku's avatar
osku committed
1282 1283 1284
	n_owned = page_dir_slot_get_n_owned(slot);
	ut_ad(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED + 1);

1285
	/* 1. We loop to find a record approximately in the middle of the
osku's avatar
osku committed
1286
	records owned by the slot. */
1287

osku's avatar
osku committed
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
	prev_slot = page_dir_get_nth_slot(page, slot_no - 1);
	rec = page_dir_slot_get_rec(prev_slot);

	for (i = 0; i < n_owned / 2; i++) {
		rec = page_rec_get_next(rec);
	}

	ut_ad(n_owned / 2 >= PAGE_DIR_SLOT_MIN_N_OWNED);

	/* 2. We add one directory slot immediately below the slot to be
	split. */

marko's avatar
marko committed
1300
	page_dir_add_slot(page, page_zip, slot_no - 1);
osku's avatar
osku committed
1301 1302 1303 1304 1305 1306 1307 1308

	/* The added slot is now number slot_no, and the old slot is
	now number slot_no + 1 */

	new_slot = page_dir_get_nth_slot(page, slot_no);
	slot = page_dir_get_nth_slot(page, slot_no + 1);

	/* 3. We store the appropriate values to the new slot. */
1309

1310
	page_dir_slot_set_rec(new_slot, rec);
1311
	page_dir_slot_set_n_owned(new_slot, page_zip, n_owned / 2);
1312 1313

	/* 4. Finally, we update the number of records field of the
osku's avatar
osku committed
1314 1315
	original slot */

1316
	page_dir_slot_set_n_owned(slot, page_zip, n_owned - (n_owned / 2));
osku's avatar
osku committed
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
}

/*****************************************************************
Tries to balance the given directory slot with too few records with the upper
neighbor, so that there are at least the minimum number of records owned by
the slot; this may result in the merging of two slots. */

void
page_dir_balance_slot(
/*==================*/
marko's avatar
marko committed
1327
	page_t*		page,	/* in/out: index page */
1328
	page_zip_des_t*	page_zip,/* in/out: compressed page, or NULL */
marko's avatar
marko committed
1329
	ulint		slot_no)/* in: the directory slot */
osku's avatar
osku committed
1330 1331 1332 1333 1334 1335 1336 1337 1338
{
	page_dir_slot_t*	slot;
	page_dir_slot_t*	up_slot;
	ulint			n_owned;
	ulint			up_n_owned;
	rec_t*			old_rec;
	rec_t*			new_rec;

	ut_ad(page);
marko's avatar
marko committed
1339
	ut_ad(!page_zip || page_is_comp(page));
osku's avatar
osku committed
1340 1341 1342
	ut_ad(slot_no > 0);

	slot = page_dir_get_nth_slot(page, slot_no);
1343

osku's avatar
osku committed
1344 1345 1346
	/* The last directory slot cannot be balanced with the upper
	neighbor, as there is none. */

marko's avatar
marko committed
1347
	if (UNIV_UNLIKELY(slot_no == page_dir_get_n_slots(page) - 1)) {
osku's avatar
osku committed
1348 1349 1350

		return;
	}
1351

osku's avatar
osku committed
1352
	up_slot = page_dir_get_nth_slot(page, slot_no + 1);
1353

osku's avatar
osku committed
1354 1355
	n_owned = page_dir_slot_get_n_owned(slot);
	up_n_owned = page_dir_slot_get_n_owned(up_slot);
1356

osku's avatar
osku committed
1357 1358 1359
	ut_ad(n_owned == PAGE_DIR_SLOT_MIN_N_OWNED - 1);

	/* If the upper slot has the minimum value of n_owned, we will merge
1360
	the two slots, therefore we assert: */
osku's avatar
osku committed
1361
	ut_ad(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 <= PAGE_DIR_SLOT_MAX_N_OWNED);
1362

osku's avatar
osku committed
1363 1364 1365 1366 1367
	if (up_n_owned > PAGE_DIR_SLOT_MIN_N_OWNED) {

		/* In this case we can just transfer one record owned
		by the upper slot to the property of the lower slot */
		old_rec = page_dir_slot_get_rec(slot);
marko's avatar
marko committed
1368 1369 1370

		if (page_is_comp(page)) {
			new_rec = rec_get_next_ptr(old_rec, TRUE);
1371

marko's avatar
marko committed
1372 1373 1374 1375
			rec_set_n_owned_new(old_rec, page_zip, 0);
			rec_set_n_owned_new(new_rec, page_zip, n_owned + 1);
		} else {
			new_rec = rec_get_next_ptr(old_rec, FALSE);
1376

marko's avatar
marko committed
1377 1378 1379 1380
			rec_set_n_owned_old(old_rec, 0);
			rec_set_n_owned_old(new_rec, n_owned + 1);
		}

1381
		page_dir_slot_set_rec(slot, new_rec);
1382

marko's avatar
marko committed
1383
		page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned -1);
osku's avatar
osku committed
1384 1385
	} else {
		/* In this case we may merge the two slots */
marko's avatar
marko committed
1386
		page_dir_delete_slot(page, page_zip, slot_no);
1387
	}
osku's avatar
osku committed
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
}

/****************************************************************
Returns the middle record of the record list. If there are an even number
of records in the list, returns the first record of the upper half-list. */

rec_t*
page_get_middle_rec(
/*================*/
			/* out: middle record */
	page_t*	page)	/* in: page */
{
	page_dir_slot_t*	slot;
	ulint			middle;
	ulint			i;
	ulint			n_owned;
	ulint			count;
	rec_t*			rec;

	/* This many records we must leave behind */
	middle = (page_get_n_recs(page) + 2) / 2;

	count = 0;

	for (i = 0;; i++) {

		slot = page_dir_get_nth_slot(page, i);
		n_owned = page_dir_slot_get_n_owned(slot);

		if (count + n_owned > middle) {
			break;
		} else {
			count += n_owned;
		}
	}

	ut_ad(i > 0);
	slot = page_dir_get_nth_slot(page, i - 1);
	rec = page_dir_slot_get_rec(slot);
	rec = page_rec_get_next(rec);

	/* There are now count records behind rec */

	for (i = 0; i < middle - count; i++) {
		rec = page_rec_get_next(rec);
	}

	return(rec);
}
1437

osku's avatar
osku committed
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455
/*******************************************************************
Returns the number of records before the given record in chain.
The number includes infimum and supremum records. */

ulint
page_rec_get_n_recs_before(
/*=======================*/
			/* out: number of records */
	rec_t*	rec)	/* in: the physical record */
{
	page_dir_slot_t*	slot;
	rec_t*			slot_rec;
	page_t*			page;
	ulint			i;
	lint			n	= 0;

	ut_ad(page_rec_check(rec));

1456
	page = page_align(rec);
marko's avatar
marko committed
1457 1458
	if (page_is_comp(page)) {
		while (rec_get_n_owned_new(rec) == 0) {
osku's avatar
osku committed
1459

marko's avatar
marko committed
1460 1461 1462
			rec = rec_get_next_ptr(rec, TRUE);
			n--;
		}
osku's avatar
osku committed
1463

marko's avatar
marko committed
1464 1465 1466
		for (i = 0; ; i++) {
			slot = page_dir_get_nth_slot(page, i);
			slot_rec = page_dir_slot_get_rec(slot);
osku's avatar
osku committed
1467

marko's avatar
marko committed
1468
			n += rec_get_n_owned_new(slot_rec);
osku's avatar
osku committed
1469

marko's avatar
marko committed
1470
			if (rec == slot_rec) {
osku's avatar
osku committed
1471

marko's avatar
marko committed
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
				break;
			}
		}
	} else {
		while (rec_get_n_owned_old(rec) == 0) {

			rec = rec_get_next_ptr(rec, FALSE);
			n--;
		}

		for (i = 0; ; i++) {
			slot = page_dir_get_nth_slot(page, i);
			slot_rec = page_dir_slot_get_rec(slot);

			n += rec_get_n_owned_old(slot_rec);

			if (rec == slot_rec) {

				break;
			}
osku's avatar
osku committed
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
		}
	}

	n--;

	ut_ad(n >= 0);

	return((ulint) n);
}

/****************************************************************
Prints record contents including the data relevant only in
the index page context. */
1505

osku's avatar
osku committed
1506 1507 1508 1509 1510 1511
void
page_rec_print(
/*===========*/
	rec_t*		rec,	/* in: physical record */
	const ulint*	offsets)/* in: record descriptor */
{
marko's avatar
marko committed
1512
	ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
osku's avatar
osku committed
1513
	rec_print_new(stderr, rec, offsets);
marko's avatar
marko committed
1514 1515
	if (page_rec_is_comp(rec)) {
		fprintf(stderr,
1516
			" n_owned: %lu; heap_no: %lu; next rec: %lu\n",
marko's avatar
marko committed
1517 1518 1519 1520 1521
			(ulong) rec_get_n_owned_new(rec),
			(ulong) rec_get_heap_no_new(rec),
			(ulong) rec_get_next_offs(rec, TRUE));
	} else {
		fprintf(stderr,
1522
			" n_owned: %lu; heap_no: %lu; next rec: %lu\n",
marko's avatar
marko committed
1523 1524 1525 1526
			(ulong) rec_get_n_owned_old(rec),
			(ulong) rec_get_heap_no_old(rec),
			(ulong) rec_get_next_offs(rec, TRUE));
	}
osku's avatar
osku committed
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546

	page_rec_check(rec);
	rec_validate(rec, offsets);
}

/*******************************************************************
This is used to print the contents of the directory for
debugging purposes. */

void
page_dir_print(
/*===========*/
	page_t*	page,	/* in: index page */
	ulint	pr_n)	/* in: print n first and n last entries */
{
	ulint			n;
	ulint			i;
	page_dir_slot_t*	slot;

	n = page_dir_get_n_slots(page);
1547

osku's avatar
osku committed
1548 1549 1550 1551
	fprintf(stderr, "--------------------------------\n"
		"PAGE DIRECTORY\n"
		"Page address %p\n"
		"Directory stack top at offs: %lu; number of slots: %lu\n",
1552 1553
		page, (ulong)(page_dir_get_nth_slot(page, n - 1) - page),
		(ulong) n);
osku's avatar
osku committed
1554 1555 1556
	for (i = 0; i < n; i++) {
		slot = page_dir_get_nth_slot(page, i);
		if ((i == pr_n) && (i < n - pr_n)) {
1557
			fputs("    ...   \n", stderr);
osku's avatar
osku committed
1558
		}
1559
		if ((i < pr_n) || (i >= n - pr_n)) {
osku's avatar
osku committed
1560
			fprintf(stderr,
1561 1562 1563 1564 1565
				"Contents of slot: %lu: n_owned: %lu,"
				" rec offs: %lu\n",
				(ulong) i,
				(ulong) page_dir_slot_get_n_owned(slot),
				(ulong)(page_dir_slot_get_rec(slot) - page));
1566
		}
osku's avatar
osku committed
1567 1568 1569 1570
	}
	fprintf(stderr, "Total of %lu records\n"
		"--------------------------------\n",
		(ulong) (2 + page_get_n_recs(page)));
1571 1572
}

osku's avatar
osku committed
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591
/*******************************************************************
This is used to print the contents of the page record list for
debugging purposes. */

void
page_print_list(
/*============*/
	page_t*		page,	/* in: index page */
	dict_index_t*	index,	/* in: dictionary index of the page */
	ulint		pr_n)	/* in: print n first and n last entries */
{
	page_cur_t	cur;
	ulint		count;
	ulint		n_recs;
	mem_heap_t*	heap		= NULL;
	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
	ulint*		offsets		= offsets_;
	*offsets_ = (sizeof offsets_) / sizeof *offsets_;

1592
	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
osku's avatar
osku committed
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604

	fprintf(stderr,
		"--------------------------------\n"
		"PAGE RECORD LIST\n"
		"Page address %p\n", page);

	n_recs = page_get_n_recs(page);

	page_cur_set_before_first(page, &cur);
	count = 0;
	for (;;) {
		offsets = rec_get_offsets(cur.rec, index, offsets,
1605
					  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1606 1607 1608 1609
		page_rec_print(cur.rec, offsets);

		if (count == pr_n) {
			break;
1610
		}
osku's avatar
osku committed
1611 1612
		if (page_cur_is_after_last(&cur)) {
			break;
1613
		}
osku's avatar
osku committed
1614
		page_cur_move_to_next(&cur);
1615
		count++;
osku's avatar
osku committed
1616
	}
1617

osku's avatar
osku committed
1618 1619 1620
	if (n_recs > 2 * pr_n) {
		fputs(" ... \n", stderr);
	}
1621

osku's avatar
osku committed
1622 1623 1624
	while (!page_cur_is_after_last(&cur)) {
		page_cur_move_to_next(&cur);

1625
		if (count + pr_n >= n_recs) {
osku's avatar
osku committed
1626
			offsets = rec_get_offsets(cur.rec, index, offsets,
1627
						  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
1628 1629
			page_rec_print(cur.rec, offsets);
		}
1630
		count++;
osku's avatar
osku committed
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
	}

	fprintf(stderr,
		"Total of %lu records \n"
		"--------------------------------\n",
		(ulong) (count + 1));

	if (UNIV_LIKELY_NULL(heap)) {
		mem_heap_free(heap);
	}
1641
}
osku's avatar
osku committed
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675

/*******************************************************************
Prints the info in a page header. */

void
page_header_print(
/*==============*/
	page_t*	page)
{
	fprintf(stderr,
		"--------------------------------\n"
		"PAGE HEADER INFO\n"
		"Page address %p, n records %lu (%s)\n"
		"n dir slots %lu, heap top %lu\n"
		"Page n heap %lu, free %lu, garbage %lu\n"
		"Page last insert %lu, direction %lu, n direction %lu\n",
		page, (ulong) page_header_get_field(page, PAGE_N_RECS),
		page_is_comp(page) ? "compact format" : "original format",
		(ulong) page_header_get_field(page, PAGE_N_DIR_SLOTS),
		(ulong) page_header_get_field(page, PAGE_HEAP_TOP),
		(ulong) page_dir_get_n_heap(page),
		(ulong) page_header_get_field(page, PAGE_FREE),
		(ulong) page_header_get_field(page, PAGE_GARBAGE),
		(ulong) page_header_get_field(page, PAGE_LAST_INSERT),
		(ulong) page_header_get_field(page, PAGE_DIRECTION),
		(ulong) page_header_get_field(page, PAGE_N_DIRECTION));
}

/*******************************************************************
This is used to print the contents of the page for
debugging purposes. */

void
page_print(
1676
/*=======*/
osku's avatar
osku committed
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
	page_t*		page,	/* in: index page */
	dict_index_t*	index,	/* in: dictionary index of the page */
	ulint		dn,	/* in: print dn first and last entries
				in directory */
	ulint		rn)	/* in: print rn first and last records
				in directory */
{
	page_header_print(page);
	page_dir_print(page, dn);
	page_print_list(page, index, rn);
1687
}
osku's avatar
osku committed
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704

/*******************************************************************
The following is used to validate a record on a page. This function
differs from rec_validate as it can also check the n_owned field and
the heap_no field. */

ibool
page_rec_validate(
/*==============*/
				/* out: TRUE if ok */
	rec_t*		rec,	/* in: physical record */
	const ulint*	offsets)/* in: array returned by rec_get_offsets() */
{
	ulint	n_owned;
	ulint	heap_no;
	page_t*	page;

1705
	page = page_align(rec);
marko's avatar
marko committed
1706
	ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
osku's avatar
osku committed
1707 1708 1709 1710

	page_rec_check(rec);
	rec_validate(rec, offsets);

marko's avatar
marko committed
1711 1712 1713 1714 1715 1716 1717
	if (page_rec_is_comp(rec)) {
		n_owned = rec_get_n_owned_new(rec);
		heap_no = rec_get_heap_no_new(rec);
	} else {
		n_owned = rec_get_n_owned_old(rec);
		heap_no = rec_get_heap_no_old(rec);
	}
osku's avatar
osku committed
1718

marko's avatar
marko committed
1719
	if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
osku's avatar
osku committed
1720 1721
		fprintf(stderr,
			"InnoDB: Dir slot of rec %lu, n owned too big %lu\n",
1722
			(ulong)(rec - page), (ulong) n_owned);
osku's avatar
osku committed
1723 1724 1725
		return(FALSE);
	}

marko's avatar
marko committed
1726
	if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
osku's avatar
osku committed
1727
		fprintf(stderr,
1728 1729 1730
			"InnoDB: Heap no of rec %lu too big %lu %lu\n",
			(ulong)(rec - page), (ulong) heap_no,
			(ulong) page_dir_get_n_heap(page));
osku's avatar
osku committed
1731 1732
		return(FALSE);
	}
1733

osku's avatar
osku committed
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
	return(TRUE);
}

/*******************************************************************
Checks that the first directory slot points to the infimum record and
the last to the supremum. This function is intended to track if the
bug fixed in 4.0.14 has caused corruption to users' databases. */

void
page_check_dir(
/*===========*/
	page_t*	page)	/* in: index page */
{
	ulint	n_slots;

	n_slots = page_dir_get_n_slots(page);

1751
	if (UNIV_UNLIKELY(page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0))
1752
			  != page_get_infimum_rec(page))) {
osku's avatar
osku committed
1753

1754
		fprintf(stderr,
1755 1756
			"InnoDB: Page directory corruption:"
			" infimum not pointed to\n");
1757
		buf_page_print(page, 0);
1758
	}
osku's avatar
osku committed
1759

1760 1761 1762
	if (UNIV_UNLIKELY
	    (page_dir_slot_get_rec(page_dir_get_nth_slot(page, n_slots - 1))
	     != page_get_supremum_rec(page))) {
osku's avatar
osku committed
1763

1764
		fprintf(stderr,
1765 1766
			"InnoDB: Page directory corruption:"
			" supremum not pointed to\n");
1767
		buf_page_print(page, 0);
1768
	}
osku's avatar
osku committed
1769
}
1770

osku's avatar
osku committed
1771 1772 1773 1774 1775 1776
/*******************************************************************
This function checks the consistency of an index page when we do not
know the index. This is also resilient so that this should never crash
even if the page is total garbage. */

ibool
marko's avatar
marko committed
1777 1778
page_simple_validate_old(
/*=====================*/
osku's avatar
osku committed
1779
			/* out: TRUE if ok */
marko's avatar
marko committed
1780 1781
	page_t*	page)	/* in: old-style index page */
{
1782
	page_cur_t	cur;
marko's avatar
marko committed
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
	page_dir_slot_t* slot;
	ulint		slot_no;
	ulint		n_slots;
	rec_t*		rec;
	byte*		rec_heap_top;
	ulint		count;
	ulint		own_count;
	ibool		ret	= FALSE;

	ut_a(!page_is_comp(page));

	/* Check first that the record heap and the directory do not
	overlap. */

	n_slots = page_dir_get_n_slots(page);

	if (UNIV_UNLIKELY(n_slots > UNIV_PAGE_SIZE / 4)) {
		fprintf(stderr,
1801 1802
			"InnoDB: Nonsensical number %lu of page dir slots\n",
			(ulong) n_slots);
marko's avatar
marko committed
1803 1804 1805 1806 1807

		goto func_exit;
	}

	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1808

marko's avatar
marko committed
1809
	if (UNIV_UNLIKELY(rec_heap_top
1810
			  > page_dir_get_nth_slot(page, n_slots - 1))) {
marko's avatar
marko committed
1811 1812

		fprintf(stderr,
1813 1814 1815 1816 1817 1818
			"InnoDB: Record heap and dir overlap on a page,"
			" heap top %lu, dir %lu\n",
			(ulong)
			(page_header_get_ptr(page, PAGE_HEAP_TOP) - page),
			(ulong)
			(page_dir_get_nth_slot(page, n_slots - 1) - page));
marko's avatar
marko committed
1819

1820 1821
		goto func_exit;
	}
marko's avatar
marko committed
1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834

	/* Validate the record list in a loop checking also that it is
	consistent with the page record directory. */

	count = 0;
	own_count = 1;
	slot_no = 0;
	slot = page_dir_get_nth_slot(page, slot_no);

	page_cur_set_before_first(page, &cur);

	for (;;) {
		rec = (&cur)->rec;
1835

marko's avatar
marko committed
1836 1837
		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
			fprintf(stderr,
1838 1839 1840 1841
				"InnoDB: Record %lu is above"
				" rec heap top %lu\n",
				(ulong)(rec - page),
				(ulong)(rec_heap_top - page));
marko's avatar
marko committed
1842 1843 1844 1845 1846 1847 1848

			goto func_exit;
		}

		if (UNIV_UNLIKELY(rec_get_n_owned_old(rec))) {
			/* This is a record pointed to by a dir slot */
			if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
1849
					  != own_count)) {
marko's avatar
marko committed
1850 1851

				fprintf(stderr,
1852 1853 1854 1855 1856
					"InnoDB: Wrong owned count %lu, %lu,"
					" rec %lu\n",
					(ulong) rec_get_n_owned_old(rec),
					(ulong) own_count,
					(ulong)(rec - page));
marko's avatar
marko committed
1857 1858 1859 1860

				goto func_exit;
			}

1861 1862
			if (UNIV_UNLIKELY
			    (page_dir_slot_get_rec(slot) != rec)) {
marko's avatar
marko committed
1863
				fprintf(stderr,
1864 1865
					"InnoDB: Dir slot does not point"
					" to right rec %lu\n",
marko's avatar
marko committed
1866 1867 1868 1869
					(ulong)(rec - page));

				goto func_exit;
			}
1870

marko's avatar
marko committed
1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
			own_count = 0;

			if (!page_cur_is_after_last(&cur)) {
				slot_no++;
				slot = page_dir_get_nth_slot(page, slot_no);
			}
		}

		if (page_cur_is_after_last(&cur)) {

			break;
		}

1884 1885 1886
		if (UNIV_UNLIKELY
		    (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
		     || rec_get_next_offs(rec, FALSE) >= UNIV_PAGE_SIZE)) {
marko's avatar
marko committed
1887
			fprintf(stderr,
1888 1889 1890 1891
				"InnoDB: Next record offset"
				" nonsensical %lu for rec %lu\n",
				(ulong) rec_get_next_offs(rec, FALSE),
				(ulong) (rec - page));
marko's avatar
marko committed
1892 1893 1894 1895

			goto func_exit;
		}

1896
		count++;
marko's avatar
marko committed
1897 1898 1899

		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
			fprintf(stderr,
1900 1901 1902
				"InnoDB: Page record list appears"
				" to be circular %lu\n",
				(ulong) count);
marko's avatar
marko committed
1903 1904
			goto func_exit;
		}
1905

marko's avatar
marko committed
1906 1907 1908
		page_cur_move_to_next(&cur);
		own_count++;
	}
1909

marko's avatar
marko committed
1910 1911 1912 1913 1914
	if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
		fprintf(stderr, "InnoDB: n owned is zero in a supremum rec\n");

		goto func_exit;
	}
1915

marko's avatar
marko committed
1916 1917 1918 1919
	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
		fprintf(stderr, "InnoDB: n slots wrong %lu, %lu\n",
			(ulong) slot_no, (ulong) (n_slots - 1));
		goto func_exit;
1920
	}
marko's avatar
marko committed
1921 1922

	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS) + 2
1923
			  != count + 1)) {
marko's avatar
marko committed
1924
		fprintf(stderr, "InnoDB: n recs wrong %lu %lu\n",
1925 1926
			(ulong) page_header_get_field(page, PAGE_N_RECS) + 2,
			(ulong) (count + 1));
marko's avatar
marko committed
1927 1928 1929 1930 1931 1932 1933 1934 1935

		goto func_exit;
	}

	/* Check then the free list */
	rec = page_header_get_ptr(page, PAGE_FREE);

	while (rec != NULL) {
		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
1936
				  || rec >= page + UNIV_PAGE_SIZE)) {
marko's avatar
marko committed
1937
			fprintf(stderr,
1938 1939 1940
				"InnoDB: Free list record has"
				" a nonsensical offset %lu\n",
				(ulong) (rec - page));
marko's avatar
marko committed
1941 1942 1943 1944 1945 1946

			goto func_exit;
		}

		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
			fprintf(stderr,
1947 1948 1949 1950
				"InnoDB: Free list record %lu"
				" is above rec heap top %lu\n",
				(ulong) (rec - page),
				(ulong) (rec_heap_top - page));
marko's avatar
marko committed
1951 1952 1953 1954 1955

			goto func_exit;
		}

		count++;
1956

marko's avatar
marko committed
1957 1958
		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
			fprintf(stderr,
1959 1960
				"InnoDB: Page free list appears"
				" to be circular %lu\n",
1961
				(ulong) count);
marko's avatar
marko committed
1962 1963 1964 1965 1966
			goto func_exit;
		}

		rec = page_rec_get_next(rec);
	}
1967

marko's avatar
marko committed
1968 1969 1970
	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {

		fprintf(stderr, "InnoDB: N heap is wrong %lu, %lu\n",
1971 1972
			(ulong) page_dir_get_n_heap(page),
			(ulong) (count + 1));
marko's avatar
marko committed
1973 1974 1975 1976

		goto func_exit;
	}

1977
	ret = TRUE;
marko's avatar
marko committed
1978 1979

func_exit:
1980
	return(ret);
marko's avatar
marko committed
1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992
}

/*******************************************************************
This function checks the consistency of an index page when we do not
know the index. This is also resilient so that this should never crash
even if the page is total garbage. */

ibool
page_simple_validate_new(
/*=====================*/
			/* out: TRUE if ok */
	page_t*	page)	/* in: new-style index page */
osku's avatar
osku committed
1993
{
1994
	page_cur_t	cur;
osku's avatar
osku committed
1995 1996 1997 1998 1999 2000 2001 2002
	page_dir_slot_t* slot;
	ulint		slot_no;
	ulint		n_slots;
	rec_t*		rec;
	byte*		rec_heap_top;
	ulint		count;
	ulint		own_count;
	ibool		ret	= FALSE;
marko's avatar
marko committed
2003 2004

	ut_a(page_is_comp(page));
osku's avatar
osku committed
2005 2006 2007 2008 2009 2010

	/* Check first that the record heap and the directory do not
	overlap. */

	n_slots = page_dir_get_n_slots(page);

marko's avatar
marko committed
2011
	if (UNIV_UNLIKELY(n_slots > UNIV_PAGE_SIZE / 4)) {
osku's avatar
osku committed
2012
		fprintf(stderr,
2013 2014
			"InnoDB: Nonsensical number %lu"
			" of page dir slots\n", (ulong) n_slots);
osku's avatar
osku committed
2015 2016 2017 2018 2019

		goto func_exit;
	}

	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
2020

marko's avatar
marko committed
2021
	if (UNIV_UNLIKELY(rec_heap_top
2022
			  > page_dir_get_nth_slot(page, n_slots - 1))) {
osku's avatar
osku committed
2023 2024

		fprintf(stderr,
2025 2026 2027 2028 2029 2030
			"InnoDB: Record heap and dir overlap on a page,"
			" heap top %lu, dir %lu\n",
			(ulong)
			(page_header_get_ptr(page, PAGE_HEAP_TOP) - page),
			(ulong)
			(page_dir_get_nth_slot(page, n_slots - 1) - page));
osku's avatar
osku committed
2031

2032 2033
		goto func_exit;
	}
osku's avatar
osku committed
2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046

	/* Validate the record list in a loop checking also that it is
	consistent with the page record directory. */

	count = 0;
	own_count = 1;
	slot_no = 0;
	slot = page_dir_get_nth_slot(page, slot_no);

	page_cur_set_before_first(page, &cur);

	for (;;) {
		rec = (&cur)->rec;
2047

marko's avatar
marko committed
2048
		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
osku's avatar
osku committed
2049
			fprintf(stderr,
2050 2051 2052 2053
				"InnoDB: Record %lu is above rec"
				" heap top %lu\n",
				(ulong) (rec - page),
				(ulong) (rec_heap_top - page));
osku's avatar
osku committed
2054 2055 2056 2057

			goto func_exit;
		}

marko's avatar
marko committed
2058
		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec))) {
osku's avatar
osku committed
2059
			/* This is a record pointed to by a dir slot */
marko's avatar
marko committed
2060
			if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
2061
					  != own_count)) {
osku's avatar
osku committed
2062 2063

				fprintf(stderr,
2064 2065 2066 2067 2068
					"InnoDB: Wrong owned count %lu, %lu,"
					" rec %lu\n",
					(ulong) rec_get_n_owned_new(rec),
					(ulong) own_count,
					(ulong)(rec - page));
osku's avatar
osku committed
2069 2070 2071 2072

				goto func_exit;
			}

2073 2074
			if (UNIV_UNLIKELY
			    (page_dir_slot_get_rec(slot) != rec)) {
osku's avatar
osku committed
2075
				fprintf(stderr,
2076 2077
					"InnoDB: Dir slot does not point"
					" to right rec %lu\n",
osku's avatar
osku committed
2078 2079 2080 2081
					(ulong)(rec - page));

				goto func_exit;
			}
2082

osku's avatar
osku committed
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095
			own_count = 0;

			if (!page_cur_is_after_last(&cur)) {
				slot_no++;
				slot = page_dir_get_nth_slot(page, slot_no);
			}
		}

		if (page_cur_is_after_last(&cur)) {

			break;
		}

2096 2097 2098
		if (UNIV_UNLIKELY
		    (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
		     || rec_get_next_offs(rec, TRUE) >= UNIV_PAGE_SIZE)) {
osku's avatar
osku committed
2099
			fprintf(stderr,
2100 2101 2102 2103
				"InnoDB: Next record offset nonsensical %lu"
				" for rec %lu\n",
				(ulong) rec_get_next_offs(rec, TRUE),
				(ulong)(rec - page));
osku's avatar
osku committed
2104 2105 2106 2107

			goto func_exit;
		}

2108
		count++;
osku's avatar
osku committed
2109

marko's avatar
marko committed
2110
		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
osku's avatar
osku committed
2111
			fprintf(stderr,
2112 2113 2114
				"InnoDB: Page record list appears"
				" to be circular %lu\n",
				(ulong) count);
osku's avatar
osku committed
2115 2116
			goto func_exit;
		}
2117

osku's avatar
osku committed
2118 2119 2120
		page_cur_move_to_next(&cur);
		own_count++;
	}
2121

marko's avatar
marko committed
2122
	if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2123 2124
		fprintf(stderr, "InnoDB: n owned is zero"
			" in a supremum rec\n");
osku's avatar
osku committed
2125 2126 2127

		goto func_exit;
	}
2128

marko's avatar
marko committed
2129
	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
osku's avatar
osku committed
2130 2131 2132
		fprintf(stderr, "InnoDB: n slots wrong %lu, %lu\n",
			(ulong) slot_no, (ulong) (n_slots - 1));
		goto func_exit;
2133
	}
osku's avatar
osku committed
2134

marko's avatar
marko committed
2135
	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS) + 2
2136
			  != count + 1)) {
osku's avatar
osku committed
2137
		fprintf(stderr, "InnoDB: n recs wrong %lu %lu\n",
2138 2139
			(ulong) page_header_get_field(page, PAGE_N_RECS) + 2,
			(ulong) (count + 1));
osku's avatar
osku committed
2140 2141 2142 2143 2144 2145 2146 2147

		goto func_exit;
	}

	/* Check then the free list */
	rec = page_header_get_ptr(page, PAGE_FREE);

	while (rec != NULL) {
marko's avatar
marko committed
2148
		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2149
				  || rec >= page + UNIV_PAGE_SIZE)) {
osku's avatar
osku committed
2150
			fprintf(stderr,
2151 2152 2153
				"InnoDB: Free list record has"
				" a nonsensical offset %lu\n",
				(ulong) (rec - page));
osku's avatar
osku committed
2154 2155 2156 2157

			goto func_exit;
		}

marko's avatar
marko committed
2158
		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
osku's avatar
osku committed
2159
			fprintf(stderr,
2160 2161 2162 2163
				"InnoDB: Free list record %lu"
				" is above rec heap top %lu\n",
				(ulong) (rec - page),
				(ulong) (rec_heap_top - page));
osku's avatar
osku committed
2164 2165 2166 2167 2168

			goto func_exit;
		}

		count++;
2169

marko's avatar
marko committed
2170
		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
osku's avatar
osku committed
2171
			fprintf(stderr,
2172 2173 2174
				"InnoDB: Page free list appears"
				" to be circular %lu\n",
				(ulong) count);
osku's avatar
osku committed
2175 2176 2177 2178 2179
			goto func_exit;
		}

		rec = page_rec_get_next(rec);
	}
2180

marko's avatar
marko committed
2181
	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
osku's avatar
osku committed
2182 2183

		fprintf(stderr, "InnoDB: N heap is wrong %lu, %lu\n",
2184 2185
			(ulong) page_dir_get_n_heap(page),
			(ulong) (count + 1));
osku's avatar
osku committed
2186 2187 2188 2189

		goto func_exit;
	}

2190
	ret = TRUE;
osku's avatar
osku committed
2191 2192

func_exit:
2193
	return(ret);
osku's avatar
osku committed
2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208
}

/*******************************************************************
This function checks the consistency of an index page. */

ibool
page_validate(
/*==========*/
				/* out: TRUE if ok */
	page_t*		page,	/* in: index page */
	dict_index_t*	index)	/* in: data dictionary index containing
				the page record type definition */
{
	page_dir_slot_t* slot;
	mem_heap_t*	heap;
2209
	page_cur_t	cur;
osku's avatar
osku committed
2210 2211
	byte*		buf;
	ulint		count;
marko's avatar
marko committed
2212 2213
	ulint		own_count;
	ulint		rec_own_count;
osku's avatar
osku committed
2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224
	ulint		slot_no;
	ulint		data_size;
	rec_t*		rec;
	rec_t*		old_rec		= NULL;
	ulint		offs;
	ulint		n_slots;
	ibool		ret		= FALSE;
	ulint		i;
	ulint*		offsets		= NULL;
	ulint*		old_offsets	= NULL;

marko's avatar
marko committed
2225
	if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2226
			  != dict_table_is_comp(index->table))) {
osku's avatar
osku committed
2227 2228 2229
		fputs("InnoDB: 'compact format' flag mismatch\n", stderr);
		goto func_exit2;
	}
marko's avatar
marko committed
2230 2231 2232 2233 2234 2235 2236 2237
	if (page_is_comp(page)) {
		if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
			goto func_exit2;
		}
	} else {
		if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
			goto func_exit2;
		}
osku's avatar
osku committed
2238 2239 2240
	}

	heap = mem_heap_create(UNIV_PAGE_SIZE + 200);
2241

osku's avatar
osku committed
2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252
	/* The following buffer is used to check that the
	records in the page record heap do not overlap */

	buf = mem_heap_alloc(heap, UNIV_PAGE_SIZE);
	memset(buf, 0, UNIV_PAGE_SIZE);

	/* Check first that the record heap and the directory do not
	overlap. */

	n_slots = page_dir_get_n_slots(page);

2253 2254
	if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
			    <= page_dir_get_nth_slot(page, n_slots - 1)))) {
osku's avatar
osku committed
2255 2256

		fputs("InnoDB: Record heap and dir overlap on a page ",
2257
		      stderr);
osku's avatar
osku committed
2258 2259
		dict_index_name_print(stderr, NULL, index);
		fprintf(stderr, ", %p, %p\n",
2260
			page_header_get_ptr(page, PAGE_HEAP_TOP),
osku's avatar
osku committed
2261 2262
			page_dir_get_nth_slot(page, n_slots - 1));

2263 2264
		goto func_exit;
	}
osku's avatar
osku committed
2265 2266 2267 2268 2269

	/* Validate the record list in a loop checking also that
	it is consistent with the directory. */
	count = 0;
	data_size = 0;
marko's avatar
marko committed
2270
	own_count = 1;
osku's avatar
osku committed
2271 2272 2273 2274 2275 2276 2277 2278
	slot_no = 0;
	slot = page_dir_get_nth_slot(page, slot_no);

	page_cur_set_before_first(page, &cur);

	for (;;) {
		rec = cur.rec;
		offsets = rec_get_offsets(rec, index, offsets,
2279
					  ULINT_UNDEFINED, &heap);
osku's avatar
osku committed
2280

marko's avatar
marko committed
2281
		if (page_is_comp(page) && page_rec_is_user_rec(rec)
2282 2283
		    && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
				     == page_is_leaf(page))) {
osku's avatar
osku committed
2284 2285 2286 2287
			fputs("InnoDB: node_ptr flag mismatch\n", stderr);
			goto func_exit;
		}

marko's avatar
marko committed
2288
		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
osku's avatar
osku committed
2289 2290
			goto func_exit;
		}
2291

osku's avatar
osku committed
2292
		/* Check that the records are in the ascending order */
marko's avatar
marko committed
2293
		if (UNIV_LIKELY(count >= 2)
2294 2295 2296 2297
		    && (!page_cur_is_after_last(&cur))) {
			if (UNIV_UNLIKELY
			    (1 != cmp_rec_rec(rec, old_rec,
					      offsets, old_offsets, index))) {
osku's avatar
osku committed
2298
				fprintf(stderr,
2299 2300
					"InnoDB: Records in wrong order"
					" on page %lu ",
osku's avatar
osku committed
2301 2302 2303 2304 2305 2306 2307
					(ulong) buf_frame_get_page_no(page));
				dict_index_name_print(stderr, NULL, index);
				fputs("\nInnoDB: previous record ", stderr);
				rec_print_new(stderr, old_rec, old_offsets);
				fputs("\nInnoDB: record ", stderr);
				rec_print_new(stderr, rec, offsets);
				putc('\n', stderr);
2308

osku's avatar
osku committed
2309 2310 2311 2312 2313 2314 2315 2316
				goto func_exit;
			}
		}

		if (page_rec_is_user_rec(rec)) {

			data_size += rec_offs_size(offsets);
		}
2317

osku's avatar
osku committed
2318
		offs = rec_get_start(rec, offsets) - page;
2319

marko's avatar
marko committed
2320 2321
		for (i = rec_offs_size(offsets); i--; ) {
			if (UNIV_UNLIKELY(buf[offs + i])) {
osku's avatar
osku committed
2322 2323 2324
				/* No other record may overlap this */

				fputs("InnoDB: Record overlaps another\n",
2325
				      stderr);
osku's avatar
osku committed
2326 2327
				goto func_exit;
			}
marko's avatar
marko committed
2328 2329

			buf[offs + i] = 1;
osku's avatar
osku committed
2330 2331
		}

marko's avatar
marko committed
2332
		if (page_is_comp(page)) {
marko's avatar
marko committed
2333 2334 2335 2336
			rec_own_count = rec_get_n_owned_new(rec);
		} else {
			rec_own_count = rec_get_n_owned_old(rec);
		}
marko's avatar
marko committed
2337

marko's avatar
marko committed
2338
		if (UNIV_UNLIKELY(rec_own_count)) {
marko's avatar
marko committed
2339
			/* This is a record pointed to by a dir slot */
marko's avatar
marko committed
2340 2341
			if (UNIV_UNLIKELY(rec_own_count != own_count)) {
				fprintf(stderr,
2342 2343 2344
					"InnoDB: Wrong owned count %lu, %lu\n",
					(ulong) rec_own_count,
					(ulong) own_count);
marko's avatar
marko committed
2345 2346 2347 2348
				goto func_exit;
			}

			if (page_dir_slot_get_rec(slot) != rec) {
2349 2350 2351
				fputs("InnoDB: Dir slot does not"
				      " point to right rec\n",
				      stderr);
marko's avatar
marko committed
2352 2353 2354
				goto func_exit;
			}

osku's avatar
osku committed
2355
			page_dir_slot_check(slot);
marko's avatar
marko committed
2356 2357

			own_count = 0;
osku's avatar
osku committed
2358
			if (!page_cur_is_after_last(&cur)) {
marko's avatar
marko committed
2359 2360
				slot_no++;
				slot = page_dir_get_nth_slot(page, slot_no);
osku's avatar
osku committed
2361 2362 2363 2364 2365 2366 2367
			}
		}

		if (page_cur_is_after_last(&cur)) {
			break;
		}

2368
		count++;
osku's avatar
osku committed
2369
		page_cur_move_to_next(&cur);
marko's avatar
marko committed
2370
		own_count++;
osku's avatar
osku committed
2371 2372 2373 2374 2375 2376 2377 2378
		old_rec = rec;
		/* set old_offsets to offsets; recycle offsets */
		{
			ulint* offs = old_offsets;
			old_offsets = offsets;
			offsets = offs;
		}
	}
marko's avatar
marko committed
2379 2380 2381 2382 2383 2384 2385 2386

	if (page_is_comp(page)) {
		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {

			goto n_owned_zero;
		}
	} else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
n_owned_zero:
osku's avatar
osku committed
2387 2388 2389
		fputs("InnoDB: n owned is zero\n", stderr);
		goto func_exit;
	}
marko's avatar
marko committed
2390 2391

	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
osku's avatar
osku committed
2392 2393 2394
		fprintf(stderr, "InnoDB: n slots wrong %lu %lu\n",
			(ulong) slot_no, (ulong) (n_slots - 1));
		goto func_exit;
2395
	}
osku's avatar
osku committed
2396

marko's avatar
marko committed
2397
	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS) + 2
2398
			  != count + 1)) {
osku's avatar
osku committed
2399
		fprintf(stderr, "InnoDB: n recs wrong %lu %lu\n",
2400 2401
			(ulong) page_header_get_field(page, PAGE_N_RECS) + 2,
			(ulong) (count + 1));
osku's avatar
osku committed
2402 2403 2404
		goto func_exit;
	}

marko's avatar
marko committed
2405
	if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
osku's avatar
osku committed
2406
		fprintf(stderr,
2407
			"InnoDB: Summed data size %lu, returned by func %lu\n",
osku's avatar
osku committed
2408 2409 2410 2411 2412 2413 2414 2415 2416
			(ulong) data_size, (ulong) page_get_data_size(page));
		goto func_exit;
	}

	/* Check then the free list */
	rec = page_header_get_ptr(page, PAGE_FREE);

	while (rec != NULL) {
		offsets = rec_get_offsets(rec, index, offsets,
2417
					  ULINT_UNDEFINED, &heap);
marko's avatar
marko committed
2418
		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
osku's avatar
osku committed
2419 2420 2421

			goto func_exit;
		}
2422 2423

		count++;
osku's avatar
osku committed
2424
		offs = rec_get_start(rec, offsets) - page;
2425

marko's avatar
marko committed
2426
		for (i = rec_offs_size(offsets); i--; ) {
osku's avatar
osku committed
2427

marko's avatar
marko committed
2428
			if (UNIV_UNLIKELY(buf[offs + i])) {
2429 2430
				fputs("InnoDB: Record overlaps another"
				      " in free list\n", stderr);
osku's avatar
osku committed
2431 2432
				goto func_exit;
			}
marko's avatar
marko committed
2433 2434

			buf[offs + i] = 1;
osku's avatar
osku committed
2435
		}
2436

osku's avatar
osku committed
2437 2438
		rec = page_rec_get_next(rec);
	}
2439

marko's avatar
marko committed
2440
	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
osku's avatar
osku committed
2441 2442 2443 2444 2445 2446
		fprintf(stderr, "InnoDB: N heap is wrong %lu %lu\n",
			(ulong) page_dir_get_n_heap(page),
			(ulong) count + 1);
		goto func_exit;
	}

2447
	ret = TRUE;
osku's avatar
osku committed
2448 2449 2450 2451

func_exit:
	mem_heap_free(heap);

marko's avatar
marko committed
2452
	if (UNIV_UNLIKELY(ret == FALSE)) {
2453
func_exit2:
osku's avatar
osku committed
2454 2455 2456 2457
		fprintf(stderr, "InnoDB: Apparent corruption in page %lu in ",
			(ulong) buf_frame_get_page_no(page));
		dict_index_name_print(stderr, NULL, index);
		putc('\n', stderr);
2458
		buf_page_print(page, 0);
osku's avatar
osku committed
2459
	}
2460 2461

	return(ret);
osku's avatar
osku committed
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477
}

/*******************************************************************
Looks in the page record list for a record with the given heap number. */

rec_t*
page_find_rec_with_heap_no(
/*=======================*/
			/* out: record, NULL if not found */
	page_t*	page,	/* in: index page */
	ulint	heap_no)/* in: heap number */
{
	page_cur_t	cur;

	page_cur_set_before_first(page, &cur);

marko's avatar
marko committed
2478 2479 2480 2481 2482 2483
	if (page_is_comp(page)) {
		for (;;) {
			if (rec_get_heap_no_new(cur.rec) == heap_no) {

				return(cur.rec);
			}
osku's avatar
osku committed
2484

marko's avatar
marko committed
2485 2486 2487
			if (page_cur_is_after_last(&cur)) {

				return(NULL);
2488
			}
marko's avatar
marko committed
2489 2490

			page_cur_move_to_next(&cur);
osku's avatar
osku committed
2491
		}
marko's avatar
marko committed
2492 2493 2494
	} else {
		for (;;) {
			if (rec_get_heap_no_old(cur.rec) == heap_no) {
osku's avatar
osku committed
2495

marko's avatar
marko committed
2496 2497
				return(cur.rec);
			}
osku's avatar
osku committed
2498

marko's avatar
marko committed
2499
			if (page_cur_is_after_last(&cur)) {
osku's avatar
osku committed
2500

marko's avatar
marko committed
2501
				return(NULL);
2502
			}
marko's avatar
marko committed
2503 2504 2505

			page_cur_move_to_next(&cur);
		}
osku's avatar
osku committed
2506 2507
	}
}