buf0mtflu.cc 18.4 KB
Newer Older
1 2
/*****************************************************************************

3 4
Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved.
Copyright (C) 2013, 2014, SkySQL Ab. All Rights Reserved.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

*****************************************************************************/

/******************************************************************//**
@file buf/buf0mtflu.cc
Multi-threaded flush method implementation

Created  06/11/2013 Dhananjoy Das DDas@fusionio.com
Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
26 27
Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com
Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
28 29
***********************************************************************/

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0mtflu.h"
#include "buf0checksum.h"
#include "srv0start.h"
#include "srv0srv.h"
#include "page0zip.h"
#include "ut0byte.h"
#include "ut0lst.h"
#include "page0page.h"
#include "fil0fil.h"
#include "buf0lru.h"
#include "buf0rea.h"
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
#include "mysql/service_thd_wait.h"
#include "fil0pagecompress.h"
52 53

#define	MT_COMP_WATER_MARK	50
54 55
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000
56 57

/* Work item status */
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
typedef enum wrk_status {
	WRK_ITEM_SET=0,		/*!< Work item is set */
	WRK_ITEM_START=1,	/*!< Processing of work item has started */
	WRK_ITEM_DONE=2,	/*!< Processing is done usually set to
				SUCCESS/FAILED */
	WRK_ITEM_SUCCESS=2,	/*!< Work item successfully processed */
	WRK_ITEM_FAILED=3,	/*!< Work item process failed */
	WRK_ITEM_EXIT=4,	/*!< Exiting */
	WRK_ITEM_STATUS_UNDEFINED
} wrk_status_t;

/* Work item task type */
typedef enum mt_wrk_tsk {
	MT_WRK_NONE=0,		/*!< Exit queue-wait */
	MT_WRK_WRITE=1,		/*!< Flush operation */
	MT_WRK_READ=2,		/*!< Read operation  */
	MT_WRK_UNDEFINED
} mt_wrk_tsk_t;
76 77

/* Work thread status */
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
typedef enum wthr_status {
	WTHR_NOT_INIT=0,	/*!< Work thread not initialized */
	WTHR_INITIALIZED=1,	/*!< Work thread initialized */
	WTHR_SIG_WAITING=2,	/*!< Work thread wating signal */
	WTHR_RUNNING=3,		/*!< Work thread running */
	WTHR_NO_WORK=4,		/*!< Work thread has no work */
	WTHR_KILL_IT=5,		/*!< Work thread should exit */
	WTHR_STATUS_UNDEFINED
} wthr_status_t;

/* Write work task */
typedef struct wr_tsk {
	buf_pool_t	*buf_pool;	/*!< buffer-pool instance */
	enum buf_flush	flush_type;	/*!< flush-type for buffer-pool
					flush operation */
	ulint		min;		/*!< minimum number of pages
					requested to be flushed */
	lsn_t		lsn_limit;	/*!< lsn limit for the buffer-pool
					flush operation */
} wr_tsk_t;

/* Read work task */
typedef struct rd_tsk {
	buf_pool_t	*page_pool;	/*!< list of pages to decompress; */
} rd_tsk_t;

/* Work item */
typedef struct wrk_itm
106
{
107 108 109 110 111 112
	mt_wrk_tsk_t	tsk;		/*!< Task type. Based on task-type
					one of the entries wr_tsk/rd_tsk
					will be used */
	wr_tsk_t	wr;		/*!< Flush page list */
	rd_tsk_t	rd;		/*!< Decompress page list */
        ulint		n_flushed; 	/*!< Flushed pages count  */
113
 	os_thread_id_t	id_usr;		/*!< Thread-id currently working */
114 115
    	wrk_status_t    wi_status;	/*!< Work item status */
 	struct wrk_itm	*next;		/*!< Next work item */
116 117
	mem_heap_t      *wheap;         /*!< Heap were to allocate memory
					for queue nodes */
118 119 120 121 122
} wrk_t;

/* Thread syncronization data */
typedef struct thread_sync
{
123
	ulint           n_threads;	/*!< Number of threads */
124 125 126 127 128 129 130 131 132 133 134 135 136
	os_thread_id_t	wthread_id;	/*!< Identifier */
	os_thread_t 	wthread;	/*!< Thread id */
	ib_wqueue_t	*wq;		/*!< Work Queue */
	ib_wqueue_t     *wr_cq;		/*!< Write Completion Queue */
	ib_wqueue_t     *rd_cq;		/*!< Read Completion Queue */
	wthr_status_t   wt_status;	/*!< Worker thread status */
	mem_heap_t*     wheap;		/*!< Work heap where memory
					is allocated */
} thread_sync_t;

static int		mtflush_work_initialized = -1;
static os_fast_mutex_t	mtflush_mtx;
static thread_sync_t*   mtflush_ctx=NULL;
137

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
/******************************************************************//**
Set multi-threaded flush work initialized. */
static inline
void
buf_mtflu_work_init(void)
/*=====================*/
{
	mtflush_work_initialized = 1;
}

/******************************************************************//**
Return true if multi-threaded flush is initialized
@return true if initialized */
bool
buf_mtflu_init_done(void)
/*=====================*/
{
	return(mtflush_work_initialized == 1);
}

/******************************************************************//**
Fush buffer pool instance.
@return number of flushed pages, or 0 if error happened
*/
static
ulint
buf_mtflu_flush_pool_instance(
/*==========================*/
	wrk_t	*work_item)	/*!< inout: work item to be flushed */
{
	ut_a(work_item != NULL);
	ut_a(work_item->wr.buf_pool != NULL);

	if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) {
		/* We have two choices here. If lsn_limit was
		specified then skipping an instance of buffer
		pool means we cannot guarantee that all pages
		up to lsn_limit has been flushed. We can
		return right now with failure or we can try
		to flush remaining buffer pools up to the
		lsn_limit. We attempt to flush other buffer
		pools based on the assumption that it will
		help in the retry which will follow the
		failure. */
#ifdef UNIV_DEBUG
183
		fprintf(stderr, "flush start failed.\n");
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
#endif
		return 0;
	}


    	if (work_item->wr.flush_type == BUF_FLUSH_LRU) {
        	/* srv_LRU_scan_depth can be arbitrarily large value.
        	 * We cap it with current LRU size.
        	 */
        	buf_pool_mutex_enter(work_item->wr.buf_pool);
        	work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU);
        	buf_pool_mutex_exit(work_item->wr.buf_pool);
        	work_item->wr.min = ut_min(srv_LRU_scan_depth,work_item->wr.min);
    	}

	work_item->n_flushed = buf_flush_batch(work_item->wr.buf_pool,
                                    		work_item->wr.flush_type,
                                    		work_item->wr.min,
						work_item->wr.lsn_limit);

204

205 206 207
	buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type);
	buf_flush_common(work_item->wr.flush_type, work_item->n_flushed);

208
	return work_item->n_flushed;
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
}

/******************************************************************//**
Worker function to wait for work items and processing them and
sending reply back.
*/
static
void
mtflush_service_io(
/*===============*/
	thread_sync_t*	mtflush_io)	/*!< inout: multi-threaded flush
					syncronization data */
{
	wrk_t		*work_item = NULL;
	ulint		n_flushed=0;

   	mtflush_io->wt_status = WTHR_SIG_WAITING;
226 227 228 229 230 231

	work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq);

	if (work_item == NULL) {
		work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq);
	}
232 233 234 235

	if (work_item) {
		mtflush_io->wt_status = WTHR_RUNNING;
	} else {
236
		/* Thread did not get any work */
237 238 239 240
		mtflush_io->wt_status = WTHR_NO_WORK;
		return;
	}

241 242 243 244 245 246 247 248 249
	work_item->id_usr = os_thread_get_curr_id();

	/*  This works as a producer/consumer model, where in tasks are
         *  inserted into the work-queue (wq) and completions are based
         *  on the type of operations performed and as a result the WRITE/
         *  compression/flush operation completions get posted to wr_cq.
         *  And READ/decompress operations completions get posted to rd_cq.
         *  in future we may have others.
	*/
250 251 252 253

	switch(work_item->tsk) {
	case MT_WRK_NONE:
		ut_a(work_item->wi_status == WRK_ITEM_EXIT);
254
		work_item->wi_status = WRK_ITEM_EXIT;
255
		ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
256 257
		mtflush_io->wt_status = WTHR_KILL_IT;
        return;
258 259 260 261

	case MT_WRK_WRITE:
		work_item->wi_status = WRK_ITEM_START;
		/* Process work item */
262 263 264 265
		if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) {
#ifdef UNIV_DEBUG
			fprintf(stderr, "No pages flushed\n");
#endif
266 267 268
			work_item->wi_status = WRK_ITEM_FAILED;
		}
		work_item->wi_status = WRK_ITEM_SUCCESS;
269
		ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->wheap);
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
		break;

	case MT_WRK_READ:
		ut_a(0);
		break;

	default:
		/* None other than Write/Read handling planned */
		ut_a(0);
	}

	mtflush_io->wt_status = WTHR_NO_WORK;
}

/******************************************************************//**
Thead used to flush dirty pages when multi-threaded flush is
used.
@return a dummy parameter*/
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(mtflush_io_thread)(
/*==============================*/
	void * arg)
{
	thread_sync_t *mtflush_io = ((thread_sync_t *)arg);
295 296 297 298 299
#ifdef UNIV_DEBUG
	ib_uint64_t   stat_universal_num_processed = 0;
	ib_uint64_t   stat_cycle_num_processed = 0;
	ulint i;
#endif
300

301
	while (TRUE) {
302
		mtflush_service_io(mtflush_io);
303

304 305 306
		if (mtflush_io->wt_status == WTHR_KILL_IT) {
			break;
		}
307 308 309 310 311 312 313 314 315 316 317 318 319 320
	}

	os_thread_exit(NULL);
	OS_THREAD_DUMMY_RETURN;
}

/******************************************************************//**
Add exit work item to work queue to signal multi-threded flush
threads that they should exit.
*/
void
buf_mtflu_io_thread_exit(void)
/*==========================*/
{
321
	long i;
322
	thread_sync_t* mtflush_io = mtflush_ctx;
323
	wrk_t* work_item;
324 325 326

	ut_a(mtflush_io != NULL);

327 328 329
	/* Allocate work items for shutdown message */
	work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads);

330 331 332 333 334 335
	/* Confirm if the io-thread KILL is in progress, bailout */
	if (mtflush_io->wt_status == WTHR_KILL_IT) {
		return;
	}

	fprintf(stderr, "signal mtflush_io_threads to exit [%lu]\n",
336
		srv_mtflush_threads);
337 338

	/* Send one exit work item/thread */
339
	for (i=0; i < srv_mtflush_threads; i++) {
340 341 342 343 344
		work_item[i].wr.buf_pool = NULL;
		work_item[i].rd.page_pool = NULL;
		work_item[i].tsk = MT_WRK_NONE;
		work_item[i].wi_status = WRK_ITEM_EXIT;
		work_item[i].wheap = mtflush_io->wheap;
345 346

		ib_wqueue_add(mtflush_io->wq,
347
			(void *)&(work_item[i]),
348 349 350 351 352
			mtflush_io->wheap);
	}

	/* Wait until all work items on a work queue are processed */
	while(!ib_wqueue_is_empty(mtflush_io->wq)) {
353
		/* Wait */
354
		os_thread_sleep(MT_WAIT_IN_USECS * 2);
355 356 357 358 359
	}

	ut_a(ib_wqueue_is_empty(mtflush_io->wq));

	/* Collect all work done items */
360
	for (i=0; i < srv_mtflush_threads;) {
361 362
		wrk_t* work_item;

363
		work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);
364

365 366 367
		/* If we receive reply to work item and it's status is exit,
		thead has processed this message and existed */
		if (work_item && work_item->wi_status == WRK_ITEM_EXIT) {
368 369 370 371
			i++;
		}
	}

372
	/* Wait about 1/2 sec to allow threads really exit */
373
	os_thread_sleep(5000000);
374 375

	ut_a(ib_wqueue_is_empty(mtflush_io->wq));
376 377 378 379 380 381 382 383
	ut_a(ib_wqueue_is_empty(mtflush_io->wr_cq));
	ut_a(ib_wqueue_is_empty(mtflush_io->rd_cq));

	/* Free all queues */
	ib_wqueue_free(mtflush_io->wq);
	ib_wqueue_free(mtflush_io->wr_cq);
	ib_wqueue_free(mtflush_io->rd_cq);

384 385
	os_fast_mutex_free(&mtflush_mtx);

386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
	/* Free heap */
	mem_heap_free(mtflush_io->wheap);
}

/******************************************************************//**
Initialize multi-threaded flush thread syncronization data.
@return Initialized multi-threaded flush thread syncroniztion data. */
void*
buf_mtflu_handler_init(
/*===================*/
	ulint n_threads,	/*!< in: Number of threads to create */
	ulint wrk_cnt)		/*!< in: Number of work items */
{
	ulint   	i;
	mem_heap_t*	mtflush_heap;
	ib_wqueue_t*	mtflush_work_queue;
	ib_wqueue_t*	mtflush_write_comp_queue;
	ib_wqueue_t*	mtflush_read_comp_queue;

	os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &mtflush_mtx);

	/* Create heap, work queue, write completion queue, read
	completion queue for multi-threaded flush, and init
	handler. */
	mtflush_heap = mem_heap_create(0);
	ut_a(mtflush_heap != NULL);
	mtflush_work_queue = ib_wqueue_create();
	ut_a(mtflush_work_queue != NULL);
	mtflush_write_comp_queue = ib_wqueue_create();
	ut_a(mtflush_write_comp_queue != NULL);
	mtflush_read_comp_queue = ib_wqueue_create();
	ut_a(mtflush_read_comp_queue != NULL);

	mtflush_ctx = (thread_sync_t *)mem_heap_alloc(mtflush_heap,
				MTFLUSH_MAX_WORKER * sizeof(thread_sync_t));
	ut_a(mtflush_ctx != NULL);

	/* Create threads for page-compression-flush */
	for(i=0; i < n_threads; i++) {
		os_thread_id_t new_thread_id;
426
		mtflush_ctx[i].n_threads = n_threads;
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
		mtflush_ctx[i].wq = mtflush_work_queue;
		mtflush_ctx[i].wr_cq = mtflush_write_comp_queue;
		mtflush_ctx[i].rd_cq = mtflush_read_comp_queue;
		mtflush_ctx[i].wheap = mtflush_heap;
		mtflush_ctx[i].wt_status = WTHR_INITIALIZED;

		mtflush_ctx[i].wthread = os_thread_create(
			mtflush_io_thread,
			((void *)(mtflush_ctx + i)),
	                &new_thread_id);

		mtflush_ctx[i].wthread_id = new_thread_id;
	}

	buf_mtflu_work_init();

	return((void *)mtflush_ctx);
}

/******************************************************************//**
Flush buffer pool instances.
@return number of pages flushed. */
ulint
buf_mtflu_flush_work_items(
/*=======================*/
	ulint buf_pool_inst,		/*!< in: Number of buffer pool instances */
	ulint *per_pool_pages_flushed,	/*!< out: Number of pages
					flushed/instance */
	enum buf_flush flush_type,	/*!< in: Type of flush */
	ulint min_n,			/*!< in: Wished minimum number of
					blocks to be flushed */
	lsn_t lsn_limit)		/*!< in: All blocks whose
					oldest_modification is smaller than
					this should be flushed (if their
					number does not exceed min_n) */
462
{
463 464
	ulint n_flushed=0, i;
	wrk_t *done_wi;
465 466 467 468 469 470 471
	mem_heap_t* work_heap;
	wrk_t* work_item;

	/* Allocate heap where all work items used and queue
	node items areallocated */
	work_heap = mem_heap_create(0);
	work_item = (wrk_t*)mem_heap_alloc(work_heap, sizeof(wrk_t)*buf_pool_inst);
472 473

	for(i=0;i<buf_pool_inst; i++) {
474 475 476 477 478 479 480 481 482
		work_item[i].tsk = MT_WRK_WRITE;
		work_item[i].rd.page_pool = NULL;
		work_item[i].wr.buf_pool = buf_pool_from_array(i);
		work_item[i].wr.flush_type = flush_type;
		work_item[i].wr.min = min_n;
		work_item[i].wr.lsn_limit = lsn_limit;
		work_item[i].id_usr = -1;
		work_item[i].wi_status = WRK_ITEM_SET;
		work_item[i].wheap = work_heap;
483 484

		ib_wqueue_add(mtflush_ctx->wq,
485 486
			(void *)(&(work_item[i])),
			work_heap);
487 488 489 490
	}

	/* wait on the completion to arrive */
   	for(i=0; i< buf_pool_inst;) {
491
		done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq);
492 493

		if (done_wi != NULL) {
494
			per_pool_pages_flushed[i] = done_wi->n_flushed;
495

496
			if((int)done_wi->id_usr == -1 &&
497 498
			   done_wi->wi_status == WRK_ITEM_SET ) {
				fprintf(stderr,
499
					"**Set/Unused work_item[%lu] flush_type=%d\n",
500 501
					i,
					done_wi->wr.flush_type);
502
				ut_a(0);
503 504 505 506 507 508 509
			}

			n_flushed+= done_wi->n_flushed;
			i++;
		}
	}

510 511 512
	/* Release used work_items and queue nodes */
	mem_heap_free(work_heap);

513
	return(n_flushed);
514 515 516
}

/*******************************************************************//**
517
Multi-threaded version of buf_flush_list
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
*/
bool
buf_mtflu_flush_list(
/*=================*/
	ulint		min_n,		/*!< in: wished minimum mumber of blocks
					flushed (it is not guaranteed that the
					actual number is that big, though) */
	lsn_t		lsn_limit,	/*!< in the case BUF_FLUSH_LIST all
					blocks whose oldest_modification is
					smaller than this should be flushed
					(if their number does not exceed
					min_n), otherwise ignored */
	ulint*		n_processed)	/*!< out: the number of pages
					which were processed is passed
					back to caller. Ignored if NULL */

{
	ulint		i;
	bool		success = true;
537
	ulint		cnt_flush[MTFLUSH_MAX_WORKER];
538 539 540 541 542 543 544 545 546 547 548 549 550 551

	if (n_processed) {
		*n_processed = 0;
	}

	if (min_n != ULINT_MAX) {
		/* Ensure that flushing is spread evenly amongst the
		buffer pool instances. When min_n is ULINT_MAX
		we need to flush everything up to the lsn limit
		so no limit here. */
		min_n = (min_n + srv_buf_pool_instances - 1)
			 / srv_buf_pool_instances;
	}

552
	/* This lock is to safequard against re-entry if any. */
553 554 555 556 557
	os_fast_mutex_lock(&mtflush_mtx);
	buf_mtflu_flush_work_items(srv_buf_pool_instances,
                cnt_flush, BUF_FLUSH_LIST,
                min_n, lsn_limit);
	os_fast_mutex_unlock(&mtflush_mtx);
558 559 560

	for (i = 0; i < srv_buf_pool_instances; i++) {
		if (n_processed) {
561
			*n_processed += cnt_flush[i];
562
		}
563
		if (cnt_flush[i]) {
564 565 566 567
			MONITOR_INC_VALUE_CUMULATIVE(
				MONITOR_FLUSH_BATCH_TOTAL_PAGE,
				MONITOR_FLUSH_BATCH_COUNT,
				MONITOR_FLUSH_BATCH_PAGES,
568
				cnt_flush[i]);
569 570 571
		}
	}
#ifdef UNIV_DEBUG
572 573
	fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n",
		__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed);
574 575 576 577 578
#endif
	return(success);
}

/*********************************************************************//**
579
Clears up tail of the LRU lists:
580 581 582 583 584
* Put replaceable pages at the tail of LRU to the free list
* Flush dirty pages at the tail of LRU to the disk
The depth to which we scan each buffer pool is controlled by dynamic
config parameter innodb_LRU_scan_depth.
@return total pages flushed */
585
UNIV_INTERN
586 587 588 589
ulint
buf_mtflu_flush_LRU_tail(void)
/*==========================*/
{
590 591
	ulint	total_flushed=0, i;
	ulint	cnt_flush[MTFLUSH_MAX_WORKER];
592

593
	ut_a(buf_mtflu_init_done());
594

595
	/* This lock is to safeguard against re-entry if any */
596 597
	os_fast_mutex_lock(&mtflush_mtx);
	buf_mtflu_flush_work_items(srv_buf_pool_instances,
598
		cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
599
	os_fast_mutex_unlock(&mtflush_mtx);
600 601 602 603 604 605

	for (i = 0; i < srv_buf_pool_instances; i++) {
		if (cnt_flush[i]) {
			total_flushed += cnt_flush[i];

			MONITOR_INC_VALUE_CUMULATIVE(
606
			        MONITOR_LRU_BATCH_TOTAL_PAGE,
607 608 609 610 611 612 613
			        MONITOR_LRU_BATCH_COUNT,
			        MONITOR_LRU_BATCH_PAGES,
			        cnt_flush[i]);
		}
	}

#if UNIV_DEBUG
614 615
	fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", (
			srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed);
616 617 618 619 620
#endif

	return(total_flushed);
}

621 622 623 624 625 626 627 628 629
/*********************************************************************//**
Set correct thread identifiers to io thread array based on
information we have. */
void
buf_mtflu_set_thread_ids(
/*=====================*/
	ulint		n_threads,	/*!<in: Number of threads to fill */
        void*		ctx,		/*!<in: thread context */
	os_thread_id_t*	thread_ids)	/*!<in: thread id array */
630
{
631 632 633 634
	thread_sync_t *mtflush_io = ((thread_sync_t *)ctx);
	ulint i;
	ut_a(mtflush_io != NULL);
	ut_a(thread_ids != NULL);
635

636 637
	for(i = 0; i < n_threads; i++) {
		thread_ids[i] = mtflush_io[i].wthread_id;
638 639
	}
}