buf0flu.cc 75.8 KB
Newer Older
1 2
/*****************************************************************************

Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
3
Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
Copyright (c) 2013, 2022, MariaDB Corporation.
5
Copyright (c) 2013, 2014, Fusion-io
6 7 8 9 10 11 12 13 14 15 16

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
17
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 19 20 21 22 23 24 25 26 27

*****************************************************************************/

/**************************************************//**
@file buf/buf0flu.cc
The database buffer buf_pool flush algorithm

Created 11/11/1995 Heikki Tuuri
*******************************************************/

28
#include "univ.i"
29
#include <my_service_manager.h>
30
#include <mysql/service_thd_wait.h>
31
#include <sql_class.h>
32

33 34 35
#include "buf0flu.h"
#include "buf0buf.h"
#include "buf0checksum.h"
36
#include "buf0dblwr.h"
37 38 39
#include "srv0start.h"
#include "page0zip.h"
#include "fil0fil.h"
40
#include "log0crypt.h"
41
#include "srv0mon.h"
42
#include "fil0pagecompress.h"
43 44
#include "lzo/lzo1x.h"
#include "snappy-c.h"
45

46 47
/** Number of pages flushed via LRU. Protected by buf_pool.mutex.
Also included in buf_flush_page_count. */
48
ulint buf_lru_flush_page_count;
49

50 51 52
/** Number of pages freed without flushing. Protected by buf_pool.mutex. */
ulint buf_lru_freed_page_count;

53 54 55 56
/** Number of pages flushed. Protected by buf_pool.mutex. */
ulint buf_flush_page_count;

/** Flag indicating if the page_cleaner is in active state. */
57
Atomic_relaxed<bool> buf_page_cleaner_is_active;
58 59 60

/** Factor for scan length to determine n_pages for intended oldest LSN
progress */
61
static constexpr ulint buf_flush_lsn_scan_factor = 3;
62

63 64 65
/** Average redo generation rate */
static lsn_t lsn_avg_rate = 0;

66 67 68 69 70
/** Target oldest_modification for the page cleaner background flushing;
writes are protected by buf_pool.flush_list_mutex */
static Atomic_relaxed<lsn_t> buf_flush_async_lsn;
/** Target oldest_modification for the page cleaner furious flushing;
writes are protected by buf_pool.flush_list_mutex */
71
static Atomic_relaxed<lsn_t> buf_flush_sync_lsn;
72

73
#ifdef UNIV_PFS_THREAD
74
mysql_pfs_key_t page_cleaner_thread_key;
75 76
#endif /* UNIV_PFS_THREAD */

77
/** Page cleaner structure */
78 79 80 81 82 83 84
static struct
{
  /** total elapsed time in adaptive flushing, in seconds */
  ulint flush_time;
  /** number of adaptive flushing passes */
  ulint flush_pass;
} page_cleaner;
85

86 87 88 89 90 91 92 93
/** If LRU list of a buf_pool is less than this size then LRU eviction
should not happen. This is because when we do LRU flushing we also put
the blocks on free list. If LRU list is very small then we can end up
in thrashing. */
#define BUF_LRU_MIN_LEN		256

/* @} */

94
#ifdef UNIV_DEBUG
95 96
/** Validate the flush list. */
static void buf_flush_validate_low();
97

98 99
/** Validates the flush list some of the time. */
static void buf_flush_validate_skip()
100 101 102 103 104 105 106 107 108 109 110 111 112
{
/** Try buf_flush_validate_low() every this many times */
# define BUF_FLUSH_VALIDATE_SKIP	23

	/** The buf_flush_validate_low() call skip counter.
	Use a signed type because of the race condition below. */
	static int buf_flush_validate_count = BUF_FLUSH_VALIDATE_SKIP;

	/* There is a race condition below, but it does not matter,
	because this call is only for heuristic purposes. We want to
	reduce the call frequency of the costly buf_flush_validate_low()
	check in debug builds. */
	if (--buf_flush_validate_count > 0) {
113
		return;
114 115 116
	}

	buf_flush_validate_count = BUF_FLUSH_VALIDATE_SKIP;
117
	buf_flush_validate_low();
118
}
119
#endif /* UNIV_DEBUG */
120

121
/** Wake up the page cleaner if needed */
122
void buf_pool_t::page_cleaner_wakeup()
123
{
124 125 126 127 128
  if (!page_cleaner_idle())
    return;
  double dirty_pct= double(UT_LIST_GET_LEN(buf_pool.flush_list)) * 100.0 /
    double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free));
  double pct_lwm= srv_max_dirty_pages_pct_lwm;
129

130
  /* if pct_lwm != 0.0, adaptive flushing is enabled.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
  signal buf page cleaner thread
  - if pct_lwm <= dirty_pct then it will invoke apdative flushing flow
  - if pct_lwm > dirty_pct then it will invoke idle flushing flow.

  idle_flushing:
  dirty_pct < innodb_max_dirty_pages_pct_lwm so it could be an
  idle flushing use-case.

  Why is last_activity_count not updated always?
  - let's first understand when is server activity count updated.
  - it is updated on commit of a transaction trx_t::commit() and not
    on adding a page to the flush list.
  - page_cleaner_wakeup is called when a page is added to the flush list.

  - now let's say the first user thread, updates the count from X -> Y but
    is yet to commit the transaction (so activity count is still Y).
    followup user threads will see the updated count as (Y) that is matching
    the universal server activity count (Y), giving a false impression that
    the server is idle.

  How to avoid this?
  - by allowing last_activity_count to updated when page-cleaner is made
    active and has work to do. This ensures that the last_activity signal
    is consumed by the page-cleaner before the next one is generated. */
155
  if ((pct_lwm != 0.0 && pct_lwm <= dirty_pct) ||
156
      (pct_lwm != 0.0 && last_activity_count == srv_get_activity_count()) ||
157
      srv_max_buf_pool_modified_pct <= dirty_pct)
158 159
  {
    page_cleaner_is_idle= false;
160
    pthread_cond_signal(&do_flush_list);
161 162 163
  }
}

164 165 166 167 168 169 170 171
inline void buf_pool_t::delete_from_flush_list_low(buf_page_t *bpage)
{
  ut_ad(!fsp_is_system_temporary(bpage->id().space()));
  mysql_mutex_assert_owner(&flush_list_mutex);
  flush_hp.adjust(bpage);
  UT_LIST_REMOVE(flush_list, bpage);
}

172
/** Insert a modified block into the flush list.
173 174 175
@param block    modified block
@param lsn      start LSN of the mini-transaction that modified the block */
void buf_pool_t::insert_into_flush_list(buf_block_t *block, lsn_t lsn)
176
{
177 178 179 180
  mysql_mutex_assert_not_owner(&mutex);
  mysql_mutex_assert_owner(&log_sys.flush_order_mutex);
  ut_ad(lsn > 2);
  ut_ad(!fsp_is_system_temporary(block->page.id().space()));
181

182 183 184 185 186 187 188 189 190 191 192 193
  mysql_mutex_lock(&flush_list_mutex);
  if (ut_d(const lsn_t old=) block->page.oldest_modification())
  {
    ut_ad(old == 1);
    delete_from_flush_list_low(&block->page);
  }
  else
    stat.flush_list_bytes+= block->physical_size();
  ut_ad(stat.flush_list_bytes <= curr_pool_size);

  block->page.set_oldest_modification(lsn);
  MEM_CHECK_DEFINED(block->page.zip.data
194
                    ? block->page.zip.data : block->page.frame,
195 196 197 198 199
                    block->physical_size());
  UT_LIST_ADD_FIRST(flush_list, &block->page);
  ut_d(buf_flush_validate_skip());
  page_cleaner_wakeup();
  mysql_mutex_unlock(&flush_list_mutex);
200 201
}

202 203 204 205
/** Remove a block from flush_list.
@param bpage   buffer pool page
@param clear   whether to invoke buf_page_t::clear_oldest_modification() */
void buf_pool_t::delete_from_flush_list(buf_page_t *bpage, bool clear)
206
{
207 208
  delete_from_flush_list_low(bpage);
  stat.flush_list_bytes-= bpage->physical_size();
209 210
  if (clear)
    bpage->clear_oldest_modification();
211 212 213 214 215
#ifdef UNIV_DEBUG
  buf_flush_validate_skip();
#endif /* UNIV_DEBUG */
}

216 217 218 219 220
/** Remove all dirty pages belonging to a given tablespace when we are
deleting the data file of that tablespace.
The pages still remain a part of LRU and are evicted from
the list as they age towards the tail of the LRU.
@param id    tablespace identifier */
221
void buf_flush_remove_pages(uint32_t id)
222 223 224 225 226 227 228 229
{
  const page_id_t first(id, 0), end(id + 1, 0);
  ut_ad(id);
  mysql_mutex_lock(&buf_pool.mutex);

  for (;;)
  {
    bool deferred= false;
230

231 232 233 234
    mysql_mutex_lock(&buf_pool.flush_list_mutex);

    for (buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.flush_list); bpage; )
    {
235 236 237
      const auto s= bpage->state();
      ut_ad(s >= buf_page_t::REMOVE_HASH);
      ut_ad(s < buf_page_t::READ_FIX || s >= buf_page_t::WRITE_FIX);
238 239 240 241 242
      buf_page_t *prev= UT_LIST_GET_PREV(list, bpage);

      const page_id_t bpage_id(bpage->id());

      if (bpage_id < first || bpage_id >= end);
243
      else if (s >= buf_page_t::WRITE_FIX)
244 245
        deferred= true;
      else
246
        buf_pool.delete_from_flush_list(bpage);
247 248 249 250 251 252 253 254 255 256

      bpage= prev;
    }

    mysql_mutex_unlock(&buf_pool.flush_list_mutex);

    if (!deferred)
      break;

    mysql_mutex_unlock(&buf_pool.mutex);
257
    std::this_thread::yield();
258 259 260 261 262 263 264
    mysql_mutex_lock(&buf_pool.mutex);
    buf_flush_wait_batch_end(false);
  }

  mysql_mutex_unlock(&buf_pool.mutex);
}

265 266 267 268 269 270 271 272 273 274 275
/*******************************************************************//**
Relocates a buffer control block on the flush_list.
Note that it is assumed that the contents of bpage have already been
copied to dpage.
IMPORTANT: When this function is called bpage and dpage are not
exact copies of each other. For example, they both will have different
::state. Also the ::list pointers in dpage may be stale. We need to
use the current list node (bpage) to do the list manipulation because
the list pointers could have changed between the time that we copied
the contents of bpage to the dpage and the flush list manipulation
below. */
276
ATTRIBUTE_COLD
277 278 279 280 281 282 283 284
void
buf_flush_relocate_on_flush_list(
/*=============================*/
	buf_page_t*	bpage,	/*!< in/out: control block being moved */
	buf_page_t*	dpage)	/*!< in/out: destination block */
{
	buf_page_t*	prev;

285
	mysql_mutex_assert_owner(&buf_pool.flush_list_mutex);
286
	ut_ad(!fsp_is_system_temporary(bpage->id().space()));
287

288
	const lsn_t lsn = bpage->oldest_modification();
289 290

	if (!lsn) {
291 292 293
		return;
	}

294 295
	ut_ad(lsn == 1 || lsn > 2);
	ut_ad(dpage->oldest_modification() == lsn);
296

297 298 299
	/* Important that we adjust the hazard pointer before removing
	the bpage from the flush list. */
	buf_pool.flush_hp.adjust(bpage);
300

301 302
	prev = UT_LIST_GET_PREV(list, bpage);
	UT_LIST_REMOVE(buf_pool.flush_list, bpage);
303

304
	bpage->clear_oldest_modification();
305 306

	if (lsn == 1) {
Marko Mäkelä's avatar
Marko Mäkelä committed
307
		buf_pool.stat.flush_list_bytes -= dpage->physical_size();
308 309
		dpage->list.prev = nullptr;
		dpage->list.next = nullptr;
310 311
		dpage->clear_oldest_modification();
	} else if (prev) {
312
		ut_ad(prev->oldest_modification());
313
		UT_LIST_INSERT_AFTER(buf_pool.flush_list, prev, dpage);
314
	} else {
315
		UT_LIST_ADD_FIRST(buf_pool.flush_list, dpage);
316 317
	}

318 319
	ut_d(buf_flush_validate_low());
}
320

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
/** Note that a block is no longer dirty, while not removing
it from buf_pool.flush_list */
inline void buf_page_t::write_complete(bool temporary)
{
  ut_ad(temporary == fsp_is_system_temporary(id().space()));
  if (temporary)
  {
    ut_ad(oldest_modification() == 2);
    oldest_modification_= 0;
  }
  else
  {
    /* We use release memory order to guarantee that callers of
    oldest_modification_acquire() will observe the block as
    being detached from buf_pool.flush_list, after reading the value 0. */
    ut_ad(oldest_modification() > 2);
    oldest_modification_.store(1, std::memory_order_release);
  }
  const auto s= state();
  ut_ad(s >= WRITE_FIX);
  zip.fix.fetch_sub((s >= WRITE_FIX_REINIT)
                    ? (WRITE_FIX_REINIT - UNFIXED)
                    : (WRITE_FIX - UNFIXED));
  lock.u_unlock(true);
}

347
/** Complete write of a file page from buf_pool.
348 349
@param request write request */
void buf_page_write_complete(const IORequest &request)
350 351
{
  ut_ad(request.is_write());
352
  ut_ad(!srv_read_only_mode);
353 354
  buf_page_t *bpage= request.bpage;
  ut_ad(bpage);
355 356 357 358
  const auto state= bpage->state();
  /* io-fix can only be cleared by buf_page_t::write_complete()
  and buf_page_t::read_complete() */
  ut_ad(state >= buf_page_t::WRITE_FIX);
359
  ut_ad(!buf_dblwr.is_inside(bpage->id()));
360 361
  ut_ad(request.node->space->id == bpage->id().space());

362 363
  if (state < buf_page_t::WRITE_FIX_REINIT &&
      request.node->space->use_doublewrite())
364
  {
365 366
    ut_ad(request.node->space != fil_system.temp_space);
    buf_dblwr.write_completed();
367
  }
368

369 370
  if (request.slot)
    request.slot->release();
371

372
  if (UNIV_UNLIKELY(MONITOR_IS_ON(MONITOR_MODULE_BUF_PAGE)))
373
    buf_page_monitor(*bpage, false);
374 375
  DBUG_PRINT("ib_buf", ("write page %u:%u",
                        bpage->id().space(), bpage->id().page_no()));
376
  const bool temp= fsp_is_system_temporary(bpage->id().space());
377 378

  mysql_mutex_lock(&buf_pool.mutex);
379
  mysql_mutex_assert_not_owner(&buf_pool.flush_list_mutex);
380
  buf_pool.stat.n_pages_written++;
381
  bpage->write_complete(temp);
382

383
  if (request.is_LRU())
384
  {
385
    buf_LRU_free_page(bpage, true);
386

387 388
    ut_ad(buf_pool.n_flush_LRU_);
    if (!--buf_pool.n_flush_LRU_)
389
    {
390
      pthread_cond_broadcast(&buf_pool.done_flush_LRU);
391 392
      pthread_cond_signal(&buf_pool.done_free);
    }
393 394 395
  }
  else
  {
396
    ut_ad(!temp);
397 398
    ut_ad(buf_pool.n_flush_list_);
    if (!--buf_pool.n_flush_list_)
399
      pthread_cond_broadcast(&buf_pool.done_flush_list);
400
  }
401

402
  mysql_mutex_unlock(&buf_pool.mutex);
403 404
}

405
/** Calculate a ROW_FORMAT=COMPRESSED page checksum and update the page.
406
@param[in,out]	page		page to update
407 408
@param[in]	size		compressed page size */
void buf_flush_update_zip_checksum(buf_frame_t *page, ulint size)
409
{
410 411
  ut_ad(size > 0);
  mach_write_to_4(page + FIL_PAGE_SPACE_OR_CHKSUM,
412
                  page_zip_calc_checksum(page, size, false));
413 414
}

415 416 417 418
/** Assign the full crc32 checksum for non-compressed page.
@param[in,out]	page	page to be updated */
void buf_flush_assign_full_crc32_checksum(byte* page)
{
419 420 421 422 423 424 425 426 427
	ut_d(bool compressed = false);
	ut_d(bool corrupted = false);
	ut_d(const uint size = buf_page_full_crc32_size(page, &compressed,
							&corrupted));
	ut_ad(!compressed);
	ut_ad(!corrupted);
	ut_ad(size == uint(srv_page_size));
	const ulint payload = srv_page_size - FIL_PAGE_FCRC32_CHECKSUM;
	mach_write_to_4(page + payload, ut_crc32(page, payload));
428 429
}

430
/** Initialize a page for writing to the tablespace.
431 432 433 434 435 436
@param[in]	block			buffer block; NULL if bypassing
					the buffer pool
@param[in,out]	page			page frame
@param[in,out]	page_zip_		compressed page, or NULL if
					uncompressed
@param[in]	use_full_checksum	whether tablespace uses full checksum */
437 438
void
buf_flush_init_for_writing(
439 440 441
	const buf_block_t*	block,
	byte*			page,
	void*			page_zip_,
442
	bool			use_full_checksum)
443
{
444
	if (block && block->page.frame != page) {
445 446 447 448 449 450
		/* If page is encrypted in full crc32 format then
		checksum stored already as a part of fil_encrypt_buf() */
		ut_ad(use_full_checksum);
		return;
	}

451
	ut_ad(!block || block->page.frame == page);
452 453 454 455
	ut_ad(page);

	if (page_zip_) {
		page_zip_des_t*	page_zip;
456
		ulint		size;
457 458

		page_zip = static_cast<page_zip_des_t*>(page_zip_);
459
		ut_ad(!block || &block->page.zip == page_zip);
460
		size = page_zip_get_size(page_zip);
461

462 463 464
		ut_ad(size);
		ut_ad(ut_is_2pow(size));
		ut_ad(size <= UNIV_ZIP_SIZE_MAX);
465

466
		switch (fil_page_get_type(page)) {
467 468 469 470 471 472
		case FIL_PAGE_TYPE_ALLOCATED:
		case FIL_PAGE_INODE:
		case FIL_PAGE_IBUF_BITMAP:
		case FIL_PAGE_TYPE_FSP_HDR:
		case FIL_PAGE_TYPE_XDES:
			/* These are essentially uncompressed pages. */
473
			memcpy(page_zip->data, page, size);
474 475 476 477
			/* fall through */
		case FIL_PAGE_TYPE_ZBLOB:
		case FIL_PAGE_TYPE_ZBLOB2:
		case FIL_PAGE_INDEX:
478
		case FIL_PAGE_RTREE:
479
			buf_flush_update_zip_checksum(page_zip->data, size);
480 481 482
			return;
		}

483 484 485
		ib::error() << "The compressed page to be written"
			" seems corrupt:";
		ut_print_buf(stderr, page, size);
486
		fputs("\nInnoDB: Possibly older version of the page:", stderr);
487
		ut_print_buf(stderr, page_zip->data, size);
488 489 490 491
		putc('\n', stderr);
		ut_error;
	}

492
	if (use_full_checksum) {
493 494 495 496 497
		static_assert(FIL_PAGE_FCRC32_END_LSN % 4 == 0, "aligned");
		static_assert(FIL_PAGE_LSN % 4 == 0, "aligned");
		memcpy_aligned<4>(page + srv_page_size
				  - FIL_PAGE_FCRC32_END_LSN,
				  FIL_PAGE_LSN + 4 + page, 4);
Marko Mäkelä's avatar
Marko Mäkelä committed
498
		return buf_flush_assign_full_crc32_checksum(page);
499
	}
500

Marko Mäkelä's avatar
Marko Mäkelä committed
501 502 503 504 505
	static_assert(FIL_PAGE_END_LSN_OLD_CHKSUM % 8 == 0, "aligned");
	static_assert(FIL_PAGE_LSN % 8 == 0, "aligned");
	memcpy_aligned<8>(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM,
			  FIL_PAGE_LSN + page, 8);

506 507 508 509 510 511 512
	if (block && srv_page_size == 16384) {
		/* The page type could be garbage in old files
		created before MySQL 5.5. Such files always
		had a page size of 16 kilobytes. */
		ulint	page_type = fil_page_get_type(page);
		ulint	reset_type = page_type;

513
		switch (block->page.id().page_no() % 16384) {
514
		case 0:
515
			reset_type = block->page.id().page_no() == 0
516 517 518 519 520 521 522
				? FIL_PAGE_TYPE_FSP_HDR
				: FIL_PAGE_TYPE_XDES;
			break;
		case 1:
			reset_type = FIL_PAGE_IBUF_BITMAP;
			break;
		case FSP_TRX_SYS_PAGE_NO:
523 524
			if (block->page.id()
			    == page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO)) {
525
				reset_type = FIL_PAGE_TYPE_TRX_SYS;
526
				break;
527 528 529 530 531
			}
			/* fall through */
		default:
			switch (page_type) {
			case FIL_PAGE_INDEX:
532
			case FIL_PAGE_TYPE_INSTANT:
533 534 535 536 537 538 539 540 541 542
			case FIL_PAGE_RTREE:
			case FIL_PAGE_UNDO_LOG:
			case FIL_PAGE_INODE:
			case FIL_PAGE_IBUF_FREE_LIST:
			case FIL_PAGE_TYPE_ALLOCATED:
			case FIL_PAGE_TYPE_SYS:
			case FIL_PAGE_TYPE_TRX_SYS:
			case FIL_PAGE_TYPE_BLOB:
			case FIL_PAGE_TYPE_ZBLOB:
			case FIL_PAGE_TYPE_ZBLOB2:
543
				break;
544 545 546 547 548 549
			case FIL_PAGE_TYPE_FSP_HDR:
			case FIL_PAGE_TYPE_XDES:
			case FIL_PAGE_IBUF_BITMAP:
				/* These pages should have
				predetermined page numbers
				(see above). */
550
			default:
551 552
				reset_type = FIL_PAGE_TYPE_UNKNOWN;
				break;
553 554 555
			}
		}

556 557 558
		if (UNIV_UNLIKELY(page_type != reset_type)) {
			ib::info()
				<< "Resetting invalid page "
559
				<< block->page.id() << " type "
560 561 562
				<< page_type << " to "
				<< reset_type << " when flushing.";
			fil_page_set_type(page, reset_type);
563
		}
564 565
	}

566 567
	const uint32_t checksum = buf_calc_page_crc32(page);
	mach_write_to_4(page + FIL_PAGE_SPACE_OR_CHKSUM, checksum);
568
	mach_write_to_4(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM,
569 570 571
			checksum);
}

572 573 574 575 576 577 578 579 580
/** Reserve a buffer for compression.
@param[in,out]  slot    reserved slot */
static void buf_tmp_reserve_compression_buf(buf_tmp_buffer_t* slot)
{
  if (slot->comp_buf)
    return;
  /* Both Snappy and LZO compression methods require that the output
  buffer be bigger than input buffer. Adjust the allocated size. */
  ulint size= srv_page_size;
581 582 583 584
  if (provider_service_lzo->is_loaded)
    size+= LZO1X_1_15_MEM_COMPRESS;
  else if (provider_service_snappy->is_loaded)
    size= snappy_max_compressed_length(size);
585 586 587 588 589 590 591 592 593 594 595
  slot->comp_buf= static_cast<byte*>(aligned_malloc(size, srv_page_size));
}

/** Encrypt a buffer of temporary tablespace
@param[in]      offset  Page offset
@param[in]      s       Page to encrypt
@param[in,out]  d       Output buffer
@return encrypted buffer or NULL */
static byte* buf_tmp_page_encrypt(ulint offset, const byte* s, byte* d)
{
  /* Calculate the start offset in a page */
596 597 598
  uint srclen= static_cast<uint>(srv_page_size) -
    (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION +
     FIL_PAGE_FCRC32_CHECKSUM);
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
  const byte* src= s + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
  byte* dst= d + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;

  memcpy(d, s, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);

  if (!log_tmp_block_encrypt(src, srclen, dst, (offset * srv_page_size), true))
    return NULL;

  const ulint payload= srv_page_size - FIL_PAGE_FCRC32_CHECKSUM;
  mach_write_to_4(d + payload, ut_crc32(d, payload));

  srv_stats.pages_encrypted.inc();
  srv_stats.n_temp_blocks_encrypted.inc();
  return d;
}

/** Encryption and page_compression hook that is called just before
a page is written to disk.
@param[in,out]  space   tablespace
@param[in,out]  bpage   buffer page
@param[in]      s       physical page frame that is being encrypted
620
@param[in,out]  size    payload size in bytes
621 622
@return page frame to be written to file
(may be src_frame or an encrypted/compressed copy of it) */
623
static byte *buf_page_encrypt(fil_space_t* space, buf_page_t* bpage, byte* s,
624
                              buf_tmp_buffer_t **slot, size_t *size)
625
{
626
  ut_ad(!bpage->is_freed());
627
  ut_ad(space->id == bpage->id().space());
628
  ut_ad(!*slot);
629 630

  ut_d(fil_page_type_validate(space, s));
631
  const uint32_t page_no= bpage->id().page_no();
632

633
  switch (page_no) {
634
  case TRX_SYS_PAGE_NO:
635
    if (bpage->id().space() != TRX_SYS_SPACE)
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
      break;
    /* The TRX_SYS page is neither encrypted nor compressed, because
    it contains the address of the doublewrite buffer. */
    /* fall through */
  case 0:
    /* Page 0 of a tablespace is not encrypted/compressed */
    return s;
  }

  fil_space_crypt_t *crypt_data= space->crypt_data;
  bool encrypted, page_compressed;
  if (space->purpose == FIL_TYPE_TEMPORARY)
  {
    ut_ad(!crypt_data);
    encrypted= innodb_encrypt_temporary_tables;
    page_compressed= false;
  }
  else
  {
    encrypted= crypt_data && !crypt_data->not_encrypted() &&
      crypt_data->type != CRYPT_SCHEME_UNENCRYPTED &&
      (!crypt_data->is_default_encryption() || srv_encrypt_tables);
    page_compressed= space->is_compressed();
  }

  const bool full_crc32= space->full_crc32();

  if (!encrypted && !page_compressed)
  {
    /* No need to encrypt or compress. Clear key-version & crypt-checksum. */
666 667 668
    static_assert(FIL_PAGE_FCRC32_KEY_VERSION % 4 == 0, "alignment");
    static_assert(FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION % 4 == 2,
                  "not perfect alignment");
669
    if (full_crc32)
670
      memset_aligned<4>(s + FIL_PAGE_FCRC32_KEY_VERSION, 0, 4);
671
    else
672
      memset_aligned<2>(s + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0, 8);
673 674 675
    return s;
  }

676 677
  static_assert(FIL_PAGE_FCRC32_END_LSN % 4 == 0, "alignment");
  static_assert(FIL_PAGE_LSN % 8 == 0, "alignment");
678
  if (full_crc32)
679 680
    memcpy_aligned<4>(s + srv_page_size - FIL_PAGE_FCRC32_END_LSN,
                      FIL_PAGE_LSN + 4 + s, 4);
681 682 683

  ut_ad(!bpage->zip_size() || !page_compressed);
  /* Find free slot from temporary memory array */
684 685 686
  *slot= buf_pool.io_buf_reserve();
  ut_a(*slot);
  (*slot)->allocate();
687

688
  byte *d= (*slot)->crypt_buf;
689 690 691 692

  if (!page_compressed)
  {
not_compressed:
693
    d= space->purpose == FIL_TYPE_TEMPORARY
694 695
      ? buf_tmp_page_encrypt(page_no, s, d)
      : fil_space_encrypt(space, page_no, s, d);
696 697 698 699 700
  }
  else
  {
    ut_ad(space->purpose != FIL_TYPE_TEMPORARY);
    /* First we compress the page content */
701 702
    buf_tmp_reserve_compression_buf(*slot);
    byte *tmp= (*slot)->comp_buf;
703
    ulint len= fil_page_compress(s, tmp, space->flags,
704
                                 fil_space_get_block_size(space, page_no),
705 706 707 708 709
                                 encrypted);

    if (!len)
      goto not_compressed;

710
    *size= len;
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729

    if (full_crc32)
    {
      ut_d(bool compressed = false);
      len= buf_page_full_crc32_size(tmp,
#ifdef UNIV_DEBUG
                                    &compressed,
#else
                                    NULL,
#endif
                                    NULL);
      ut_ad(compressed);
    }

    /* Workaround for MDEV-15527. */
    memset(tmp + len, 0 , srv_page_size - len);
    ut_d(fil_page_type_validate(space, tmp));

    if (encrypted)
730
      tmp= fil_space_encrypt(space, page_no, tmp, d);
731 732 733

    if (full_crc32)
    {
734
      static_assert(FIL_PAGE_FCRC32_CHECKSUM == 4, "alignment");
735 736 737 738
      mach_write_to_4(tmp + len - 4, ut_crc32(tmp, len - 4));
      ut_ad(!buf_page_is_corrupted(true, tmp, space->flags));
    }

739
    d= tmp;
740 741 742
  }

  ut_d(fil_page_type_validate(space, d));
743
  (*slot)->out_buf= d;
744 745 746
  return d;
}

747 748
/** Free a page whose underlying file page has been freed. */
inline void buf_pool_t::release_freed_page(buf_page_t *bpage)
749
{
750
  mysql_mutex_assert_owner(&mutex);
751
  mysql_mutex_lock(&flush_list_mutex);
752
  ut_d(const lsn_t oldest_modification= bpage->oldest_modification();)
753 754
  if (fsp_is_system_temporary(bpage->id().space()))
  {
755
    ut_ad(bpage->frame);
756
    ut_ad(oldest_modification == 2);
757 758 759
  }
  else
  {
760
    ut_ad(oldest_modification > 2);
761 762
    delete_from_flush_list(bpage, false);
  }
763
  bpage->clear_oldest_modification();
764
  mysql_mutex_unlock(&flush_list_mutex);
765
  bpage->lock.u_unlock(true);
Marko Mäkelä's avatar
Marko Mäkelä committed
766

767
  buf_LRU_free_page(bpage, true);
768 769
}

770
/** Write a flushable page to a file. buf_pool.mutex must be held.
771 772
@param lru         true=buf_pool.LRU; false=buf_pool.flush_list
@param space       tablespace
773
@return whether the page was flushed and buf_pool.mutex was released */
774
inline bool buf_page_t::flush(bool lru, fil_space_t *space)
775
{
776 777
  ut_ad(in_file());
  ut_ad(in_LRU_list);
778 779
  ut_ad((space->purpose == FIL_TYPE_TEMPORARY) ==
        (space == fil_system.temp_space));
780
  ut_ad(space->referenced());
781
  ut_ad(lru || space != fil_system.temp_space);
782

783 784
  if (!lock.u_lock_try(true))
    return false;
785

786 787
  const auto s= state();
  ut_a(s >= FREED);
788

789
  if (s < UNFIXED)
790
  {
791 792 793
    buf_pool.release_freed_page(this);
    mysql_mutex_unlock(&buf_pool.mutex);
    return true;
794
  }
795

796
  if (s >= READ_FIX || oldest_modification() < 2)
797
  {
798 799
    lock.u_unlock(true);
    return false;
800 801
  }

802
  mysql_mutex_assert_not_owner(&buf_pool.flush_list_mutex);
803

804 805 806
  /* Apart from the U-lock, this block will also be protected by
  is_write_fixed() and oldest_modification()>1.
  Thus, it cannot be relocated or removed. */
807

808 809
  DBUG_PRINT("ib_buf", ("%s %u page %u:%u",
                        lru ? "LRU" : "flush_list",
810 811 812 813
                        id().space(), id().page_no()));
  ut_d(const auto f=) zip.fix.fetch_add(WRITE_FIX - UNFIXED);
  ut_ad(f >= UNFIXED);
  ut_ad(f < READ_FIX);
814
  ut_ad(space == fil_system.temp_space
815 816 817 818 819 820 821 822 823 824 825 826 827 828
        ? oldest_modification() == 2
        : oldest_modification() > 2);
  if (lru)
  {
    ut_ad(buf_pool.n_flush_LRU_ < ULINT_UNDEFINED);
    buf_pool.n_flush_LRU_++;
  }
  else
  {
    ut_ad(buf_pool.n_flush_list_ < ULINT_UNDEFINED);
    buf_pool.n_flush_list_++;
  }
  buf_flush_page_count++;

829
  mysql_mutex_unlock(&buf_pool.mutex);
830

831 832
  buf_block_t *block= reinterpret_cast<buf_block_t*>(this);
  page_t *write_frame= zip.data;
833

834 835 836 837 838 839 840
  space->reacquire();
  size_t size;
#if defined HAVE_FALLOC_PUNCH_HOLE_AND_KEEP_SIZE || defined _WIN32
  size_t orig_size;
#endif
  IORequest::Type type= lru ? IORequest::WRITE_LRU : IORequest::WRITE_ASYNC;
  buf_tmp_buffer_t *slot= nullptr;
841

842 843 844 845 846 847 848 849 850 851 852 853
  if (UNIV_UNLIKELY(!frame)) /* ROW_FORMAT=COMPRESSED */
  {
    ut_ad(!space->full_crc32());
    ut_ad(!space->is_compressed()); /* not page_compressed */
    size= zip_size();
#if defined HAVE_FALLOC_PUNCH_HOLE_AND_KEEP_SIZE || defined _WIN32
    orig_size= size;
#endif
    buf_flush_update_zip_checksum(write_frame, size);
    write_frame= buf_page_encrypt(space, this, write_frame, &slot, &size);
    ut_ad(size == zip_size());
  }
854 855
  else
  {
856 857
    byte *page= frame;
    size= block->physical_size();
858
#if defined HAVE_FALLOC_PUNCH_HOLE_AND_KEEP_SIZE || defined _WIN32
859
    orig_size= size;
860
#endif
861

862
    if (space->full_crc32())
863
    {
864 865 866 867 868
      /* innodb_checksum_algorithm=full_crc32 is not implemented for
      ROW_FORMAT=COMPRESSED pages. */
      ut_ad(!write_frame);
      page= buf_page_encrypt(space, this, page, &slot, &size);
      buf_flush_init_for_writing(block, page, nullptr, true);
869 870
    }
    else
871
    {
872 873 874 875 876
      buf_flush_init_for_writing(block, page, write_frame ? &zip : nullptr,
                                 false);
      page= buf_page_encrypt(space, this, write_frame ? write_frame : page,
                             &slot, &size);
    }
877

878
#if defined HAVE_FALLOC_PUNCH_HOLE_AND_KEEP_SIZE || defined _WIN32
879 880 881 882 883 884 885 886
    if (size != orig_size)
    {
      switch (space->chain.start->punch_hole) {
      case 1:
        type= lru ? IORequest::PUNCH_LRU : IORequest::PUNCH;
        break;
      case 2:
        size= orig_size;
887
      }
888
    }
889 890 891
#endif
    write_frame= page;
  }
892

893 894 895
  if ((s & LRU_MASK) == REINIT || !space->use_doublewrite())
  {
    if (UNIV_LIKELY(space->purpose == FIL_TYPE_TABLESPACE))
896
    {
897 898 899 900 901 902 903
      const lsn_t lsn=
        mach_read_from_8(my_assume_aligned<8>(FIL_PAGE_LSN +
                                              (write_frame ? write_frame
                                               : frame)));
      ut_ad(lsn >= oldest_modification());
      if (lsn > log_sys.get_flushed_lsn())
        log_write_up_to(lsn, true);
904
    }
905 906
    space->io(IORequest{type, this, slot}, physical_offset(), size,
              write_frame, this);
907
  }
908 909 910
  else
    buf_dblwr.add_to_batch(IORequest{this, slot, space->chain.start, type},
                           size);
911

912 913 914
  /* Increment the I/O operation count used for selecting LRU policy. */
  buf_LRU_stat_inc_io();
  return true;
915
}
916

917 918
/** Check whether a page can be flushed from the buf_pool.
@param id          page identifier
919 920
@param fold        id.fold()
@param lru         true=buf_pool.LRU; false=buf_pool.flush_list
921
@return whether the page can be flushed */
922
static bool buf_flush_check_neighbor(const page_id_t id, ulint fold, bool lru)
923
{
924 925
  mysql_mutex_assert_owner(&buf_pool.mutex);
  ut_ad(fold == id.fold());
926

927 928 929
  /* FIXME: cell_get() is being invoked while holding buf_pool.mutex */
  const buf_page_t *bpage=
    buf_pool.page_hash.get(id, buf_pool.page_hash.cell_get(fold));
930

931 932
  if (!bpage || buf_pool.watch_is_sentinel(*bpage))
    return false;
933

934 935
  /* We avoid flushing 'non-old' blocks in an LRU flush, because the
  flushed blocks are soon freed */
936 937
  if (lru && !bpage->is_old())
    return false;
938

939
  return bpage->oldest_modification() > 1 && bpage->ready_for_flush();
940
}
941

942 943 944
/** Check which neighbors of a page can be flushed from the buf_pool.
@param space       tablespace
@param id          page identifier of a dirty page
945
@param contiguous  whether to consider contiguous areas of pages
946
@param lru         true=buf_pool.LRU; false=buf_pool.flush_list
947 948
@return last page number that can be flushed */
static page_id_t buf_flush_check_neighbors(const fil_space_t &space,
949
                                           page_id_t &id, bool contiguous,
950
                                           bool lru)
951
{
952 953 954
  ut_ad(id.page_no() < space.size +
        (space.physical_size() == 2048 ? 1
         : space.physical_size() == 1024 ? 3 : 0));
955 956 957 958 959 960 961 962
  /* When flushed, dirty blocks are searched in neighborhoods of this
  size, and flushed along with the original page. */
  const ulint s= buf_pool.curr_size / 16;
  const uint32_t read_ahead= buf_pool.read_ahead_area;
  const uint32_t buf_flush_area= read_ahead > s
    ? static_cast<uint32_t>(s) : read_ahead;
  page_id_t low= id - (id.page_no() % buf_flush_area);
  page_id_t high= low + buf_flush_area;
963
  high.set_page_no(std::min(high.page_no(), space.last_page_number()));
964

965 966 967 968 969 970 971
  if (!contiguous)
  {
    high= std::max(id + 1, high);
    id= low;
    return high;
  }

972 973
  /* Determine the contiguous dirty area around id. */
  const ulint id_fold= id.fold();
974

975
  mysql_mutex_lock(&buf_pool.mutex);
976

977 978 979 980 981 982
  if (id > low)
  {
    ulint fold= id_fold;
    for (page_id_t i= id - 1;; --i)
    {
      fold--;
983
      if (!buf_flush_check_neighbor(i, fold, lru))
984 985 986 987 988 989 990 991
      {
        low= i + 1;
        break;
      }
      if (i == low)
        break;
    }
  }
992

993 994 995 996 997 998
  page_id_t i= id;
  id= low;
  ulint fold= id_fold;
  while (++i < high)
  {
    ++fold;
999
    if (!buf_flush_check_neighbor(i, fold, lru))
1000 1001
      break;
  }
1002

1003
  mysql_mutex_unlock(&buf_pool.mutex);
1004
  return i;
1005 1006
}

1007
MY_ATTRIBUTE((nonnull))
1008 1009
/** Write punch-hole or zeroes of the freed ranges when
innodb_immediate_scrub_data_uncompressed from the freed ranges.
1010
@param space   tablespace which may contain ranges of freed pages */
1011 1012
static void buf_flush_freed_pages(fil_space_t *space)
{
1013 1014
  const bool punch_hole= space->chain.start->punch_hole == 1;
  if (!punch_hole && !srv_immediate_scrub_data_uncompressed)
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
    return;
  lsn_t flush_to_disk_lsn= log_sys.get_flushed_lsn();

  std::unique_lock<std::mutex> freed_lock(space->freed_range_mutex);
  if (space->freed_ranges.empty()
      || flush_to_disk_lsn < space->get_last_freed_lsn())
  {
    freed_lock.unlock();
    return;
  }

  range_set freed_ranges= std::move(space->freed_ranges);
  freed_lock.unlock();

  for (const auto &range : freed_ranges)
  {
1031
    const ulint physical_size= space->physical_size();
1032 1033 1034

    if (punch_hole)
    {
1035
      space->reacquire();
1036 1037 1038 1039
      space->io(IORequest(IORequest::PUNCH_RANGE),
                          os_offset_t{range.first} * physical_size,
                          (range.last - range.first + 1) * physical_size,
                          nullptr);
1040
    }
1041
    else
1042
    {
1043
      for (os_offset_t i= range.first; i <= range.last; i++)
1044
      {
1045
        space->reacquire();
1046 1047 1048
        space->io(IORequest(IORequest::WRITE_ASYNC),
                  i * physical_size, physical_size,
                  const_cast<byte*>(field_ref_zero));
1049 1050 1051 1052 1053 1054 1055 1056
      }
    }
    buf_pool.stat.n_pages_written+= (range.last - range.first + 1);
  }
}

/** Flushes to disk all flushable pages within the flush area
and also write zeroes or punch the hole for the freed ranges of pages.
1057 1058 1059 1060 1061 1062
@param space       tablespace
@param page_id     page identifier
@param contiguous  whether to consider contiguous areas of pages
@param lru         true=buf_pool.LRU; false=buf_pool.flush_list
@param n_flushed   number of pages flushed so far in this batch
@param n_to_flush  maximum number of pages we are allowed to flush
1063
@return number of pages flushed */
1064 1065 1066 1067
static ulint buf_flush_try_neighbors(fil_space_t *space,
                                     const page_id_t page_id,
                                     bool contiguous, bool lru,
                                     ulint n_flushed, ulint n_to_flush)
1068
{
1069
  ut_ad(space->id == page_id.space());
1070

1071 1072 1073
  ulint count= 0;
  page_id_t id= page_id;
  page_id_t high= buf_flush_check_neighbors(*space, id, contiguous, lru);
1074

1075 1076
  ut_ad(page_id >= id);
  ut_ad(page_id < high);
1077

1078 1079
  for (ulint id_fold= id.fold(); id < high && !space->is_stopping();
       ++id, ++id_fold)
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  {
    if (count + n_flushed >= n_to_flush)
    {
      if (id > page_id)
        break;
      /* If the page whose neighbors we are flushing has not been
      flushed yet, we must flush the page that we selected originally. */
      id= page_id;
      id_fold= id.fold();
    }
1090

1091
    const buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id_fold);
1092
    mysql_mutex_lock(&buf_pool.mutex);
1093

1094
    if (buf_page_t *bpage= buf_pool.page_hash.get(id, chain))
1095 1096 1097 1098 1099 1100
    {
      ut_ad(bpage->in_file());
      /* We avoid flushing 'non-old' blocks in an LRU flush,
      because the flushed blocks are soon freed */
      if (!lru || id == page_id || bpage->is_old())
      {
1101
        if (!buf_pool.watch_is_sentinel(*bpage) &&
1102 1103
            bpage->oldest_modification() > 1 && bpage->ready_for_flush() &&
            bpage->flush(lru, space))
1104 1105 1106 1107 1108 1109
        {
          ++count;
          continue;
        }
      }
    }
1110

1111 1112
    mysql_mutex_unlock(&buf_pool.mutex);
  }
1113

1114 1115 1116 1117 1118 1119
  if (auto n= count - 1)
  {
    MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_NEIGHBOR_TOTAL_PAGE,
                                 MONITOR_FLUSH_NEIGHBOR_COUNT,
                                 MONITOR_FLUSH_NEIGHBOR_PAGES, n);
  }
1120

1121
  return count;
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
}

/*******************************************************************//**
This utility moves the uncompressed frames of pages to the free list.
Note that this function does not actually flush any data to disk. It
just detaches the uncompressed frames from the compressed pages at the
tail of the unzip_LRU and puts those freed frames in the free list.
Note that it is a best effort attempt and it is not guaranteed that
after a call to this function there will be 'max' blocks in the free
list.
1132
@param[in]	max		desired number of blocks in the free_list
1133
@return number of blocks moved to the free list. */
1134
static ulint buf_free_from_unzip_LRU_list_batch(ulint max)
1135 1136 1137 1138
{
	ulint		scanned = 0;
	ulint		count = 0;

1139
	mysql_mutex_assert_owner(&buf_pool.mutex);
1140

1141
	buf_block_t*	block = UT_LIST_GET_LAST(buf_pool.unzip_LRU);
1142

1143
	while (block
1144
	       && count < max
1145 1146 1147
	       && UT_LIST_GET_LEN(buf_pool.free) < srv_LRU_scan_depth
	       && UT_LIST_GET_LEN(buf_pool.unzip_LRU)
	       > UT_LIST_GET_LEN(buf_pool.LRU) / 10) {
1148 1149 1150

		++scanned;
		if (buf_LRU_free_page(&block->page, false)) {
1151
			/* Block was freed. buf_pool.mutex potentially
1152 1153
			released and reacquired */
			++count;
1154
			block = UT_LIST_GET_LAST(buf_pool.unzip_LRU);
1155 1156 1157 1158 1159
		} else {
			block = UT_LIST_GET_PREV(unzip_LRU, block);
		}
	}

1160
	mysql_mutex_assert_owner(&buf_pool.mutex);
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172

	if (scanned) {
		MONITOR_INC_VALUE_CUMULATIVE(
			MONITOR_LRU_BATCH_SCANNED,
			MONITOR_LRU_BATCH_SCANNED_NUM_CALL,
			MONITOR_LRU_BATCH_SCANNED_PER_CALL,
			scanned);
	}

	return(count);
}

1173 1174 1175 1176 1177 1178
/** Start writing out pages for a tablespace.
@param id   tablespace identifier
@return tablespace
@retval nullptr if the pages for this tablespace should be discarded */
static fil_space_t *buf_flush_space(const uint32_t id)
{
1179
  fil_space_t *space= fil_space_t::get(id);
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
  if (space)
    buf_flush_freed_pages(space);
  return space;
}

struct flush_counters_t
{
  /** number of dirty pages flushed */
  ulint flushed;
  /** number of clean pages evicted */
  ulint evicted;
};
1192

1193 1194 1195 1196 1197 1198 1199 1200 1201
/** Try to discard a dirty page.
@param bpage      dirty page whose tablespace is not accessible */
static void buf_flush_discard_page(buf_page_t *bpage)
{
  mysql_mutex_assert_owner(&buf_pool.mutex);
  mysql_mutex_assert_not_owner(&buf_pool.flush_list_mutex);
  ut_ad(bpage->in_file());
  ut_ad(bpage->oldest_modification());

1202 1203
  if (!bpage->lock.u_lock_try(false))
    return;
1204 1205

  mysql_mutex_lock(&buf_pool.flush_list_mutex);
1206
  buf_pool.delete_from_flush_list(bpage);
1207 1208
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);

1209 1210 1211 1212
  ut_d(const auto state= bpage->state());
  ut_ad(state == buf_page_t::FREED || state == buf_page_t::UNFIXED ||
        state == buf_page_t::IBUF_EXIST || state == buf_page_t::REINIT);
  bpage->lock.u_unlock();
1213 1214 1215 1216

  buf_LRU_free_page(bpage, true);
}

1217 1218 1219
/** Flush dirty blocks from the end of the LRU list.
@param max   maximum number of blocks to make available in buf_pool.free
@param n     counts of flushed and evicted pages */
1220
static void buf_flush_LRU_list_batch(ulint max, flush_counters_t *n)
1221
{
1222 1223
  ulint scanned= 0;
  ulint free_limit= srv_LRU_scan_depth;
1224 1225

  mysql_mutex_assert_owner(&buf_pool.mutex);
1226 1227 1228
  if (buf_pool.withdraw_target && buf_pool.curr_size < buf_pool.old_size)
    free_limit+= buf_pool.withdraw_target - UT_LIST_GET_LEN(buf_pool.withdraw);

1229 1230 1231
  const auto neighbors= UT_LIST_GET_LEN(buf_pool.LRU) < BUF_LRU_OLD_MIN_LEN
    ? 0 : srv_flush_neighbors;
  fil_space_t *space= nullptr;
1232 1233 1234
  uint32_t last_space_id= FIL_NULL;
  static_assert(FIL_NULL > SRV_TMP_SPACE_ID, "consistency");
  static_assert(FIL_NULL > SRV_SPACE_ID_UPPER_BOUND, "consistency");
1235

1236 1237 1238
  for (buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.LRU);
       bpage && n->flushed + n->evicted < max &&
       UT_LIST_GET_LEN(buf_pool.LRU) > BUF_LRU_MIN_LEN &&
1239
       UT_LIST_GET_LEN(buf_pool.free) < free_limit; ++scanned)
1240 1241
  {
    buf_page_t *prev= UT_LIST_GET_PREV(LRU, bpage);
1242
    const lsn_t oldest_modification= bpage->oldest_modification();
1243
    buf_pool.lru_hp.set(prev);
1244 1245 1246
    const auto state= bpage->state();
    ut_ad(state >= buf_page_t::FREED);
    ut_ad(bpage->in_LRU_list);
1247

1248
    if (oldest_modification <= 1)
1249
    {
1250 1251 1252
      if (state != buf_page_t::FREED &&
          (state >= buf_page_t::READ_FIX || (~buf_page_t::LRU_MASK & state)))
        goto must_skip;
1253 1254 1255
      if (buf_LRU_free_page(bpage, true))
        ++n->evicted;
    }
1256
    else if (state < buf_page_t::READ_FIX)
1257 1258 1259 1260
    {
      /* Block is ready for flush. Dispatch an IO request. The IO
      helper thread will put it on free list in IO completion routine. */
      const page_id_t page_id(bpage->id());
1261 1262 1263
      const uint32_t space_id= page_id.space();
      if (!space || space->id != space_id)
      {
1264 1265 1266
        if (last_space_id != space_id)
        {
          if (space)
1267
            space->release();
1268 1269 1270 1271 1272
          space= buf_flush_space(space_id);
          last_space_id= space_id;
        }
        else
          ut_ad(!space);
1273
      }
1274 1275
      else if (space->is_stopping())
      {
1276
        space->release();
1277 1278 1279 1280 1281 1282
        space= nullptr;
      }

      if (!space)
        buf_flush_discard_page(bpage);
      else if (neighbors && space->is_rotational())
1283 1284 1285 1286 1287 1288 1289
      {
        mysql_mutex_unlock(&buf_pool.mutex);
        n->flushed+= buf_flush_try_neighbors(space, page_id, neighbors == 1,
                                             true, n->flushed, max);
reacquire_mutex:
        mysql_mutex_lock(&buf_pool.mutex);
      }
1290
      else if (bpage->flush(true, space))
1291 1292 1293 1294
      {
        ++n->flushed;
        goto reacquire_mutex;
      }
1295 1296
    }
    else
1297
    must_skip:
1298 1299
      /* Can't evict or dispatch this block. Go to previous. */
      ut_ad(buf_pool.lru_hp.is_hp(prev));
1300
    bpage= buf_pool.lru_hp.get();
1301
  }
1302

1303 1304
  buf_pool.lru_hp.set(nullptr);

1305
  if (space)
1306
    space->release();
1307

1308 1309 1310 1311 1312
  if (scanned)
    MONITOR_INC_VALUE_CUMULATIVE(MONITOR_LRU_BATCH_SCANNED,
                                 MONITOR_LRU_BATCH_SCANNED_NUM_CALL,
                                 MONITOR_LRU_BATCH_SCANNED_PER_CALL,
                                 scanned);
1313 1314
}

1315 1316
/** Flush and move pages from LRU or unzip_LRU list to the free list.
Whether LRU or unzip_LRU is used depends on the state of the system.
1317 1318 1319
@param max   maximum number of blocks to make available in buf_pool.free
@return number of flushed pages */
static ulint buf_do_LRU_batch(ulint max)
1320
{
1321 1322 1323 1324 1325 1326 1327
  const ulint n_unzip_LRU_evicted= buf_LRU_evict_from_unzip_LRU()
    ? buf_free_from_unzip_LRU_list_batch(max)
    : 0;
  flush_counters_t n;
  n.flushed= 0;
  n.evicted= n_unzip_LRU_evicted;
  buf_flush_LRU_list_batch(max, &n);
1328
  mysql_mutex_assert_owner(&buf_pool.mutex);
1329 1330

  if (const ulint evicted= n.evicted - n_unzip_LRU_evicted)
1331 1332 1333 1334
    buf_lru_freed_page_count+= evicted;

  if (n.flushed)
    buf_lru_flush_page_count+= n.flushed;
1335

1336
  return n.flushed;
1337 1338
}

1339 1340
/** This utility flushes dirty blocks from the end of the flush_list.
The calling thread is not allowed to own any latches on pages!
1341 1342
@param max_n    maximum mumber of blocks to flush
@param lsn      once an oldest_modification>=lsn is found, terminate the batch
1343
@return number of blocks for which the write request was queued */
1344
static ulint buf_do_flush_list_batch(ulint max_n, lsn_t lsn)
1345
{
1346 1347 1348
  ulint count= 0;
  ulint scanned= 0;

1349 1350 1351 1352 1353
  mysql_mutex_assert_owner(&buf_pool.mutex);

  const auto neighbors= UT_LIST_GET_LEN(buf_pool.LRU) < BUF_LRU_OLD_MIN_LEN
    ? 0 : srv_flush_neighbors;
  fil_space_t *space= nullptr;
1354 1355 1356
  uint32_t last_space_id= FIL_NULL;
  static_assert(FIL_NULL > SRV_TMP_SPACE_ID, "consistency");
  static_assert(FIL_NULL > SRV_SPACE_ID_UPPER_BOUND, "consistency");
1357 1358 1359

  /* Start from the end of the list looking for a suitable block to be
  flushed. */
1360 1361
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  ulint len= UT_LIST_GET_LEN(buf_pool.flush_list);
1362 1363

  for (buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.flush_list);
1364
       bpage && len && count < max_n; ++scanned, len--)
1365 1366
  {
    const lsn_t oldest_modification= bpage->oldest_modification();
1367
    if (oldest_modification >= lsn)
1368
      break;
1369
    ut_ad(bpage->in_file());
1370

1371
    buf_page_t *prev= UT_LIST_GET_PREV(list, bpage);
1372

1373
    if (oldest_modification == 1)
1374
    {
1375 1376
      buf_pool.delete_from_flush_list(bpage);
    skip:
1377 1378 1379 1380
      bpage= prev;
      continue;
    }

1381
    ut_ad(oldest_modification > 2);
1382

1383 1384 1385
    if (!bpage->ready_for_flush())
      goto skip;

1386 1387 1388 1389 1390 1391 1392 1393 1394
    /* In order not to degenerate this scan to O(n*n) we attempt to
    preserve the pointer position. Any thread that would remove 'prev'
    from buf_pool.flush_list must adjust the hazard pointer.

    Note: A concurrent execution of buf_flush_list_space() may
    terminate this scan prematurely. The buf_pool.n_flush_list()
    should prevent multiple threads from executing
    buf_do_flush_list_batch() concurrently,
    but buf_flush_list_space() is ignoring that. */
1395
    buf_pool.flush_hp.set(prev);
1396
    mysql_mutex_unlock(&buf_pool.flush_list_mutex);
1397

1398 1399 1400
    const page_id_t page_id(bpage->id());
    const uint32_t space_id= page_id.space();
    if (!space || space->id != space_id)
1401
    {
1402
      if (last_space_id != space_id)
1403
      {
1404 1405 1406 1407
        if (space)
          space->release();
        space= buf_flush_space(space_id);
        last_space_id= space_id;
1408
      }
1409 1410 1411 1412 1413 1414 1415 1416
      else
        ut_ad(!space);
    }
    else if (space->is_stopping())
    {
      space->release();
      space= nullptr;
    }
1417

1418 1419 1420 1421 1422 1423 1424 1425 1426 1427
    if (!space)
      buf_flush_discard_page(bpage);
    else if (neighbors && space->is_rotational())
    {
      mysql_mutex_unlock(&buf_pool.mutex);
      count+= buf_flush_try_neighbors(space, page_id, neighbors == 1,
                                      false, count, max_n);
    reacquire_mutex:
      mysql_mutex_lock(&buf_pool.mutex);
    }
1428
    else if (bpage->flush(false, space))
1429 1430 1431
    {
      ++count;
      goto reacquire_mutex;
1432
    }
1433

1434
    mysql_mutex_lock(&buf_pool.flush_list_mutex);
1435
    bpage= buf_pool.flush_hp.get();
1436
  }
1437

1438
  buf_pool.flush_hp.set(nullptr);
1439 1440 1441
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);

  if (space)
1442
    space->release();
1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453

  if (scanned)
    MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_BATCH_SCANNED,
                                 MONITOR_FLUSH_BATCH_SCANNED_NUM_CALL,
                                 MONITOR_FLUSH_BATCH_SCANNED_PER_CALL,
                                 scanned);
  if (count)
    MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_BATCH_TOTAL_PAGE,
                                 MONITOR_FLUSH_BATCH_COUNT,
                                 MONITOR_FLUSH_BATCH_PAGES,
                                 count);
1454
  mysql_mutex_assert_owner(&buf_pool.mutex);
1455
  return count;
1456 1457
}

1458 1459 1460
/** Wait until a flush batch ends.
@param lru    true=buf_pool.LRU; false=buf_pool.flush_list */
void buf_flush_wait_batch_end(bool lru)
1461
{
1462
  const auto &n_flush= lru ? buf_pool.n_flush_LRU_ : buf_pool.n_flush_list_;
1463

1464 1465 1466 1467 1468 1469
  if (n_flush)
  {
    auto cond= lru ? &buf_pool.done_flush_LRU : &buf_pool.done_flush_list;
    tpool::tpool_wait_begin();
    thd_wait_begin(nullptr, THD_WAIT_DISKIO);
    do
1470
      my_cond_wait(cond, &buf_pool.mutex.m_mutex);
1471 1472 1473
    while (n_flush);
    tpool::tpool_wait_end();
    thd_wait_end(nullptr);
1474
    pthread_cond_broadcast(cond);
1475
  }
1476 1477
}

1478 1479
/** Write out dirty blocks from buf_pool.flush_list.
@param max_n    wished maximum mumber of blocks flushed
1480
@param lsn      buf_pool.get_oldest_modification(LSN_MAX) target
1481
@return the number of processed pages
1482
@retval 0 if a buf_pool.flush_list batch is already running */
1483
static ulint buf_flush_list(ulint max_n= ULINT_UNDEFINED, lsn_t lsn= LSN_MAX)
1484
{
1485
  ut_ad(lsn);
1486

1487
  if (buf_pool.n_flush_list())
1488
    return 0;
1489

1490
  mysql_mutex_lock(&buf_pool.mutex);
1491
  const bool running= buf_pool.n_flush_list_ != 0;
1492 1493
  /* FIXME: we are performing a dirty read of buf_pool.flush_list.count
  while not holding buf_pool.flush_list_mutex */
1494
  if (running || !UT_LIST_GET_LEN(buf_pool.flush_list))
1495
  {
1496
    if (!running)
1497
      pthread_cond_broadcast(&buf_pool.done_flush_list);
1498 1499
    mysql_mutex_unlock(&buf_pool.mutex);
    return 0;
1500
  }
1501

1502 1503 1504
  buf_pool.n_flush_list_++;
  const ulint n_flushed= buf_do_flush_list_batch(max_n, lsn);
  const ulint n_flushing= --buf_pool.n_flush_list_;
1505

1506
  buf_pool.try_LRU_scan= true;
1507

1508
  mysql_mutex_unlock(&buf_pool.mutex);
1509

1510
  if (!n_flushing)
1511
    pthread_cond_broadcast(&buf_pool.done_flush_list);
1512

1513
  buf_dblwr.flush_buffered_writes();
1514

1515
  DBUG_PRINT("ib_buf", ("flush_list completed, " ULINTPF " pages", n_flushed));
1516
  return n_flushed;
1517 1518
}

1519 1520 1521
/** Try to flush all the dirty pages that belong to a given tablespace.
@param space       tablespace
@param n_flushed   number of pages written
1522
@return whether the flush for some pages might not have been initiated */
1523 1524 1525 1526 1527 1528 1529
bool buf_flush_list_space(fil_space_t *space, ulint *n_flushed)
{
  const auto space_id= space->id;
  ut_ad(space_id <= SRV_SPACE_ID_UPPER_BOUND);

  bool may_have_skipped= false;
  ulint max_n_flush= srv_io_capacity;
1530

1531
  mysql_mutex_lock(&buf_pool.mutex);
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
  mysql_mutex_lock(&buf_pool.flush_list_mutex);

  bool acquired= space->acquire();
  buf_flush_freed_pages(space);

  for (buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.flush_list); bpage; )
  {
    ut_ad(bpage->oldest_modification());
    ut_ad(bpage->in_file());

    buf_page_t *prev= UT_LIST_GET_PREV(list, bpage);
    if (bpage->id().space() != space_id);
1544 1545
    else if (bpage->oldest_modification() == 1)
      buf_pool.delete_from_flush_list(bpage);
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573
    else if (!bpage->ready_for_flush())
      may_have_skipped= true;
    else
    {
      /* In order not to degenerate this scan to O(n*n) we attempt to
      preserve the pointer position. Any thread that would remove 'prev'
      from buf_pool.flush_list must adjust the hazard pointer.

      Note: Multiple executions of buf_flush_list_space() may be
      interleaved, and also buf_do_flush_list_batch() may be running
      concurrently. This may terminate our iteration prematurely,
      leading us to return may_have_skipped=true. */
      buf_pool.flush_hp.set(prev);
      mysql_mutex_unlock(&buf_pool.flush_list_mutex);

      if (!acquired)
      {
      was_freed:
        buf_flush_discard_page(bpage);
      }
      else
      {
        if (space->is_stopping())
        {
          space->release();
          acquired= false;
          goto was_freed;
        }
1574
        if (!bpage->flush(false, space))
1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
        {
          may_have_skipped= true;
          mysql_mutex_lock(&buf_pool.flush_list_mutex);
          goto next_after_skip;
        }
        if (n_flushed)
          ++*n_flushed;
        if (!--max_n_flush)
        {
          mysql_mutex_lock(&buf_pool.mutex);
          mysql_mutex_lock(&buf_pool.flush_list_mutex);
          may_have_skipped= true;
          break;
        }
        mysql_mutex_lock(&buf_pool.mutex);
      }

      mysql_mutex_lock(&buf_pool.flush_list_mutex);
      if (!buf_pool.flush_hp.is_hp(prev))
        may_have_skipped= true;
1595
    next_after_skip:
1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
      bpage= buf_pool.flush_hp.get();
      continue;
    }

    bpage= prev;
  }

  /* Note: this loop may have been executed concurrently with
  buf_do_flush_list_batch() as well as other threads executing
  buf_flush_list_space(). We should always return true from
  buf_flush_list_space() if that should be the case; in
  buf_do_flush_list_batch() we will simply perform less work. */

  buf_pool.flush_hp.set(nullptr);
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);

  buf_pool.try_LRU_scan= true;

  mysql_mutex_unlock(&buf_pool.mutex);

  if (acquired)
    space->release();

  if (space->purpose == FIL_TYPE_IMPORT)
    os_aio_wait_until_no_pending_writes();
  else
    buf_dblwr.flush_buffered_writes();

  return may_have_skipped;
}

/** Write out dirty blocks from buf_pool.LRU.
@param max_n    wished maximum mumber of blocks flushed
@return the number of processed pages
@retval 0 if a buf_pool.LRU batch is already running */
ulint buf_flush_LRU(ulint max_n)
{
1633
  if (buf_pool.n_flush_LRU())
1634 1635
    return 0;

1636
  log_buffer_flush_to_disk();
1637 1638

  mysql_mutex_lock(&buf_pool.mutex);
1639
  if (buf_pool.n_flush_LRU_)
1640 1641 1642 1643
  {
    mysql_mutex_unlock(&buf_pool.mutex);
    return 0;
  }
1644
  buf_pool.n_flush_LRU_++;
1645

1646
  ulint n_flushed= buf_do_LRU_batch(max_n);
1647

1648
  const ulint n_flushing= --buf_pool.n_flush_LRU_;
1649

1650
  buf_pool.try_LRU_scan= true;
1651

1652
  mysql_mutex_unlock(&buf_pool.mutex);
1653

1654
  if (!n_flushing)
1655
  {
1656
    pthread_cond_broadcast(&buf_pool.done_flush_LRU);
1657 1658
    pthread_cond_signal(&buf_pool.done_free);
  }
1659

1660
  buf_dblwr.flush_buffered_writes();
1661

1662
  DBUG_PRINT("ib_buf", ("LRU flush completed, " ULINTPF " pages", n_flushed));
1663
  return n_flushed;
1664 1665
}

1666 1667 1668 1669 1670
/** Initiate a log checkpoint, discarding the start of the log.
@param oldest_lsn   the checkpoint LSN
@param end_lsn      log_sys.get_lsn()
@return true if success, false if a checkpoint write was already running */
static bool log_checkpoint_low(lsn_t oldest_lsn, lsn_t end_lsn)
1671
{
1672
  ut_ad(!srv_read_only_mode);
1673
  mysql_mutex_assert_owner(&log_sys.mutex);
1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689
  ut_ad(oldest_lsn <= end_lsn);
  ut_ad(end_lsn == log_sys.get_lsn());
  ut_ad(!recv_no_log_write);

  ut_ad(oldest_lsn >= log_sys.last_checkpoint_lsn);

  if (oldest_lsn > log_sys.last_checkpoint_lsn + SIZE_OF_FILE_CHECKPOINT)
    /* Some log has been written since the previous checkpoint. */;
  else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED)
    /* MariaDB startup expects the redo log file to be logically empty
    (not even containing a FILE_CHECKPOINT record) after a clean shutdown.
    Perform an extra checkpoint at shutdown. */;
  else
  {
    /* Do nothing, because nothing was logged (other than a
    FILE_CHECKPOINT record) since the previous checkpoint. */
1690
    mysql_mutex_unlock(&log_sys.mutex);
1691 1692
    return true;
  }
1693

1694 1695 1696 1697 1698
  /* Repeat the FILE_MODIFY records after the checkpoint, in case some
  log records between the checkpoint and log_sys.lsn need them.
  Finally, write a FILE_CHECKPOINT record. Redo log apply expects to
  see a FILE_CHECKPOINT after the checkpoint, except on clean
  shutdown, where the log will be empty after the checkpoint.
1699

1700 1701 1702 1703 1704
  It is important that we write out the redo log before any further
  dirty pages are flushed to the tablespace files.  At this point,
  because we hold log_sys.mutex, mtr_t::commit() in other threads will
  be blocked, and no pages can be added to the flush lists. */
  lsn_t flush_lsn= oldest_lsn;
1705

1706 1707 1708 1709 1710
  if (fil_names_clear(flush_lsn, oldest_lsn != end_lsn ||
                      srv_shutdown_state <= SRV_SHUTDOWN_INITIATED))
  {
    flush_lsn= log_sys.get_lsn();
    ut_ad(flush_lsn >= end_lsn + SIZE_OF_FILE_CHECKPOINT);
1711
    mysql_mutex_unlock(&log_sys.mutex);
1712
    log_write_up_to(flush_lsn, true, true);
1713
    mysql_mutex_lock(&log_sys.mutex);
1714 1715
    if (log_sys.last_checkpoint_lsn >= oldest_lsn)
    {
1716
      mysql_mutex_unlock(&log_sys.mutex);
1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
      return true;
    }
  }
  else
    ut_ad(oldest_lsn >= log_sys.last_checkpoint_lsn);

  ut_ad(log_sys.get_flushed_lsn() >= flush_lsn);

  if (log_sys.n_pending_checkpoint_writes)
  {
    /* A checkpoint write is running */
1728
    mysql_mutex_unlock(&log_sys.mutex);
1729 1730 1731 1732 1733
    return false;
  }

  log_sys.next_checkpoint_lsn= oldest_lsn;
  log_write_checkpoint_info(end_lsn);
1734
  mysql_mutex_assert_not_owner(&log_sys.mutex);
1735 1736

  return true;
1737
}
1738

1739 1740 1741 1742 1743 1744 1745
/** Make a checkpoint. Note that this function does not flush dirty
blocks from the buffer pool: it only checks what is lsn of the oldest
modification in the pool, and writes information about the lsn in
log file. Use log_make_checkpoint() to flush also the pool.
@retval true if the checkpoint was or had been made
@retval false if a checkpoint write was already running */
static bool log_checkpoint()
1746
{
1747 1748
  if (recv_recovery_is_on())
    recv_sys.apply(true);
1749

1750 1751 1752 1753 1754 1755 1756
  switch (srv_file_flush_method) {
  case SRV_NOSYNC:
  case SRV_O_DIRECT_NO_FSYNC:
    break;
  default:
    fil_flush_file_spaces();
  }
1757

1758
  mysql_mutex_lock(&log_sys.mutex);
1759
  const lsn_t end_lsn= log_sys.get_lsn();
1760
  mysql_mutex_lock(&log_sys.flush_order_mutex);
1761 1762 1763
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  const lsn_t oldest_lsn= buf_pool.get_oldest_modification(end_lsn);
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
1764
  mysql_mutex_unlock(&log_sys.flush_order_mutex);
1765 1766
  return log_checkpoint_low(oldest_lsn, end_lsn);
}
1767

1768 1769 1770
/** Make a checkpoint. */
ATTRIBUTE_COLD void log_make_checkpoint()
{
1771
  buf_flush_wait_flushed(log_sys.get_lsn(std::memory_order_acquire));
1772 1773
  while (!log_checkpoint());
}
1774

1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
/** Wait for all dirty pages up to an LSN to be written out.
NOTE: The calling thread is not allowed to hold any buffer page latches! */
static void buf_flush_wait(lsn_t lsn)
{
  ut_ad(lsn <= log_sys.get_lsn());

  while (buf_pool.get_oldest_modification(lsn) < lsn)
  {
    if (buf_flush_sync_lsn < lsn)
    {
      buf_flush_sync_lsn= lsn;
      buf_pool.page_cleaner_set_idle(false);
      pthread_cond_signal(&buf_pool.do_flush_list);
    }
    my_cond_wait(&buf_pool.done_flush_list,
                 &buf_pool.flush_list_mutex.m_mutex);
  }
}

1794 1795 1796 1797 1798 1799
/** Wait until all persistent pages are flushed up to a limit.
@param sync_lsn   buf_pool.get_oldest_modification(LSN_MAX) to wait for */
ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
{
  ut_ad(sync_lsn);
  ut_ad(sync_lsn < LSN_MAX);
1800
  mysql_mutex_assert_not_owner(&log_sys.mutex);
1801
  ut_ad(!srv_read_only_mode);
1802

1803 1804
  if (recv_recovery_is_on())
    recv_sys.apply(true);
1805

1806
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
1807

1808
  if (buf_pool.get_oldest_modification(sync_lsn) < sync_lsn)
1809
  {
1810
    MONITOR_INC(MONITOR_FLUSH_SYNC_WAITS);
1811
#if 1 /* FIXME: remove this, and guarantee that the page cleaner serves us */
1812
    if (UNIV_UNLIKELY(!buf_page_cleaner_is_active))
1813
    {
1814
      do
1815
      {
1816
        mysql_mutex_unlock(&buf_pool.flush_list_mutex);
1817
        ulint n_pages= buf_flush_list(srv_max_io_capacity, sync_lsn);
1818 1819 1820 1821 1822 1823 1824 1825
        buf_flush_wait_batch_end_acquiring_mutex(false);
        if (n_pages)
        {
          MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_SYNC_TOTAL_PAGE,
                                       MONITOR_FLUSH_SYNC_COUNT,
                                       MONITOR_FLUSH_SYNC_PAGES, n_pages);
        }
        mysql_mutex_lock(&buf_pool.flush_list_mutex);
1826
      }
1827
      while (buf_pool.get_oldest_modification(sync_lsn) < sync_lsn);
1828
    }
1829
    else
1830
#endif
1831 1832
    {
      thd_wait_begin(nullptr, THD_WAIT_DISKIO);
1833 1834
      tpool::tpool_wait_begin();
      buf_flush_wait(sync_lsn);
1835
      tpool::tpool_wait_end();
1836
      thd_wait_end(nullptr);
1837
    }
1838 1839 1840
  }

  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
1841 1842

  if (UNIV_UNLIKELY(log_sys.last_checkpoint_lsn < sync_lsn))
1843 1844 1845 1846 1847 1848 1849
  {
    /* If the buffer pool was clean, no log write was guaranteed
    to happen until now. There could be an outstanding FILE_CHECKPOINT
    record from a previous fil_names_clear() call, which we must
    write out before we can advance the checkpoint. */
    if (sync_lsn > log_sys.get_flushed_lsn())
      log_write_up_to(sync_lsn, true);
1850
    log_checkpoint();
1851
  }
1852 1853
}

1854 1855 1856 1857
/** Initiate more eager page flushing if the log checkpoint age is too old.
@param lsn      buf_pool.get_oldest_modification(LSN_MAX) target
@param furious  true=furious flushing, false=limit to innodb_io_capacity */
ATTRIBUTE_COLD void buf_flush_ahead(lsn_t lsn, bool furious)
1858
{
1859
  mysql_mutex_assert_not_owner(&log_sys.mutex);
1860 1861 1862 1863 1864
  ut_ad(!srv_read_only_mode);

  if (recv_recovery_is_on())
    recv_sys.apply(true);

1865 1866 1867 1868
  Atomic_relaxed<lsn_t> &limit= furious
    ? buf_flush_sync_lsn : buf_flush_async_lsn;

  if (limit < lsn)
1869 1870
  {
    mysql_mutex_lock(&buf_pool.flush_list_mutex);
1871
    if (limit < lsn)
1872
    {
1873
      limit= lsn;
1874 1875 1876
      buf_pool.page_cleaner_set_idle(false);
      pthread_cond_signal(&buf_pool.do_flush_list);
    }
1877 1878
    mysql_mutex_unlock(&buf_pool.flush_list_mutex);
  }
1879 1880
}

1881 1882
/** Wait for pending flushes to complete. */
void buf_flush_wait_batch_end_acquiring_mutex(bool lru)
1883
{
1884
  if (lru ? buf_pool.n_flush_LRU() : buf_pool.n_flush_list())
1885 1886 1887 1888 1889
  {
    mysql_mutex_lock(&buf_pool.mutex);
    buf_flush_wait_batch_end(lru);
    mysql_mutex_unlock(&buf_pool.mutex);
  }
1890 1891
}

1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902
/** Conduct checkpoint-related flushing for innodb_flush_sync=ON,
and try to initiate checkpoints until the target is met.
@param lsn   minimum value of buf_pool.get_oldest_modification(LSN_MAX) */
ATTRIBUTE_COLD static void buf_flush_sync_for_checkpoint(lsn_t lsn)
{
  ut_ad(!srv_read_only_mode);

  for (;;)
  {
    mysql_mutex_unlock(&buf_pool.flush_list_mutex);

1903
    if (ulint n_flushed= buf_flush_list(srv_max_io_capacity, lsn))
1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917
    {
      MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_SYNC_TOTAL_PAGE,
                                   MONITOR_FLUSH_SYNC_COUNT,
                                   MONITOR_FLUSH_SYNC_PAGES, n_flushed);
    }

    switch (srv_file_flush_method) {
    case SRV_NOSYNC:
    case SRV_O_DIRECT_NO_FSYNC:
      break;
    default:
      fil_flush_file_spaces();
    }

1918
    mysql_mutex_lock(&log_sys.mutex);
1919
    const lsn_t newest_lsn= log_sys.get_lsn();
1920
    mysql_mutex_lock(&log_sys.flush_order_mutex);
1921 1922
    mysql_mutex_lock(&buf_pool.flush_list_mutex);
    lsn_t measure= buf_pool.get_oldest_modification(0);
1923
    mysql_mutex_unlock(&log_sys.flush_order_mutex);
1924 1925
    const lsn_t checkpoint_lsn= measure ? measure : newest_lsn;

1926 1927
    if (!recv_recovery_is_on() &&
        checkpoint_lsn > log_sys.last_checkpoint_lsn + SIZE_OF_FILE_CHECKPOINT)
1928 1929 1930 1931 1932 1933 1934 1935
    {
      mysql_mutex_unlock(&buf_pool.flush_list_mutex);
      log_checkpoint_low(checkpoint_lsn, newest_lsn);
      mysql_mutex_lock(&buf_pool.flush_list_mutex);
      measure= buf_pool.get_oldest_modification(LSN_MAX);
    }
    else
    {
1936
      mysql_mutex_unlock(&log_sys.mutex);
1937 1938 1939 1940
      if (!measure)
        measure= LSN_MAX;
    }

1941
    mysql_mutex_assert_not_owner(&log_sys.mutex);
1942 1943 1944 1945 1946 1947

    /* After attempting log checkpoint, check if we have reached our target. */
    const lsn_t target= buf_flush_sync_lsn;

    if (measure >= target)
      buf_flush_sync_lsn= 0;
1948 1949
    else if (measure >= buf_flush_async_lsn)
      buf_flush_async_lsn= 0;
1950

1951
    /* wake up buf_flush_wait() */
1952
    pthread_cond_broadcast(&buf_pool.done_flush_list);
1953 1954 1955 1956 1957 1958 1959 1960

    lsn= std::max(lsn, target);

    if (measure >= lsn)
      return;
  }
}

1961 1962 1963 1964 1965 1966 1967 1968
/** Check if the adpative flushing threshold is recommended based on
redo log capacity filled threshold.
@param oldest_lsn     buf_pool.get_oldest_modification()
@return true if adaptive flushing is recommended. */
static bool af_needed_for_redo(lsn_t oldest_lsn)
{
  lsn_t age= (log_sys.get_lsn() - oldest_lsn);
  lsn_t af_lwm= static_cast<lsn_t>(srv_adaptive_flushing_lwm *
1969
    static_cast<double>(log_sys.log_capacity) / 100);
1970 1971 1972 1973 1974

  /* if age > af_lwm adaptive flushing is recommended */
  return (age > af_lwm);
}

1975 1976 1977 1978 1979 1980 1981 1982 1983
/*********************************************************************//**
Calculates if flushing is required based on redo generation rate.
@return percent of io_capacity to flush to manage redo space */
static
ulint
af_get_pct_for_lsn(
/*===============*/
	lsn_t	age)	/*!< in: current age of LSN. */
{
1984 1985
	lsn_t	af_lwm = static_cast<lsn_t>(
		srv_adaptive_flushing_lwm
1986
		* static_cast<double>(log_sys.log_capacity) / 100);
1987 1988 1989 1990 1991 1992

	if (age < af_lwm) {
		/* No adaptive flushing. */
		return(0);
	}

1993
	lsn_t lsn_age_factor = (age * 100) / log_sys.max_modified_age_async;
1994 1995

	ut_ad(srv_max_io_capacity >= srv_io_capacity);
1996 1997 1998 1999 2000
	return static_cast<ulint>(
		(static_cast<double>(srv_max_io_capacity / srv_io_capacity
				     * lsn_age_factor)
		 * sqrt(static_cast<double>(lsn_age_factor))
		 / 7.5));
2001 2002
}

2003 2004 2005
/** This function is called approximately once every second by the
page_cleaner thread if innodb_adaptive_flushing=ON.
Based on various factors it decides if there is a need to do flushing.
2006
@return number of pages recommended to be flushed
2007 2008
@param last_pages_in  number of pages flushed in previous batch
@param oldest_lsn     buf_pool.get_oldest_modification(0)
2009
@param dirty_blocks   UT_LIST_GET_LEN(buf_pool.flush_list)
2010 2011 2012
@param dirty_pct      100*flush_list.count / (LRU.count + free.count) */
static ulint page_cleaner_flush_pages_recommendation(ulint last_pages_in,
                                                     lsn_t oldest_lsn,
2013
                                                     ulint dirty_blocks,
2014
                                                     double dirty_pct)
2015 2016 2017 2018 2019
{
	static	lsn_t		prev_lsn = 0;
	static	ulint		sum_pages = 0;
	static	ulint		avg_page_rate = 0;
	static	ulint		n_iterations = 0;
2020
	static	time_t		prev_time;
2021 2022 2023
	lsn_t			lsn_rate;
	ulint			n_pages = 0;

2024
	const lsn_t cur_lsn = log_sys.get_lsn();
2025 2026 2027
	ut_ad(oldest_lsn <= cur_lsn);
	ulint pct_for_lsn = af_get_pct_for_lsn(cur_lsn - oldest_lsn);
	time_t curr_time = time(nullptr);
2028
	const double max_pct = srv_max_buf_pool_modified_pct;
2029

2030 2031
	if (!prev_lsn || !pct_for_lsn) {
		prev_time = curr_time;
2032
		prev_lsn = cur_lsn;
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
		if (max_pct > 0.0) {
			dirty_pct /= max_pct;
		}

		n_pages = ulint(dirty_pct * double(srv_io_capacity));
		if (n_pages < dirty_blocks) {
			n_pages= std::min<ulint>(srv_io_capacity, dirty_blocks);
		}

		return n_pages;
2043 2044
	}

2045 2046 2047 2048
	sum_pages += last_pages_in;

	double	time_elapsed = difftime(curr_time, prev_time);

2049 2050
	/* We update our variables every srv_flushing_avg_loops
	iterations to smooth out transition in workload. */
2051
	if (++n_iterations >= srv_flushing_avg_loops
2052
	    || time_elapsed >= static_cast<double>(srv_flushing_avg_loops)) {
2053 2054 2055 2056

		if (time_elapsed < 1) {
			time_elapsed = 1;
		}
2057

2058 2059 2060
		avg_page_rate = static_cast<ulint>(
			((static_cast<double>(sum_pages)
			  / time_elapsed)
2061
			 + static_cast<double>(avg_page_rate)) / 2);
2062 2063

		/* How much LSN we have generated since last call. */
2064 2065 2066
		lsn_rate = static_cast<lsn_t>(
			static_cast<double>(cur_lsn - prev_lsn)
			/ time_elapsed);
2067 2068 2069

		lsn_avg_rate = (lsn_avg_rate + lsn_rate) / 2;

2070 2071
		ulint	flush_tm = page_cleaner.flush_time;
		ulint	flush_pass = page_cleaner.flush_pass;
2072

2073 2074
		page_cleaner.flush_time = 0;
		page_cleaner.flush_pass = 0;
2075

2076 2077
		if (flush_pass) {
			flush_tm /= flush_pass;
2078 2079
		}

2080 2081
		MONITOR_SET(MONITOR_FLUSH_ADAPTIVE_AVG_TIME, flush_tm);
		MONITOR_SET(MONITOR_FLUSH_ADAPTIVE_AVG_PASS, flush_pass);
2082

2083
		prev_lsn = cur_lsn;
2084
		prev_time = curr_time;
2085 2086 2087 2088 2089 2090

		n_iterations = 0;

		sum_pages = 0;
	}

2091 2092 2093
	const ulint pct_for_dirty = srv_max_dirty_pages_pct_lwm == 0
		? (dirty_pct >= max_pct ? 100 : 0)
		: static_cast<ulint>
2094
		(max_pct > 0.0 ? dirty_pct / max_pct : dirty_pct);
2095
	ulint pct_total = std::max(pct_for_dirty, pct_for_lsn);
2096

2097 2098
	/* Estimate pages to be flushed for the lsn progress */
	lsn_t	target_lsn = oldest_lsn
2099 2100 2101
		+ lsn_avg_rate * buf_flush_lsn_scan_factor;
	ulint	pages_for_lsn = 0;

2102 2103
	mysql_mutex_lock(&buf_pool.flush_list_mutex);

2104
	for (buf_page_t* b = UT_LIST_GET_LAST(buf_pool.flush_list);
2105 2106
	     b != NULL;
	     b = UT_LIST_GET_PREV(list, b)) {
2107
		if (b->oldest_modification() > target_lsn) {
2108
			break;
2109
		}
2110 2111 2112
		if (++pages_for_lsn >= srv_max_io_capacity) {
			break;
		}
2113
	}
2114
	mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2115 2116 2117 2118

	pages_for_lsn /= buf_flush_lsn_scan_factor;
	if (pages_for_lsn < 1) {
		pages_for_lsn = 1;
2119 2120
	}

2121 2122
	n_pages = (ulint(double(srv_io_capacity) * double(pct_total) / 100.0)
		   + avg_page_rate + pages_for_lsn) / 3;
2123 2124 2125 2126 2127 2128 2129

	if (n_pages > srv_max_io_capacity) {
		n_pages = srv_max_io_capacity;
	}

	MONITOR_SET(MONITOR_FLUSH_N_TO_FLUSH_REQUESTED, n_pages);

2130
	MONITOR_SET(MONITOR_FLUSH_N_TO_FLUSH_BY_AGE, pages_for_lsn);
2131 2132 2133 2134 2135 2136 2137 2138 2139

	MONITOR_SET(MONITOR_FLUSH_AVG_PAGE_RATE, avg_page_rate);
	MONITOR_SET(MONITOR_FLUSH_LSN_AVG_RATE, lsn_avg_rate);
	MONITOR_SET(MONITOR_FLUSH_PCT_FOR_DIRTY, pct_for_dirty);
	MONITOR_SET(MONITOR_FLUSH_PCT_FOR_LSN, pct_for_lsn);

	return(n_pages);
}

2140 2141 2142
/** page_cleaner thread tasked with flushing dirty pages from the buffer
pools. As of now we'll have only one coordinator. */
static void buf_flush_page_cleaner()
2143
{
2144
  my_thread_init();
2145
#ifdef UNIV_PFS_THREAD
2146
  pfs_register_thread(page_cleaner_thread_key);
2147
#endif /* UNIV_PFS_THREAD */
2148 2149
  ut_ad(!srv_read_only_mode);
  ut_ad(buf_page_cleaner_is_active);
2150

2151 2152 2153
  ulint last_pages= 0;
  timespec abstime;
  set_timespec(abstime, 1);
2154

2155
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
2156

2157
  lsn_t lsn_limit;
2158
  ulint last_activity_count= srv_get_activity_count();
2159

2160 2161 2162
  for (;;)
  {
    lsn_limit= buf_flush_sync_lsn;
2163

2164 2165 2166
    if (UNIV_UNLIKELY(lsn_limit != 0))
    {
furious_flush:
2167 2168 2169 2170 2171 2172 2173
      if (UNIV_LIKELY(srv_flush_sync))
      {
        buf_flush_sync_for_checkpoint(lsn_limit);
        last_pages= 0;
        set_timespec(abstime, 1);
        continue;
      }
2174
    }
2175
    else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED)
2176
      break;
2177

2178 2179 2180 2181 2182 2183 2184
    /* If buf pager cleaner is idle and there is no work
    (either dirty pages are all flushed or adaptive flushing
    is not enabled) then opt for non-timed wait */
    if (buf_pool.page_cleaner_idle() &&
        (!UT_LIST_GET_LEN(buf_pool.flush_list) ||
         srv_max_dirty_pages_pct_lwm == 0.0))
      my_cond_wait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex.m_mutex);
2185
    else
2186 2187
      my_cond_timedwait(&buf_pool.do_flush_list,
                        &buf_pool.flush_list_mutex.m_mutex, &abstime);
2188

2189
    set_timespec(abstime, 1);
2190

2191
    lsn_t soft_lsn_limit= buf_flush_async_lsn;
2192
    lsn_limit= buf_flush_sync_lsn;
2193

2194
    if (UNIV_UNLIKELY(lsn_limit != 0))
2195 2196 2197 2198 2199
    {
      if (UNIV_LIKELY(srv_flush_sync))
        goto furious_flush;
    }
    else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED)
2200 2201
      break;

2202
    const lsn_t oldest_lsn= buf_pool.get_oldest_modification(0);
2203

2204
    if (!oldest_lsn)
2205 2206 2207 2208
    {
      if (UNIV_UNLIKELY(lsn_limit != 0))
      {
        buf_flush_sync_lsn= 0;
2209
        /* wake up buf_flush_wait() */
2210
        pthread_cond_broadcast(&buf_pool.done_flush_list);
2211
      }
2212
unemployed:
2213
      buf_flush_async_lsn= 0;
2214
      buf_pool.page_cleaner_set_idle(true);
2215 2216 2217 2218 2219

      DBUG_EXECUTE_IF("ib_log_checkpoint_avoid", continue;);

      mysql_mutex_unlock(&buf_pool.flush_list_mutex);

2220 2221 2222
      if (!recv_recovery_is_on() &&
          !srv_startup_is_before_trx_rollback_phase &&
          srv_operation == SRV_OPERATION_NORMAL)
2223 2224 2225
        log_checkpoint();

      mysql_mutex_lock(&buf_pool.flush_list_mutex);
2226
      continue;
2227
    }
2228

2229 2230
    const ulint dirty_blocks= UT_LIST_GET_LEN(buf_pool.flush_list);
    ut_ad(dirty_blocks);
2231 2232 2233 2234 2235 2236
    /* We perform dirty reads of the LRU+free list lengths here.
    Division by zero is not possible, because buf_pool.flush_list is
    guaranteed to be nonempty, and it is a subset of buf_pool.LRU. */
    const double dirty_pct= double(dirty_blocks) * 100.0 /
      double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free));

2237 2238
    bool idle_flush= false;

2239
    if (lsn_limit || soft_lsn_limit);
2240
    else if (af_needed_for_redo(oldest_lsn));
2241 2242
    else if (srv_max_dirty_pages_pct_lwm != 0.0)
    {
2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257
      const ulint activity_count= srv_get_activity_count();
      if (activity_count != last_activity_count)
        last_activity_count= activity_count;
      else if (buf_pool.page_cleaner_idle() && buf_pool.n_pend_reads == 0)
      {
         /* reaching here means 3 things:
         - last_activity_count == activity_count: suggesting server is idle
           (no trx_t::commit activity)
         - page cleaner is idle (dirty_pct < srv_max_dirty_pages_pct_lwm)
         - there are no pending reads but there are dirty pages to flush */
        idle_flush= true;
        buf_pool.update_last_activity_count(activity_count);
      }

      if (!idle_flush && dirty_pct < srv_max_dirty_pages_pct_lwm)
2258 2259
        goto unemployed;
    }
2260 2261
    else if (dirty_pct < srv_max_buf_pool_modified_pct)
      goto unemployed;
2262

2263
    if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit)
2264 2265 2266
      lsn_limit= buf_flush_sync_lsn= 0;
    if (UNIV_UNLIKELY(soft_lsn_limit != 0) && oldest_lsn >= soft_lsn_limit)
      soft_lsn_limit= buf_flush_async_lsn= 0;
2267

2268
    buf_pool.page_cleaner_set_idle(false);
2269 2270
    mysql_mutex_unlock(&buf_pool.flush_list_mutex);

2271 2272 2273
    if (!lsn_limit)
      lsn_limit= soft_lsn_limit;

2274 2275
    ulint n_flushed;

2276 2277
    if (UNIV_UNLIKELY(lsn_limit != 0))
    {
2278
      n_flushed= buf_flush_list(srv_max_io_capacity, lsn_limit);
2279
      /* wake up buf_flush_wait() */
2280
      pthread_cond_broadcast(&buf_pool.done_flush_list);
2281 2282
      goto try_checkpoint;
    }
2283
    else if (idle_flush || !srv_adaptive_flushing)
2284
    {
2285
      n_flushed= buf_flush_list(srv_io_capacity);
2286
try_checkpoint:
2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
      if (n_flushed)
      {
        MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_BACKGROUND_TOTAL_PAGE,
                                     MONITOR_FLUSH_BACKGROUND_COUNT,
                                     MONITOR_FLUSH_BACKGROUND_PAGES,
                                     n_flushed);
do_checkpoint:
        /* The periodic log_checkpoint() call here makes it harder to
        reproduce bugs in crash recovery or mariabackup --prepare, or
        in code that writes the redo log records. Omitting the call
        here should not affect correctness, because log_free_check()
        should still be invoking checkpoints when needed. */
        DBUG_EXECUTE_IF("ib_log_checkpoint_avoid", goto next;);

        if (!recv_recovery_is_on() && srv_operation == SRV_OPERATION_NORMAL)
          log_checkpoint();
      }
    }
    else if (ulint n= page_cleaner_flush_pages_recommendation(last_pages,
                                                              oldest_lsn,
2307
                                                              dirty_blocks,
2308 2309 2310 2311
                                                              dirty_pct))
    {
      page_cleaner.flush_pass++;
      const ulint tm= ut_time_ms();
2312
      last_pages= n_flushed= buf_flush_list(n);
2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323
      page_cleaner.flush_time+= ut_time_ms() - tm;

      if (n_flushed)
      {
        MONITOR_INC_VALUE_CUMULATIVE(MONITOR_FLUSH_ADAPTIVE_TOTAL_PAGE,
                                     MONITOR_FLUSH_ADAPTIVE_COUNT,
                                     MONITOR_FLUSH_ADAPTIVE_PAGES,
                                     n_flushed);
        goto do_checkpoint;
      }
    }
2324
    else if (buf_flush_async_lsn <= oldest_lsn)
2325 2326 2327 2328
    {
      mysql_mutex_lock(&buf_pool.flush_list_mutex);
      goto unemployed;
    }
2329 2330 2331 2332 2333

#ifndef DBUG_OFF
next:
#endif /* !DBUG_OFF */
    mysql_mutex_lock(&buf_pool.flush_list_mutex);
2334 2335 2336 2337 2338 2339

    /* when idle flushing kicks in page_cleaner is marked active.
    reset it back to idle since the it was made active as part of
    idle flushing stage. */
    if (idle_flush)
      buf_pool.page_cleaner_set_idle(true);
2340 2341 2342
  }

  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2343

2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354
  if (srv_fast_shutdown != 2)
  {
    buf_flush_wait_batch_end_acquiring_mutex(true);
    buf_flush_wait_batch_end_acquiring_mutex(false);
  }

  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  lsn_limit= buf_flush_sync_lsn;
  if (UNIV_UNLIKELY(lsn_limit != 0))
    goto furious_flush;
  buf_page_cleaner_is_active= false;
2355
  pthread_cond_broadcast(&buf_pool.done_flush_list);
2356 2357 2358 2359
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);

  my_thread_end();

2360 2361 2362
#ifdef UNIV_PFS_THREAD
  pfs_delete_thread();
#endif
2363
}
2364

2365
/** Initialize page_cleaner. */
2366
ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
2367
{
2368
  ut_ad(!buf_page_cleaner_is_active);
2369 2370 2371
  ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
        srv_operation == SRV_OPERATION_RESTORE ||
        srv_operation == SRV_OPERATION_RESTORE_EXPORT);
2372
  buf_flush_async_lsn= 0;
2373
  buf_flush_sync_lsn= 0;
2374
  buf_page_cleaner_is_active= true;
2375
  std::thread(buf_flush_page_cleaner).detach();
2376 2377
}

2378
#if defined(HAVE_SYSTEMD) && !defined(EMBEDDED_LIBRARY)
2379 2380 2381 2382 2383 2384 2385 2386
/** @return the number of dirty pages in the buffer pool */
static ulint buf_flush_list_length()
{
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  const ulint len= UT_LIST_GET_LEN(buf_pool.flush_list);
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
  return len;
}
2387
#endif
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397

/** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool()
{
  ut_ad(!buf_page_cleaner_is_active);
  ut_ad(!buf_flush_sync_lsn);

  service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
                                 "Waiting to flush the buffer pool");

2398 2399 2400
  mysql_mutex_lock(&buf_pool.flush_list_mutex);

  while (buf_pool.get_oldest_modification(0))
2401
  {
2402
    mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2403
    buf_flush_list(srv_max_io_capacity);
2404
    if (buf_pool.n_flush_list())
2405
    {
2406
      timespec abstime;
2407 2408 2409 2410 2411
      service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
                                     "Waiting to flush " ULINTPF " pages",
                                     buf_flush_list_length());
      set_timespec(abstime, INNODB_EXTEND_TIMEOUT_INTERVAL / 2);
      mysql_mutex_lock(&buf_pool.mutex);
2412
      while (buf_pool.n_flush_list_)
2413 2414
        my_cond_timedwait(&buf_pool.done_flush_list, &buf_pool.mutex.m_mutex,
                          &abstime);
2415 2416
      mysql_mutex_unlock(&buf_pool.mutex);
    }
2417
    mysql_mutex_lock(&buf_pool.flush_list_mutex);
2418 2419
  }

2420
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2421 2422 2423
  ut_ad(!buf_pool.any_io_pending());
}

2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
/** Synchronously flush dirty blocks during recv_sys_t::apply().
NOTE: The calling thread is not allowed to hold any buffer page latches! */
void buf_flush_sync_batch(lsn_t lsn)
{
  thd_wait_begin(nullptr, THD_WAIT_DISKIO);
  tpool::tpool_wait_begin();
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  buf_flush_wait(lsn);
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
  tpool::tpool_wait_end();
  thd_wait_end(nullptr);
}

2437 2438 2439
/** Synchronously flush dirty blocks.
NOTE: The calling thread is not allowed to hold any buffer page latches! */
void buf_flush_sync()
2440
{
2441 2442 2443 2444 2445 2446
  if (recv_recovery_is_on())
    recv_sys.apply(true);

  thd_wait_begin(nullptr, THD_WAIT_DISKIO);
  tpool::tpool_wait_begin();
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
2447 2448
  for (;;)
  {
2449 2450
    const lsn_t lsn= log_sys.get_lsn();
    buf_flush_wait(lsn);
2451 2452 2453 2454
    /* Wait for the page cleaner to be idle (for log resizing at startup) */
    while (buf_flush_sync_lsn)
      my_cond_wait(&buf_pool.done_flush_list,
                   &buf_pool.flush_list_mutex.m_mutex);
2455 2456
    if (lsn == log_sys.get_lsn())
      break;
2457
  }
2458 2459 2460
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
  tpool::tpool_wait_end();
  thd_wait_end(nullptr);
2461
}
2462

2463
#ifdef UNIV_DEBUG
2464 2465
/** Functor to validate the flush list. */
struct	Check {
Marko Mäkelä's avatar
Marko Mäkelä committed
2466
	void operator()(const buf_page_t* elem) const
2467
	{
2468 2469
		ut_ad(elem->oldest_modification());
		ut_ad(!fsp_is_system_temporary(elem->id().space()));
2470 2471 2472
	}
};

2473 2474
/** Validate the flush list. */
static void buf_flush_validate_low()
2475 2476 2477
{
	buf_page_t*		bpage;

2478
	mysql_mutex_assert_owner(&buf_pool.flush_list_mutex);
2479

2480
	ut_list_validate(buf_pool.flush_list, Check());
2481

2482
	bpage = UT_LIST_GET_FIRST(buf_pool.flush_list);
2483 2484

	while (bpage != NULL) {
2485
		const lsn_t	om = bpage->oldest_modification();
2486
		/* A page in buf_pool.flush_list can be in
2487 2488 2489 2490
		BUF_BLOCK_REMOVE_HASH state. This happens when a page
		is in the middle of being relocated. In that case the
		original descriptor can have this state and still be
		in the flush list waiting to acquire the
2491
		buf_pool.flush_list_mutex to complete the relocation. */
2492
		ut_d(const auto s= bpage->state());
2493
		ut_ad(s >= buf_page_t::REMOVE_HASH);
2494
		ut_ad(om == 1 || om > 2);
2495 2496

		bpage = UT_LIST_GET_NEXT(list, bpage);
2497
		ut_ad(om == 1 || !bpage || recv_recovery_is_on()
2498
		      || om >= bpage->oldest_modification());
2499 2500 2501
	}
}

2502 2503
/** Validate the flush list. */
void buf_flush_validate()
2504
{
2505 2506 2507
  mysql_mutex_lock(&buf_pool.flush_list_mutex);
  buf_flush_validate_low();
  mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2508
}
2509
#endif /* UNIV_DEBUG */