ibuf0ibuf.c 129 KB
Newer Older
1 2
/*****************************************************************************

3
Copyright (c) 1997, 2010, Innobase Oy. All Rights Reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place, Suite 330, Boston, MA 02111-1307 USA

*****************************************************************************/

19 20
/**************************************************//**
@file ibuf/ibuf0ibuf.c
osku's avatar
osku committed
21 22 23 24 25 26 27
Insert buffer

Created 7/19/1997 Heikki Tuuri
*******************************************************/

#include "ibuf0ibuf.h"

28
/** Number of bits describing a single page */
29 30 31 32
#define IBUF_BITS_PER_PAGE	4
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE must be an even number!"
#endif
33
/** The start address for an insert buffer bitmap page bitmap */
34 35
#define IBUF_BITMAP		PAGE_DATA

osku's avatar
osku committed
36 37 38 39
#ifdef UNIV_NONINL
#include "ibuf0ibuf.ic"
#endif

40 41
#ifndef UNIV_HOTBACKUP

osku's avatar
osku committed
42 43 44 45 46 47 48 49 50 51
#include "buf0buf.h"
#include "buf0rea.h"
#include "fsp0fsp.h"
#include "trx0sys.h"
#include "fil0fil.h"
#include "thr0loc.h"
#include "rem0rec.h"
#include "btr0cur.h"
#include "btr0pcur.h"
#include "btr0btr.h"
52
#include "row0upd.h"
osku's avatar
osku committed
53 54 55 56 57 58
#include "sync0sync.h"
#include "dict0boot.h"
#include "fut0lst.h"
#include "lock0lock.h"
#include "log0recv.h"
#include "que0que.h"
59
#include "srv0start.h" /* srv_shutdown_state */
osku's avatar
osku committed
60

61
/*	STRUCTURE OF AN INSERT BUFFER RECORD
osku's avatar
osku committed
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

In versions < 4.1.x:

1. The first field is the page number.
2. The second field is an array which stores type info for each subsequent
   field. We store the information which affects the ordering of records, and
   also the physical storage size of an SQL NULL value. E.g., for CHAR(10) it
   is 10 bytes.
3. Next we have the fields of the actual index record.

In versions >= 4.1.x:

Note that contary to what we planned in the 1990's, there will only be one
insert buffer tree, and that is in the system tablespace of InnoDB.

1. The first field is the space id.
2. The second field is a one-byte marker (0) which differentiates records from
   the < 4.1.x storage format.
3. The third field is the page number.
4. The fourth field contains the type info, where we have also added 2 bytes to
   store the charset. In the compressed table format of 5.0.x we must add more
   information here so that we can build a dummy 'index' struct which 5.0.x
   can use in the binary search on the index page in the ibuf merge phase.
5. The rest of the fields contain the fields of the actual index record.

In versions >= 5.0.3:

The first byte of the fourth field is an additional marker (0) if the record
is in the compact format.  The presence of this marker can be detected by
looking at the length of the field modulo DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE.

The high-order bit of the character set field in the type info is the
94 95
"nullable" flag for the field.

96
In versions >= 5.5:
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

The optional marker byte at the start of the fourth field is replaced by
mandatory 3 fields, totaling 4 bytes:

 1. 2 bytes: Counter field, used to sort records within a (space id, page
    no) in the order they were added. This is needed so that for example the
    sequence of operations "INSERT x, DEL MARK x, INSERT x" is handled
    correctly.

 2. 1 byte: Operation type (see ibuf_op_t).

 3. 1 byte: Flags. Currently only one flag exists, IBUF_REC_COMPACT.

To ensure older records, which do not have counters to enforce correct
sorting, are merged before any new records, ibuf_insert checks if we're
trying to insert to a position that contains old-style records, and if so,
refuses the insert. Thus, ibuf pages are gradually converted to the new
format as their corresponding buffer pool pages are read into memory.
*/
osku's avatar
osku committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186


/*	PREVENTING DEADLOCKS IN THE INSERT BUFFER SYSTEM

If an OS thread performs any operation that brings in disk pages from
non-system tablespaces into the buffer pool, or creates such a page there,
then the operation may have as a side effect an insert buffer index tree
compression. Thus, the tree latch of the insert buffer tree may be acquired
in the x-mode, and also the file space latch of the system tablespace may
be acquired in the x-mode.

Also, an insert to an index in a non-system tablespace can have the same
effect. How do we know this cannot lead to a deadlock of OS threads? There
is a problem with the i\o-handler threads: they break the latching order
because they own x-latches to pages which are on a lower level than the
insert buffer tree latch, its page latches, and the tablespace latch an
insert buffer operation can reserve.

The solution is the following: Let all the tree and page latches connected
with the insert buffer be later in the latching order than the fsp latch and
fsp page latches.

Insert buffer pages must be such that the insert buffer is never invoked
when these pages are accessed as this would result in a recursion violating
the latching order. We let a special i/o-handler thread take care of i/o to
the insert buffer pages and the ibuf bitmap pages, as well as the fsp bitmap
pages and the first inode page, which contains the inode of the ibuf tree: let
us call all these ibuf pages. To prevent deadlocks, we do not let a read-ahead
access both non-ibuf and ibuf pages.

Then an i/o-handler for the insert buffer never needs to access recursively the
insert buffer tree and thus obeys the latching order. On the other hand, other
i/o-handlers for other tablespaces may require access to the insert buffer,
but because all kinds of latches they need to access there are later in the
latching order, no violation of the latching order occurs in this case,
either.

A problem is how to grow and contract an insert buffer tree. As it is later
in the latching order than the fsp management, we have to reserve the fsp
latch first, before adding or removing pages from the insert buffer tree.
We let the insert buffer tree have its own file space management: a free
list of pages linked to the tree root. To prevent recursive using of the
insert buffer when adding pages to the tree, we must first load these pages
to memory, obtaining a latch on them, and only after that add them to the
free list of the insert buffer tree. More difficult is removing of pages
from the free list. If there is an excess of pages in the free list of the
ibuf tree, they might be needed if some thread reserves the fsp latch,
intending to allocate more file space. So we do the following: if a thread
reserves the fsp latch, we check the writer count field of the latch. If
this field has value 1, it means that the thread did not own the latch
before entering the fsp system, and the mtr of the thread contains no
modifications to the fsp pages. Now we are free to reserve the ibuf latch,
and check if there is an excess of pages in the free list. We can then, in a
separate mini-transaction, take them out of the free list and free them to
the fsp system.

To avoid deadlocks in the ibuf system, we divide file pages into three levels:

(1) non-ibuf pages,
(2) ibuf tree pages and the pages in the ibuf tree free list, and
(3) ibuf bitmap pages.

No OS thread is allowed to access higher level pages if it has latches to
lower level pages; even if the thread owns a B-tree latch it must not access
the B-tree non-leaf pages if it has latches on lower level pages. Read-ahead
is only allowed for level 1 and 2 pages. Dedicated i/o-handler threads handle
exclusively level 1 i/o. A dedicated i/o handler thread handles exclusively
level 2 i/o. However, if an OS thread does the i/o handling for itself, i.e.,
it uses synchronous aio, it can access any pages, as long as it obeys the
access order rules. */

187
/** Buffer pool size per the maximum insert buffer size */
osku's avatar
osku committed
188 189
#define IBUF_POOL_SIZE_PER_MAX_SIZE	2

190
/** Table name for the insert buffer. */
191 192
#define IBUF_TABLE_NAME		"SYS_IBUF_TABLE"

193 194 195
/** Operations that can currently be buffered. */
UNIV_INTERN ibuf_use_t	ibuf_use		= IBUF_USE_ALL;

196
#if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
197 198
/** Flag to control insert buffer debugging. */
UNIV_INTERN uint	ibuf_debug;
199 200
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */

201
/** The insert buffer control structure */
202
UNIV_INTERN ibuf_t*	ibuf			= NULL;
osku's avatar
osku committed
203

204
/** Counter for ibuf_should_try() */
205
UNIV_INTERN ulint	ibuf_flush_count	= 0;
osku's avatar
osku committed
206

207 208 209 210 211 212
#ifdef UNIV_PFS_MUTEX
UNIV_INTERN mysql_pfs_key_t	ibuf_pessimistic_insert_mutex_key;
UNIV_INTERN mysql_pfs_key_t	ibuf_mutex_key;
UNIV_INTERN mysql_pfs_key_t	ibuf_bitmap_mutex_key;
#endif /* UNIV_PFS_MUTEX */

213
#ifdef UNIV_IBUF_COUNT_DEBUG
214
/** Number of tablespaces in the ibuf_counts array */
215
#define IBUF_COUNT_N_SPACES	4
216
/** Number of pages within each tablespace in the ibuf_counts array */
217
#define IBUF_COUNT_N_PAGES	130000
osku's avatar
osku committed
218

219
/** Buffered entry counts for file pages, used in debugging */
220
static ulint	ibuf_counts[IBUF_COUNT_N_SPACES][IBUF_COUNT_N_PAGES];
osku's avatar
osku committed
221

222
/******************************************************************//**
223 224 225 226 227
Checks that the indexes to ibuf_counts[][] are within limits. */
UNIV_INLINE
void
ibuf_count_check(
/*=============*/
228 229
	ulint	space_id,	/*!< in: space identifier */
	ulint	page_no)	/*!< in: page number */
230 231 232 233 234 235
{
	if (space_id < IBUF_COUNT_N_SPACES && page_no < IBUF_COUNT_N_PAGES) {
		return;
	}

	fprintf(stderr,
236
		"InnoDB: UNIV_IBUF_COUNT_DEBUG limits space_id and page_no\n"
237 238 239 240 241 242 243
		"InnoDB: and breaks crash recovery.\n"
		"InnoDB: space_id=%lu, should be 0<=space_id<%lu\n"
		"InnoDB: page_no=%lu, should be 0<=page_no<%lu\n",
		(ulint) space_id, (ulint) IBUF_COUNT_N_SPACES,
		(ulint) page_no, (ulint) IBUF_COUNT_N_PAGES);
	ut_error;
}
244
#endif
osku's avatar
osku committed
245

246
/** @name Offsets to the per-page bits in the insert buffer bitmap */
247 248 249 250 251 252 253 254 255 256
/* @{ */
#define	IBUF_BITMAP_FREE	0	/*!< Bits indicating the
					amount of free space */
#define IBUF_BITMAP_BUFFERED	2	/*!< TRUE if there are buffered
					changes for the page */
#define IBUF_BITMAP_IBUF	3	/*!< TRUE if page is a part of
					the ibuf tree, excluding the
					root page, or is in the free
					list of the ibuf */
/* @} */
osku's avatar
osku committed
257

258 259 260 261
/* Various constants for checking the type of an ibuf record and extracting
data from it. For details, see the description of the record format at the
top of this file. */

262
/** @name Format of the fourth column of an insert buffer record
263
The fourth column in the MySQL 5.5 format contains an operation
264 265 266
type, counter, and some flags. */
/* @{ */
#define IBUF_REC_INFO_SIZE	4	/*!< Combined size of info fields at
267
					the beginning of the fourth field */
268 269 270
#if IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE
# error "IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE"
#endif
271 272

/* Offsets for the fields at the beginning of the fourth field */
273 274 275
#define IBUF_REC_OFFSET_COUNTER	0	/*!< Operation counter */
#define IBUF_REC_OFFSET_TYPE	2	/*!< Type of operation */
#define IBUF_REC_OFFSET_FLAGS	3	/*!< Additional flags */
276 277

/* Record flag masks */
278 279 280 281
#define IBUF_REC_COMPACT	0x1	/*!< Set in
					IBUF_REC_OFFSET_FLAGS if the
					user index is in COMPACT
					format or later */
282 283


284
/** The mutex used to block pessimistic inserts to ibuf trees */
osku's avatar
osku committed
285 286
static mutex_t	ibuf_pessimistic_insert_mutex;

287
/** The mutex protecting the insert buffer structs */
osku's avatar
osku committed
288 289
static mutex_t	ibuf_mutex;

290
/** The mutex protecting the insert buffer bitmaps */
osku's avatar
osku committed
291 292
static mutex_t	ibuf_bitmap_mutex;

293
/** The area in pages from which contract looks for page numbers for merge */
osku's avatar
osku committed
294 295
#define	IBUF_MERGE_AREA			8

296
/** Inside the merge area, pages which have at most 1 per this number less
osku's avatar
osku committed
297 298 299 300
buffered entries compared to maximum volume that can buffered for a single
page are merged along with the page whose buffer became full */
#define IBUF_MERGE_THRESHOLD		4

301
/** In ibuf_contract at most this number of pages is read to memory in one
osku's avatar
osku committed
302 303 304
batch, in order to merge the entries for them in the insert buffer */
#define	IBUF_MAX_N_PAGES_MERGED		IBUF_MERGE_AREA

305
/** If the combined size of the ibuf trees exceeds ibuf->max_size by this
osku's avatar
osku committed
306 307 308 309
many pages, we start to contract it in connection to inserts there, using
non-synchronous contract */
#define IBUF_CONTRACT_ON_INSERT_NON_SYNC	0

310 311 312
/** If the combined size of the ibuf trees exceeds ibuf->max_size by this
many pages, we start to contract it in connection to inserts there, using
synchronous contract */
osku's avatar
osku committed
313 314
#define IBUF_CONTRACT_ON_INSERT_SYNC		5

315 316 317
/** If the combined size of the ibuf trees exceeds ibuf->max_size by
this many pages, we start to contract it synchronous contract, but do
not insert */
osku's avatar
osku committed
318 319 320 321 322 323 324 325
#define IBUF_CONTRACT_DO_NOT_INSERT		10

/* TODO: how to cope with drop table if there are records in the insert
buffer for the indexes of the table? Is there actually any problem,
because ibuf merge is done to a page when it is read in, and it is
still physically like the index page even if the index would have been
dropped! So, there seems to be no problem. */

326
/******************************************************************//**
osku's avatar
osku committed
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
Sets the flag in the current OS thread local storage denoting that it is
inside an insert buffer routine. */
UNIV_INLINE
void
ibuf_enter(void)
/*============*/
{
	ibool*	ptr;

	ptr = thr_local_get_in_ibuf_field();

	ut_ad(*ptr == FALSE);

	*ptr = TRUE;
}

343
/******************************************************************//**
osku's avatar
osku committed
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
Sets the flag in the current OS thread local storage denoting that it is
exiting an insert buffer routine. */
UNIV_INLINE
void
ibuf_exit(void)
/*===========*/
{
	ibool*	ptr;

	ptr = thr_local_get_in_ibuf_field();

	ut_ad(*ptr == TRUE);

	*ptr = FALSE;
}

360
/******************************************************************//**
osku's avatar
osku committed
361
Returns TRUE if the current OS thread is performing an insert buffer
362
routine.
363 364 365 366

For instance, a read-ahead of non-ibuf pages is forbidden by threads
that are executing an insert buffer routine.
@return TRUE if inside an insert buffer routine */
367
UNIV_INTERN
osku's avatar
osku committed
368 369 370 371 372 373 374
ibool
ibuf_inside(void)
/*=============*/
{
	return(*thr_local_get_in_ibuf_field());
}

375
/******************************************************************//**
376 377
Gets the ibuf header page and x-latches it.
@return	insert buffer header page */
osku's avatar
osku committed
378 379 380 381
static
page_t*
ibuf_header_page_get(
/*=================*/
382
	mtr_t*	mtr)	/*!< in: mtr */
osku's avatar
osku committed
383
{
384
	buf_block_t*	block;
osku's avatar
osku committed
385 386 387

	ut_ad(!ibuf_inside());

388 389
	block = buf_page_get(
		IBUF_SPACE_ID, 0, FSP_IBUF_HEADER_PAGE_NO, RW_X_LATCH, mtr);
390
	buf_block_dbg_add_level(block, SYNC_IBUF_HEADER);
osku's avatar
osku committed
391

392
	return(buf_block_get_frame(block));
osku's avatar
osku committed
393 394
}

395
/******************************************************************//**
396 397
Gets the root page and x-latches it.
@return	insert buffer tree root page */
osku's avatar
osku committed
398 399 400 401
static
page_t*
ibuf_tree_root_get(
/*===============*/
402
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
403
{
404
	buf_block_t*	block;
405
	page_t*		root;
osku's avatar
osku committed
406 407

	ut_ad(ibuf_inside());
408
	ut_ad(mutex_own(&ibuf_mutex));
osku's avatar
osku committed
409

410 411 412 413
	mtr_x_lock(dict_index_get_lock(ibuf->index), mtr);

	block = buf_page_get(
		IBUF_SPACE_ID, 0, FSP_IBUF_TREE_ROOT_PAGE_NO, RW_X_LATCH, mtr);
osku's avatar
osku committed
414

415
	buf_block_dbg_add_level(block, SYNC_TREE_NODE);
osku's avatar
osku committed
416

417 418 419 420 421 422 423
	root = buf_block_get_frame(block);

	ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
	ut_ad(page_get_page_no(root) == FSP_IBUF_TREE_ROOT_PAGE_NO);
	ut_ad(ibuf->empty == (page_get_n_recs(root) == 0));

	return(root);
osku's avatar
osku committed
424 425
}

426
#ifdef UNIV_IBUF_COUNT_DEBUG
427
/******************************************************************//**
428
Gets the ibuf count for a given page.
429 430
@return number of entries in the insert buffer currently buffered for
this page */
431
UNIV_INTERN
osku's avatar
osku committed
432 433 434
ulint
ibuf_count_get(
/*===========*/
435 436
	ulint	space,	/*!< in: space id */
	ulint	page_no)/*!< in: page number */
osku's avatar
osku committed
437
{
438
	ibuf_count_check(space, page_no);
osku's avatar
osku committed
439

440
	return(ibuf_counts[space][page_no]);
osku's avatar
osku committed
441 442
}

443
/******************************************************************//**
osku's avatar
osku committed
444 445 446 447 448
Sets the ibuf count for a given page. */
static
void
ibuf_count_set(
/*===========*/
449 450 451
	ulint	space,	/*!< in: space id */
	ulint	page_no,/*!< in: page number */
	ulint	val)	/*!< in: value to set */
osku's avatar
osku committed
452
{
453
	ibuf_count_check(space, page_no);
osku's avatar
osku committed
454 455
	ut_a(val < UNIV_PAGE_SIZE);

456
	ibuf_counts[space][page_no] = val;
osku's avatar
osku committed
457 458 459
}
#endif

460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
/******************************************************************//**
Closes insert buffer and frees the data structures. */
UNIV_INTERN
void
ibuf_close(void)
/*============*/
{
	mutex_free(&ibuf_pessimistic_insert_mutex);
	memset(&ibuf_pessimistic_insert_mutex,
	       0x0, sizeof(ibuf_pessimistic_insert_mutex));

	mutex_free(&ibuf_mutex);
	memset(&ibuf_mutex, 0x0, sizeof(ibuf_mutex));

	mutex_free(&ibuf_bitmap_mutex);
	memset(&ibuf_bitmap_mutex, 0x0, sizeof(ibuf_mutex));

	mem_free(ibuf);
	ibuf = NULL;
}

481
/******************************************************************//**
482 483
Updates the size information of the ibuf, assuming the segment size has not
changed. */
osku's avatar
osku committed
484 485
static
void
486 487
ibuf_size_update(
/*=============*/
488 489
	const page_t*	root,	/*!< in: ibuf tree root */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
490 491 492
{
	ut_ad(mutex_own(&ibuf_mutex));

493
	ibuf->free_list_len = flst_get_len(root + PAGE_HEADER
osku's avatar
osku committed
494 495
					   + PAGE_BTR_IBUF_FREE_LIST, mtr);

496
	ibuf->height = 1 + btr_page_get_level(root, mtr);
osku's avatar
osku committed
497

498
	/* the '1 +' is the ibuf header page */
499
	ibuf->size = ibuf->seg_size - (1 + ibuf->free_list_len);
osku's avatar
osku committed
500 501
}

502
/******************************************************************//**
503 504
Creates the insert buffer data structure at a database startup and initializes
the data structures for the insert buffer. */
505
UNIV_INTERN
506 507 508
void
ibuf_init_at_db_start(void)
/*=======================*/
osku's avatar
osku committed
509 510 511 512
{
	page_t*		root;
	mtr_t		mtr;
	dict_table_t*	table;
513
	mem_heap_t*	heap;
osku's avatar
osku committed
514 515
	dict_index_t*	index;
	ulint		n_used;
516
	page_t*		header_page;
517
	ulint		error;
518

519 520 521 522 523 524 525 526 527 528 529
	ibuf = mem_alloc(sizeof(ibuf_t));

	memset(ibuf, 0, sizeof(*ibuf));

	/* Note that also a pessimistic delete can sometimes make a B-tree
	grow in size, as the references on the upper levels of the tree can
	change */

	ibuf->max_size = buf_pool_get_curr_size() / UNIV_PAGE_SIZE
		/ IBUF_POOL_SIZE_PER_MAX_SIZE;

530 531
	mutex_create(ibuf_pessimistic_insert_mutex_key,
		     &ibuf_pessimistic_insert_mutex,
532
		     SYNC_IBUF_PESS_INSERT_MUTEX);
osku's avatar
osku committed
533

534 535
	mutex_create(ibuf_mutex_key,
		     &ibuf_mutex, SYNC_IBUF_MUTEX);
osku's avatar
osku committed
536

537 538
	mutex_create(ibuf_bitmap_mutex_key,
		     &ibuf_bitmap_mutex, SYNC_IBUF_BITMAP_MUTEX);
osku's avatar
osku committed
539 540 541 542 543

	mtr_start(&mtr);

	mutex_enter(&ibuf_mutex);

544
	mtr_x_lock(fil_space_get_latch(IBUF_SPACE_ID, NULL), &mtr);
osku's avatar
osku committed
545

546
	header_page = ibuf_header_page_get(&mtr);
osku's avatar
osku committed
547 548

	fseg_n_reserved_pages(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
549
			      &n_used, &mtr);
osku's avatar
osku committed
550
	ibuf_enter();
551

osku's avatar
osku committed
552 553
	ut_ad(n_used >= 2);

554
	ibuf->seg_size = n_used;
555

556
	{
557 558 559 560
		buf_block_t*	block;

		block = buf_page_get(
			IBUF_SPACE_ID, 0, FSP_IBUF_TREE_ROOT_PAGE_NO,
561
			RW_X_LATCH, &mtr);
562
		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
563

564 565
		root = buf_block_get_frame(block);
	}
osku's avatar
osku committed
566

567
	ibuf_size_update(root, &mtr);
osku's avatar
osku committed
568 569
	mutex_exit(&ibuf_mutex);

570
	ibuf->empty = (page_get_n_recs(root) == 0);
osku's avatar
osku committed
571 572 573 574
	mtr_commit(&mtr);

	ibuf_exit();

575 576
	heap = mem_heap_create(450);

577 578
	/* Use old-style record format for the insert buffer. */
	table = dict_mem_table_create(IBUF_TABLE_NAME, IBUF_SPACE_ID, 1, 0);
osku's avatar
osku committed
579

580
	dict_mem_table_add_col(table, heap, "DUMMY_COLUMN", DATA_BINARY, 0, 0);
osku's avatar
osku committed
581

582
	table->id = DICT_IBUF_ID_MIN + IBUF_SPACE_ID;
osku's avatar
osku committed
583

584 585
	dict_table_add_to_cache(table, heap);
	mem_heap_free(heap);
osku's avatar
osku committed
586

587
	index = dict_mem_index_create(
588 589
		IBUF_TABLE_NAME, "CLUST_IND",
		IBUF_SPACE_ID, DICT_CLUSTERED | DICT_UNIVERSAL | DICT_IBUF, 1);
osku's avatar
osku committed
590

591
	dict_mem_index_add_field(index, "DUMMY_COLUMN", 0);
osku's avatar
osku committed
592

593
	index->id = DICT_IBUF_ID_MIN + IBUF_SPACE_ID;
osku's avatar
osku committed
594

595 596 597
	error = dict_index_add_to_cache(table, index,
					FSP_IBUF_TREE_ROOT_PAGE_NO, FALSE);
	ut_a(error == DB_SUCCESS);
osku's avatar
osku committed
598

599
	ibuf->index = dict_table_get_first_index(table);
osku's avatar
osku committed
600
}
601
#endif /* !UNIV_HOTBACKUP */
602
/*********************************************************************//**
osku's avatar
osku committed
603
Initializes an ibuf bitmap page. */
604
UNIV_INTERN
osku's avatar
osku committed
605 606 607
void
ibuf_bitmap_page_init(
/*==================*/
608 609
	buf_block_t*	block,	/*!< in: bitmap page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
610
{
611
	page_t*	page;
osku's avatar
osku committed
612
	ulint	byte_offset;
613
	ulint	zip_size = buf_block_get_zip_size(block);
osku's avatar
osku committed
614

615
	ut_a(ut_is_2pow(zip_size));
osku's avatar
osku committed
616

617
	page = buf_block_get_frame(block);
618
	fil_page_set_type(page, FIL_PAGE_IBUF_BITMAP);
osku's avatar
osku committed
619

620
	/* Write all zeros to the bitmap */
osku's avatar
osku committed
621

622
	if (!zip_size) {
623 624
		byte_offset = UT_BITS_IN_BYTES(UNIV_PAGE_SIZE
					       * IBUF_BITS_PER_PAGE);
625
	} else {
626
		byte_offset = UT_BITS_IN_BYTES(zip_size * IBUF_BITS_PER_PAGE);
627
	}
osku's avatar
osku committed
628

629 630 631
	memset(page + IBUF_BITMAP, 0, byte_offset);

	/* The remaining area (up to the page trailer) is uninitialized. */
osku's avatar
osku committed
632

633
#ifndef UNIV_HOTBACKUP
osku's avatar
osku committed
634
	mlog_write_initial_log_record(page, MLOG_IBUF_BITMAP_INIT, mtr);
635
#endif /* !UNIV_HOTBACKUP */
osku's avatar
osku committed
636 637
}

638
/*********************************************************************//**
639 640
Parses a redo log record of an ibuf bitmap page init.
@return	end of log record or NULL */
641
UNIV_INTERN
osku's avatar
osku committed
642 643 644
byte*
ibuf_parse_bitmap_init(
/*===================*/
645 646 647 648
	byte*		ptr,	/*!< in: buffer */
	byte*		end_ptr __attribute__((unused)), /*!< in: buffer end */
	buf_block_t*	block,	/*!< in: block or NULL */
	mtr_t*		mtr)	/*!< in: mtr or NULL */
osku's avatar
osku committed
649 650 651
{
	ut_ad(ptr && end_ptr);

652 653
	if (block) {
		ibuf_bitmap_page_init(block, mtr);
osku's avatar
osku committed
654 655 656 657
	}

	return(ptr);
}
658
#ifndef UNIV_HOTBACKUP
659
/********************************************************************//**
660 661
Gets the desired bits for a given page from a bitmap page.
@return	value of bits */
osku's avatar
osku committed
662 663 664 665
UNIV_INLINE
ulint
ibuf_bitmap_page_get_bits(
/*======================*/
666 667 668
	const page_t*	page,	/*!< in: bitmap page */
	ulint		page_no,/*!< in: page whose bits to get */
	ulint		zip_size,/*!< in: compressed page size in bytes;
669
				0 for uncompressed pages */
670
	ulint		bit,	/*!< in: IBUF_BITMAP_FREE,
671
				IBUF_BITMAP_BUFFERED, ... */
672
	mtr_t*		mtr __attribute__((unused)))
673
				/*!< in: mtr containing an
674
				x-latch to the bitmap page */
osku's avatar
osku committed
675 676 677 678 679 680 681
{
	ulint	byte_offset;
	ulint	bit_offset;
	ulint	map_byte;
	ulint	value;

	ut_ad(bit < IBUF_BITS_PER_PAGE);
682 683 684
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE % 2 != 0"
#endif
685
	ut_ad(ut_is_2pow(zip_size));
686
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
osku's avatar
osku committed
687

688 689
	if (!zip_size) {
		bit_offset = (page_no % UNIV_PAGE_SIZE) * IBUF_BITS_PER_PAGE
690
			+ bit;
691 692
	} else {
		bit_offset = (page_no & (zip_size - 1)) * IBUF_BITS_PER_PAGE
693
			+ bit;
694
	}
osku's avatar
osku committed
695 696 697 698 699 700 701 702 703 704 705 706

	byte_offset = bit_offset / 8;
	bit_offset = bit_offset % 8;

	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);

	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);

	value = ut_bit_get_nth(map_byte, bit_offset);

	if (bit == IBUF_BITMAP_FREE) {
		ut_ad(bit_offset + 1 < 8);
707

osku's avatar
osku committed
708 709 710 711 712 713
		value = value * 2 + ut_bit_get_nth(map_byte, bit_offset + 1);
	}

	return(value);
}

714
/********************************************************************//**
osku's avatar
osku committed
715 716 717 718 719
Sets the desired bit for a given page in a bitmap page. */
static
void
ibuf_bitmap_page_set_bits(
/*======================*/
720 721 722
	page_t*	page,	/*!< in: bitmap page */
	ulint	page_no,/*!< in: page whose bits to set */
	ulint	zip_size,/*!< in: compressed page size in bytes;
723
			0 for uncompressed pages */
724 725 726
	ulint	bit,	/*!< in: IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ... */
	ulint	val,	/*!< in: value to set */
	mtr_t*	mtr)	/*!< in: mtr containing an x-latch to the bitmap page */
osku's avatar
osku committed
727 728 729 730 731 732
{
	ulint	byte_offset;
	ulint	bit_offset;
	ulint	map_byte;

	ut_ad(bit < IBUF_BITS_PER_PAGE);
733 734 735
#if IBUF_BITS_PER_PAGE % 2
# error "IBUF_BITS_PER_PAGE % 2 != 0"
#endif
736
	ut_ad(ut_is_2pow(zip_size));
737
	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
738
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
739
	ut_a((bit != IBUF_BITMAP_BUFFERED) || (val != FALSE)
740
	     || (0 == ibuf_count_get(page_get_space_id(page),
741
				     page_no)));
osku's avatar
osku committed
742
#endif
743 744
	if (!zip_size) {
		bit_offset = (page_no % UNIV_PAGE_SIZE) * IBUF_BITS_PER_PAGE
745
			+ bit;
746 747
	} else {
		bit_offset = (page_no & (zip_size - 1)) * IBUF_BITS_PER_PAGE
748
			+ bit;
749
	}
osku's avatar
osku committed
750 751 752 753 754 755 756 757 758 759 760

	byte_offset = bit_offset / 8;
	bit_offset = bit_offset % 8;

	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);

	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);

	if (bit == IBUF_BITMAP_FREE) {
		ut_ad(bit_offset + 1 < 8);
		ut_ad(val <= 3);
761

osku's avatar
osku committed
762 763 764 765 766 767
		map_byte = ut_bit_set_nth(map_byte, bit_offset, val / 2);
		map_byte = ut_bit_set_nth(map_byte, bit_offset + 1, val % 2);
	} else {
		ut_ad(val <= 1);
		map_byte = ut_bit_set_nth(map_byte, bit_offset, val);
	}
768

osku's avatar
osku committed
769
	mlog_write_ulint(page + IBUF_BITMAP + byte_offset, map_byte,
770
			 MLOG_1BYTE, mtr);
osku's avatar
osku committed
771 772
}

773
/********************************************************************//**
774 775
Calculates the bitmap page number for a given page number.
@return	the bitmap page number where the file page is mapped */
osku's avatar
osku committed
776 777 778 779
UNIV_INLINE
ulint
ibuf_bitmap_page_no_calc(
/*=====================*/
780
	ulint	zip_size,	/*!< in: compressed page size in bytes;
781
				0 for uncompressed pages */
782
	ulint	page_no)	/*!< in: tablespace page number */
osku's avatar
osku committed
783
{
784
	ut_ad(ut_is_2pow(zip_size));
785 786 787

	if (!zip_size) {
		return(FSP_IBUF_BITMAP_OFFSET
788
		       + (page_no & ~(UNIV_PAGE_SIZE - 1)));
789 790
	} else {
		return(FSP_IBUF_BITMAP_OFFSET
791
		       + (page_no & ~(zip_size - 1)));
792
	}
osku's avatar
osku committed
793 794
}

795
/********************************************************************//**
osku's avatar
osku committed
796
Gets the ibuf bitmap page where the bits describing a given file page are
797
stored.
798 799 800
@return bitmap page where the file page is mapped, that is, the bitmap
page containing the descriptor bits for the file page; the bitmap page
is x-latched */
osku's avatar
osku committed
801 802
static
page_t*
803 804 805 806 807 808 809 810 811
ibuf_bitmap_get_map_page_func(
/*==========================*/
	ulint		space,	/*!< in: space id of the file page */
	ulint		page_no,/*!< in: page number of the file page */
	ulint		zip_size,/*!< in: compressed page size in bytes;
				0 for uncompressed pages */
	const char*	file,	/*!< in: file name */
	ulint		line,	/*!< in: line where called */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
812
{
813
	buf_block_t*	block;
osku's avatar
osku committed
814

815 816 817 818
	block = buf_page_get_gen(space, zip_size,
				 ibuf_bitmap_page_no_calc(zip_size, page_no),
				 RW_X_LATCH, NULL, BUF_GET,
				 file, line, mtr);
819
	buf_block_dbg_add_level(block, SYNC_IBUF_BITMAP);
osku's avatar
osku committed
820

821
	return(buf_block_get_frame(block));
osku's avatar
osku committed
822 823
}

824 825 826 827 828 829 830 831 832 833 834 835 836 837
/********************************************************************//**
Gets the ibuf bitmap page where the bits describing a given file page are
stored.
@return bitmap page where the file page is mapped, that is, the bitmap
page containing the descriptor bits for the file page; the bitmap page
is x-latched
@param space	in: space id of the file page
@param page_no	in: page number of the file page
@param zip_size	in: compressed page size in bytes; 0 for uncompressed pages
@param mtr	in: mini-transaction */
#define ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr)		\
	ibuf_bitmap_get_map_page_func(space, page_no, zip_size,		\
				      __FILE__, __LINE__, mtr)

838
/************************************************************************//**
osku's avatar
osku committed
839 840 841 842 843 844 845 846
Sets the free bits of the page in the ibuf bitmap. This is done in a separate
mini-transaction, hence this operation does not restrict further work to only
ibuf bitmap operations, which would result if the latch to the bitmap page
were kept. */
UNIV_INLINE
void
ibuf_set_free_bits_low(
/*===================*/
847
	ulint			zip_size,/*!< in: compressed page size in bytes;
848
					0 for uncompressed pages */
849
	const buf_block_t*	block,	/*!< in: index page; free bits are set if
850 851
					the index is non-clustered and page
					level is 0 */
852 853
	ulint			val,	/*!< in: value to set: < 4 */
	mtr_t*			mtr)	/*!< in/out: mtr */
osku's avatar
osku committed
854 855
{
	page_t*	bitmap_page;
856 857
	ulint	space;
	ulint	page_no;
osku's avatar
osku committed
858

859
	if (!page_is_leaf(buf_block_get_frame(block))) {
osku's avatar
osku committed
860 861 862 863

		return;
	}

864 865
	space = buf_block_get_space(block);
	page_no = buf_block_get_page_no(block);
866
	bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr);
osku's avatar
osku committed
867
#ifdef UNIV_IBUF_DEBUG
868 869
# if 0
	fprintf(stderr,
870 871 872
		"Setting space %lu page %lu free bits to %lu should be %lu\n",
		space, page_no, val,
		ibuf_index_page_calc_free(zip_size, block));
873
# endif
874

875
	ut_a(val <= ibuf_index_page_calc_free(zip_size, block));
876
#endif /* UNIV_IBUF_DEBUG */
877
	ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
878
				  IBUF_BITMAP_FREE, val, mtr);
osku's avatar
osku committed
879 880
}

881
/************************************************************************//**
osku's avatar
osku committed
882 883 884 885
Sets the free bit of the page in the ibuf bitmap. This is done in a separate
mini-transaction, hence this operation does not restrict further work to only
ibuf bitmap operations, which would result if the latch to the bitmap page
were kept. */
886
UNIV_INTERN
osku's avatar
osku committed
887
void
888 889
ibuf_set_free_bits_func(
/*====================*/
890
	buf_block_t*	block,	/*!< in: index page of a non-clustered index;
891
				free bit is reset if page level is 0 */
892
#ifdef UNIV_IBUF_DEBUG
893
	ulint		max_val,/*!< in: ULINT_UNDEFINED or a maximum
894 895
				value which the bits must have before
				setting; this is for debugging */
896
#endif /* UNIV_IBUF_DEBUG */
897
	ulint		val)	/*!< in: value to set: < 4 */
osku's avatar
osku committed
898 899
{
	mtr_t	mtr;
900
	page_t*	page;
osku's avatar
osku committed
901
	page_t*	bitmap_page;
902 903 904
	ulint	space;
	ulint	page_no;
	ulint	zip_size;
osku's avatar
osku committed
905

906 907
	page = buf_block_get_frame(block);

marko's avatar
marko committed
908
	if (!page_is_leaf(page)) {
osku's avatar
osku committed
909 910 911 912 913

		return;
	}

	mtr_start(&mtr);
914

915 916 917
	space = buf_block_get_space(block);
	page_no = buf_block_get_page_no(block);
	zip_size = buf_block_get_zip_size(block);
918
	bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, &mtr);
osku's avatar
osku committed
919 920

#ifdef UNIV_IBUF_DEBUG
921
	if (max_val != ULINT_UNDEFINED) {
osku's avatar
osku committed
922 923
		ulint	old_val;

924 925 926
		old_val = ibuf_bitmap_page_get_bits(
			bitmap_page, page_no, zip_size,
			IBUF_BITMAP_FREE, &mtr);
927
# if 0
osku's avatar
osku committed
928
		if (old_val != max_val) {
929 930
			fprintf(stderr,
				"Ibuf: page %lu old val %lu max val %lu\n",
931
				page_get_page_no(page),
932
				old_val, max_val);
osku's avatar
osku committed
933
		}
934
# endif
osku's avatar
osku committed
935 936 937

		ut_a(old_val <= max_val);
	}
938 939
# if 0
	fprintf(stderr, "Setting page no %lu free bits to %lu should be %lu\n",
940
		page_get_page_no(page), val,
941
		ibuf_index_page_calc_free(zip_size, block));
942
# endif
osku's avatar
osku committed
943

944
	ut_a(val <= ibuf_index_page_calc_free(zip_size, block));
945
#endif /* UNIV_IBUF_DEBUG */
946
	ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
947
				  IBUF_BITMAP_FREE, val, &mtr);
osku's avatar
osku committed
948 949 950
	mtr_commit(&mtr);
}

951
/************************************************************************//**
osku's avatar
osku committed
952
Resets the free bits of the page in the ibuf bitmap. This is done in a
953 954 955 956 957 958 959
separate mini-transaction, hence this operation does not restrict
further work to only ibuf bitmap operations, which would result if the
latch to the bitmap page were kept.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to decrement or reset the bits in the bitmap in a mini-transaction
that is committed before the mini-transaction that affects the free
space. */
960
UNIV_INTERN
osku's avatar
osku committed
961
void
962 963
ibuf_reset_free_bits(
/*=================*/
964
	buf_block_t*	block)	/*!< in: index page; free bits are set to 0
965 966
				if the index is a non-clustered
				non-unique, and page level is 0 */
osku's avatar
osku committed
967
{
968
	ibuf_set_free_bits(block, 0, ULINT_UNDEFINED);
osku's avatar
osku committed
969 970
}

971
/**********************************************************************//**
972 973 974 975 976 977 978
Updates the free bits for an uncompressed page to reflect the present
state.  Does this in the mtr given, which means that the latching
order rules virtually prevent any further operations for this OS
thread until mtr is committed.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to set the free bits in the same mini-transaction that updated the
page. */
979
UNIV_INTERN
osku's avatar
osku committed
980 981 982
void
ibuf_update_free_bits_low(
/*======================*/
983 984
	const buf_block_t*	block,		/*!< in: index page */
	ulint			max_ins_size,	/*!< in: value of
985 986 987 988
						maximum insert size
						with reorganize before
						the latest operation
						performed to the page */
989
	mtr_t*			mtr)		/*!< in/out: mtr */
osku's avatar
osku committed
990
{
991
	ulint	before;
992 993 994
	ulint	after;

	ut_a(!buf_block_get_page_zip(block));
995 996

	before = ibuf_index_page_calc_free_bits(0, max_ins_size);
osku's avatar
osku committed
997

998
	after = ibuf_index_page_calc_free(0, block);
osku's avatar
osku committed
999

1000 1001 1002 1003
	/* This approach cannot be used on compressed pages, since the
	computed value of "before" often does not match the current
	state of the bitmap.  This is because the free space may
	increase or decrease when a compressed page is reorganized. */
osku's avatar
osku committed
1004
	if (before != after) {
1005
		ibuf_set_free_bits_low(0, block, after, mtr);
osku's avatar
osku committed
1006 1007 1008
	}
}

1009
/**********************************************************************//**
1010 1011 1012 1013 1014 1015 1016
Updates the free bits for a compressed page to reflect the present
state.  Does this in the mtr given, which means that the latching
order rules virtually prevent any further operations for this OS
thread until mtr is committed.  NOTE: The free bits in the insert
buffer bitmap must never exceed the free space on a page.  It is safe
to set the free bits in the same mini-transaction that updated the
page. */
1017
UNIV_INTERN
1018 1019 1020
void
ibuf_update_free_bits_zip(
/*======================*/
1021 1022
	buf_block_t*	block,	/*!< in/out: index page */
	mtr_t*		mtr)	/*!< in/out: mtr */
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
{
	page_t*	bitmap_page;
	ulint	space;
	ulint	page_no;
	ulint	zip_size;
	ulint	after;

	space = buf_block_get_space(block);
	page_no = buf_block_get_page_no(block);
	zip_size = buf_block_get_zip_size(block);

	ut_a(page_is_leaf(buf_block_get_frame(block)));
	ut_a(zip_size);

	bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr);

	after = ibuf_index_page_calc_free_zip(zip_size, block);
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049

	if (after == 0) {
		/* We move the page to the front of the buffer pool LRU list:
		the purpose of this is to prevent those pages to which we
		cannot make inserts using the insert buffer from slipping
		out of the buffer pool */

		buf_page_make_young(&block->page);
	}

1050 1051 1052 1053
	ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
				  IBUF_BITMAP_FREE, after, mtr);
}

1054
/**********************************************************************//**
1055 1056 1057 1058 1059 1060
Updates the free bits for the two pages to reflect the present state.
Does this in the mtr given, which means that the latching order rules
virtually prevent any further operations until mtr is committed.
NOTE: The free bits in the insert buffer bitmap must never exceed the
free space on a page.  It is safe to set the free bits in the same
mini-transaction that updated the pages. */
1061
UNIV_INTERN
osku's avatar
osku committed
1062 1063 1064
void
ibuf_update_free_bits_for_two_pages_low(
/*====================================*/
1065
	ulint		zip_size,/*!< in: compressed page size in bytes;
1066
				0 for uncompressed pages */
1067 1068 1069
	buf_block_t*	block1,	/*!< in: index page */
	buf_block_t*	block2,	/*!< in: index page */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
1070 1071 1072 1073 1074 1075 1076 1077
{
	ulint	state;

	/* As we have to x-latch two random bitmap pages, we have to acquire
	the bitmap mutex to prevent a deadlock with a similar operation
	performed by another OS thread. */

	mutex_enter(&ibuf_bitmap_mutex);
1078

1079
	state = ibuf_index_page_calc_free(zip_size, block1);
osku's avatar
osku committed
1080

1081
	ibuf_set_free_bits_low(zip_size, block1, state, mtr);
osku's avatar
osku committed
1082

1083
	state = ibuf_index_page_calc_free(zip_size, block2);
osku's avatar
osku committed
1084

1085
	ibuf_set_free_bits_low(zip_size, block2, state, mtr);
osku's avatar
osku committed
1086 1087 1088 1089

	mutex_exit(&ibuf_bitmap_mutex);
}

1090
/**********************************************************************//**
1091 1092
Returns TRUE if the page is one of the fixed address ibuf pages.
@return	TRUE if a fixed address ibuf i/o page */
osku's avatar
osku committed
1093 1094 1095 1096
UNIV_INLINE
ibool
ibuf_fixed_addr_page(
/*=================*/
1097 1098
	ulint	space,	/*!< in: space id */
	ulint	zip_size,/*!< in: compressed page size in bytes;
1099
			0 for uncompressed pages */
1100
	ulint	page_no)/*!< in: page number */
osku's avatar
osku committed
1101
{
1102
	return((space == IBUF_SPACE_ID && page_no == IBUF_TREE_ROOT_PAGE_NO)
1103
	       || ibuf_bitmap_page(zip_size, page_no));
osku's avatar
osku committed
1104 1105
}

1106
/***********************************************************************//**
1107
Checks if a page is a level 2 or 3 page in the ibuf hierarchy of pages.
1108 1109
Must not be called when recv_no_ibuf_operations==TRUE.
@return	TRUE if level 2 or level 3 page */
1110
UNIV_INTERN
osku's avatar
osku committed
1111 1112 1113
ibool
ibuf_page(
/*======*/
1114 1115 1116 1117
	ulint	space,	/*!< in: space id */
	ulint	zip_size,/*!< in: compressed page size in bytes, or 0 */
	ulint	page_no,/*!< in: page number */
	mtr_t*	mtr)	/*!< in: mtr which will contain an x-latch to the
1118 1119 1120
			bitmap page if the page is not one of the fixed
			address ibuf pages, or NULL, in which case a new
			transaction is created. */
osku's avatar
osku committed
1121 1122
{
	ibool	ret;
1123
	mtr_t	local_mtr;
1124
	page_t*	bitmap_page;
osku's avatar
osku committed
1125

1126 1127
	ut_ad(!recv_no_ibuf_operations);

1128
	if (ibuf_fixed_addr_page(space, zip_size, page_no)) {
osku's avatar
osku committed
1129 1130

		return(TRUE);
1131
	} else if (space != IBUF_SPACE_ID) {
osku's avatar
osku committed
1132 1133 1134 1135

		return(FALSE);
	}

1136
	ut_ad(fil_space_get_type(IBUF_SPACE_ID) == FIL_TABLESPACE);
osku's avatar
osku committed
1137

1138 1139
	if (mtr == NULL) {
		mtr = &local_mtr;
1140 1141
		mtr_start(mtr);
	}
osku's avatar
osku committed
1142

1143
	bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr);
osku's avatar
osku committed
1144

1145
	ret = ibuf_bitmap_page_get_bits(bitmap_page, page_no, zip_size,
1146
					IBUF_BITMAP_IBUF, mtr);
osku's avatar
osku committed
1147

1148
	if (mtr == &local_mtr) {
1149
		mtr_commit(mtr);
osku's avatar
osku committed
1150 1151
	}

1152
	return(ret);
osku's avatar
osku committed
1153 1154
}

1155
/********************************************************************//**
1156 1157
Returns the page number field of an ibuf record.
@return	page number */
osku's avatar
osku committed
1158 1159 1160 1161
static
ulint
ibuf_rec_get_page_no(
/*=================*/
1162
	const rec_t*	rec)	/*!< in: ibuf record */
osku's avatar
osku committed
1163
{
1164 1165
	const byte*	field;
	ulint		len;
osku's avatar
osku committed
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188

	ut_ad(ibuf_inside());
	ut_ad(rec_get_n_fields_old(rec) > 2);

	field = rec_get_nth_field_old(rec, 1, &len);

	if (len == 1) {
		/* This is of the >= 4.1.x record format */
		ut_a(trx_sys_multiple_tablespace_format);

		field = rec_get_nth_field_old(rec, 2, &len);
	} else {
		ut_a(trx_doublewrite_must_reset_space_ids);
		ut_a(!trx_sys_multiple_tablespace_format);

		field = rec_get_nth_field_old(rec, 0, &len);
	}

	ut_a(len == 4);

	return(mach_read_from_4(field));
}

1189
/********************************************************************//**
osku's avatar
osku committed
1190
Returns the space id field of an ibuf record. For < 4.1.x format records
1191 1192
returns 0.
@return	space id */
osku's avatar
osku committed
1193 1194 1195 1196
static
ulint
ibuf_rec_get_space(
/*===============*/
1197
	const rec_t*	rec)	/*!< in: ibuf record */
osku's avatar
osku committed
1198
{
1199 1200
	const byte*	field;
	ulint		len;
osku's avatar
osku committed
1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222

	ut_ad(ibuf_inside());
	ut_ad(rec_get_n_fields_old(rec) > 2);

	field = rec_get_nth_field_old(rec, 1, &len);

	if (len == 1) {
		/* This is of the >= 4.1.x record format */

		ut_a(trx_sys_multiple_tablespace_format);
		field = rec_get_nth_field_old(rec, 0, &len);
		ut_a(len == 4);

		return(mach_read_from_4(field));
	}

	ut_a(trx_doublewrite_must_reset_space_ids);
	ut_a(!trx_sys_multiple_tablespace_format);

	return(0);
}

1223
/****************************************************************//**
1224
Get various information about an ibuf record in >= 4.1.x format. */
1225 1226 1227 1228
static
void
ibuf_rec_get_info(
/*==============*/
1229 1230 1231 1232
	const rec_t*	rec,		/*!< in: ibuf record */
	ibuf_op_t*	op,		/*!< out: operation type, or NULL */
	ibool*		comp,		/*!< out: compact flag, or NULL */
	ulint*		info_len,	/*!< out: length of info fields at the
1233 1234
					start of the fourth field, or
					NULL */
1235
	ulint*		counter)	/*!< in: counter value, or NULL */
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
{
	const byte*	types;
	ulint		fields;
	ulint		len;

	/* Local variables to shadow arguments. */
	ibuf_op_t	op_local;
	ibool		comp_local;
	ulint		info_len_local;
	ulint		counter_local;

	ut_ad(ibuf_inside());
	fields = rec_get_n_fields_old(rec);
	ut_a(fields > 4);

	types = rec_get_nth_field_old(rec, 3, &len);

marko's avatar
marko committed
1253
	info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1254

marko's avatar
marko committed
1255 1256 1257
	switch (info_len_local) {
	case 0:
	case 1:
1258
		op_local = IBUF_OP_INSERT;
marko's avatar
marko committed
1259
		comp_local = info_len_local;
1260
		ut_ad(!counter);
marko's avatar
marko committed
1261 1262
		counter_local = ULINT_UNDEFINED;
		break;
1263

marko's avatar
marko committed
1264
	case IBUF_REC_INFO_SIZE:
1265 1266 1267 1268
		op_local = (ibuf_op_t)types[IBUF_REC_OFFSET_TYPE];
		comp_local = types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT;
		counter_local = mach_read_from_2(
			types + IBUF_REC_OFFSET_COUNTER);
marko's avatar
marko committed
1269
		break;
1270

marko's avatar
marko committed
1271
	default:
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
		ut_error;
	}

	ut_a(op_local < IBUF_OP_COUNT);
	ut_a((len - info_len_local) ==
	     (fields - 4) * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);

	if (op) {
		*op = op_local;
	}

	if (comp) {
		*comp = comp_local;
	}

	if (info_len) {
		*info_len = info_len_local;
	}

	if (counter) {
		*counter = counter_local;
	}
}

1296
/****************************************************************//**
1297 1298
Returns the operation type field of an ibuf record.
@return	operation type */
1299 1300 1301 1302
static
ibuf_op_t
ibuf_rec_get_op_type(
/*=================*/
1303
	const rec_t*	rec)	/*!< in: ibuf record */
1304 1305 1306 1307 1308 1309
{
	ulint		len;

	ut_ad(ibuf_inside());
	ut_ad(rec_get_n_fields_old(rec) > 2);

1310
	(void) rec_get_nth_field_old(rec, 1, &len);
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324

	if (len > 1) {
		/* This is a < 4.1.x format record */

		return(IBUF_OP_INSERT);
	} else {
		ibuf_op_t	op;

		ibuf_rec_get_info(rec, &op, NULL, NULL, NULL);

		return(op);
	}
}

1325
/****************************************************************//**
1326
Read the first two bytes from a record's fourth field (counter field in new
1327
records; something else in older records).
1328 1329
@return "counter" field, or ULINT_UNDEFINED if for some reason it
can't be read */
1330
UNIV_INTERN
1331
ulint
1332 1333
ibuf_rec_get_counter(
/*=================*/
1334
	const rec_t*	rec)	/*!< in: ibuf record */
1335
{
1336
	const byte*	ptr;
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
	ulint		len;

	if (rec_get_n_fields_old(rec) < 4) {

		return(ULINT_UNDEFINED);
	}

	ptr = rec_get_nth_field_old(rec, 3, &len);

	if (len >= 2) {

		return(mach_read_from_2(ptr));
	} else {

		return(ULINT_UNDEFINED);
	}
}

1355
/****************************************************************//**
1356 1357 1358 1359 1360 1361
Add accumulated operation counts to a permanent array. Both arrays must be
of size IBUF_OP_COUNT. */
static
void
ibuf_add_ops(
/*=========*/
1362 1363
	ulint*		arr,	/*!< in/out: array to modify */
	const ulint*	ops)	/*!< in: operation counts */
1364 1365

{
1366 1367
	ulint	i;

1368 1369 1370 1371
#ifndef HAVE_ATOMIC_BUILTINS
	ut_ad(mutex_own(&ibuf_mutex));
#endif /* !HAVE_ATOMIC_BUILTINS */

1372
	for (i = 0; i < IBUF_OP_COUNT; i++) {
1373 1374 1375
#ifdef HAVE_ATOMIC_BUILTINS
		os_atomic_increment_ulint(&arr[i], ops[i]);
#else /* HAVE_ATOMIC_BUILTINS */
1376
		arr[i] += ops[i];
1377
#endif /* HAVE_ATOMIC_BUILTINS */
1378 1379 1380
	}
}

1381
/****************************************************************//**
1382 1383 1384 1385
Print operation counts. The array must be of size IBUF_OP_COUNT. */
static
void
ibuf_print_ops(
1386
/*===========*/
1387 1388
	const ulint*	ops,	/*!< in: operation counts */
	FILE*		file)	/*!< in: file where to print */
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402
{
	static const char* op_names[] = {
		"insert",
		"delete mark",
		"delete"
	};
	ulint	i;

	ut_a(UT_ARR_SIZE(op_names) == IBUF_OP_COUNT);

	for (i = 0; i < IBUF_OP_COUNT; i++) {
		fprintf(file, "%s %lu%s", op_names[i],
			(ulong) ops[i], (i < (IBUF_OP_COUNT - 1)) ? ", " : "");
	}
1403 1404

	putc('\n', file);
1405 1406
}

1407
/********************************************************************//**
osku's avatar
osku committed
1408
Creates a dummy index for inserting a record to a non-clustered index.
1409
@return	dummy index */
osku's avatar
osku committed
1410 1411 1412 1413
static
dict_index_t*
ibuf_dummy_index_create(
/*====================*/
1414 1415
	ulint		n,	/*!< in: number of fields */
	ibool		comp)	/*!< in: TRUE=use compact record format */
osku's avatar
osku committed
1416 1417 1418
{
	dict_table_t*	table;
	dict_index_t*	index;
1419

1420 1421 1422
	table = dict_mem_table_create("IBUF_DUMMY",
				      DICT_HDR_SPACE, n,
				      comp ? DICT_TF_COMPACT : 0);
1423

1424 1425
	index = dict_mem_index_create("IBUF_DUMMY", "IBUF_DUMMY",
				      DICT_HDR_SPACE, 0, n);
1426

osku's avatar
osku committed
1427
	index->table = table;
1428

osku's avatar
osku committed
1429 1430
	/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
	index->cached = TRUE;
1431

osku's avatar
osku committed
1432 1433
	return(index);
}
1434
/********************************************************************//**
osku's avatar
osku committed
1435 1436 1437 1438
Add a column to the dummy index */
static
void
ibuf_dummy_index_add_col(
1439
/*=====================*/
1440 1441 1442
	dict_index_t*	index,	/*!< in: dummy index */
	const dtype_t*	type,	/*!< in: the data type of the column */
	ulint		len)	/*!< in: length of the column */
osku's avatar
osku committed
1443 1444
{
	ulint	i	= index->table->n_def;
1445 1446 1447 1448 1449 1450
	dict_mem_table_add_col(index->table, NULL, NULL,
			       dtype_get_mtype(type),
			       dtype_get_prtype(type),
			       dtype_get_len(type));
	dict_index_add_col(index, index->table,
			   dict_table_get_nth_col(index->table, i), len);
osku's avatar
osku committed
1451
}
1452
/********************************************************************//**
1453
Deallocates a dummy index for inserting a record to a non-clustered index. */
osku's avatar
osku committed
1454 1455 1456
static
void
ibuf_dummy_index_free(
1457
/*==================*/
1458
	dict_index_t*	index)	/*!< in, own: dummy index */
osku's avatar
osku committed
1459 1460
{
	dict_table_t*	table = index->table;
1461 1462 1463

	dict_mem_index_free(index);
	dict_mem_table_free(table);
osku's avatar
osku committed
1464 1465
}

1466
/*********************************************************************//**
osku's avatar
osku committed
1467
Builds the entry to insert into a non-clustered index when we have the
1468
corresponding record in an ibuf index.
1469 1470 1471 1472 1473

NOTE that as we copy pointers to fields in ibuf_rec, the caller must
hold a latch to the ibuf_rec page as long as the entry is used!

@return own: entry to insert to a non-clustered index */
1474 1475 1476 1477
UNIV_INLINE
dtuple_t*
ibuf_build_entry_pre_4_1_x(
/*=======================*/
1478 1479 1480
	const rec_t*	ibuf_rec,	/*!< in: record in an insert buffer */
	mem_heap_t*	heap,		/*!< in: heap where built */
	dict_index_t**	pindex)		/*!< out, own: dummy index that
1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
					describes the entry */
{
	ulint		i;
	ulint		len;
	const byte*	types;
	dtuple_t*	tuple;
	ulint		n_fields;

	ut_a(trx_doublewrite_must_reset_space_ids);
	ut_a(!trx_sys_multiple_tablespace_format);

	n_fields = rec_get_n_fields_old(ibuf_rec) - 2;
	tuple = dtuple_create(heap, n_fields);
	types = rec_get_nth_field_old(ibuf_rec, 1, &len);

	ut_a(len == n_fields * DATA_ORDER_NULL_TYPE_BUF_SIZE);

	for (i = 0; i < n_fields; i++) {
		const byte*	data;
		dfield_t*	field;

		field = dtuple_get_nth_field(tuple, i);

		data = rec_get_nth_field_old(ibuf_rec, i + 2, &len);

		dfield_set_data(field, data, len);

		dtype_read_for_order_and_null_size(
			dfield_get_type(field),
			types + i * DATA_ORDER_NULL_TYPE_BUF_SIZE);
	}

	*pindex = ibuf_dummy_index_create(n_fields, FALSE);

	return(tuple);
}

1518
/*********************************************************************//**
1519 1520 1521 1522 1523 1524 1525 1526 1527
Builds the entry used to

1) IBUF_OP_INSERT: insert into a non-clustered index

2) IBUF_OP_DELETE_MARK: find the record whose delete-mark flag we need to
   activate

3) IBUF_OP_DELETE: find the record we need to delete

1528
when we have the corresponding record in an ibuf index.
1529 1530 1531 1532 1533

NOTE that as we copy pointers to fields in ibuf_rec, the caller must
hold a latch to the ibuf_rec page as long as the entry is used!

@return own: entry to insert to a non-clustered index */
osku's avatar
osku committed
1534 1535 1536 1537
static
dtuple_t*
ibuf_build_entry_from_ibuf_rec(
/*===========================*/
1538 1539 1540
	const rec_t*	ibuf_rec,	/*!< in: record in an insert buffer */
	mem_heap_t*	heap,		/*!< in: heap where built */
	dict_index_t**	pindex)		/*!< out, own: dummy index that
osku's avatar
osku committed
1541 1542 1543 1544 1545
					describes the entry */
{
	dtuple_t*	tuple;
	dfield_t*	field;
	ulint		n_fields;
1546
	const byte*	types;
osku's avatar
osku committed
1547 1548
	const byte*	data;
	ulint		len;
1549
	ulint		info_len;
osku's avatar
osku committed
1550
	ulint		i;
1551
	ulint		comp;
osku's avatar
osku committed
1552 1553 1554 1555 1556 1557 1558
	dict_index_t*	index;

	data = rec_get_nth_field_old(ibuf_rec, 1, &len);

	if (len > 1) {
		/* This a < 4.1.x format record */

1559
		return(ibuf_build_entry_pre_4_1_x(ibuf_rec, heap, pindex));
osku's avatar
osku committed
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573
	}

	/* This a >= 4.1.x format record */

	ut_a(trx_sys_multiple_tablespace_format);
	ut_a(*data == 0);
	ut_a(rec_get_n_fields_old(ibuf_rec) > 4);

	n_fields = rec_get_n_fields_old(ibuf_rec) - 4;

	tuple = dtuple_create(heap, n_fields);

	types = rec_get_nth_field_old(ibuf_rec, 3, &len);

1574
	ibuf_rec_get_info(ibuf_rec, NULL, &comp, &info_len, NULL);
osku's avatar
osku committed
1575

1576 1577 1578 1579
	index = ibuf_dummy_index_create(n_fields, comp);

	len -= info_len;
	types += info_len;
osku's avatar
osku committed
1580 1581 1582 1583

	ut_a(len == n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);

	for (i = 0; i < n_fields; i++) {
1584
		field = dtuple_get_nth_field(tuple, i);
osku's avatar
osku committed
1585 1586 1587 1588 1589

		data = rec_get_nth_field_old(ibuf_rec, i + 4, &len);

		dfield_set_data(field, data, len);

1590
		dtype_new_read_for_order_and_null_size(
1591
			dfield_get_type(field),
1592
			types + i * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
osku's avatar
osku committed
1593 1594 1595 1596

		ibuf_dummy_index_add_col(index, dfield_get_type(field), len);
	}

marko's avatar
marko committed
1597 1598 1599 1600 1601
	/* Prevent an ut_ad() failure in page_zip_write_rec() by
	adding system columns to the dummy table pointed to by the
	dummy secondary index.  The insert buffer is only used for
	secondary indexes, whose records never contain any system
	columns, such as DB_TRX_ID. */
1602 1603
	ut_d(dict_table_add_system_columns(index->table, index->table->heap));

osku's avatar
osku committed
1604
	*pindex = index;
1605

osku's avatar
osku committed
1606 1607 1608
	return(tuple);
}

1609
/******************************************************************//**
1610 1611
Get the data size.
@return	size of fields */
1612 1613 1614 1615
UNIV_INLINE
ulint
ibuf_rec_get_size(
/*==============*/
1616 1617 1618 1619
	const rec_t*	rec,			/*!< in: ibuf record */
	const byte*	types,			/*!< in: fields */
	ulint		n_fields,		/*!< in: number of fields */
	ibool		pre_4_1,		/*!< in: TRUE=pre-4.1 format,
1620
						FALSE=newer */
1621
	ulint		comp)			/*!< in: 0=ROW_FORMAT=REDUNDANT,
1622
						nonzero=ROW_FORMAT=COMPACT */
1623
{
1624 1625 1626 1627
	ulint	i;
	ulint	field_offset;
	ulint	types_offset;
	ulint	size = 0;
1628

1629 1630 1631 1632 1633 1634 1635
	if (pre_4_1) {
		field_offset = 2;
		types_offset = DATA_ORDER_NULL_TYPE_BUF_SIZE;
	} else {
		field_offset = 4;
		types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
	}
1636 1637 1638

	for (i = 0; i < n_fields; i++) {
		ulint		len;
1639
		dtype_t		dtype;
1640

1641
		rec_get_nth_field_offs_old(rec, i + field_offset, &len);
1642

1643 1644 1645 1646
		if (len != UNIV_SQL_NULL) {
			size += len;
		} else if (pre_4_1) {
			dtype_read_for_order_and_null_size(&dtype, types);
1647

1648
			size += dtype_get_sql_null_size(&dtype, comp);
1649
		} else {
1650 1651
			dtype_new_read_for_order_and_null_size(&dtype, types);

1652
			size += dtype_get_sql_null_size(&dtype, comp);
1653
		}
1654 1655

		types += types_offset;
1656 1657 1658 1659 1660
	}

	return(size);
}

1661
/********************************************************************//**
osku's avatar
osku committed
1662
Returns the space taken by a stored non-clustered index entry if converted to
1663
an index record.
1664 1665
@return size of index record in bytes + an upper limit of the space
taken in the page directory */
osku's avatar
osku committed
1666 1667 1668 1669
static
ulint
ibuf_rec_get_volume(
/*================*/
1670
	const rec_t*	ibuf_rec)/*!< in: ibuf record */
osku's avatar
osku committed
1671
{
1672
	ulint		len;
1673 1674 1675
	const byte*	data;
	const byte*	types;
	ulint		n_fields;
1676 1677
	ulint		data_size;
	ibool		pre_4_1;
1678
	ulint		comp;
osku's avatar
osku committed
1679 1680 1681 1682 1683

	ut_ad(ibuf_inside());
	ut_ad(rec_get_n_fields_old(ibuf_rec) > 2);

	data = rec_get_nth_field_old(ibuf_rec, 1, &len);
1684
	pre_4_1 = (len > 1);
osku's avatar
osku committed
1685

1686
	if (pre_4_1) {
osku's avatar
osku committed
1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
		/* < 4.1.x format record */

		ut_a(trx_doublewrite_must_reset_space_ids);
		ut_a(!trx_sys_multiple_tablespace_format);

		n_fields = rec_get_n_fields_old(ibuf_rec) - 2;

		types = rec_get_nth_field_old(ibuf_rec, 1, &len);

		ut_ad(len == n_fields * DATA_ORDER_NULL_TYPE_BUF_SIZE);
1697
		comp = 0;
osku's avatar
osku committed
1698 1699
	} else {
		/* >= 4.1.x format record */
1700 1701
		ibuf_op_t	op;
		ulint		info_len;
osku's avatar
osku committed
1702 1703 1704 1705 1706 1707

		ut_a(trx_sys_multiple_tablespace_format);
		ut_a(*data == 0);

		types = rec_get_nth_field_old(ibuf_rec, 3, &len);

1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
		ibuf_rec_get_info(ibuf_rec, &op, &comp, &info_len, NULL);

		if (op == IBUF_OP_DELETE_MARK || op == IBUF_OP_DELETE) {
			/* Delete-marking a record doesn't take any
			additional space, and while deleting a record
			actually frees up space, we have to play it safe and
			pretend it takes no additional space (the record
			might not exist, etc.).  */

			return(0);
		} else if (comp) {
			dtuple_t*	entry;
osku's avatar
osku committed
1720 1721 1722
			ulint		volume;
			dict_index_t*	dummy_index;
			mem_heap_t*	heap = mem_heap_create(500);
1723 1724

			entry = ibuf_build_entry_from_ibuf_rec(
1725
				ibuf_rec, heap, &dummy_index);
1726

1727
			volume = rec_get_converted_size(dummy_index, entry, 0);
1728

osku's avatar
osku committed
1729 1730
			ibuf_dummy_index_free(dummy_index);
			mem_heap_free(heap);
1731

osku's avatar
osku committed
1732 1733 1734
			return(volume + page_dir_calc_reserved_space(1));
		}

1735
		types += info_len;
osku's avatar
osku committed
1736 1737 1738
		n_fields = rec_get_n_fields_old(ibuf_rec) - 4;
	}

1739
	data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, pre_4_1, comp);
osku's avatar
osku committed
1740

1741
	return(data_size + rec_get_converted_extra_size(data_size, n_fields, 0)
1742
	       + page_dir_calc_reserved_space(1));
osku's avatar
osku committed
1743 1744
}

1745
/*********************************************************************//**
osku's avatar
osku committed
1746
Builds the tuple to insert to an ibuf tree when we have an entry for a
1747
non-clustered index.
1748 1749 1750 1751 1752

NOTE that the original entry must be kept because we copy pointers to
its fields.

@return	own: entry to insert into an ibuf index tree */
osku's avatar
osku committed
1753 1754 1755 1756
static
dtuple_t*
ibuf_entry_build(
/*=============*/
1757 1758 1759 1760 1761
	ibuf_op_t	op,	/*!< in: operation type */
	dict_index_t*	index,	/*!< in: non-clustered index */
	const dtuple_t*	entry,	/*!< in: entry for a non-clustered index */
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number where entry should
osku's avatar
osku committed
1762
				be inserted */
1763
	ulint		counter,/*!< in: counter value;
1764
				ULINT_UNDEFINED=not used */
1765
	mem_heap_t*	heap)	/*!< in: heap into which to build */
osku's avatar
osku committed
1766 1767 1768
{
	dtuple_t*	tuple;
	dfield_t*	field;
1769
	const dfield_t*	entry_field;
osku's avatar
osku committed
1770 1771
	ulint		n_fields;
	byte*		buf;
1772 1773
	byte*		ti;
	byte*		type_info;
osku's avatar
osku committed
1774
	ulint		i;
1775

1776 1777 1778 1779
	ut_ad(counter != ULINT_UNDEFINED || op == IBUF_OP_INSERT);
	ut_ad(counter == ULINT_UNDEFINED || counter <= 0xFFFF);
	ut_ad(op < IBUF_OP_COUNT);

1780 1781 1782 1783 1784 1785 1786 1787
	/* We have to build a tuple with the following fields:

	1-4) These are described at the top of this file.

	5) The rest of the fields are copied from the entry.

	All fields in the tuple are ordered like the type binary in our
	insert buffer tree. */
osku's avatar
osku committed
1788 1789 1790 1791 1792

	n_fields = dtuple_get_n_fields(entry);

	tuple = dtuple_create(heap, n_fields + 4);

1793
	/* 1) Space Id */
osku's avatar
osku committed
1794

1795
	field = dtuple_get_nth_field(tuple, 0);
osku's avatar
osku committed
1796 1797 1798 1799 1800 1801 1802

	buf = mem_heap_alloc(heap, 4);

	mach_write_to_4(buf, space);

	dfield_set_data(field, buf, 4);

1803
	/* 2) Marker byte */
osku's avatar
osku committed
1804

1805
	field = dtuple_get_nth_field(tuple, 1);
osku's avatar
osku committed
1806 1807 1808 1809 1810 1811 1812 1813 1814

	buf = mem_heap_alloc(heap, 1);

	/* We set the marker byte zero */

	mach_write_to_1(buf, 0);

	dfield_set_data(field, buf, 1);

1815
	/* 3) Page number */
osku's avatar
osku committed
1816

1817
	field = dtuple_get_nth_field(tuple, 2);
osku's avatar
osku committed
1818 1819 1820 1821 1822 1823 1824

	buf = mem_heap_alloc(heap, 4);

	mach_write_to_4(buf, page_no);

	dfield_set_data(field, buf, 4);

1825 1826
	/* 4) Type info, part #1 */

1827 1828 1829 1830 1831 1832 1833 1834 1835
	if (counter == ULINT_UNDEFINED) {
		i = dict_table_is_comp(index->table) ? 1 : 0;
	} else {
		ut_ad(counter <= 0xFFFF);
		i = IBUF_REC_INFO_SIZE;
	}

	ti = type_info = mem_heap_alloc(heap, i + n_fields
					* DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1836

1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850
	switch (i) {
	default:
		ut_error;
		break;
	case 1:
		/* set the flag for ROW_FORMAT=COMPACT */
		*ti++ = 0;
		/* fall through */
	case 0:
		/* the old format does not allow delete buffering */
		ut_ad(op == IBUF_OP_INSERT);
		break;
	case IBUF_REC_INFO_SIZE:
		mach_write_to_2(ti + IBUF_REC_OFFSET_COUNTER, counter);
1851

1852 1853 1854 1855 1856 1857
		ti[IBUF_REC_OFFSET_TYPE] = (byte) op;
		ti[IBUF_REC_OFFSET_FLAGS] = dict_table_is_comp(index->table)
			? IBUF_REC_COMPACT : 0;
		ti += IBUF_REC_INFO_SIZE;
		break;
	}
1858 1859 1860

	/* 5+) Fields from the entry */

osku's avatar
osku committed
1861
	for (i = 0; i < n_fields; i++) {
1862 1863 1864
		ulint			fixed_len;
		const dict_field_t*	ifield;

osku's avatar
osku committed
1865 1866 1867
		/* We add 4 below because we have the 4 extra fields at the
		start of an ibuf record */

1868
		field = dtuple_get_nth_field(tuple, i + 4);
osku's avatar
osku committed
1869 1870 1871
		entry_field = dtuple_get_nth_field(entry, i);
		dfield_copy(field, entry_field);

1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
		ifield = dict_index_get_nth_field(index, i);
		/* Prefix index columns of fixed-length columns are of
		fixed length.  However, in the function call below,
		dfield_get_type(entry_field) contains the fixed length
		of the column in the clustered index.  Replace it with
		the fixed length of the secondary index column. */
		fixed_len = ifield->fixed_len;

#ifdef UNIV_DEBUG
		if (fixed_len) {
			/* dict_index_add_col() should guarantee these */
1883 1884
			ut_ad(fixed_len <= (ulint)
			      dfield_get_type(entry_field)->len);
1885 1886 1887
			if (ifield->prefix_len) {
				ut_ad(ifield->prefix_len == fixed_len);
			} else {
1888 1889
				ut_ad(fixed_len == (ulint)
				      dfield_get_type(entry_field)->len);
1890 1891 1892 1893
			}
		}
#endif /* UNIV_DEBUG */

1894
		dtype_new_store_for_order_and_null_size(
1895 1896
			ti, dfield_get_type(entry_field), fixed_len);
		ti += DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
osku's avatar
osku committed
1897 1898
	}

1899
	/* 4) Type info, part #2 */
osku's avatar
osku committed
1900

1901
	field = dtuple_get_nth_field(tuple, 3);
osku's avatar
osku committed
1902

1903
	dfield_set_data(field, type_info, ti - type_info);
osku's avatar
osku committed
1904 1905 1906 1907 1908 1909

	/* Set all the types in the new tuple binary */

	dtuple_set_types_binary(tuple, n_fields + 4);

	return(tuple);
1910
}
osku's avatar
osku committed
1911

1912
/*********************************************************************//**
osku's avatar
osku committed
1913
Builds a search tuple used to search buffered inserts for an index page.
1914 1915
This is for < 4.1.x format records
@return	own: search tuple */
osku's avatar
osku committed
1916 1917 1918 1919
static
dtuple_t*
ibuf_search_tuple_build(
/*====================*/
1920 1921 1922
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number */
	mem_heap_t*	heap)	/*!< in: heap into which to build */
osku's avatar
osku committed
1923 1924 1925 1926
{
	dtuple_t*	tuple;
	dfield_t*	field;
	byte*		buf;
1927

osku's avatar
osku committed
1928 1929 1930 1931 1932 1933 1934 1935
	ut_a(space == 0);
	ut_a(trx_doublewrite_must_reset_space_ids);
	ut_a(!trx_sys_multiple_tablespace_format);

	tuple = dtuple_create(heap, 1);

	/* Store the page number in tuple */

1936
	field = dtuple_get_nth_field(tuple, 0);
osku's avatar
osku committed
1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948

	buf = mem_heap_alloc(heap, 4);

	mach_write_to_4(buf, page_no);

	dfield_set_data(field, buf, 4);

	dtuple_set_types_binary(tuple, 1);

	return(tuple);
}

1949
/*********************************************************************//**
osku's avatar
osku committed
1950
Builds a search tuple used to search buffered inserts for an index page.
1951 1952
This is for >= 4.1.x format records.
@return	own: search tuple */
osku's avatar
osku committed
1953 1954 1955 1956
static
dtuple_t*
ibuf_new_search_tuple_build(
/*========================*/
1957 1958 1959
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number */
	mem_heap_t*	heap)	/*!< in: heap into which to build */
osku's avatar
osku committed
1960 1961 1962 1963
{
	dtuple_t*	tuple;
	dfield_t*	field;
	byte*		buf;
1964

osku's avatar
osku committed
1965 1966 1967 1968 1969 1970
	ut_a(trx_sys_multiple_tablespace_format);

	tuple = dtuple_create(heap, 3);

	/* Store the space id in tuple */

1971
	field = dtuple_get_nth_field(tuple, 0);
osku's avatar
osku committed
1972 1973 1974 1975 1976 1977 1978 1979 1980

	buf = mem_heap_alloc(heap, 4);

	mach_write_to_4(buf, space);

	dfield_set_data(field, buf, 4);

	/* Store the new format record marker byte */

1981
	field = dtuple_get_nth_field(tuple, 1);
osku's avatar
osku committed
1982 1983 1984 1985 1986 1987 1988 1989 1990

	buf = mem_heap_alloc(heap, 1);

	mach_write_to_1(buf, 0);

	dfield_set_data(field, buf, 1);

	/* Store the page number in tuple */

1991
	field = dtuple_get_nth_field(tuple, 2);
osku's avatar
osku committed
1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003

	buf = mem_heap_alloc(heap, 4);

	mach_write_to_4(buf, page_no);

	dfield_set_data(field, buf, 4);

	dtuple_set_types_binary(tuple, 3);

	return(tuple);
}

2004
/*********************************************************************//**
osku's avatar
osku committed
2005
Checks if there are enough pages in the free list of the ibuf tree that we
2006 2007
dare to start a pessimistic insert to the insert buffer.
@return	TRUE if enough free pages in list */
osku's avatar
osku committed
2008 2009
UNIV_INLINE
ibool
2010 2011
ibuf_data_enough_free_for_insert(void)
/*==================================*/
osku's avatar
osku committed
2012 2013 2014 2015 2016 2017 2018 2019 2020
{
	ut_ad(mutex_own(&ibuf_mutex));

	/* We want a big margin of free pages, because a B-tree can sometimes
	grow in size also if records are deleted from it, as the node pointers
	can change, and we must make sure that we are able to delete the
	inserts buffered for pages that we read to the buffer pool, without
	any risk of running out of free space in the insert buffer. */

2021
	return(ibuf->free_list_len >= (ibuf->size / 2) + 3 * ibuf->height);
osku's avatar
osku committed
2022 2023
}

2024
/*********************************************************************//**
osku's avatar
osku committed
2025
Checks if there are enough pages in the free list of the ibuf tree that we
2026 2027
should remove them and free to the file space management.
@return	TRUE if enough free pages in list */
osku's avatar
osku committed
2028 2029
UNIV_INLINE
ibool
2030 2031
ibuf_data_too_much_free(void)
/*=========================*/
osku's avatar
osku committed
2032 2033 2034
{
	ut_ad(mutex_own(&ibuf_mutex));

2035
	return(ibuf->free_list_len >= 3 + (ibuf->size / 2) + 3 * ibuf->height);
osku's avatar
osku committed
2036 2037
}

2038
/*********************************************************************//**
osku's avatar
osku committed
2039
Allocates a new page from the ibuf file segment and adds it to the free
2040
list.
2041
@return	TRUE on success, FALSE if no space left */
osku's avatar
osku committed
2042
static
2043
ibool
2044 2045
ibuf_add_free_page(void)
/*====================*/
osku's avatar
osku committed
2046 2047 2048
{
	mtr_t	mtr;
	page_t*	header_page;
2049
	ulint	flags;
2050
	ulint	zip_size;
osku's avatar
osku committed
2051 2052 2053 2054 2055 2056 2057 2058 2059
	ulint	page_no;
	page_t*	page;
	page_t*	root;
	page_t*	bitmap_page;

	mtr_start(&mtr);

	/* Acquire the fsp latch before the ibuf header, obeying the latching
	order */
2060 2061
	mtr_x_lock(fil_space_get_latch(IBUF_SPACE_ID, &flags), &mtr);
	zip_size = dict_table_flags_to_zip_size(flags);
2062

2063
	header_page = ibuf_header_page_get(&mtr);
osku's avatar
osku committed
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074

	/* Allocate a new page: NOTE that if the page has been a part of a
	non-clustered index which has subsequently been dropped, then the
	page may have buffered inserts in the insert buffer, and these
	should be deleted from there. These get deleted when the page
	allocation creates the page in buffer. Thus the call below may end
	up calling the insert buffer routines and, as we yet have no latches
	to insert buffer tree pages, these routines can run without a risk
	of a deadlock. This is the reason why we created a special ibuf
	header page apart from the ibuf tree. */

2075 2076 2077 2078
	page_no = fseg_alloc_free_page(
		header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER, 0, FSP_UP,
		&mtr);

2079
	if (UNIV_UNLIKELY(page_no == FIL_NULL)) {
osku's avatar
osku committed
2080 2081
		mtr_commit(&mtr);

2082
		return(FALSE);
osku's avatar
osku committed
2083 2084
	}

2085
	{
2086 2087 2088 2089 2090
		buf_block_t*	block;

		block = buf_page_get(
			IBUF_SPACE_ID, 0, page_no, RW_X_LATCH, &mtr);

2091
		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
2092

2093

2094 2095
		page = buf_block_get_frame(block);
	}
osku's avatar
osku committed
2096 2097 2098 2099 2100

	ibuf_enter();

	mutex_enter(&ibuf_mutex);

2101
	root = ibuf_tree_root_get(&mtr);
osku's avatar
osku committed
2102 2103 2104 2105

	/* Add the page to the free list and update the ibuf size data */

	flst_add_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2106
		      page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
osku's avatar
osku committed
2107

2108
	mlog_write_ulint(page + FIL_PAGE_TYPE, FIL_PAGE_IBUF_FREE_LIST,
2109
			 MLOG_2BYTES, &mtr);
osku's avatar
osku committed
2110

2111 2112
	ibuf->seg_size++;
	ibuf->free_list_len++;
osku's avatar
osku committed
2113 2114 2115 2116

	/* Set the bit indicating that this page is now an ibuf tree page
	(level 2 page) */

2117 2118 2119
	bitmap_page = ibuf_bitmap_get_map_page(
		IBUF_SPACE_ID, page_no, zip_size, &mtr);

2120 2121
	mutex_exit(&ibuf_mutex);

2122 2123
	ibuf_bitmap_page_set_bits(
		bitmap_page, page_no, zip_size, IBUF_BITMAP_IBUF, TRUE, &mtr);
2124

osku's avatar
osku committed
2125 2126 2127 2128
	mtr_commit(&mtr);

	ibuf_exit();

2129
	return(TRUE);
osku's avatar
osku committed
2130 2131
}

2132
/*********************************************************************//**
osku's avatar
osku committed
2133 2134 2135
Removes a page from the free list and frees it to the fsp system. */
static
void
2136 2137
ibuf_remove_free_page(void)
/*=======================*/
osku's avatar
osku committed
2138 2139 2140 2141
{
	mtr_t	mtr;
	mtr_t	mtr2;
	page_t*	header_page;
2142
	ulint	flags;
2143
	ulint	zip_size;
osku's avatar
osku committed
2144 2145 2146 2147 2148 2149 2150 2151 2152
	ulint	page_no;
	page_t*	page;
	page_t*	root;
	page_t*	bitmap_page;

	mtr_start(&mtr);

	/* Acquire the fsp latch before the ibuf header, obeying the latching
	order */
2153 2154
	mtr_x_lock(fil_space_get_latch(IBUF_SPACE_ID, &flags), &mtr);
	zip_size = dict_table_flags_to_zip_size(flags);
2155

2156
	header_page = ibuf_header_page_get(&mtr);
osku's avatar
osku committed
2157 2158 2159

	/* Prevent pessimistic inserts to insert buffer trees for a while */
	ibuf_enter();
2160
	mutex_enter(&ibuf_pessimistic_insert_mutex);
osku's avatar
osku committed
2161 2162
	mutex_enter(&ibuf_mutex);

2163
	if (!ibuf_data_too_much_free()) {
osku's avatar
osku committed
2164 2165

		mutex_exit(&ibuf_mutex);
2166
		mutex_exit(&ibuf_pessimistic_insert_mutex);
osku's avatar
osku committed
2167 2168

		ibuf_exit();
2169

osku's avatar
osku committed
2170 2171 2172 2173
		mtr_commit(&mtr);

		return;
	}
2174

osku's avatar
osku committed
2175
	mtr_start(&mtr2);
2176

2177
	root = ibuf_tree_root_get(&mtr2);
osku's avatar
osku committed
2178

2179 2180
	mutex_exit(&ibuf_mutex);

osku's avatar
osku committed
2181
	page_no = flst_get_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2182
				&mtr2).page;
osku's avatar
osku committed
2183 2184 2185 2186

	/* NOTE that we must release the latch on the ibuf tree root
	because in fseg_free_page we access level 1 pages, and the root
	is a level 2 page. */
2187

osku's avatar
osku committed
2188 2189 2190
	mtr_commit(&mtr2);

	ibuf_exit();
2191

osku's avatar
osku committed
2192 2193 2194 2195 2196
	/* Since pessimistic inserts were prevented, we know that the
	page is still in the free list. NOTE that also deletes may take
	pages from the free list, but they take them from the start, and
	the free list was so long that they cannot have taken the last
	page from it. */
2197

osku's avatar
osku committed
2198
	fseg_free_page(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
2199 2200
		       IBUF_SPACE_ID, page_no, &mtr);

osku's avatar
osku committed
2201
#ifdef UNIV_DEBUG_FILE_ACCESSES
2202
	buf_page_reset_file_page_was_freed(IBUF_SPACE_ID, page_no);
osku's avatar
osku committed
2203
#endif
2204

osku's avatar
osku committed
2205
	ibuf_enter();
2206

osku's avatar
osku committed
2207 2208
	mutex_enter(&ibuf_mutex);

2209
	root = ibuf_tree_root_get(&mtr);
osku's avatar
osku committed
2210 2211

	ut_ad(page_no == flst_get_last(root + PAGE_HEADER
2212
				       + PAGE_BTR_IBUF_FREE_LIST, &mtr).page);
osku's avatar
osku committed
2213

2214
	{
2215 2216 2217 2218 2219
		buf_block_t*	block;

		block = buf_page_get(
			IBUF_SPACE_ID, 0, page_no, RW_X_LATCH, &mtr);

2220
		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
2221

2222

2223 2224
		page = buf_block_get_frame(block);
	}
osku's avatar
osku committed
2225 2226

	/* Remove the page from the free list and update the ibuf size data */
2227

osku's avatar
osku committed
2228
	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2229
		    page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
osku's avatar
osku committed
2230

2231 2232
	mutex_exit(&ibuf_pessimistic_insert_mutex);

2233 2234
	ibuf->seg_size--;
	ibuf->free_list_len--;
2235

osku's avatar
osku committed
2236 2237 2238
	/* Set the bit indicating that this page is no more an ibuf tree page
	(level 2 page) */

2239 2240 2241
	bitmap_page = ibuf_bitmap_get_map_page(
		IBUF_SPACE_ID, page_no, zip_size, &mtr);

2242 2243
	mutex_exit(&ibuf_mutex);

2244 2245
	ibuf_bitmap_page_set_bits(
		bitmap_page, page_no, zip_size, IBUF_BITMAP_IBUF, FALSE, &mtr);
osku's avatar
osku committed
2246 2247

#ifdef UNIV_DEBUG_FILE_ACCESSES
2248
	buf_page_set_file_page_was_freed(IBUF_SPACE_ID, page_no);
osku's avatar
osku committed
2249 2250 2251 2252 2253 2254
#endif
	mtr_commit(&mtr);

	ibuf_exit();
}

2255
/***********************************************************************//**
osku's avatar
osku committed
2256 2257
Frees excess pages from the ibuf free list. This function is called when an OS
thread calls fsp services to allocate a new file segment, or a new page to a
2258
file segment, and the thread did not own the fsp latch before this call. */
2259
UNIV_INTERN
osku's avatar
osku committed
2260
void
2261
ibuf_free_excess_pages(void)
2262
/*========================*/
osku's avatar
osku committed
2263 2264 2265 2266
{
	ulint		i;

#ifdef UNIV_SYNC_DEBUG
2267 2268
	ut_ad(rw_lock_own(fil_space_get_latch(IBUF_SPACE_ID, NULL),
			  RW_LOCK_EX));
osku's avatar
osku committed
2269
#endif /* UNIV_SYNC_DEBUG */
2270 2271 2272 2273

	ut_ad(rw_lock_get_x_lock_count(
		fil_space_get_latch(IBUF_SPACE_ID, NULL)) == 1);

osku's avatar
osku committed
2274
	ut_ad(!ibuf_inside());
2275

osku's avatar
osku committed
2276 2277 2278 2279
	/* NOTE: We require that the thread did not own the latch before,
	because then we know that we can obey the correct latching order
	for ibuf latches */

2280 2281 2282
	if (!ibuf) {
		/* Not yet initialized; not sure if this is possible, but
		does no harm to check for it. */
osku's avatar
osku committed
2283 2284 2285 2286 2287 2288 2289 2290 2291

		return;
	}

	/* Free at most a few pages at a time, so that we do not delay the
	requested service too much */

	for (i = 0; i < 4; i++) {

2292
		ibool	too_much_free;
osku's avatar
osku committed
2293

2294 2295 2296
		mutex_enter(&ibuf_mutex);
		too_much_free = ibuf_data_too_much_free();
		mutex_exit(&ibuf_mutex);
osku's avatar
osku committed
2297

2298
		if (!too_much_free) {
osku's avatar
osku committed
2299 2300 2301
			return;
		}

2302
		ibuf_remove_free_page();
osku's avatar
osku committed
2303 2304 2305
	}
}

2306
/*********************************************************************//**
2307
Reads page numbers from a leaf in an ibuf tree.
2308 2309
@return a lower limit for the combined volume of records which will be
merged */
osku's avatar
osku committed
2310 2311 2312 2313
static
ulint
ibuf_get_merge_page_nos(
/*====================*/
2314
	ibool		contract,/*!< in: TRUE if this function is called to
osku's avatar
osku committed
2315 2316 2317
				contract the tree, FALSE if this is called
				when a single page becomes full and we look
				if it pays to read also nearby pages */
2318
	rec_t*		rec,	/*!< in: record from which we read up and down
osku's avatar
osku committed
2319
				in the chain of records */
2320 2321
	ulint*		space_ids,/*!< in/out: space id's of the pages */
	ib_int64_t*	space_versions,/*!< in/out: tablespace version
osku's avatar
osku committed
2322 2323
				timestamps; used to prevent reading in old
				pages after DISCARD + IMPORT tablespace */
2324
	ulint*		page_nos,/*!< in/out: buffer for at least
osku's avatar
osku committed
2325 2326
				IBUF_MAX_N_PAGES_MERGED many page numbers;
				the page numbers are in an ascending order */
2327
	ulint*		n_stored)/*!< out: number of page numbers stored to
osku's avatar
osku committed
2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343
				page_nos in this function */
{
	ulint	prev_page_no;
	ulint	prev_space_id;
	ulint	first_page_no;
	ulint	first_space_id;
	ulint	rec_page_no;
	ulint	rec_space_id;
	ulint	sum_volumes;
	ulint	volume_for_page;
	ulint	rec_volume;
	ulint	limit;
	ulint	n_pages;

	*n_stored = 0;

irana's avatar
irana committed
2344
	limit = ut_min(IBUF_MAX_N_PAGES_MERGED, buf_pool_get_curr_size() / 4);
osku's avatar
osku committed
2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365

	if (page_rec_is_supremum(rec)) {

		rec = page_rec_get_prev(rec);
	}

	if (page_rec_is_infimum(rec)) {

		rec = page_rec_get_next(rec);
	}

	if (page_rec_is_supremum(rec)) {

		return(0);
	}

	first_page_no = ibuf_rec_get_page_no(rec);
	first_space_id = ibuf_rec_get_space(rec);
	n_pages = 0;
	prev_page_no = 0;
	prev_space_id = 0;
2366

osku's avatar
osku committed
2367 2368 2369 2370 2371 2372 2373 2374 2375 2376
	/* Go backwards from the first rec until we reach the border of the
	'merge area', or the page start or the limit of storeable pages is
	reached */

	while (!page_rec_is_infimum(rec) && UNIV_LIKELY(n_pages < limit)) {

		rec_page_no = ibuf_rec_get_page_no(rec);
		rec_space_id = ibuf_rec_get_space(rec);

		if (rec_space_id != first_space_id
2377 2378
		    || (rec_page_no / IBUF_MERGE_AREA)
		    != (first_page_no / IBUF_MERGE_AREA)) {
osku's avatar
osku committed
2379

2380
			break;
2381
		}
2382

2383 2384
		if (rec_page_no != prev_page_no
		    || rec_space_id != prev_space_id) {
osku's avatar
osku committed
2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403
			n_pages++;
		}

		prev_page_no = rec_page_no;
		prev_space_id = rec_space_id;

		rec = page_rec_get_prev(rec);
	}

	rec = page_rec_get_next(rec);

	/* At the loop start there is no prev page; we mark this with a pair
	of space id, page no (0, 0) for which there can never be entries in
	the insert buffer */

	prev_page_no = 0;
	prev_space_id = 0;
	sum_volumes = 0;
	volume_for_page = 0;
2404

osku's avatar
osku committed
2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420
	while (*n_stored < limit) {
		if (page_rec_is_supremum(rec)) {
			/* When no more records available, mark this with
			another 'impossible' pair of space id, page no */
			rec_page_no = 1;
			rec_space_id = 0;
		} else {
			rec_page_no = ibuf_rec_get_page_no(rec);
			rec_space_id = ibuf_rec_get_space(rec);
			ut_ad(rec_page_no > IBUF_TREE_ROOT_PAGE_NO);
		}

#ifdef UNIV_IBUF_DEBUG
		ut_a(*n_stored < IBUF_MAX_N_PAGES_MERGED);
#endif
		if ((rec_space_id != prev_space_id
2421 2422
		     || rec_page_no != prev_page_no)
		    && (prev_space_id != 0 || prev_page_no != 0)) {
osku's avatar
osku committed
2423 2424

			if ((prev_page_no == first_page_no
2425 2426 2427 2428 2429 2430 2431
			     && prev_space_id == first_space_id)
			    || contract
			    || (volume_for_page
				> ((IBUF_MERGE_THRESHOLD - 1)
				   * 4 * UNIV_PAGE_SIZE
				   / IBUF_PAGE_SIZE_PER_FREE_SPACE)
				/ IBUF_MERGE_THRESHOLD)) {
2432 2433

				space_ids[*n_stored] = prev_space_id;
osku's avatar
osku committed
2434
				space_versions[*n_stored]
2435
					= fil_space_get_version(prev_space_id);
osku's avatar
osku committed
2436 2437 2438 2439 2440 2441 2442 2443
				page_nos[*n_stored] = prev_page_no;

				(*n_stored)++;

				sum_volumes += volume_for_page;
			}

			if (rec_space_id != first_space_id
2444 2445
			    || rec_page_no / IBUF_MERGE_AREA
			    != first_page_no / IBUF_MERGE_AREA) {
osku's avatar
osku committed
2446

2447
				break;
osku's avatar
osku committed
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461
			}

			volume_for_page = 0;
		}

		if (rec_page_no == 1 && rec_space_id == 0) {
			/* Supremum record */

			break;
		}

		rec_volume = ibuf_rec_get_volume(rec);

		volume_for_page += rec_volume;
2462

osku's avatar
osku committed
2463 2464 2465 2466 2467 2468 2469 2470 2471
		prev_page_no = rec_page_no;
		prev_space_id = rec_space_id;

		rec = page_rec_get_next(rec);
	}

#ifdef UNIV_IBUF_DEBUG
	ut_a(*n_stored <= IBUF_MAX_N_PAGES_MERGED);
#endif
2472 2473 2474 2475
#if 0
	fprintf(stderr, "Ibuf merge batch %lu pages %lu volume\n",
		*n_stored, sum_volumes);
#endif
osku's avatar
osku committed
2476 2477 2478
	return(sum_volumes);
}

2479
/*********************************************************************//**
2480
Contracts insert buffer trees by reading pages to the buffer pool.
2481 2482 2483
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
osku's avatar
osku committed
2484 2485 2486 2487
static
ulint
ibuf_contract_ext(
/*==============*/
2488 2489
	ulint*	n_pages,/*!< out: number of pages to which merged */
	ibool	sync)	/*!< in: TRUE if the caller wants to wait for the
osku's avatar
osku committed
2490 2491 2492 2493 2494 2495
			issued read with the highest tablespace address
			to complete */
{
	btr_pcur_t	pcur;
	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
2496
	ib_int64_t	space_versions[IBUF_MAX_N_PAGES_MERGED];
osku's avatar
osku committed
2497 2498 2499 2500 2501 2502
	ulint		sum_sizes;
	mtr_t		mtr;

	*n_pages = 0;
	ut_ad(!ibuf_inside());

2503 2504 2505 2506
	/* We perform a dirty read of ibuf->empty, without latching
	the insert buffer root page. We trust this dirty read except
	when a slow shutdown is being executed. During a slow
	shutdown, the insert buffer merge must be completed. */
osku's avatar
osku committed
2507

2508 2509
	if (UNIV_UNLIKELY(ibuf->empty)
	    && UNIV_LIKELY(!srv_shutdown_state)) {
2510
ibuf_is_empty:
osku's avatar
osku committed
2511

2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526
#if 0 /* TODO */
		if (srv_shutdown_state) {
			/* If the insert buffer becomes empty during
			shutdown, note it in the system tablespace. */

			trx_sys_set_ibuf_format(TRX_SYS_IBUF_EMPTY);
		}

		/* TO DO: call trx_sys_set_ibuf_format() at startup
		and whenever ibuf_use is changed to allow buffered
		delete-marking or deleting.  Never downgrade the
		stamped format except when the insert buffer becomes
		empty. */
#endif

2527 2528
		return(0);
	}
osku's avatar
osku committed
2529 2530 2531 2532

	mtr_start(&mtr);

	ibuf_enter();
2533

osku's avatar
osku committed
2534 2535 2536
	/* Open a cursor to a randomly chosen leaf of the tree, at a random
	position within the leaf */

2537
	btr_pcur_open_at_rnd_pos(ibuf->index, BTR_SEARCH_LEAF, &pcur, &mtr);
osku's avatar
osku committed
2538

2539 2540
	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

2541
	if (page_get_n_recs(btr_pcur_get_page(&pcur)) == 0) {
2542 2543 2544 2545 2546 2547 2548 2549
		/* If a B-tree page is empty, it must be the root page
		and the whole B-tree must be empty. InnoDB does not
		allow empty B-tree pages other than the root. */
		ut_ad(ibuf->empty);
		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
		      == IBUF_SPACE_ID);
		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
		      == FSP_IBUF_TREE_ROOT_PAGE_NO);
osku's avatar
osku committed
2550

2551
		ibuf_exit();
osku's avatar
osku committed
2552

2553 2554
		mtr_commit(&mtr);
		btr_pcur_close(&pcur);
osku's avatar
osku committed
2555

2556
		goto ibuf_is_empty;
osku's avatar
osku committed
2557
	}
2558

2559 2560
	sum_sizes = ibuf_get_merge_page_nos(TRUE, btr_pcur_get_rec(&pcur),
					    space_ids, space_versions,
2561
					    page_nos, n_pages);
2562 2563
#if 0 /* defined UNIV_IBUF_DEBUG */
	fprintf(stderr, "Ibuf contract sync %lu pages %lu volume %lu\n",
2564
		sync, *n_pages, sum_sizes);
osku's avatar
osku committed
2565 2566 2567 2568 2569 2570 2571
#endif
	ibuf_exit();

	mtr_commit(&mtr);
	btr_pcur_close(&pcur);

	buf_read_ibuf_merge_pages(sync, space_ids, space_versions, page_nos,
2572
				  *n_pages);
2573

osku's avatar
osku committed
2574 2575 2576
	return(sum_sizes + 1);
}

2577
/*********************************************************************//**
2578
Contracts insert buffer trees by reading pages to the buffer pool.
2579 2580 2581
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
2582
UNIV_INTERN
osku's avatar
osku committed
2583 2584 2585
ulint
ibuf_contract(
/*==========*/
2586
	ibool	sync)	/*!< in: TRUE if the caller wants to wait for the
osku's avatar
osku committed
2587 2588 2589 2590 2591 2592 2593 2594
			issued read with the highest tablespace address
			to complete */
{
	ulint	n_pages;

	return(ibuf_contract_ext(&n_pages, sync));
}

2595
/*********************************************************************//**
2596
Contracts insert buffer trees by reading pages to the buffer pool.
2597 2598 2599
@return a lower limit for the combined size in bytes of entries which
will be merged from ibuf trees to the pages read, 0 if ibuf is
empty */
2600
UNIV_INTERN
osku's avatar
osku committed
2601 2602 2603
ulint
ibuf_contract_for_n_pages(
/*======================*/
2604
	ibool	sync,	/*!< in: TRUE if the caller wants to wait for the
osku's avatar
osku committed
2605 2606
			issued read with the highest tablespace address
			to complete */
2607
	ulint	n_pages)/*!< in: try to read at least this many pages to
osku's avatar
osku committed
2608 2609 2610 2611
			the buffer pool and merge the ibuf contents to
			them */
{
	ulint	sum_bytes	= 0;
2612
	ulint	sum_pages	= 0;
osku's avatar
osku committed
2613 2614
	ulint	n_bytes;
	ulint	n_pag2;
2615

osku's avatar
osku committed
2616 2617
	while (sum_pages < n_pages) {
		n_bytes = ibuf_contract_ext(&n_pag2, sync);
2618

osku's avatar
osku committed
2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629
		if (n_bytes == 0) {
			return(sum_bytes);
		}

		sum_bytes += n_bytes;
		sum_pages += n_pag2;
	}

	return(sum_bytes);
}

2630
/*********************************************************************//**
osku's avatar
osku committed
2631 2632 2633 2634 2635
Contract insert buffer trees after insert if they are too big. */
UNIV_INLINE
void
ibuf_contract_after_insert(
/*=======================*/
2636
	ulint	entry_size)	/*!< in: size of a record which was inserted
osku's avatar
osku committed
2637 2638 2639 2640 2641
				into an ibuf tree */
{
	ibool	sync;
	ulint	sum_sizes;
	ulint	size;
2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654
	ulint	max_size;

	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
	reduce ibuf_mutex contention. ibuf->max_size remains constant
	after ibuf_init_at_db_start(), but ibuf->size should be
	protected by ibuf_mutex. Given that ibuf->size fits in a
	machine word, this should be OK; at worst we are doing some
	excessive ibuf_contract() or occasionally skipping a
	ibuf_contract(). */
	size = ibuf->size;
	max_size = ibuf->max_size;

	if (size < max_size + IBUF_CONTRACT_ON_INSERT_NON_SYNC) {
osku's avatar
osku committed
2655 2656 2657
		return;
	}

2658
	sync = (size >= max_size + IBUF_CONTRACT_ON_INSERT_SYNC);
osku's avatar
osku committed
2659 2660 2661 2662 2663

	/* Contract at least entry_size many bytes */
	sum_sizes = 0;
	size = 1;

2664
	do {
osku's avatar
osku committed
2665 2666 2667

		size = ibuf_contract(sync);
		sum_sizes += size;
2668
	} while (size > 0 && sum_sizes < entry_size);
osku's avatar
osku committed
2669 2670
}

2671
/*********************************************************************//**
2672 2673
Determine if an insert buffer record has been encountered already.
@return	TRUE if a new record, FALSE if possible duplicate */
2674 2675 2676 2677
static
ibool
ibuf_get_volume_buffered_hash(
/*==========================*/
2678 2679 2680 2681
	const rec_t*	rec,	/*!< in: ibuf record in post-4.1 format */
	const byte*	types,	/*!< in: fields */
	const byte*	data,	/*!< in: start of user record data */
	ulint		comp,	/*!< in: 0=ROW_FORMAT=REDUNDANT,
2682
				nonzero=ROW_FORMAT=COMPACT */
2683 2684
	ulint*		hash,	/*!< in/out: hash array */
	ulint		size)	/*!< in: number of elements in hash array */
2685 2686 2687 2688 2689 2690
{
	ulint		len;
	ulint		fold;
	ulint		bitmask;

	len = ibuf_rec_get_size(rec, types, rec_get_n_fields_old(rec) - 4,
2691
				FALSE, comp);
2692
	fold = ut_fold_binary(data, len);
2693

2694 2695
	hash += (fold / (CHAR_BIT * sizeof *hash)) % size;
	bitmask = 1 << (fold % (CHAR_BIT * sizeof *hash));
2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707

	if (*hash & bitmask) {

		return(FALSE);
	}

	/* We have not seen this record yet.  Insert it. */
	*hash |= bitmask;

	return(TRUE);
}

2708
/*********************************************************************//**
2709
Update the estimate of the number of records on a page, and
2710
get the space taken by merging the buffered record to the index page.
2711 2712
@return size of index record in bytes + an upper limit of the space
taken in the page directory */
2713
static
2714
ulint
2715 2716
ibuf_get_volume_buffered_count(
/*===========================*/
2717
	const rec_t*	rec,	/*!< in: insert buffer record */
2718 2719
	ulint*		hash,	/*!< in/out: hash array */
	ulint		size,	/*!< in: number of elements in hash array */
2720
	lint*		n_recs)	/*!< in/out: estimated number of records
2721 2722 2723 2724
				on the page that rec points to */
{
	ulint		len;
	ibuf_op_t	ibuf_op;
2725 2726
	const byte*	types;
	ulint		n_fields	= rec_get_n_fields_old(rec);
2727 2728

	ut_ad(ibuf_inside());
2729 2730
	ut_ad(n_fields > 4);
	n_fields -= 4;
2731

2732 2733 2734 2735 2736 2737
	rec_get_nth_field_offs_old(rec, 1, &len);
	/* This function is only invoked when buffering new
	operations.  All pre-4.1 records should have been merged
	when the database was started up. */
	ut_a(len == 1);
	ut_ad(trx_sys_multiple_tablespace_format);
2738

2739
	types = rec_get_nth_field_old(rec, 3, &len);
2740 2741 2742 2743 2744 2745

	switch (UNIV_EXPECT(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE,
			    IBUF_REC_INFO_SIZE)) {
	default:
		ut_error;
	case 0:
2746 2747 2748 2749 2750
		/* This ROW_TYPE=REDUNDANT record does not include an
		operation counter.  Exclude it from the *n_recs,
		because deletes cannot be buffered if there are
		old-style inserts buffered for the page. */

2751
		len = ibuf_rec_get_size(rec, types, n_fields, FALSE, 0);
2752 2753 2754 2755

		return(len
		       + rec_get_converted_extra_size(len, n_fields, 0)
		       + page_dir_calc_reserved_space(1));
2756
	case 1:
2757 2758 2759 2760 2761
		/* This ROW_TYPE=COMPACT record does not include an
		operation counter.  Exclude it from the *n_recs,
		because deletes cannot be buffered if there are
		old-style inserts buffered for the page. */
		goto get_volume_comp;
2762 2763

	case IBUF_REC_INFO_SIZE:
2764
		ibuf_op = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
2765 2766 2767 2768 2769
		break;
	}

	switch (ibuf_op) {
	case IBUF_OP_INSERT:
2770 2771
		/* Inserts can be done by updating a delete-marked record.
		Because delete-mark and insert operations can be pointing to
2772 2773 2774 2775
		the same records, we must not count duplicates. */
	case IBUF_OP_DELETE_MARK:
		/* There must be a record to delete-mark.
		See if this record has been already buffered. */
2776 2777
		if (n_recs && ibuf_get_volume_buffered_hash(
			    rec, types + IBUF_REC_INFO_SIZE,
2778 2779 2780
			    types + len,
			    types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT,
			    hash, size)) {
2781 2782
			(*n_recs)++;
		}
2783 2784 2785 2786 2787 2788

		if (ibuf_op == IBUF_OP_DELETE_MARK) {
			/* Setting the delete-mark flag does not
			affect the available space on the page. */
			return(0);
		}
2789 2790 2791
		break;
	case IBUF_OP_DELETE:
		/* A record will be removed from the page. */
2792
		if (n_recs) {
2793 2794
			(*n_recs)--;
		}
2795 2796 2797 2798
		/* While deleting a record actually frees up space,
		we have to play it safe and pretend that it takes no
		additional space (the record might not exist, etc.). */
		return(0);
2799 2800 2801
	default:
		ut_error;
	}
2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821

	ut_ad(ibuf_op == IBUF_OP_INSERT);

get_volume_comp:
	{
		dtuple_t*	entry;
		ulint		volume;
		dict_index_t*	dummy_index;
		mem_heap_t*	heap = mem_heap_create(500);

		entry = ibuf_build_entry_from_ibuf_rec(
			rec, heap, &dummy_index);

		volume = rec_get_converted_size(dummy_index, entry, 0);

		ibuf_dummy_index_free(dummy_index);
		mem_heap_free(heap);

		return(volume + page_dir_calc_reserved_space(1));
	}
2822 2823
}

2824 2825 2826 2827 2828 2829
/*********************************************************************//**
Gets an upper limit for the combined size of entries buffered in the insert
buffer for a given page.
@return upper limit for the volume of buffered inserts for the index
page, in bytes; UNIV_PAGE_SIZE, if the entries for the index page span
several pages in the insert buffer */
2830
static
osku's avatar
osku committed
2831 2832 2833
ulint
ibuf_get_volume_buffered(
/*=====================*/
2834
	btr_pcur_t*	pcur,	/*!< in: pcur positioned at a place in an
osku's avatar
osku committed
2835 2836 2837 2838
				insert buffer tree where we would insert an
				entry for the index page whose number is
				page_no, latch mode has to be BTR_MODIFY_PREV
				or BTR_MODIFY_TREE */
2839 2840 2841
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: page number of an index page */
	lint*		n_recs,	/*!< in/out: minimum number of records on the
2842 2843
				page after the buffered changes have been
				applied, or NULL to disable the counting */
2844
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
2845 2846 2847 2848 2849 2850 2851 2852
{
	ulint	volume;
	rec_t*	rec;
	page_t*	page;
	ulint	prev_page_no;
	page_t*	prev_page;
	ulint	next_page_no;
	page_t*	next_page;
2853
	ulint	hash_bitmap[128 / sizeof(ulint)]; /* bitmap of buffered recs */
2854

osku's avatar
osku committed
2855 2856 2857
	ut_a(trx_sys_multiple_tablespace_format);

	ut_ad((pcur->latch_mode == BTR_MODIFY_PREV)
2858
	      || (pcur->latch_mode == BTR_MODIFY_TREE));
osku's avatar
osku committed
2859

2860
	/* Count the volume of inserts earlier in the alphabetical order than
osku's avatar
osku committed
2861 2862 2863
	pcur */

	volume = 0;
2864 2865 2866 2867

	if (n_recs) {
		memset(hash_bitmap, 0, sizeof hash_bitmap);
	}
2868

osku's avatar
osku committed
2869
	rec = btr_pcur_get_rec(pcur);
2870
	page = page_align(rec);
2871
	ut_ad(page_validate(page, ibuf->index));
osku's avatar
osku committed
2872 2873 2874 2875 2876 2877 2878 2879 2880 2881

	if (page_rec_is_supremum(rec)) {
		rec = page_rec_get_prev(rec);
	}

	for (;;) {
		if (page_rec_is_infimum(rec)) {

			break;
		}
2882

osku's avatar
osku committed
2883
		if (page_no != ibuf_rec_get_page_no(rec)
2884
		    || space != ibuf_rec_get_space(rec)) {
osku's avatar
osku committed
2885 2886 2887 2888

			goto count_later;
		}

2889
		volume += ibuf_get_volume_buffered_count(
2890
			rec, hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2891

osku's avatar
osku committed
2892
		rec = page_rec_get_prev(rec);
2893
		ut_ad(page_align(rec) == page);
osku's avatar
osku committed
2894 2895 2896
	}

	/* Look at the previous page */
2897

osku's avatar
osku committed
2898 2899 2900 2901 2902 2903 2904
	prev_page_no = btr_page_get_prev(page, mtr);

	if (prev_page_no == FIL_NULL) {

		goto count_later;
	}

2905
	{
2906 2907 2908 2909 2910
		buf_block_t*	block;

		block = buf_page_get(
			IBUF_SPACE_ID, 0, prev_page_no, RW_X_LATCH, mtr);

2911
		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
2912

2913

2914
		prev_page = buf_block_get_frame(block);
2915
		ut_ad(page_validate(prev_page, ibuf->index));
2916
	}
2917

2918 2919
#ifdef UNIV_BTR_DEBUG
	ut_a(btr_page_get_next(prev_page, mtr)
2920
	     == page_get_page_no(page));
2921
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2922 2923 2924

	rec = page_get_supremum_rec(prev_page);
	rec = page_rec_get_prev(rec);
2925

osku's avatar
osku committed
2926 2927 2928 2929 2930 2931
	for (;;) {
		if (page_rec_is_infimum(rec)) {

			/* We cannot go to yet a previous page, because we
			do not have the x-latch on it, and cannot acquire one
			because of the latching order: we have to give up */
2932

osku's avatar
osku committed
2933 2934
			return(UNIV_PAGE_SIZE);
		}
2935

osku's avatar
osku committed
2936
		if (page_no != ibuf_rec_get_page_no(rec)
2937
		    || space != ibuf_rec_get_space(rec)) {
osku's avatar
osku committed
2938 2939 2940 2941

			goto count_later;
		}

2942
		volume += ibuf_get_volume_buffered_count(
2943
			rec, hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2944

osku's avatar
osku committed
2945
		rec = page_rec_get_prev(rec);
2946
		ut_ad(page_align(rec) == prev_page);
osku's avatar
osku committed
2947
	}
2948

osku's avatar
osku committed
2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960
count_later:
	rec = btr_pcur_get_rec(pcur);

	if (!page_rec_is_supremum(rec)) {
		rec = page_rec_get_next(rec);
	}

	for (;;) {
		if (page_rec_is_supremum(rec)) {

			break;
		}
2961

osku's avatar
osku committed
2962
		if (page_no != ibuf_rec_get_page_no(rec)
2963
		    || space != ibuf_rec_get_space(rec)) {
osku's avatar
osku committed
2964 2965 2966 2967

			return(volume);
		}

2968
		volume += ibuf_get_volume_buffered_count(
2969
			rec, hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2970

osku's avatar
osku committed
2971 2972 2973 2974
		rec = page_rec_get_next(rec);
	}

	/* Look at the next page */
2975

osku's avatar
osku committed
2976 2977 2978 2979 2980 2981 2982
	next_page_no = btr_page_get_next(page, mtr);

	if (next_page_no == FIL_NULL) {

		return(volume);
	}

2983
	{
2984 2985 2986 2987 2988
		buf_block_t*	block;

		block = buf_page_get(
			IBUF_SPACE_ID, 0, next_page_no, RW_X_LATCH, mtr);

2989
		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
2990

2991

2992
		next_page = buf_block_get_frame(block);
2993
		ut_ad(page_validate(next_page, ibuf->index));
2994
	}
2995

2996
#ifdef UNIV_BTR_DEBUG
2997
	ut_a(btr_page_get_prev(next_page, mtr) == page_get_page_no(page));
2998
#endif /* UNIV_BTR_DEBUG */
osku's avatar
osku committed
2999 3000 3001 3002 3003 3004 3005 3006

	rec = page_get_infimum_rec(next_page);
	rec = page_rec_get_next(rec);

	for (;;) {
		if (page_rec_is_supremum(rec)) {

			/* We give up */
3007

osku's avatar
osku committed
3008 3009
			return(UNIV_PAGE_SIZE);
		}
3010

osku's avatar
osku committed
3011
		if (page_no != ibuf_rec_get_page_no(rec)
3012
		    || space != ibuf_rec_get_space(rec)) {
osku's avatar
osku committed
3013 3014 3015 3016

			return(volume);
		}

3017
		volume += ibuf_get_volume_buffered_count(
3018
			rec, hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3019

osku's avatar
osku committed
3020
		rec = page_rec_get_next(rec);
3021
		ut_ad(page_align(rec) == next_page);
osku's avatar
osku committed
3022 3023 3024
	}
}

3025
/*********************************************************************//**
osku's avatar
osku committed
3026 3027
Reads the biggest tablespace id from the high end of the insert buffer
tree and updates the counter in fil_system. */
3028
UNIV_INTERN
osku's avatar
osku committed
3029 3030 3031 3032 3033
void
ibuf_update_max_tablespace_id(void)
/*===============================*/
{
	ulint		max_space_id;
3034 3035
	const rec_t*	rec;
	const byte*	field;
osku's avatar
osku committed
3036 3037 3038 3039
	ulint		len;
	btr_pcur_t	pcur;
	mtr_t		mtr;

3040
	ut_a(!dict_table_is_comp(ibuf->index->table));
osku's avatar
osku committed
3041 3042 3043 3044 3045

	ibuf_enter();

	mtr_start(&mtr);

3046 3047 3048
	btr_pcur_open_at_index_side(
		FALSE, ibuf->index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);

3049 3050
	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));

osku's avatar
osku committed
3051 3052
	btr_pcur_move_to_prev(&pcur, &mtr);

3053
	if (btr_pcur_is_before_first_on_page(&pcur)) {
osku's avatar
osku committed
3054 3055 3056 3057 3058 3059 3060 3061 3062
		/* The tree is empty */

		max_space_id = 0;
	} else {
		rec = btr_pcur_get_rec(&pcur);

		field = rec_get_nth_field_old(rec, 0, &len);

		ut_a(len == 4);
3063

osku's avatar
osku committed
3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074
		max_space_id = mach_read_from_4(field);
	}

	mtr_commit(&mtr);
	ibuf_exit();

	/* printf("Maximum space id in insert buffer %lu\n", max_space_id); */

	fil_set_max_space_id_if_bigger(max_space_id);
}

3075
/****************************************************************//**
3076 3077
Helper function for ibuf_set_entry_counter. Checks if rec is for (space,
page_no), and if so, reads counter value from it and returns that + 1.
3078
Otherwise, returns 0.
3079
@return	new counter value, or 0 */
3080 3081
static
ulint
3082
ibuf_get_entry_counter_low(
3083
/*=======================*/
3084 3085 3086
	const rec_t*	rec,		/*!< in: insert buffer record */
	ulint		space,		/*!< in: space id */
	ulint		page_no)	/*!< in: page number */
3087
{
3088 3089 3090
	ulint		counter;
	const byte*	field;
	ulint		len;
3091

3092 3093
	ut_ad(ibuf_inside());
	ut_ad(rec_get_n_fields_old(rec) > 2);
3094

3095
	field = rec_get_nth_field_old(rec, 1, &len);
3096

3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112
	if (UNIV_UNLIKELY(len != 1)) {
		/* pre-4.1 format */
		ut_a(trx_doublewrite_must_reset_space_ids);
		ut_a(!trx_sys_multiple_tablespace_format);

		return(ULINT_UNDEFINED);
	}

	ut_a(trx_sys_multiple_tablespace_format);

	/* Check the tablespace identifier. */
	field = rec_get_nth_field_old(rec, 0, &len);
	ut_a(len == 4);

	if (mach_read_from_4(field) != space) {

3113
		return(0);
3114 3115
	}

3116 3117 3118 3119 3120 3121
	/* Check the page offset. */
	field = rec_get_nth_field_old(rec, 2, &len);
	ut_a(len == 4);

	if (mach_read_from_4(field) != page_no) {

3122
		return(0);
3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139
	}

	/* Check if the record contains a counter field. */
	field = rec_get_nth_field_old(rec, 3, &len);

	switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) {
	default:
		ut_error;
	case 0: /* ROW_FORMAT=REDUNDANT */
	case 1: /* ROW_FORMAT=COMPACT */
		return(ULINT_UNDEFINED);

	case IBUF_REC_INFO_SIZE:
		counter = mach_read_from_2(field + IBUF_REC_OFFSET_COUNTER);
		ut_a(counter < 0xFFFF);
		return(counter + 1);
	}
3140 3141
}

3142
/****************************************************************//**
3143
Set the counter field in entry to the correct value based on the current
3144 3145
last record in ibuf for (space, page_no).
@return	FALSE if we should abort this insertion to ibuf */
3146 3147 3148 3149
static
ibool
ibuf_set_entry_counter(
/*===================*/
3150 3151 3152 3153
	dtuple_t*	entry,		/*!< in/out: entry to patch */
	ulint		space,		/*!< in: space id of entry */
	ulint		page_no,	/*!< in: page number of entry */
	btr_pcur_t*	pcur,		/*!< in: pcur positioned on the record
3154 3155
					found by btr_pcur_open(.., entry,
					PAGE_CUR_LE, ..., pcur, ...) */
3156 3157
	ibool		is_optimistic,	/*!< in: is this an optimistic insert */
	mtr_t*		mtr)		/*!< in: mtr */
3158 3159
{
	dfield_t*	field;
3160
	byte*		data;
irana's avatar
irana committed
3161
	ulint		counter = 0;
3162 3163

	/* pcur points to either a user rec or to a page's infimum record. */
3164
	ut_ad(page_validate(btr_pcur_get_page(pcur), ibuf->index));
3165 3166 3167

	if (btr_pcur_is_on_user_rec(pcur)) {

3168
		counter = ibuf_get_entry_counter_low(
3169 3170
			btr_pcur_get_rec(pcur), space, page_no);

3171 3172 3173 3174 3175 3176 3177
		if (UNIV_UNLIKELY(counter == ULINT_UNDEFINED)) {
			/* The record lacks a counter field.
			Such old records must be merged before
			new records can be buffered. */

			return(FALSE);
		}
3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198
	} else if (btr_pcur_is_before_first_in_tree(pcur, mtr)) {
		/* Ibuf tree is either completely empty, or the insert
		position is at the very first record of a non-empty tree. In
		either case we have no previous records for (space,
		page_no). */

		counter = 0;
	} else if (btr_pcur_is_before_first_on_page(pcur)) {
		btr_cur_t*	cursor = btr_pcur_get_btr_cur(pcur);

		if (cursor->low_match < 3) {
			/* If low_match < 3, we know that the father node
			pointer did not contain the searched for (space,
			page_no), which means that the search ended on the
			right page regardless of the counter value, and
			since we're at the infimum record, there are no
			existing records. */

			counter = 0;
		} else {
			rec_t*		rec;
marko's avatar
marko committed
3199
			const page_t*	page;
3200 3201 3202 3203 3204 3205 3206 3207 3208
			buf_block_t*	block;
			page_t*		prev_page;
			ulint		prev_page_no;

			ut_a(cursor->ibuf_cnt != ULINT_UNDEFINED);

			page = btr_pcur_get_page(pcur);
			prev_page_no = btr_page_get_prev(page, mtr);

marko's avatar
marko committed
3209
			ut_a(prev_page_no != FIL_NULL);
3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223

			block = buf_page_get(
				IBUF_SPACE_ID, 0, prev_page_no,
				RW_X_LATCH, mtr);

			buf_block_dbg_add_level(block, SYNC_TREE_NODE);

			prev_page = buf_block_get_frame(block);

			rec = page_rec_get_prev(
				page_get_supremum_rec(prev_page));

			ut_ad(page_rec_is_user_rec(rec));

3224
			counter = ibuf_get_entry_counter_low(
3225 3226
				rec, space, page_no);

3227 3228 3229 3230 3231 3232 3233 3234
			if (UNIV_UNLIKELY(counter == ULINT_UNDEFINED)) {
				/* The record lacks a counter field.
				Such old records must be merged before
				new records can be buffered. */

				return(FALSE);
			}

3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267
			if (counter < cursor->ibuf_cnt) {
				/* Search ended on the wrong page. */

				if (is_optimistic) {
					/* In an optimistic insert, we can
					shift the insert position to the left
					page, since it only needs an X-latch
					on the page itself, which the
					original search acquired for us. */

					btr_cur_position(
						ibuf->index, rec, block,
						btr_pcur_get_btr_cur(pcur));
				} else {
					/* We can't shift the insert
					position to the left page in a
					pessimistic insert since it would
					require an X-latch on the left
					page's left page, so we have to
					abort. */

					return(FALSE);
				}
			} else {
				/* The counter field in the father node is
				the same as we would insert; we don't know
				whether the insert should go to this page or
				the left page (the later fields can differ),
				so refuse the insert. */

				return(FALSE);
			}
		}
3268 3269 3270
	} else {
		/* The cursor is not positioned at or before a user record. */
		return(FALSE);
3271 3272 3273 3274 3275 3276
	}

	/* Patch counter value in already built entry. */
	field = dtuple_get_nth_field(entry, 3);
	data = dfield_get_data(field);

3277
	mach_write_to_2(data + IBUF_REC_OFFSET_COUNTER, counter);
3278 3279 3280 3281

	return(TRUE);
}

3282 3283 3284
/*********************************************************************//**
Buffer an operation in the insert/delete buffer, instead of doing it
directly to the disk page, if this is possible.
3285
@return	DB_SUCCESS, DB_STRONG_FAIL or other error */
osku's avatar
osku committed
3286 3287 3288 3289
static
ulint
ibuf_insert_low(
/*============*/
3290 3291
	ulint		mode,	/*!< in: BTR_MODIFY_PREV or BTR_MODIFY_TREE */
	ibuf_op_t	op,	/*!< in: operation type */
3292
	ibool		no_counter,
3293
				/*!< in: TRUE=use 5.0.3 format;
3294
				FALSE=allow delete buffering */
3295
	const dtuple_t*	entry,	/*!< in: index entry to insert */
3296
	ulint		entry_size,
3297 3298
				/*!< in: rec_get_converted_size(index, entry) */
	dict_index_t*	index,	/*!< in: index where to insert; must not be
osku's avatar
osku committed
3299
				unique or clustered */
3300 3301 3302 3303
	ulint		space,	/*!< in: space id where to insert */
	ulint		zip_size,/*!< in: compressed page size in bytes, or 0 */
	ulint		page_no,/*!< in: page number where to insert */
	que_thr_t*	thr)	/*!< in: query thread */
osku's avatar
osku committed
3304 3305 3306 3307 3308 3309 3310
{
	big_rec_t*	dummy_big_rec;
	btr_pcur_t	pcur;
	btr_cur_t*	cursor;
	dtuple_t*	ibuf_entry;
	mem_heap_t*	heap;
	ulint		buffered;
3311
	lint		min_n_recs;
osku's avatar
osku committed
3312 3313 3314
	rec_t*		ins_rec;
	ibool		old_bit_value;
	page_t*		bitmap_page;
3315
	buf_block_t*	block;
osku's avatar
osku committed
3316 3317 3318 3319
	page_t*		root;
	ulint		err;
	ibool		do_merge;
	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
3320
	ib_int64_t	space_versions[IBUF_MAX_N_PAGES_MERGED];
osku's avatar
osku committed
3321 3322 3323 3324
	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
	ulint		n_stored;
	mtr_t		mtr;
	mtr_t		bitmap_mtr;
3325

3326
	ut_a(!dict_index_is_clust(index));
osku's avatar
osku committed
3327
	ut_ad(dtuple_check_typed(entry));
3328
	ut_ad(ut_is_2pow(zip_size));
3329
	ut_ad(!no_counter || op == IBUF_OP_INSERT);
3330
	ut_a(op < IBUF_OP_COUNT);
osku's avatar
osku committed
3331 3332 3333 3334 3335

	ut_a(trx_sys_multiple_tablespace_format);

	do_merge = FALSE;

3336 3337 3338 3339 3340 3341 3342 3343
	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
	reduce ibuf_mutex contention. ibuf->max_size remains constant
	after ibuf_init_at_db_start(), but ibuf->size should be
	protected by ibuf_mutex. Given that ibuf->size fits in a
	machine word, this should be OK; at worst we are doing some
	excessive ibuf_contract() or occasionally skipping a
	ibuf_contract(). */
	if (ibuf->size >= ibuf->max_size + IBUF_CONTRACT_DO_NOT_INSERT) {
osku's avatar
osku committed
3344 3345 3346 3347 3348 3349
		/* Insert buffer is now too big, contract it but do not try
		to insert */


#ifdef UNIV_IBUF_DEBUG
		fputs("Ibuf too big\n", stderr);
3350
#endif
osku's avatar
osku committed
3351 3352 3353 3354 3355 3356
		/* Use synchronous contract (== TRUE) */
		ibuf_contract(TRUE);

		return(DB_STRONG_FAIL);
	}

3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373
	heap = mem_heap_create(512);

	/* Build the entry which contains the space id and the page number
	as the first fields and the type information for other fields, and
	which will be inserted to the insert buffer. Using a counter value
	of 0xFFFF we find the last record for (space, page_no), from which
	we can then read the counter value N and use N + 1 in the record we
	insert. (We patch the ibuf_entry's counter field to the correct
	value just before actually inserting the entry.) */

	ibuf_entry = ibuf_entry_build(
		op, index, entry, space, page_no,
		no_counter ? ULINT_UNDEFINED : 0xFFFF, heap);

	/* Open a cursor to the insert buffer tree to calculate if we can add
	the new entry to it without exceeding the free space limit for the
	page. */
osku's avatar
osku committed
3374 3375

	if (mode == BTR_MODIFY_TREE) {
3376 3377
		for (;;) {
			ibuf_enter();
3378
			mutex_enter(&ibuf_pessimistic_insert_mutex);
3379
			mutex_enter(&ibuf_mutex);
osku's avatar
osku committed
3380

3381 3382 3383 3384
			if (UNIV_LIKELY(ibuf_data_enough_free_for_insert())) {

				break;
			}
osku's avatar
osku committed
3385 3386 3387

			mutex_exit(&ibuf_mutex);
			mutex_exit(&ibuf_pessimistic_insert_mutex);
3388
			ibuf_exit();
osku's avatar
osku committed
3389

3390
			if (UNIV_UNLIKELY(!ibuf_add_free_page())) {
osku's avatar
osku committed
3391

3392
				mem_heap_free(heap);
3393
				return(DB_STRONG_FAIL);
osku's avatar
osku committed
3394 3395 3396 3397 3398 3399 3400 3401
			}
		}
	} else {
		ibuf_enter();
	}

	mtr_start(&mtr);

3402
	btr_pcur_open(ibuf->index, ibuf_entry, PAGE_CUR_LE, mode, &pcur, &mtr);
3403
	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
3404

osku's avatar
osku committed
3405 3406
	/* Find out the volume of already buffered inserts for the same index
	page */
3407
	min_n_recs = 0;
3408
	buffered = ibuf_get_volume_buffered(&pcur, space, page_no,
3409 3410 3411
					    op == IBUF_OP_DELETE
					    ? &min_n_recs
					    : NULL, &mtr);
3412

3413 3414 3415 3416 3417 3418
	if (op == IBUF_OP_DELETE
	    && (min_n_recs < 2
		|| buf_pool_watch_occurred(space, page_no))) {
		/* The page could become empty after the record is
		deleted, or the page has been read in to the buffer
		pool.  Refuse to buffer the operation. */
3419

3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431
		/* The buffer pool watch is needed for IBUF_OP_DELETE
		because of latching order considerations.  We can
		check buf_pool_watch_occurred() only after latching
		the insert buffer B-tree pages that contain buffered
		changes for the page.  We never buffer IBUF_OP_DELETE,
		unless some IBUF_OP_INSERT or IBUF_OP_DELETE_MARK have
		been previously buffered for the page.  Because there
		are buffered operations for the page, the insert
		buffer B-tree page latches held by mtr will guarantee
		that no changes for the user page will be merged
		before mtr_commit(&mtr).  We must not mtr_commit(&mtr)
		until after the IBUF_OP_DELETE has been buffered. */
3432

3433 3434 3435 3436 3437
fail_exit:
		if (mode == BTR_MODIFY_TREE) {
			mutex_exit(&ibuf_mutex);
			mutex_exit(&ibuf_pessimistic_insert_mutex);
		}
3438

3439 3440
		err = DB_STRONG_FAIL;
		goto func_exit;
3441
	}
osku's avatar
osku committed
3442

3443 3444 3445 3446 3447 3448 3449 3450
	/* After this point, the page could still be loaded to the
	buffer pool, but we do not have to care about it, since we are
	holding a latch on the insert buffer leaf page that contains
	buffered changes for (space, page_no).  If the page enters the
	buffer pool, buf_page_io_complete() for (space, page_no) will
	have to acquire a latch on the same insert buffer leaf page,
	which it cannot do until we have buffered the IBUF_OP_DELETE
	and done mtr_commit(&mtr) to release the latch. */
3451

3452
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
3453 3454
	ut_a((buffered == 0) || ibuf_count_get(space, page_no));
#endif
3455
	mtr_start(&bitmap_mtr);
osku's avatar
osku committed
3456

3457 3458
	bitmap_page = ibuf_bitmap_get_map_page(space, page_no,
					       zip_size, &bitmap_mtr);
osku's avatar
osku committed
3459 3460 3461 3462

	/* We check if the index page is suitable for buffered entries */

	if (buf_page_peek(space, page_no)
3463
	    || lock_rec_expl_exist_on_page(space, page_no)) {
osku's avatar
osku committed
3464

3465
		goto bitmap_fail;
osku's avatar
osku committed
3466 3467
	}

3468 3469 3470 3471
	if (op == IBUF_OP_INSERT) {
		ulint	bits = ibuf_bitmap_page_get_bits(
			bitmap_page, page_no, zip_size, IBUF_BITMAP_FREE,
			&bitmap_mtr);
osku's avatar
osku committed
3472

3473 3474
		if (buffered + entry_size + page_dir_calc_reserved_space(1)
		    > ibuf_index_page_calc_free_from_bits(zip_size, bits)) {
3475
			/* Release the bitmap page latch early. */
3476
			mtr_commit(&bitmap_mtr);
osku's avatar
osku committed
3477

3478 3479
			/* It may not fit */
			do_merge = TRUE;
osku's avatar
osku committed
3480

3481 3482 3483 3484
			ibuf_get_merge_page_nos(
				FALSE, btr_pcur_get_rec(&pcur),
				space_ids, space_versions,
				page_nos, &n_stored);
3485

3486
			goto fail_exit;
3487
		}
3488 3489 3490 3491 3492
	}

	/* Patch correct counter value to the entry to insert. This can
	change the insert position, which can result in the need to abort in
	some cases. */
3493 3494 3495
	if (!no_counter
	    && !ibuf_set_entry_counter(ibuf_entry, space, page_no, &pcur,
				       mode == BTR_MODIFY_PREV, &mtr)) {
3496
bitmap_fail:
3497 3498
		mtr_commit(&bitmap_mtr);

3499
		goto fail_exit;
3500
	}
osku's avatar
osku committed
3501 3502 3503 3504

	/* Set the bitmap bit denoting that the insert buffer contains
	buffered entries for this index page, if the bit is not set yet */

3505 3506 3507
	old_bit_value = ibuf_bitmap_page_get_bits(
		bitmap_page, page_no, zip_size,
		IBUF_BITMAP_BUFFERED, &bitmap_mtr);
3508

osku's avatar
osku committed
3509
	if (!old_bit_value) {
3510
		ibuf_bitmap_page_set_bits(bitmap_page, page_no, zip_size,
3511 3512
					  IBUF_BITMAP_BUFFERED, TRUE,
					  &bitmap_mtr);
osku's avatar
osku committed
3513 3514
	}

3515
	mtr_commit(&bitmap_mtr);
3516

osku's avatar
osku committed
3517
	cursor = btr_pcur_get_btr_cur(&pcur);
3518

osku's avatar
osku committed
3519 3520 3521
	if (mode == BTR_MODIFY_PREV) {
		err = btr_cur_optimistic_insert(BTR_NO_LOCKING_FLAG, cursor,
						ibuf_entry, &ins_rec,
3522
						&dummy_big_rec, 0, thr, &mtr);
3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535
		block = btr_cur_get_block(cursor);
		ut_ad(buf_block_get_space(block) == IBUF_SPACE_ID);

		/* If this is the root page, update ibuf->empty. */
		if (UNIV_UNLIKELY(buf_block_get_page_no(block)
				  == FSP_IBUF_TREE_ROOT_PAGE_NO)) {
			const page_t*	root = buf_block_get_frame(block);

			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
			ut_ad(page_get_page_no(root)
			      == FSP_IBUF_TREE_ROOT_PAGE_NO);

			ibuf->empty = (page_get_n_recs(root) == 0);
osku's avatar
osku committed
3536 3537 3538 3539 3540 3541 3542 3543
		}
	} else {
		ut_ad(mode == BTR_MODIFY_TREE);

		/* We acquire an x-latch to the root page before the insert,
		because a pessimistic insert releases the tree x-latch,
		which would cause the x-latching of the root after that to
		break the latching order. */
3544

3545
		root = ibuf_tree_root_get(&mtr);
osku's avatar
osku committed
3546 3547 3548

		err = btr_cur_pessimistic_insert(BTR_NO_LOCKING_FLAG
						 | BTR_NO_UNDO_LOG_FLAG,
3549 3550
						 cursor,
						 ibuf_entry, &ins_rec,
3551
						 &dummy_big_rec, 0, thr, &mtr);
3552
		mutex_exit(&ibuf_pessimistic_insert_mutex);
3553
		ibuf_size_update(root, &mtr);
3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564
		mutex_exit(&ibuf_mutex);
		ibuf->empty = (page_get_n_recs(root) == 0);

		block = btr_cur_get_block(cursor);
		ut_ad(buf_block_get_space(block) == IBUF_SPACE_ID);
	}

	if (err == DB_SUCCESS && op != IBUF_OP_DELETE) {
		/* Update the page max trx id field */
		page_update_max_trx_id(block, NULL,
				       thr_get_trx(thr)->id, &mtr);
osku's avatar
osku committed
3565 3566
	}

3567
func_exit:
3568
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
3569
	if (err == DB_SUCCESS) {
3570 3571 3572 3573
		fprintf(stderr,
			"Incrementing ibuf count of space %lu page %lu\n"
			"from %lu by 1\n", space, page_no,
			ibuf_count_get(space, page_no));
osku's avatar
osku committed
3574 3575

		ibuf_count_set(space, page_no,
3576
			       ibuf_count_get(space, page_no) + 1);
osku's avatar
osku committed
3577 3578
	}
#endif
3579

osku's avatar
osku committed
3580
	mtr_commit(&mtr);
3581
	btr_pcur_close(&pcur);
osku's avatar
osku committed
3582 3583
	ibuf_exit();

3584
	mem_heap_free(heap);
osku's avatar
osku committed
3585

3586 3587
	if (err == DB_SUCCESS && mode == BTR_MODIFY_TREE) {
		ibuf_contract_after_insert(entry_size);
osku's avatar
osku committed
3588
	}
3589

osku's avatar
osku committed
3590 3591 3592 3593 3594
	if (do_merge) {
#ifdef UNIV_IBUF_DEBUG
		ut_a(n_stored <= IBUF_MAX_N_PAGES_MERGED);
#endif
		buf_read_ibuf_merge_pages(FALSE, space_ids, space_versions,
3595
					  page_nos, n_stored);
osku's avatar
osku committed
3596
	}
3597

osku's avatar
osku committed
3598 3599 3600
	return(err);
}

3601
/*********************************************************************//**
3602 3603
Buffer an operation in the insert/delete buffer, instead of doing it
directly to the disk page, if this is possible. Does not do it if the index
3604 3605
is clustered or unique.
@return	TRUE if success */
3606
UNIV_INTERN
osku's avatar
osku committed
3607 3608 3609
ibool
ibuf_insert(
/*========*/
3610 3611 3612 3613 3614 3615 3616
	ibuf_op_t	op,	/*!< in: operation type */
	const dtuple_t*	entry,	/*!< in: index entry to insert */
	dict_index_t*	index,	/*!< in: index where to insert */
	ulint		space,	/*!< in: space id where to insert */
	ulint		zip_size,/*!< in: compressed page size in bytes, or 0 */
	ulint		page_no,/*!< in: page number where to insert */
	que_thr_t*	thr)	/*!< in: query thread */
osku's avatar
osku committed
3617
{
3618 3619 3620 3621 3622 3623
	ulint		err;
	ulint		entry_size;
	ibool		no_counter;
	/* Read the settable global variable ibuf_use only once in
	this function, so that we will have a consistent view of it. */
	ibuf_use_t	use		= ibuf_use;
osku's avatar
osku committed
3624 3625 3626

	ut_a(trx_sys_multiple_tablespace_format);
	ut_ad(dtuple_check_typed(entry));
3627
	ut_ad(ut_is_2pow(zip_size));
osku's avatar
osku committed
3628

3629
	ut_a(!dict_index_is_clust(index));
3630

3631
	no_counter = use <= IBUF_USE_INSERT;
3632

3633 3634 3635 3636 3637 3638 3639 3640 3641 3642
	switch (op) {
	case IBUF_OP_INSERT:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_DELETE:
		case IBUF_USE_DELETE_MARK:
			return(FALSE);
		case IBUF_USE_INSERT:
		case IBUF_USE_INSERT_DELETE_MARK:
		case IBUF_USE_ALL:
3643
			goto check_watch;
3644
		case IBUF_USE_COUNT:
3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656
			break;
		}
		break;
	case IBUF_OP_DELETE_MARK:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_INSERT:
			return(FALSE);
		case IBUF_USE_DELETE_MARK:
		case IBUF_USE_DELETE:
		case IBUF_USE_INSERT_DELETE_MARK:
		case IBUF_USE_ALL:
3657
			ut_ad(!no_counter);
3658
			goto check_watch;
3659
		case IBUF_USE_COUNT:
3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671
			break;
		}
		break;
	case IBUF_OP_DELETE:
		switch (use) {
		case IBUF_USE_NONE:
		case IBUF_USE_INSERT:
		case IBUF_USE_INSERT_DELETE_MARK:
			return(FALSE);
		case IBUF_USE_DELETE_MARK:
		case IBUF_USE_DELETE:
		case IBUF_USE_ALL:
3672
			ut_ad(!no_counter);
3673
			goto skip_watch;
3674
		case IBUF_USE_COUNT:
3675 3676
			break;
		}
3677 3678 3679
		break;
	case IBUF_OP_COUNT:
		break;
3680 3681
	}

3682 3683 3684
	/* unknown op or use */
	ut_error;

3685 3686 3687 3688 3689 3690
check_watch:
	/* If a thread attempts to buffer an insert on a page while a
	purge is in progress on the same page, the purge must not be
	buffered, because it could remove a record that was
	re-inserted later.  For simplicity, we block the buffering of
	all operations on a page that has a purge pending.
3691

3692 3693 3694 3695 3696
	We do not check this in the IBUF_OP_DELETE case, because that
	would always trigger the buffer pool watch during purge and
	thus prevent the buffering of delete operations.  We assume
	that the issuer of IBUF_OP_DELETE has called
	buf_pool_watch_set(space, page_no). */
3697

3698 3699 3700
	{
		buf_page_t*	bpage;
		ulint		fold = buf_page_address_fold(space, page_no);
irana's avatar
irana committed
3701
		buf_pool_t*	buf_pool = buf_pool_get(space, page_no);
3702

irana's avatar
irana committed
3703 3704 3705
		buf_pool_mutex_enter(buf_pool);
		bpage = buf_page_hash_get_low(buf_pool, space, page_no, fold);
		buf_pool_mutex_exit(buf_pool);
3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716

		if (UNIV_LIKELY_NULL(bpage)) {
			/* A buffer pool watch has been set or the
			page has been read into the buffer pool.
			Do not buffer the request.  If a purge operation
			is being buffered, have this request executed
			directly on the page in the buffer pool after the
			buffered entries for this page have been merged. */
			return(FALSE);
		}
	}
3717

3718
skip_watch:
3719 3720
	entry_size = rec_get_converted_size(index, entry, 0);

3721 3722 3723
	if (entry_size
	    >= page_get_free_space_of_empty(dict_table_is_comp(index->table))
	    / 2) {
3724

osku's avatar
osku committed
3725 3726
		return(FALSE);
	}
3727

3728 3729
	err = ibuf_insert_low(BTR_MODIFY_PREV, op, no_counter,
			      entry, entry_size,
3730
			      index, space, zip_size, page_no, thr);
osku's avatar
osku committed
3731
	if (err == DB_FAIL) {
3732 3733
		err = ibuf_insert_low(BTR_MODIFY_TREE, op, no_counter,
				      entry, entry_size,
3734
				      index, space, zip_size, page_no, thr);
osku's avatar
osku committed
3735
	}
3736

osku's avatar
osku committed
3737 3738 3739
	if (err == DB_SUCCESS) {
#ifdef UNIV_IBUF_DEBUG
		/* fprintf(stderr, "Ibuf insert for page no %lu of index %s\n",
3740
		page_no, index->name); */
osku's avatar
osku committed
3741 3742 3743 3744 3745 3746 3747 3748 3749
#endif
		return(TRUE);

	} else {
		ut_a(err == DB_STRONG_FAIL);

		return(FALSE);
	}
}
3750

3751
/********************************************************************//**
osku's avatar
osku committed
3752 3753 3754 3755
During merge, inserts to an index page a secondary index entry extracted
from the insert buffer. */
static
void
3756 3757
ibuf_insert_to_index_page_low(
/*==========================*/
3758 3759 3760 3761 3762 3763
	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
	buf_block_t*	block,	/*!< in/out: index page where the buffered
				entry should be placed */
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr,	/*!< in/out: mtr */
	page_cur_t*	page_cur)/*!< in/out: cursor positioned on the record
3764 3765
				after which to insert the buffered entry */
{
3766 3767 3768 3769 3770 3771
	const page_t*	page;
	ulint		space;
	ulint		page_no;
	ulint		zip_size;
	const page_t*	bitmap_page;
	ulint		old_bits;
3772 3773

	if (UNIV_LIKELY
3774
	    (page_cur_tuple_insert(page_cur, entry, index, 0, mtr) != NULL)) {
3775 3776 3777 3778 3779
		return;
	}

	/* If the record did not fit, reorganize */

3780 3781
	btr_page_reorganize(block, index, mtr);
	page_cur_search(block, index, entry, PAGE_CUR_LE, page_cur);
3782 3783 3784 3785

	/* This time the record must fit */

	if (UNIV_LIKELY
3786
	    (page_cur_tuple_insert(page_cur, entry, index, 0, mtr) != NULL)) {
3787 3788 3789
		return;
	}

3790 3791
	page = buf_block_get_frame(block);

3792 3793 3794 3795 3796 3797
	ut_print_timestamp(stderr);

	fprintf(stderr,
		"  InnoDB: Error: Insert buffer insert fails;"
		" page free %lu, dtuple size %lu\n",
		(ulong) page_get_max_insert_size(page, 1),
3798
		(ulong) rec_get_converted_size(index, entry, 0));
3799 3800 3801 3802 3803 3804
	fputs("InnoDB: Cannot insert index record ", stderr);
	dtuple_print(stderr, entry);
	fputs("\nInnoDB: The table where this index record belongs\n"
	      "InnoDB: is now probably corrupt. Please run CHECK TABLE on\n"
	      "InnoDB: that table.\n", stderr);

3805 3806 3807
	space = page_get_space_id(page);
	zip_size = buf_block_get_zip_size(block);
	page_no = page_get_page_no(page);
3808

3809 3810
	bitmap_page = ibuf_bitmap_get_map_page(space, page_no, zip_size, mtr);
	old_bits = ibuf_bitmap_page_get_bits(bitmap_page, page_no, zip_size,
3811 3812 3813
					     IBUF_BITMAP_FREE, mtr);

	fprintf(stderr,
3814 3815 3816
		"InnoDB: space %lu, page %lu, zip_size %lu, bitmap bits %lu\n",
		(ulong) space, (ulong) page_no,
		(ulong) zip_size, (ulong) old_bits);
3817 3818 3819 3820 3821

	fputs("InnoDB: Submit a detailed bug report"
	      " to http://bugs.mysql.com\n", stderr);
}

3822 3823 3824 3825 3826
/************************************************************************
During merge, inserts to an index page a secondary index entry extracted
from the insert buffer. */
static
void
osku's avatar
osku committed
3827 3828
ibuf_insert_to_index_page(
/*======================*/
3829
	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
3830
	buf_block_t*	block,	/*!< in/out: index page where the buffered entry
osku's avatar
osku committed
3831
				should be placed */
3832 3833
	dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
3834 3835 3836
{
	page_cur_t	page_cur;
	ulint		low_match;
3837
	page_t*		page		= buf_block_get_frame(block);
osku's avatar
osku committed
3838 3839 3840 3841
	rec_t*		rec;

	ut_ad(ibuf_inside());
	ut_ad(dtuple_check_typed(entry));
3842
	ut_ad(!buf_block_align(page)->is_hashed);
osku's avatar
osku committed
3843

3844
	if (UNIV_UNLIKELY(dict_table_is_comp(index->table)
3845 3846 3847 3848 3849
			  != (ibool)!!page_is_comp(page))) {
		fputs("InnoDB: Trying to insert a record from"
		      " the insert buffer to an index page\n"
		      "InnoDB: but the 'compact' flag does not match!\n",
		      stderr);
osku's avatar
osku committed
3850 3851 3852 3853 3854
		goto dump;
	}

	rec = page_rec_get_next(page_get_infimum_rec(page));

3855
	if (page_rec_is_supremum(rec)) {
3856 3857 3858 3859 3860
		fputs("InnoDB: Trying to insert a record from"
		      " the insert buffer to an index page\n"
		      "InnoDB: but the index page is empty!\n",
		      stderr);
		goto dump;
3861 3862
	}

osku's avatar
osku committed
3863
	if (UNIV_UNLIKELY(rec_get_n_fields(rec, index)
3864 3865 3866 3867 3868 3869
			  != dtuple_get_n_fields(entry))) {
		fputs("InnoDB: Trying to insert a record from"
		      " the insert buffer to an index page\n"
		      "InnoDB: but the number of fields does not match!\n",
		      stderr);
dump:
3870
		buf_page_print(page, 0);
osku's avatar
osku committed
3871

3872
		dtuple_print(stderr, entry);
osku's avatar
osku committed
3873

3874 3875 3876 3877 3878 3879 3880
		fputs("InnoDB: The table where where"
		      " this index record belongs\n"
		      "InnoDB: is now probably corrupt."
		      " Please run CHECK TABLE on\n"
		      "InnoDB: your tables.\n"
		      "InnoDB: Submit a detailed bug report to"
		      " http://bugs.mysql.com!\n", stderr);
osku's avatar
osku committed
3881 3882 3883 3884

		return;
	}

3885 3886
	low_match = page_cur_search(block, index, entry,
				    PAGE_CUR_LE, &page_cur);
3887

3888 3889 3890 3891
	if (UNIV_UNLIKELY(low_match == dtuple_get_n_fields(entry))) {
		mem_heap_t*	heap;
		upd_t*		update;
		ulint*		offsets;
3892 3893
		page_zip_des_t*	page_zip;

osku's avatar
osku committed
3894
		rec = page_cur_get_rec(&page_cur);
3895

3896 3897 3898
		/* This is based on
		row_ins_sec_index_entry_by_modify(BTR_MODIFY_LEAF). */
		ut_ad(rec_get_deleted_flag(rec, page_is_comp(page)));
unknown's avatar
unknown committed
3899

3900
		heap = mem_heap_create(1024);
unknown's avatar
unknown committed
3901

3902 3903 3904 3905
		offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED,
					  &heap);
		update = row_upd_build_sec_rec_difference_binary(
			index, entry, rec, NULL, heap);
3906

3907 3908
		page_zip = buf_block_get_page_zip(block);

3909 3910 3911 3912
		if (update->n_fields == 0) {
			/* The records only differ in the delete-mark.
			Clear the delete-mark, like we did before
			Bug #56680 was fixed. */
3913 3914
			btr_cur_set_deleted_flag_for_ibuf(
				rec, page_zip, FALSE, mtr);
3915 3916
updated_in_place:
			mem_heap_free(heap);
3917 3918 3919
			return;
		}

3920 3921 3922 3923 3924 3925 3926 3927
		/* Copy the info bits. Clear the delete-mark. */
		update->info_bits = rec_get_info_bits(rec, page_is_comp(page));
		update->info_bits &= ~REC_INFO_DELETED_FLAG;

		/* We cannot invoke btr_cur_optimistic_update() here,
		because we do not have a btr_cur_t or que_thr_t,
		as the insert buffer merge occurs at a very low level. */
		if (!row_upd_changes_field_size_or_external(index, offsets,
3928 3929 3930 3931
							    update)
		    && (!page_zip || btr_cur_update_alloc_zip(
				page_zip, block, index,
				rec_offs_size(offsets), FALSE, mtr))) {
3932 3933
			/* This is the easy case. Do something similar
			to btr_cur_update_in_place(). */
3934 3935
			row_upd_rec_in_place(rec, index, offsets,
					     update, page_zip);
3936
			goto updated_in_place;
3937
		}
3938

3939 3940 3941 3942 3943 3944
		/* A collation may identify values that differ in
		storage length.
		Some examples (1 or 2 bytes):
		utf8_turkish_ci: I = U+0131 LATIN SMALL LETTER DOTLESS I
		utf8_general_ci: S = U+00DF LATIN SMALL LETTER SHARP S
		utf8_general_ci: A = U+00E4 LATIN SMALL LETTER A WITH DIAERESIS
3945

3946
		latin1_german2_ci: SS = U+00DF LATIN SMALL LETTER SHARP S
3947

3948 3949
		Examples of a character (3-byte UTF-8 sequence)
		identified with 2 or 4 characters (1-byte UTF-8 sequences):
3950

3951 3952 3953
		utf8_unicode_ci: 'II' = U+2171 SMALL ROMAN NUMERAL TWO
		utf8_unicode_ci: '(10)' = U+247D PARENTHESIZED NUMBER TEN
		*/
3954

3955 3956
		/* Delete the different-length record, and insert the
		buffered one. */
3957

3958
		lock_rec_store_on_page_infimum(block, rec);
3959 3960 3961 3962
		page_cur_delete_rec(&page_cur, index, offsets, mtr);
		page_cur_move_to_prev(&page_cur);
		mem_heap_free(heap);

3963
		ibuf_insert_to_index_page_low(entry, block, index, mtr,
3964
					      &page_cur);
3965
		lock_rec_restore_from_page_infimum(block, rec, block);
3966
	} else {
3967
		ibuf_insert_to_index_page_low(entry, block, index, mtr,
3968
					      &page_cur);
osku's avatar
osku committed
3969 3970 3971
	}
}

3972
/****************************************************************//**
3973 3974 3975 3976 3977 3978
During merge, sets the delete mark on a record for a secondary index
entry. */
static
void
ibuf_set_del_mark(
/*==============*/
3979 3980 3981 3982
	const dtuple_t*		entry,	/*!< in: entry */
	buf_block_t*		block,	/*!< in/out: block */
	const dict_index_t*	index,	/*!< in: record descriptor */
	mtr_t*			mtr)	/*!< in: mtr */
3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997
{
	page_cur_t	page_cur;
	ulint		low_match;

	ut_ad(ibuf_inside());
	ut_ad(dtuple_check_typed(entry));

	low_match = page_cur_search(
		block, index, entry, PAGE_CUR_LE, &page_cur);

	if (low_match == dtuple_get_n_fields(entry)) {
		rec_t*		rec;
		page_zip_des_t*	page_zip;

		rec = page_cur_get_rec(&page_cur);
marko's avatar
marko committed
3998
		page_zip = page_cur_get_page_zip(&page_cur);
3999

4000 4001 4002 4003 4004 4005 4006 4007
		/* Delete mark the old index record. According to a
		comment in row_upd_sec_index_entry(), it can already
		have been delete marked if a lock wait occurred in
		row_ins_index_entry() in a previous invocation of
		row_upd_sec_index_entry(). */

		if (UNIV_LIKELY
		    (!rec_get_deleted_flag(
4008
			    rec, dict_table_is_comp(index->table)))) {
4009 4010
			btr_cur_set_deleted_flag_for_ibuf(rec, page_zip,
							  TRUE, mtr);
4011
		}
4012
	} else {
4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025
		ut_print_timestamp(stderr);
		fputs("  InnoDB: unable to find a record to delete-mark\n",
		      stderr);
		fputs("InnoDB: tuple ", stderr);
		dtuple_print(stderr, entry);
		fputs("\n"
		      "InnoDB: record ", stderr);
		rec_print(stderr, page_cur_get_rec(&page_cur), index);
		putc('\n', stderr);
		fputs("\n"
		      "InnoDB: Submit a detailed bug report"
		      " to http://bugs.mysql.com\n", stderr);
		ut_ad(0);
4026 4027 4028
	}
}

4029
/****************************************************************//**
4030 4031 4032 4033 4034
During merge, delete a record for a secondary index entry. */
static
void
ibuf_delete(
/*========*/
4035 4036 4037
	const dtuple_t*	entry,	/*!< in: entry */
	buf_block_t*	block,	/*!< in/out: block */
	dict_index_t*	index,	/*!< in: record descriptor */
4038 4039
	mtr_t*		mtr)	/*!< in/out: mtr; must be committed
				before latching any further pages */
4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050
{
	page_cur_t	page_cur;
	ulint		low_match;

	ut_ad(ibuf_inside());
	ut_ad(dtuple_check_typed(entry));

	low_match = page_cur_search(
		block, index, entry, PAGE_CUR_LE, &page_cur);

	if (low_match == dtuple_get_n_fields(entry)) {
4051 4052 4053
		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
		page_t*		page	= buf_block_get_frame(block);
		rec_t*		rec	= page_cur_get_rec(&page_cur);
4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067

		/* TODO: the below should probably be a separate function,
		it's a bastardized version of btr_cur_optimistic_delete. */

		ulint		offsets_[REC_OFFS_NORMAL_SIZE];
		ulint*		offsets	= offsets_;
		mem_heap_t*	heap = NULL;
		ulint		max_ins_size;

		rec_offs_init(offsets_);

		offsets = rec_get_offsets(
			rec, index, offsets, ULINT_UNDEFINED, &heap);

4068 4069 4070
		/* Refuse to delete the last record. */
		ut_a(page_get_n_recs(page) > 1);

marko's avatar
marko committed
4071 4072 4073 4074
		/* The record should have been marked for deletion. */
		ut_ad(REC_INFO_DELETED_FLAG
		      & rec_get_info_bits(rec, page_is_comp(page)));

4075 4076
		lock_update_delete(block, rec);

4077 4078 4079 4080 4081 4082 4083 4084
		if (!page_zip) {
			max_ins_size
				= page_get_max_insert_size_after_reorganize(
					page, 1);
		}
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
4085
		page_cur_delete_rec(&page_cur, index, offsets, mtr);
4086 4087 4088
#ifdef UNIV_ZIP_DEBUG
		ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
4089

4090 4091 4092 4093 4094
		if (page_zip) {
			ibuf_update_free_bits_zip(block, mtr);
		} else {
			ibuf_update_free_bits_low(block, max_ins_size, mtr);
		}
4095 4096 4097 4098 4099

		if (UNIV_LIKELY_NULL(heap)) {
			mem_heap_free(heap);
		}
	} else {
4100
		/* The record must have been purged already. */
4101 4102 4103
	}
}

4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163
/*********************************************************************//**
Restores insert buffer tree cursor position
@return	TRUE if the position was restored; FALSE if not */
static __attribute__((nonnull))
ibool
ibuf_restore_pos(
/*=============*/
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number where the record
				should belong */
	const dtuple_t*	search_tuple,
				/*!< in: search tuple for entries of page_no */
	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor whose
				position is to be restored */
	mtr_t*		mtr)	/*!< in/out: mini-transaction */
{
	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);

	if (btr_pcur_restore_position(mode, pcur, mtr)) {

		return(TRUE);
	}

	if (fil_space_get_flags(space) == ULINT_UNDEFINED) {
		/* The tablespace has been dropped.  It is possible
		that another thread has deleted the insert buffer
		entry.  Do not complain. */
		btr_pcur_commit_specify_mtr(pcur, mtr);
	} else {
		fprintf(stderr,
			"InnoDB: ERROR: Submit the output to"
			" http://bugs.mysql.com\n"
			"InnoDB: ibuf cursor restoration fails!\n"
			"InnoDB: ibuf record inserted to page %lu:%lu\n",
			(ulong) space, (ulong) page_no);
		fflush(stderr);

		rec_print_old(stderr, btr_pcur_get_rec(pcur));
		rec_print_old(stderr, pcur->old_rec);
		dtuple_print(stderr, search_tuple);

		rec_print_old(stderr,
			      page_rec_get_next(btr_pcur_get_rec(pcur)));
		fflush(stderr);

		btr_pcur_commit_specify_mtr(pcur, mtr);

		fputs("InnoDB: Validating insert buffer tree:\n", stderr);
		if (!btr_validate_index(ibuf->index, NULL)) {
			ut_error;
		}

		fprintf(stderr, "InnoDB: ibuf tree ok\n");
		fflush(stderr);
	}

	return(FALSE);
}

4164
/*********************************************************************//**
osku's avatar
osku committed
4165 4166
Deletes from ibuf the record on which pcur is positioned. If we have to
resort to a pessimistic delete, this function commits mtr and closes
4167 4168
the cursor.
@return	TRUE if mtr was committed and pcur closed in this operation */
osku's avatar
osku committed
4169 4170 4171 4172
static
ibool
ibuf_delete_rec(
/*============*/
4173 4174
	ulint		space,	/*!< in: space id */
	ulint		page_no,/*!< in: index page number where the record
osku's avatar
osku committed
4175
				should belong */
4176
	btr_pcur_t*	pcur,	/*!< in: pcur positioned on the record to
osku's avatar
osku committed
4177
				delete, having latch mode BTR_MODIFY_LEAF */
4178
	const dtuple_t*	search_tuple,
4179 4180
				/*!< in: search tuple for entries of page_no */
	mtr_t*		mtr)	/*!< in: mtr */
osku's avatar
osku committed
4181 4182 4183 4184
{
	ibool		success;
	page_t*		root;
	ulint		err;
4185

osku's avatar
osku committed
4186
	ut_ad(ibuf_inside());
4187 4188 4189
	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
	ut_ad(ibuf_rec_get_page_no(btr_pcur_get_rec(pcur)) == page_no);
	ut_ad(ibuf_rec_get_space(btr_pcur_get_rec(pcur)) == space);
osku's avatar
osku committed
4190 4191 4192 4193

	success = btr_cur_optimistic_delete(btr_pcur_get_btr_cur(pcur), mtr);

	if (success) {
4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209
		if (UNIV_UNLIKELY(!page_get_n_recs(btr_pcur_get_page(pcur)))) {
			/* If a B-tree page is empty, it must be the root page
			and the whole B-tree must be empty. InnoDB does not
			allow empty B-tree pages other than the root. */
			root = btr_pcur_get_page(pcur);

			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
			ut_ad(page_get_page_no(root)
			      == FSP_IBUF_TREE_ROOT_PAGE_NO);

			/* ibuf->empty is protected by the root page latch.
			Before the deletion, it had to be FALSE. */
			ut_ad(!ibuf->empty);
			ibuf->empty = TRUE;
		}

4210
#ifdef UNIV_IBUF_COUNT_DEBUG
4211 4212 4213 4214
		fprintf(stderr,
			"Decrementing ibuf count of space %lu page %lu\n"
			"from %lu by 1\n", space, page_no,
			ibuf_count_get(space, page_no));
osku's avatar
osku committed
4215
		ibuf_count_set(space, page_no,
4216
			       ibuf_count_get(space, page_no) - 1);
osku's avatar
osku committed
4217 4218 4219
#endif
		return(FALSE);
	}
4220

4221 4222 4223 4224
	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
	ut_ad(ibuf_rec_get_page_no(btr_pcur_get_rec(pcur)) == page_no);
	ut_ad(ibuf_rec_get_space(btr_pcur_get_rec(pcur)) == space);

osku's avatar
osku committed
4225 4226 4227 4228 4229 4230 4231 4232
	/* We have to resort to a pessimistic delete from ibuf */
	btr_pcur_store_position(pcur, mtr);

	btr_pcur_commit_specify_mtr(pcur, mtr);

	mutex_enter(&ibuf_mutex);

	mtr_start(mtr);
4233

4234 4235
	if (!ibuf_restore_pos(space, page_no, search_tuple,
			      BTR_MODIFY_TREE, pcur, mtr)) {
osku's avatar
osku committed
4236

4237
		mutex_exit(&ibuf_mutex);
4238
		goto func_exit;
osku's avatar
osku committed
4239 4240
	}

4241
	root = ibuf_tree_root_get(mtr);
osku's avatar
osku committed
4242 4243

	btr_cur_pessimistic_delete(&err, TRUE, btr_pcur_get_btr_cur(pcur),
4244
				   RB_NONE, mtr);
osku's avatar
osku committed
4245 4246
	ut_a(err == DB_SUCCESS);

4247
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
4248 4249
	ibuf_count_set(space, page_no, ibuf_count_get(space, page_no) - 1);
#endif
4250
	ibuf_size_update(root, mtr);
4251 4252 4253
	mutex_exit(&ibuf_mutex);

	ibuf->empty = (page_get_n_recs(root) == 0);
osku's avatar
osku committed
4254 4255
	btr_pcur_commit_specify_mtr(pcur, mtr);

4256
func_exit:
4257
	btr_pcur_close(pcur);
osku's avatar
osku committed
4258 4259 4260 4261

	return(TRUE);
}

4262
/*********************************************************************//**
osku's avatar
osku committed
4263
When an index page is read from a disk to the buffer pool, this function
4264 4265 4266 4267 4268
applies any buffered operations to the page and deletes the entries from the
insert buffer. If the page is not read, but created in the buffer pool, this
function deletes its buffered entries from the insert buffer; there can
exist entries for such a page if the page belonged to an index which
subsequently was dropped. */
4269
UNIV_INTERN
osku's avatar
osku committed
4270 4271 4272
void
ibuf_merge_or_delete_for_page(
/*==========================*/
4273
	buf_block_t*	block,	/*!< in: if page has been read from
4274 4275
				disk, pointer to the page x-latched,
				else NULL */
4276 4277 4278
	ulint		space,	/*!< in: space id of the index page */
	ulint		page_no,/*!< in: page number of the index page */
	ulint		zip_size,/*!< in: compressed page size in bytes,
4279
				or 0 */
4280
	ibool		update_ibuf_bitmap)/*!< in: normally this is set
4281 4282 4283 4284
				to TRUE, but if we have deleted or are
				deleting the tablespace, then we
				naturally do not want to update a
				non-existent bitmap page */
osku's avatar
osku committed
4285 4286 4287 4288 4289
{
	mem_heap_t*	heap;
	btr_pcur_t	pcur;
	dtuple_t*	search_tuple;
#ifdef UNIV_IBUF_DEBUG
4290
	ulint		volume			= 0;
osku's avatar
osku committed
4291
#endif
4292
	page_zip_des_t*	page_zip		= NULL;
osku's avatar
osku committed
4293 4294 4295 4296
	ibool		tablespace_being_deleted = FALSE;
	ibool		corruption_noticed	= FALSE;
	mtr_t		mtr;

4297 4298 4299 4300
	/* Counts for merged & discarded operations. */
	ulint		mops[IBUF_OP_COUNT];
	ulint		dops[IBUF_OP_COUNT];

4301 4302
	ut_ad(!block || buf_block_get_space(block) == space);
	ut_ad(!block || buf_block_get_page_no(block) == page_no);
4303 4304
	ut_ad(!block || buf_block_get_zip_size(block) == zip_size);

4305
	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE
4306 4307 4308
	    || trx_sys_hdr_page(space, page_no)) {
		return;
	}
osku's avatar
osku committed
4309

4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321
	/* We cannot refer to zip_size in the following, because
	zip_size is passed as ULINT_UNDEFINED (it is unknown) when
	buf_read_ibuf_merge_pages() is merging (discarding) changes
	for a dropped tablespace.  When block != NULL or
	update_ibuf_bitmap is specified, the zip_size must be known.
	That is why we will repeat the check below, with zip_size in
	place of 0.  Passing zip_size as 0 assumes that the
	uncompressed page size always is a power-of-2 multiple of the
	compressed page size. */

	if (ibuf_fixed_addr_page(space, 0, page_no)
	    || fsp_descr_page(0, page_no)) {
osku's avatar
osku committed
4322 4323 4324
		return;
	}

4325
	if (UNIV_LIKELY(update_ibuf_bitmap)) {
4326 4327 4328 4329 4330 4331 4332
		ut_a(ut_is_2pow(zip_size));

		if (ibuf_fixed_addr_page(space, zip_size, page_no)
		    || fsp_descr_page(zip_size, page_no)) {
			return;
		}

osku's avatar
osku committed
4333 4334 4335 4336 4337 4338 4339
		/* If the following returns FALSE, we get the counter
		incremented, and must decrement it when we leave this
		function. When the counter is > 0, that prevents tablespace
		from being dropped. */

		tablespace_being_deleted = fil_inc_pending_ibuf_merges(space);

4340
		if (UNIV_UNLIKELY(tablespace_being_deleted)) {
osku's avatar
osku committed
4341 4342 4343
			/* Do not try to read the bitmap page from space;
			just delete the ibuf records for the page */

4344
			block = NULL;
osku's avatar
osku committed
4345
			update_ibuf_bitmap = FALSE;
4346
		} else {
4347 4348
			page_t*	bitmap_page;

4349
			mtr_start(&mtr);
4350 4351 4352

			bitmap_page = ibuf_bitmap_get_map_page(
				space, page_no, zip_size, &mtr);
4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367

			if (!ibuf_bitmap_page_get_bits(bitmap_page, page_no,
						       zip_size,
						       IBUF_BITMAP_BUFFERED,
						       &mtr)) {
				/* No inserts buffered for this page */
				mtr_commit(&mtr);

				if (!tablespace_being_deleted) {
					fil_decr_pending_ibuf_merges(space);
				}

				return;
			}
			mtr_commit(&mtr);
osku's avatar
osku committed
4368
		}
4369 4370 4371 4372 4373
	} else if (block
		   && (ibuf_fixed_addr_page(space, zip_size, page_no)
		      || fsp_descr_page(zip_size, page_no))) {

		return;
4374
	}
osku's avatar
osku committed
4375 4376 4377 4378 4379 4380 4381

	ibuf_enter();

	heap = mem_heap_create(512);

	if (!trx_sys_multiple_tablespace_format) {
		ut_a(trx_doublewrite_must_reset_space_ids);
4382
		search_tuple = ibuf_search_tuple_build(space, page_no, heap);
osku's avatar
osku committed
4383
	} else {
4384
		search_tuple = ibuf_new_search_tuple_build(space, page_no,
4385
							   heap);
osku's avatar
osku committed
4386
	}
4387

4388
	if (block) {
osku's avatar
osku committed
4389 4390 4391 4392 4393 4394
		/* Move the ownership of the x-latch on the page to this OS
		thread, so that we can acquire a second x-latch on it. This
		is needed for the insert operations to the index page to pass
		the debug checks. */

		rw_lock_x_lock_move_ownership(&(block->lock));
4395
		page_zip = buf_block_get_page_zip(block);
4396

4397
		if (UNIV_UNLIKELY(fil_page_get_type(block->frame)
4398 4399
				  != FIL_PAGE_INDEX)
		    || UNIV_UNLIKELY(!page_is_leaf(block->frame))) {
osku's avatar
osku committed
4400

4401 4402
			page_t*	bitmap_page;

osku's avatar
osku committed
4403
			corruption_noticed = TRUE;
4404

osku's avatar
osku committed
4405 4406 4407 4408
			ut_print_timestamp(stderr);

			mtr_start(&mtr);

4409
			fputs("  InnoDB: Dump of the ibuf bitmap page:\n",
4410
			      stderr);
4411

4412 4413
			bitmap_page = ibuf_bitmap_get_map_page(space, page_no,
							       zip_size, &mtr);
4414
			buf_page_print(bitmap_page, 0);
4415

osku's avatar
osku committed
4416 4417 4418 4419
			mtr_commit(&mtr);

			fputs("\nInnoDB: Dump of the page:\n", stderr);

4420
			buf_page_print(block->frame, 0);
osku's avatar
osku committed
4421 4422

			fprintf(stderr,
4423 4424 4425 4426 4427
				"InnoDB: Error: corruption in the tablespace."
				" Bitmap shows insert\n"
				"InnoDB: buffer records to page n:o %lu"
				" though the page\n"
				"InnoDB: type is %lu, which is"
4428
				" not an index leaf page!\n"
4429 4430 4431 4432 4433 4434 4435 4436
				"InnoDB: We try to resolve the problem"
				" by skipping the insert buffer\n"
				"InnoDB: merge for this page."
				" Please run CHECK TABLE on your tables\n"
				"InnoDB: to determine if they are corrupt"
				" after this.\n\n"
				"InnoDB: Please submit a detailed bug report"
				" to http://bugs.mysql.com\n\n",
osku's avatar
osku committed
4437
				(ulong) page_no,
4438
				(ulong)
4439
				fil_page_get_type(block->frame));
osku's avatar
osku committed
4440 4441 4442
		}
	}

4443 4444 4445
	memset(mops, 0, sizeof(mops));
	memset(dops, 0, sizeof(dops));

osku's avatar
osku committed
4446 4447 4448
loop:
	mtr_start(&mtr);

4449
	if (block) {
4450 4451 4452 4453 4454 4455
		ibool success;

		success = buf_page_get_known_nowait(
			RW_X_LATCH, block,
			BUF_KEEP_OLD, __FILE__, __LINE__, &mtr);

osku's avatar
osku committed
4456
		ut_a(success);
4457

4458
		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
osku's avatar
osku committed
4459
	}
4460

osku's avatar
osku committed
4461 4462
	/* Position pcur in the insert buffer at the first entry for this
	index page */
4463 4464 4465 4466
	btr_pcur_open_on_user_rec(
		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
		&pcur, &mtr);

4467
	if (!btr_pcur_is_on_user_rec(&pcur)) {
osku's avatar
osku committed
4468 4469 4470 4471 4472 4473
		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));

		goto reset_bit;
	}

	for (;;) {
4474 4475
		rec_t*	rec;

4476
		ut_ad(btr_pcur_is_on_user_rec(&pcur));
osku's avatar
osku committed
4477

4478
		rec = btr_pcur_get_rec(&pcur);
osku's avatar
osku committed
4479 4480

		/* Check if the entry is for this index page */
4481 4482 4483
		if (ibuf_rec_get_page_no(rec) != page_no
		    || ibuf_rec_get_space(rec) != space) {

4484 4485
			if (block) {
				page_header_reset_last_insert(
4486
					block->frame, page_zip, &mtr);
osku's avatar
osku committed
4487
			}
4488

osku's avatar
osku committed
4489 4490 4491
			goto reset_bit;
		}

4492
		if (UNIV_UNLIKELY(corruption_noticed)) {
osku's avatar
osku committed
4493
			fputs("InnoDB: Discarding record\n ", stderr);
4494
			rec_print_old(stderr, rec);
4495
			fputs("\nInnoDB: from the insert buffer!\n\n", stderr);
4496
		} else if (block) {
osku's avatar
osku committed
4497
			/* Now we have at pcur a record which should be
4498
			applied on the index page; NOTE that the call below
4499 4500
			copies pointers to fields in rec, and we must
			keep the latch to the rec page until the
osku's avatar
osku committed
4501
			insertion is finished! */
4502
			dtuple_t*	entry;
4503
			trx_id_t	max_trx_id;
osku's avatar
osku committed
4504
			dict_index_t*	dummy_index;
4505 4506 4507
			ibuf_op_t	op = ibuf_rec_get_op_type(rec);

			max_trx_id = page_get_max_trx_id(page_align(rec));
4508 4509
			page_update_max_trx_id(block, page_zip, max_trx_id,
					       &mtr);
marko's avatar
marko committed
4510

4511 4512
			ut_ad(page_validate(page_align(rec), ibuf->index));

4513
			entry = ibuf_build_entry_from_ibuf_rec(
4514 4515
				rec, heap, &dummy_index);

4516 4517 4518 4519 4520 4521
			ut_ad(page_validate(block->frame, dummy_index));

			switch (op) {
				ibool	success;
			case IBUF_OP_INSERT:
#ifdef UNIV_IBUF_DEBUG
4522 4523 4524 4525 4526 4527 4528
				volume += rec_get_converted_size(
					dummy_index, entry, 0);

				volume += page_dir_calc_reserved_space(1);

				ut_a(volume <= 4 * UNIV_PAGE_SIZE
					/ IBUF_PAGE_SIZE_PER_FREE_SPACE);
osku's avatar
osku committed
4529
#endif
4530 4531 4532 4533 4534 4535 4536 4537 4538 4539 4540
				ibuf_insert_to_index_page(
					entry, block, dummy_index, &mtr);
				break;

			case IBUF_OP_DELETE_MARK:
				ibuf_set_del_mark(
					entry, block, dummy_index, &mtr);
				break;

			case IBUF_OP_DELETE:
				ibuf_delete(entry, block, dummy_index, &mtr);
4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551 4552
				/* Because ibuf_delete() will latch an
				insert buffer bitmap page, commit mtr
				before latching any further pages.
				Store and restore the cursor position. */
				ut_ad(rec == btr_pcur_get_rec(&pcur));
				ut_ad(page_rec_is_user_rec(rec));
				ut_ad(ibuf_rec_get_page_no(rec) == page_no);
				ut_ad(ibuf_rec_get_space(rec) == space);

				btr_pcur_store_position(&pcur, &mtr);
				btr_pcur_commit_specify_mtr(&pcur, &mtr);

4553 4554
				mtr_start(&mtr);

4555 4556 4557 4558 4559
				success = buf_page_get_known_nowait(
					RW_X_LATCH, block,
					BUF_KEEP_OLD,
					__FILE__, __LINE__, &mtr);
				ut_a(success);
4560

4561
				buf_block_dbg_add_level(block, SYNC_TREE_NODE);
4562

4563 4564 4565 4566 4567
				if (!ibuf_restore_pos(space, page_no,
						      search_tuple,
						      BTR_MODIFY_LEAF,
						      &pcur, &mtr)) {

4568
					mtr_commit(&mtr);
4569 4570 4571 4572
					mops[op]++;
					ibuf_dummy_index_free(dummy_index);
					goto loop;
				}
4573

4574
				break;
4575 4576 4577 4578 4579 4580
			default:
				ut_error;
			}

			mops[op]++;

osku's avatar
osku committed
4581
			ibuf_dummy_index_free(dummy_index);
4582 4583
		} else {
			dops[ibuf_rec_get_op_type(rec)]++;
osku's avatar
osku committed
4584 4585 4586 4587
		}

		/* Delete the record from ibuf */
		if (ibuf_delete_rec(space, page_no, &pcur, search_tuple,
4588
				    &mtr)) {
osku's avatar
osku committed
4589 4590 4591 4592
			/* Deletion was pessimistic and mtr was committed:
			we start from the beginning again */

			goto loop;
4593
		} else if (btr_pcur_is_after_last_on_page(&pcur)) {
osku's avatar
osku committed
4594
			mtr_commit(&mtr);
4595
			btr_pcur_close(&pcur);
osku's avatar
osku committed
4596 4597 4598 4599 4600 4601

			goto loop;
		}
	}

reset_bit:
4602
	if (UNIV_LIKELY(update_ibuf_bitmap)) {
4603 4604 4605 4606 4607 4608 4609 4610 4611
		page_t*	bitmap_page;

		bitmap_page = ibuf_bitmap_get_map_page(
			space, page_no, zip_size, &mtr);

		ibuf_bitmap_page_set_bits(
			bitmap_page, page_no, zip_size,
			IBUF_BITMAP_BUFFERED, FALSE, &mtr);

4612
		if (block) {
4613 4614 4615
			ulint old_bits = ibuf_bitmap_page_get_bits(
				bitmap_page, page_no, zip_size,
				IBUF_BITMAP_FREE, &mtr);
4616

4617 4618
			ulint new_bits = ibuf_index_page_calc_free(
				zip_size, block);
4619

osku's avatar
osku committed
4620
			if (old_bits != new_bits) {
4621 4622 4623
				ibuf_bitmap_page_set_bits(
					bitmap_page, page_no, zip_size,
					IBUF_BITMAP_FREE, new_bits, &mtr);
osku's avatar
osku committed
4624 4625 4626
			}
		}
	}
4627

osku's avatar
osku committed
4628
	mtr_commit(&mtr);
4629
	btr_pcur_close(&pcur);
osku's avatar
osku committed
4630 4631
	mem_heap_free(heap);

4632 4633 4634 4635 4636
#ifdef HAVE_ATOMIC_BUILTINS
	os_atomic_increment_ulint(&ibuf->n_merges, 1);
	ibuf_add_ops(ibuf->n_merged_ops, mops);
	ibuf_add_ops(ibuf->n_discarded_ops, dops);
#else /* HAVE_ATOMIC_BUILTINS */
osku's avatar
osku committed
4637 4638 4639
	/* Protect our statistics keeping from race conditions */
	mutex_enter(&ibuf_mutex);

4640 4641 4642
	ibuf->n_merges++;
	ibuf_add_ops(ibuf->n_merged_ops, mops);
	ibuf_add_ops(ibuf->n_discarded_ops, dops);
osku's avatar
osku committed
4643 4644

	mutex_exit(&ibuf_mutex);
4645
#endif /* HAVE_ATOMIC_BUILTINS */
osku's avatar
osku committed
4646 4647 4648 4649 4650 4651 4652

	if (update_ibuf_bitmap && !tablespace_being_deleted) {

		fil_decr_pending_ibuf_merges(space);
	}

	ibuf_exit();
4653

4654
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
4655 4656 4657 4658
	ut_a(ibuf_count_get(space, page_no) == 0);
#endif
}

4659
/*********************************************************************//**
osku's avatar
osku committed
4660 4661 4662 4663
Deletes all entries in the insert buffer for a given space id. This is used
in DISCARD TABLESPACE and IMPORT TABLESPACE.
NOTE: this does not update the page free bitmaps in the space. The space will
become CORRUPT when you call this function! */
4664
UNIV_INTERN
osku's avatar
osku committed
4665 4666 4667
void
ibuf_delete_for_discarded_space(
/*============================*/
4668
	ulint	space)	/*!< in: space id */
osku's avatar
osku committed
4669 4670 4671 4672 4673 4674 4675 4676 4677
{
	mem_heap_t*	heap;
	btr_pcur_t	pcur;
	dtuple_t*	search_tuple;
	rec_t*		ibuf_rec;
	ulint		page_no;
	ibool		closed;
	mtr_t		mtr;

4678 4679
	/* Counts for discarded operations. */
	ulint		dops[IBUF_OP_COUNT];
osku's avatar
osku committed
4680 4681 4682 4683 4684 4685 4686

	heap = mem_heap_create(512);

	/* Use page number 0 to build the search tuple so that we get the
	cursor positioned at the first entry for this space id */

	search_tuple = ibuf_new_search_tuple_build(space, 0, heap);
4687

4688
	memset(dops, 0, sizeof(dops));
osku's avatar
osku committed
4689 4690 4691 4692 4693 4694 4695
loop:
	ibuf_enter();

	mtr_start(&mtr);

	/* Position pcur in the insert buffer at the first entry for the
	space */
4696 4697 4698 4699
	btr_pcur_open_on_user_rec(
		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
		&pcur, &mtr);

4700
	if (!btr_pcur_is_on_user_rec(&pcur)) {
osku's avatar
osku committed
4701 4702 4703 4704 4705 4706
		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));

		goto leave_loop;
	}

	for (;;) {
4707
		ut_ad(btr_pcur_is_on_user_rec(&pcur));
osku's avatar
osku committed
4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718

		ibuf_rec = btr_pcur_get_rec(&pcur);

		/* Check if the entry is for this space */
		if (ibuf_rec_get_space(ibuf_rec) != space) {

			goto leave_loop;
		}

		page_no = ibuf_rec_get_page_no(ibuf_rec);

4719
		dops[ibuf_rec_get_op_type(ibuf_rec)]++;
4720

osku's avatar
osku committed
4721 4722
		/* Delete the record from ibuf */
		closed = ibuf_delete_rec(space, page_no, &pcur, search_tuple,
4723
					 &mtr);
osku's avatar
osku committed
4724 4725 4726 4727 4728 4729 4730 4731 4732
		if (closed) {
			/* Deletion was pessimistic and mtr was committed:
			we start from the beginning again */

			ibuf_exit();

			goto loop;
		}

4733
		if (btr_pcur_is_after_last_on_page(&pcur)) {
osku's avatar
osku committed
4734
			mtr_commit(&mtr);
4735
			btr_pcur_close(&pcur);
osku's avatar
osku committed
4736 4737 4738 4739 4740 4741 4742 4743 4744

			ibuf_exit();

			goto loop;
		}
	}

leave_loop:
	mtr_commit(&mtr);
4745
	btr_pcur_close(&pcur);
osku's avatar
osku committed
4746

4747 4748 4749
#ifdef HAVE_ATOMIC_BUILTINS
	ibuf_add_ops(ibuf->n_discarded_ops, dops);
#else /* HAVE_ATOMIC_BUILTINS */
osku's avatar
osku committed
4750 4751
	/* Protect our statistics keeping from race conditions */
	mutex_enter(&ibuf_mutex);
4752
	ibuf_add_ops(ibuf->n_discarded_ops, dops);
osku's avatar
osku committed
4753
	mutex_exit(&ibuf_mutex);
4754
#endif /* HAVE_ATOMIC_BUILTINS */
4755

osku's avatar
osku committed
4756 4757 4758 4759 4760
	ibuf_exit();

	mem_heap_free(heap);
}

4761
/******************************************************************//**
4762 4763
Looks if the insert buffer is empty.
@return	TRUE if empty */
4764
UNIV_INTERN
osku's avatar
osku committed
4765 4766 4767 4768 4769
ibool
ibuf_is_empty(void)
/*===============*/
{
	ibool		is_empty;
4770
	const page_t*	root;
osku's avatar
osku committed
4771 4772 4773 4774 4775
	mtr_t		mtr;

	ibuf_enter();
	mtr_start(&mtr);

4776
	mutex_enter(&ibuf_mutex);
4777
	root = ibuf_tree_root_get(&mtr);
osku's avatar
osku committed
4778 4779
	mutex_exit(&ibuf_mutex);

4780
	is_empty = (page_get_n_recs(root) == 0);
4781
	mtr_commit(&mtr);
osku's avatar
osku committed
4782 4783
	ibuf_exit();

4784 4785
	ut_a(is_empty == ibuf->empty);

osku's avatar
osku committed
4786 4787 4788
	return(is_empty);
}

4789
/******************************************************************//**
osku's avatar
osku committed
4790
Prints info of ibuf. */
4791
UNIV_INTERN
osku's avatar
osku committed
4792 4793 4794
void
ibuf_print(
/*=======*/
4795
	FILE*	file)	/*!< in: file where to print */
osku's avatar
osku committed
4796
{
4797
#ifdef UNIV_IBUF_COUNT_DEBUG
osku's avatar
osku committed
4798
	ulint		i;
4799
	ulint		j;
osku's avatar
osku committed
4800 4801 4802 4803
#endif

	mutex_enter(&ibuf_mutex);

4804
	fprintf(file,
4805 4806
		"Ibuf: size %lu, free list len %lu,"
		" seg size %lu, %lu merges\n",
4807 4808 4809 4810 4811
		(ulong) ibuf->size,
		(ulong) ibuf->free_list_len,
		(ulong) ibuf->seg_size,
		(ulong) ibuf->n_merges);

4812
	fputs("merged operations:\n ", file);
4813 4814
	ibuf_print_ops(ibuf->n_merged_ops, file);

4815
	fputs("discarded operations:\n ", file);
4816 4817
	ibuf_print_ops(ibuf->n_discarded_ops, file);

4818
#ifdef UNIV_IBUF_COUNT_DEBUG
4819 4820 4821
	for (i = 0; i < IBUF_COUNT_N_SPACES; i++) {
		for (j = 0; j < IBUF_COUNT_N_PAGES; j++) {
			ulint	count = ibuf_count_get(i, j);
osku's avatar
osku committed
4822

4823
			if (count > 0) {
osku's avatar
osku committed
4824
				fprintf(stderr,
4825 4826 4827
					"Ibuf count for space/page %lu/%lu"
					" is %lu\n",
					(ulong) i, (ulong) j, (ulong) count);
osku's avatar
osku committed
4828 4829 4830
			}
		}
	}
4831
#endif /* UNIV_IBUF_COUNT_DEBUG */
osku's avatar
osku committed
4832 4833 4834

	mutex_exit(&ibuf_mutex);
}
4835
#endif /* !UNIV_HOTBACKUP */