journal.c 39.2 KB
Newer Older
1 2 3 4 5 6 7 8
// SPDX-License-Identifier: GPL-2.0
/*
 * bcachefs journalling code, for btree insertions
 *
 * Copyright 2012 Google, Inc.
 */

#include "bcachefs.h"
9
#include "alloc_foreground.h"
10 11
#include "bkey_methods.h"
#include "btree_gc.h"
12
#include "btree_update.h"
13
#include "btree_write_buffer.h"
14
#include "buckets.h"
15
#include "error.h"
16 17 18
#include "journal.h"
#include "journal_io.h"
#include "journal_reclaim.h"
19
#include "journal_sb.h"
20 21 22
#include "journal_seq_blacklist.h"
#include "trace.h"

23
static const char * const bch2_journal_errors[] = {
24
#define x(n)	#n,
25
	JOURNAL_ERRORS()
26
#undef x
27 28 29
	NULL
};

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
{
	return seq > j->seq_ondisk;
}

static bool __journal_entry_is_open(union journal_res_state state)
{
	return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
}

static inline unsigned nr_unwritten_journal_entries(struct journal *j)
{
	return atomic64_read(&j->seq) - j->seq_ondisk;
}

static bool journal_entry_is_open(struct journal *j)
{
	return __journal_entry_is_open(j->reservations);
}

50 51 52 53 54 55
static void bch2_journal_buf_to_text(struct printbuf *out, struct journal *j, u64 seq)
{
	union journal_res_state s = READ_ONCE(j->reservations);
	unsigned i = seq & JOURNAL_BUF_MASK;
	struct journal_buf *buf = j->buf + i;

56
	prt_printf(out, "seq:\t%llu\n", seq);
57 58
	printbuf_indent_add(out, 2);

59
	prt_printf(out, "refcount:\t%u\n", journal_state_count(s, i));
60

61
	prt_printf(out, "size:\t");
62 63 64
	prt_human_readable_u64(out, vstruct_bytes(buf->data));
	prt_newline(out);

65 66
	prt_printf(out, "expires:\t");
	prt_printf(out, "%li jiffies\n", buf->expires - jiffies);
67

68
	prt_printf(out, "flags:\t");
69 70 71 72 73 74 75 76 77
	if (buf->noflush)
		prt_str(out, "noflush ");
	if (buf->must_flush)
		prt_str(out, "must_flush ");
	if (buf->separate_flush)
		prt_str(out, "separate_flush ");
	if (buf->need_flush_to_write_buffer)
		prt_str(out, "need_flush_to_write_buffer ");
	if (buf->write_started)
78
		prt_str(out, "write_started ");
79
	if (buf->write_allocated)
80
		prt_str(out, "write_allocated ");
81
	if (buf->write_done)
82
		prt_str(out, "write_done");
83
	prt_newline(out);
84

85 86 87 88 89 90 91 92 93 94 95 96
	printbuf_indent_sub(out, 2);
}

static void bch2_journal_bufs_to_text(struct printbuf *out, struct journal *j)
{
	if (!out->nr_tabstops)
		printbuf_tabstop_push(out, 24);

	for (u64 seq = journal_last_unwritten_seq(j);
	     seq <= journal_cur_seq(j);
	     seq++)
		bch2_journal_buf_to_text(out, j, seq);
97
	prt_printf(out, "last buf %s\n", journal_entry_is_open(j) ? "open" : "closed");
98 99
}

100 101 102 103 104 105 106 107
static inline struct journal_buf *
journal_seq_to_buf(struct journal *j, u64 seq)
{
	struct journal_buf *buf = NULL;

	EBUG_ON(seq > journal_cur_seq(j));

	if (journal_seq_unwritten(j, seq)) {
108
		buf = j->buf + (seq & JOURNAL_BUF_MASK);
109 110 111 112 113
		EBUG_ON(le64_to_cpu(buf->data->seq) != seq);
	}
	return buf;
}

114
static void journal_pin_list_init(struct journal_entry_pin_list *p, int count)
115
{
116
	unsigned i;
117

118 119
	for (i = 0; i < ARRAY_SIZE(p->list); i++)
		INIT_LIST_HEAD(&p->list[i]);
120 121 122 123
	INIT_LIST_HEAD(&p->flushed);
	atomic_set(&p->count, count);
	p->devs.nr = 0;
}
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
/*
 * Detect stuck journal conditions and trigger shutdown. Technically the journal
 * can end up stuck for a variety of reasons, such as a blocked I/O, journal
 * reservation lockup, etc. Since this is a fatal error with potentially
 * unpredictable characteristics, we want to be fairly conservative before we
 * decide to shut things down.
 *
 * Consider the journal stuck when it appears full with no ability to commit
 * btree transactions, to discard journal buckets, nor acquire priority
 * (reserved watermark) reservation.
 */
static inline bool
journal_error_check_stuck(struct journal *j, int error, unsigned flags)
{
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
	bool stuck = false;
	struct printbuf buf = PRINTBUF;

	if (!(error == JOURNAL_ERR_journal_full ||
	      error == JOURNAL_ERR_journal_pin_full) ||
	    nr_unwritten_journal_entries(j) ||
146
	    (flags & BCH_WATERMARK_MASK) != BCH_WATERMARK_reclaim)
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
		return stuck;

	spin_lock(&j->lock);

	if (j->can_discard) {
		spin_unlock(&j->lock);
		return stuck;
	}

	stuck = true;

	/*
	 * The journal shutdown path will set ->err_seq, but do it here first to
	 * serialize against concurrent failures and avoid duplicate error
	 * reports.
	 */
	if (j->err_seq) {
		spin_unlock(&j->lock);
		return stuck;
	}
	j->err_seq = journal_cur_seq(j);
	spin_unlock(&j->lock);

	bch_err(c, "Journal stuck! Hava a pre-reservation but journal full (error %s)",
		bch2_journal_errors[error]);
	bch2_journal_debug_to_text(&buf, j);
	bch_err(c, "%s", buf.buf);

	printbuf_reset(&buf);
	bch2_journal_pins_to_text(&buf, j);
	bch_err(c, "Journal pins:\n%s", buf.buf);
	printbuf_exit(&buf);

	bch2_fatal_error(c);
	dump_stack();

	return stuck;
}

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
void bch2_journal_do_writes(struct journal *j)
{
	for (u64 seq = journal_last_unwritten_seq(j);
	     seq <= journal_cur_seq(j);
	     seq++) {
		unsigned idx = seq & JOURNAL_BUF_MASK;
		struct journal_buf *w = j->buf + idx;

		if (w->write_started && !w->write_allocated)
			break;
		if (w->write_started)
			continue;

		if (!journal_state_count(j->reservations, idx)) {
			w->write_started = true;
			closure_call(&w->io, bch2_journal_write, j->wq, NULL);
		}

		break;
	}
}

208 209 210 211 212
/*
 * Final processing when the last reference of a journal buffer has been
 * dropped. Drop the pin list reference acquired at journal entry open and write
 * the buffer, if requested.
 */
213
void bch2_journal_buf_put_final(struct journal *j, u64 seq)
214
{
215 216 217 218
	lockdep_assert_held(&j->lock);

	if (__bch2_journal_pin_put(j, seq))
		bch2_journal_reclaim_fast(j);
219
	bch2_journal_do_writes(j);
220 221 222 223
}

/*
 * Returns true if journal entry is now closed:
224 225 226
 *
 * We don't close a journal_buf until the next journal_buf is finished writing,
 * and can be opened again - this also initializes the next journal_buf:
227
 */
228
static void __journal_entry_close(struct journal *j, unsigned closed_val, bool trace)
229 230
{
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
231
	struct journal_buf *buf = journal_cur_buf(j);
232 233
	union journal_res_state old, new;
	u64 v = atomic64_read(&j->reservations.counter);
234
	unsigned sectors;
235

236 237 238
	BUG_ON(closed_val != JOURNAL_ENTRY_CLOSED_VAL &&
	       closed_val != JOURNAL_ENTRY_ERROR_VAL);

239 240 241 242
	lockdep_assert_held(&j->lock);

	do {
		old.v = new.v = v;
243
		new.cur_entry_offset = closed_val;
244

245 246
		if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL ||
		    old.cur_entry_offset == new.cur_entry_offset)
247
			return;
248 249 250
	} while ((v = atomic64_cmpxchg(&j->reservations.counter,
				       old.v, new.v)) != old.v);

251 252 253
	if (!__journal_entry_is_open(old))
		return;

254
	/* Close out old buffer: */
255 256
	buf->data->u64s		= cpu_to_le32(old.cur_entry_offset);

257 258 259 260 261 262 263
	if (trace_journal_entry_close_enabled() && trace) {
		struct printbuf pbuf = PRINTBUF;
		pbuf.atomic++;

		prt_str(&pbuf, "entry size: ");
		prt_human_readable_u64(&pbuf, vstruct_bytes(buf->data));
		prt_newline(&pbuf);
264
		bch2_prt_task_backtrace(&pbuf, current, 1, GFP_NOWAIT);
265 266 267
		trace_journal_entry_close(c, pbuf.buf);
		printbuf_exit(&pbuf);
	}
268

269 270 271 272
	sectors = vstruct_blocks_plus(buf->data, c->block_bits,
				      buf->u64s_reserved) << c->block_bits;
	BUG_ON(sectors > buf->sectors);
	buf->sectors = sectors;
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
	/*
	 * We have to set last_seq here, _before_ opening a new journal entry:
	 *
	 * A threads may replace an old pin with a new pin on their current
	 * journal reservation - the expectation being that the journal will
	 * contain either what the old pin protected or what the new pin
	 * protects.
	 *
	 * After the old pin is dropped journal_last_seq() won't include the old
	 * pin, so we can only write the updated last_seq on the entry that
	 * contains whatever the new pin protects.
	 *
	 * Restated, we can _not_ update last_seq for a given entry if there
	 * could be a newer entry open with reservations/pins that have been
	 * taken against it.
	 *
	 * Hence, we want update/set last_seq on the current journal entry right
	 * before we open a new one:
	 */
293 294
	buf->last_seq		= journal_last_seq(j);
	buf->data->last_seq	= cpu_to_le64(buf->last_seq);
295
	BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq));
296 297 298

	cancel_delayed_work(&j->write_work);

299 300
	bch2_journal_space_available(j);

301
	__bch2_journal_buf_put(j, old.idx, le64_to_cpu(buf->data->seq));
302 303
}

304 305 306
void bch2_journal_halt(struct journal *j)
{
	spin_lock(&j->lock);
307
	__journal_entry_close(j, JOURNAL_ENTRY_ERROR_VAL, true);
308 309
	if (!j->err_seq)
		j->err_seq = journal_cur_seq(j);
310
	journal_wake(j);
311 312 313
	spin_unlock(&j->lock);
}

314 315
static bool journal_entry_want_write(struct journal *j)
{
316
	bool ret = !journal_entry_is_open(j) ||
317
		journal_cur_seq(j) == journal_last_unwritten_seq(j);
318 319

	/* Don't close it yet if we already have a write in flight: */
320
	if (ret)
321
		__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
322
	else if (nr_unwritten_journal_entries(j)) {
323 324 325 326 327 328 329
		struct journal_buf *buf = journal_cur_buf(j);

		if (!buf->flush_time) {
			buf->flush_time	= local_clock() ?: 1;
			buf->expires = jiffies;
		}
	}
330 331 332 333

	return ret;
}

334
bool bch2_journal_entry_close(struct journal *j)
335
{
336
	bool ret;
337

338
	spin_lock(&j->lock);
339
	ret = journal_entry_want_write(j);
340
	spin_unlock(&j->lock);
341

342
	return ret;
343 344 345 346 347 348 349 350
}

/*
 * should _only_ called from journal_res_get() - when we actually want a
 * journal reservation - journal entry is open means journal is dirty:
 */
static int journal_entry_open(struct journal *j)
{
351
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
352 353
	struct journal_buf *buf = j->buf +
		((journal_cur_seq(j) + 1) & JOURNAL_BUF_MASK);
354
	union journal_res_state old, new;
355
	int u64s;
356 357 358 359
	u64 v;

	lockdep_assert_held(&j->lock);
	BUG_ON(journal_entry_is_open(j));
360
	BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb));
361

362
	if (j->blocked)
363
		return JOURNAL_ERR_blocked;
364

365 366
	if (j->cur_entry_error)
		return j->cur_entry_error;
367

368
	if (bch2_journal_error(j))
369
		return JOURNAL_ERR_insufficient_devices; /* -EROFS */
370

371
	if (!fifo_free(&j->pin))
372
		return JOURNAL_ERR_journal_pin_full;
373

374
	if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf))
375
		return JOURNAL_ERR_max_in_flight;
376

377
	BUG_ON(!j->cur_entry_sectors);
378

379 380 381 382
	buf->expires		=
		(journal_cur_seq(j) == j->flushed_seq_ondisk
		 ? jiffies
		 : j->last_flush_write) +
383
		msecs_to_jiffies(c->opts.journal_flush_delay);
384

385
	buf->u64s_reserved	= j->entry_u64s_reserved;
386 387
	buf->disk_sectors	= j->cur_entry_sectors;
	buf->sectors		= min(buf->disk_sectors, buf->buf_size >> 9);
388

389 390
	u64s = (int) (buf->sectors << 9) / sizeof(u64) -
		journal_entry_overhead(j);
391
	u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
392

393
	if (u64s <= (ssize_t) j->early_journal_entries.nr)
394
		return JOURNAL_ERR_journal_full;
395

396 397 398
	if (fifo_empty(&j->pin) && j->reclaim_thread)
		wake_up_process(j->reclaim_thread);

399 400 401 402 403 404 405
	/*
	 * The fifo_push() needs to happen at the same time as j->seq is
	 * incremented for journal_last_seq() to be calculated correctly
	 */
	atomic64_inc(&j->seq);
	journal_pin_list_init(fifo_push_ref(&j->pin), 1);

406 407
	BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq));

408 409 410
	BUG_ON(j->buf + (journal_cur_seq(j) & JOURNAL_BUF_MASK) != buf);

	bkey_extent_init(&buf->key);
411 412 413 414
	buf->noflush		= false;
	buf->must_flush		= false;
	buf->separate_flush	= false;
	buf->flush_time		= 0;
415
	buf->need_flush_to_write_buffer = true;
416 417 418
	buf->write_started	= false;
	buf->write_allocated	= false;
	buf->write_done		= false;
419 420 421 422 423

	memset(buf->data, 0, sizeof(*buf->data));
	buf->data->seq	= cpu_to_le64(journal_cur_seq(j));
	buf->data->u64s	= 0;

424 425 426 427 428 429
	if (j->early_journal_entries.nr) {
		memcpy(buf->data->_data, j->early_journal_entries.data,
		       j->early_journal_entries.nr * sizeof(u64));
		le32_add_cpu(&buf->data->u64s, j->early_journal_entries.nr);
	}

430 431 432 433 434 435 436 437 438
	/*
	 * Must be set before marking the journal entry as open:
	 */
	j->cur_entry_u64s = u64s;

	v = atomic64_read(&j->reservations.counter);
	do {
		old.v = new.v = v;

439
		BUG_ON(old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL);
440 441

		new.idx++;
442
		BUG_ON(journal_state_count(new, new.idx));
443
		BUG_ON(new.idx != (journal_cur_seq(j) & JOURNAL_BUF_MASK));
444

445
		journal_state_inc(&new);
446 447 448

		/* Handle any already added entries */
		new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
449 450 451
	} while ((v = atomic64_cmpxchg(&j->reservations.counter,
				       old.v, new.v)) != old.v);

452 453 454 455
	if (nr_unwritten_journal_entries(j) == 1)
		mod_delayed_work(j->wq,
				 &j->write_work,
				 msecs_to_jiffies(c->opts.journal_flush_delay));
456
	journal_wake(j);
457 458 459

	if (j->early_journal_entries.nr)
		darray_exit(&j->early_journal_entries);
460
	return 0;
461 462
}

463 464
static bool journal_quiesced(struct journal *j)
{
465
	bool ret = atomic64_read(&j->seq) == j->seq_ondisk;
466

467
	if (!ret)
468
		bch2_journal_entry_close(j);
469 470 471 472 473 474 475 476
	return ret;
}

static void journal_quiesce(struct journal *j)
{
	wait_event(j->wait, journal_quiesced(j));
}

477 478 479
static void journal_write_work(struct work_struct *work)
{
	struct journal *j = container_of(work, struct journal, write_work.work);
480 481

	spin_lock(&j->lock);
482 483
	if (__journal_entry_is_open(j->reservations)) {
		long delta = journal_cur_buf(j)->expires - jiffies;
484

485 486 487 488 489
		if (delta > 0)
			mod_delayed_work(j->wq, &j->write_work, delta);
		else
			__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
	}
490
	spin_unlock(&j->lock);
491 492 493
}

static int __journal_res_get(struct journal *j, struct journal_res *res,
494
			     unsigned flags)
495 496 497
{
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
	struct journal_buf *buf;
498
	bool can_discard;
499 500
	int ret;
retry:
501 502
	if (journal_res_get_fast(j, res, flags))
		return 0;
503

504 505 506 507 508 509
	if (bch2_journal_error(j))
		return -BCH_ERR_erofs_journal_err;

	if (j->blocked)
		return -BCH_ERR_journal_res_get_blocked;

510 511 512 513 514
	if ((flags & BCH_WATERMARK_MASK) < j->watermark) {
		ret = JOURNAL_ERR_journal_full;
		can_discard = j->can_discard;
		goto out;
	}
515

516 517 518
	if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) && !journal_entry_is_open(j)) {
		ret = JOURNAL_ERR_max_in_flight;
		goto out;
519 520
	}

521 522
	spin_lock(&j->lock);

523 524
	/*
	 * Recheck after taking the lock, so we don't race with another thread
525
	 * that just did journal_entry_open() and call bch2_journal_entry_close()
526 527
	 * unnecessarily
	 */
528
	if (journal_res_get_fast(j, res, flags)) {
529
		ret = 0;
530 531 532
		goto unlock;
	}

533 534 535 536 537 538 539
	/*
	 * If we couldn't get a reservation because the current buf filled up,
	 * and we had room for a bigger entry on disk, signal that we want to
	 * realloc the journal bufs:
	 */
	buf = journal_cur_buf(j);
	if (journal_entry_is_open(j) &&
540 541 542
	    buf->buf_size >> 9 < buf->disk_sectors &&
	    buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
		j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
543

544
	__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, false);
545
	ret = journal_entry_open(j) ?: JOURNAL_ERR_retry;
546
unlock:
547
	can_discard = j->can_discard;
548
	spin_unlock(&j->lock);
549 550
out:
	if (ret == JOURNAL_ERR_retry)
551
		goto retry;
552 553 554
	if (!ret)
		return 0;

555 556
	if (journal_error_check_stuck(j, ret, flags))
		ret = -BCH_ERR_journal_res_get_blocked;
557

558
	if (ret == JOURNAL_ERR_max_in_flight &&
559
	    track_event_change(&c->times[BCH_TIME_blocked_journal_max_in_flight], true)) {
560 561 562 563 564 565 566 567 568

		struct printbuf buf = PRINTBUF;
		prt_printf(&buf, "seq %llu\n", journal_cur_seq(j));
		bch2_journal_bufs_to_text(&buf, j);
		trace_journal_entry_full(c, buf.buf);
		printbuf_exit(&buf);
		count_event(c, journal_entry_full);
	}

569 570 571 572
	/*
	 * Journal is full - can't rely on reclaim from work item due to
	 * freezing:
	 */
573 574
	if ((ret == JOURNAL_ERR_journal_full ||
	     ret == JOURNAL_ERR_journal_pin_full) &&
575 576 577 578
	    !(flags & JOURNAL_RES_GET_NONBLOCK)) {
		if (can_discard) {
			bch2_journal_do_discards(j);
			goto retry;
579 580
		}

581 582 583 584
		if (mutex_trylock(&j->reclaim_lock)) {
			bch2_journal_reclaim(j);
			mutex_unlock(&j->reclaim_lock);
		}
585
	}
586

587
	return ret == JOURNAL_ERR_insufficient_devices
588
		? -BCH_ERR_erofs_journal_err
589
		: -BCH_ERR_journal_res_get_blocked;
590 591 592 593 594 595 596 597 598 599 600 601 602
}

/*
 * Essentially the entry function to the journaling code. When bcachefs is doing
 * a btree insert, it calls this function to get the current journal write.
 * Journal write is the structure used set up journal writes. The calling
 * function will then add its keys to the structure, queuing them for the next
 * write.
 *
 * To ensure forward progress, the current task must not be holding any
 * btree node write locks.
 */
int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
603
				  unsigned flags)
604 605 606
{
	int ret;

607
	closure_wait_event(&j->async_wait,
608
		   (ret = __journal_res_get(j, res, flags)) != -BCH_ERR_journal_res_get_blocked ||
609 610
		   (flags & JOURNAL_RES_GET_NONBLOCK));
	return ret;
611 612
}

613 614 615 616 617 618 619 620 621 622 623 624 625
/* journal_entry_res: */

void bch2_journal_entry_res_resize(struct journal *j,
				   struct journal_entry_res *res,
				   unsigned new_u64s)
{
	union journal_res_state state;
	int d = new_u64s - res->u64s;

	spin_lock(&j->lock);

	j->entry_u64s_reserved += d;
	if (d <= 0)
626
		goto out;
627

628
	j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
629 630 631 632 633 634 635 636 637
	smp_mb();
	state = READ_ONCE(j->reservations);

	if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
	    state.cur_entry_offset > j->cur_entry_u64s) {
		j->cur_entry_u64s += d;
		/*
		 * Not enough room in current journal entry, have to flush it:
		 */
638
		__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
639 640
	} else {
		journal_cur_buf(j)->u64s_reserved += d;
641 642
	}
out:
643
	spin_unlock(&j->lock);
644 645 646 647 648
	res->u64s += d;
}

/* journal flushing: */

649 650
/**
 * bch2_journal_flush_seq_async - wait for a journal entry to be written
651 652 653 654 655
 * @j:		journal object
 * @seq:	seq to flush
 * @parent:	closure object to wait with
 * Returns:	1 if @seq has already been flushed, 0 if @seq is being flushed,
 *		-EIO if @seq will never be flushed
656
 *
657
 * Like bch2_journal_wait_on_seq, except that it triggers a write immediately if
658 659
 * necessary
 */
660
int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
661
				 struct closure *parent)
662 663
{
	struct journal_buf *buf;
664
	int ret = 0;
665

666
	if (seq <= j->flushed_seq_ondisk)
667 668
		return 1;

669
	spin_lock(&j->lock);
670

671 672 673 674
	if (WARN_ONCE(seq > journal_cur_seq(j),
		      "requested to flush journal seq %llu, but currently at %llu",
		      seq, journal_cur_seq(j)))
		goto out;
675

676
	/* Recheck under lock: */
677
	if (j->err_seq && seq >= j->err_seq) {
678 679 680 681
		ret = -EIO;
		goto out;
	}

682
	if (seq <= j->flushed_seq_ondisk) {
683 684 685
		ret = 1;
		goto out;
	}
686

687
	/* if seq was written, but not flushed - flush a newer one instead */
688
	seq = max(seq, journal_last_unwritten_seq(j));
689 690

recheck_need_open:
691
	if (seq > journal_cur_seq(j)) {
692 693
		struct journal_res res = { 0 };

694
		if (journal_entry_is_open(j))
695
			__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
696

697 698
		spin_unlock(&j->lock);

699 700 701 702 703 704
		/*
		 * We're called from bch2_journal_flush_seq() -> wait_event();
		 * but this might block. We won't usually block, so we won't
		 * livelock:
		 */
		sched_annotate_sleep();
705 706 707 708 709
		ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
		if (ret)
			return ret;

		seq = res.seq;
710
		buf = journal_seq_to_buf(j, seq);
711
		buf->must_flush = true;
712 713 714 715 716

		if (!buf->flush_time) {
			buf->flush_time	= local_clock() ?: 1;
			buf->expires = jiffies;
		}
717 718

		if (parent && !closure_wait(&buf->wait, parent))
719 720
			BUG();

721 722 723 724 725 726 727
		bch2_journal_res_put(j, &res);

		spin_lock(&j->lock);
		goto want_write;
	}

	/*
728 729
	 * if write was kicked off without a flush, or if we promised it
	 * wouldn't be a flush, flush the next sequence number instead
730 731 732 733 734 735 736 737 738 739 740 741
	 */
	buf = journal_seq_to_buf(j, seq);
	if (buf->noflush) {
		seq++;
		goto recheck_need_open;
	}

	buf->must_flush = true;

	if (parent && !closure_wait(&buf->wait, parent))
		BUG();
want_write:
742
	if (seq == journal_cur_seq(j))
743
		journal_entry_want_write(j);
744
out:
745
	spin_unlock(&j->lock);
746 747 748 749 750 751 752 753
	return ret;
}

int bch2_journal_flush_seq(struct journal *j, u64 seq)
{
	u64 start_time = local_clock();
	int ret, ret2;

754 755 756 757 758 759
	/*
	 * Don't update time_stats when @seq is already flushed:
	 */
	if (seq <= j->flushed_seq_ondisk)
		return 0;

760
	ret = wait_event_interruptible(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)));
761

762 763
	if (!ret)
		bch2_time_stats_update(j->flush_seq_time, start_time);
764 765 766 767 768 769 770 771 772 773

	return ret ?: ret2 < 0 ? ret2 : 0;
}

/*
 * bch2_journal_flush_async - if there is an open journal entry, or a journal
 * still being written, write it and wait for the write to complete
 */
void bch2_journal_flush_async(struct journal *j, struct closure *parent)
{
774
	bch2_journal_flush_seq_async(j, atomic64_read(&j->seq), parent);
775 776 777 778
}

int bch2_journal_flush(struct journal *j)
{
779
	return bch2_journal_flush_seq(j, atomic64_read(&j->seq));
780 781
}

782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
/*
 * bch2_journal_noflush_seq - tell the journal not to issue any flushes before
 * @seq
 */
bool bch2_journal_noflush_seq(struct journal *j, u64 seq)
{
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
	u64 unwritten_seq;
	bool ret = false;

	if (!(c->sb.features & (1ULL << BCH_FEATURE_journal_no_flush)))
		return false;

	if (seq <= c->journal.flushed_seq_ondisk)
		return false;

	spin_lock(&j->lock);
	if (seq <= c->journal.flushed_seq_ondisk)
		goto out;

802
	for (unwritten_seq = journal_last_unwritten_seq(j);
803 804 805 806
	     unwritten_seq < seq;
	     unwritten_seq++) {
		struct journal_buf *buf = journal_seq_to_buf(j, unwritten_seq);

807 808
		/* journal flush already in flight, or flush requseted */
		if (buf->must_flush)
809 810 811 812 813 814 815 816 817 818 819
			goto out;

		buf->noflush = true;
	}

	ret = true;
out:
	spin_unlock(&j->lock);
	return ret;
}

820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
int bch2_journal_meta(struct journal *j)
{
	struct journal_buf *buf;
	struct journal_res res;
	int ret;

	memset(&res, 0, sizeof(res));

	ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
	if (ret)
		return ret;

	buf = j->buf + (res.seq & JOURNAL_BUF_MASK);
	buf->must_flush = true;

	if (!buf->flush_time) {
		buf->flush_time	= local_clock() ?: 1;
		buf->expires = jiffies;
	}

	bch2_journal_res_put(j, &res);

	return bch2_journal_flush_seq(j, res.seq);
}

845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
/* block/unlock the journal: */

void bch2_journal_unblock(struct journal *j)
{
	spin_lock(&j->lock);
	j->blocked--;
	spin_unlock(&j->lock);

	journal_wake(j);
}

void bch2_journal_block(struct journal *j)
{
	spin_lock(&j->lock);
	j->blocked++;
	spin_unlock(&j->lock);

	journal_quiesce(j);
}

865 866 867 868
static struct journal_buf *__bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq)
{
	struct journal_buf *ret = NULL;

869 870
	/* We're inside wait_event(), but using mutex_lock(: */
	sched_annotate_sleep();
871 872 873 874 875 876 877 878 879 880 881 882
	mutex_lock(&j->buf_lock);
	spin_lock(&j->lock);
	max_seq = min(max_seq, journal_cur_seq(j));

	for (u64 seq = journal_last_unwritten_seq(j);
	     seq <= max_seq;
	     seq++) {
		unsigned idx = seq & JOURNAL_BUF_MASK;
		struct journal_buf *buf = j->buf + idx;

		if (buf->need_flush_to_write_buffer) {
			if (seq == journal_cur_seq(j))
883
				__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908

			union journal_res_state s;
			s.v = atomic64_read_acquire(&j->reservations.counter);

			ret = journal_state_count(s, idx)
				? ERR_PTR(-EAGAIN)
				: buf;
			break;
		}
	}

	spin_unlock(&j->lock);
	if (IS_ERR_OR_NULL(ret))
		mutex_unlock(&j->buf_lock);
	return ret;
}

struct journal_buf *bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq)
{
	struct journal_buf *ret;

	wait_event(j->wait, (ret = __bch2_next_write_buffer_flush_journal_buf(j, max_seq)) != ERR_PTR(-EAGAIN));
	return ret;
}

909 910 911 912 913 914 915 916
/* allocate journal on a device: */

static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
					 bool new_fs, struct closure *cl)
{
	struct bch_fs *c = ca->fs;
	struct journal_device *ja = &ca->journal;
	u64 *new_bucket_seq = NULL, *new_buckets = NULL;
917 918
	struct open_bucket **ob = NULL;
	long *bu = NULL;
919
	unsigned i, pos, nr_got = 0, nr_want = nr - ja->nr;
920 921
	int ret = 0;

922
	BUG_ON(nr <= ja->nr);
923

924 925 926 927
	bu		= kcalloc(nr_want, sizeof(*bu), GFP_KERNEL);
	ob		= kcalloc(nr_want, sizeof(*ob), GFP_KERNEL);
	new_buckets	= kcalloc(nr, sizeof(u64), GFP_KERNEL);
	new_bucket_seq	= kcalloc(nr, sizeof(u64), GFP_KERNEL);
928
	if (!bu || !ob || !new_buckets || !new_bucket_seq) {
929
		ret = -BCH_ERR_ENOMEM_set_nr_journal_buckets;
930
		goto err_free;
931
	}
932

933 934 935 936
	for (nr_got = 0; nr_got < nr_want; nr_got++) {
		if (new_fs) {
			bu[nr_got] = bch2_bucket_alloc_new_fs(ca);
			if (bu[nr_got] < 0) {
937
				ret = -BCH_ERR_ENOSPC_bucket_alloc;
938 939 940
				break;
			}
		} else {
941 942
			ob[nr_got] = bch2_bucket_alloc(c, ca, BCH_WATERMARK_normal,
						       BCH_DATA_journal, cl);
943 944
			ret = PTR_ERR_OR_ZERO(ob[nr_got]);
			if (ret)
945 946
				break;

947
			ret = bch2_trans_run(c,
948
				bch2_trans_mark_metadata_bucket(trans, ca,
949
						ob[nr_got]->bucket, BCH_DATA_journal,
950
						ca->mi.bucket_size, BTREE_TRIGGER_transactional));
951 952
			if (ret) {
				bch2_open_bucket_put(c, ob[nr_got]);
953
				bch_err_msg(c, ret, "marking new journal buckets");
954 955 956
				break;
			}

957 958
			bu[nr_got] = ob[nr_got]->bucket;
		}
959
	}
960

961
	if (!nr_got)
962
		goto err_free;
963

964 965 966 967 968 969 970 971
	/* Don't return an error if we successfully allocated some buckets: */
	ret = 0;

	if (c) {
		bch2_journal_flush_all_pins(&c->journal);
		bch2_journal_block(&c->journal);
		mutex_lock(&c->sb_lock);
	}
972 973 974

	memcpy(new_buckets,	ja->buckets,	ja->nr * sizeof(u64));
	memcpy(new_bucket_seq,	ja->bucket_seq,	ja->nr * sizeof(u64));
975 976 977 978 979 980 981 982 983 984 985

	BUG_ON(ja->discard_idx > ja->nr);

	pos = ja->discard_idx ?: ja->nr;

	memmove(new_buckets + pos + nr_got,
		new_buckets + pos,
		sizeof(new_buckets[0]) * (ja->nr - pos));
	memmove(new_bucket_seq + pos + nr_got,
		new_bucket_seq + pos,
		sizeof(new_bucket_seq[0]) * (ja->nr - pos));
986

987
	for (i = 0; i < nr_got; i++) {
988 989
		new_buckets[pos + i] = bu[i];
		new_bucket_seq[pos + i] = 0;
990
	}
991

992
	nr = ja->nr + nr_got;
993

994 995 996
	ret = bch2_journal_buckets_to_sb(c, ca, new_buckets, nr);
	if (ret)
		goto err_unblock;
997

998
	if (!new_fs)
999 1000
		bch2_write_super(c);

1001
	/* Commit: */
1002
	if (c)
1003
		spin_lock(&c->journal.lock);
1004

1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
	swap(new_buckets,	ja->buckets);
	swap(new_bucket_seq,	ja->bucket_seq);
	ja->nr = nr;

	if (pos <= ja->discard_idx)
		ja->discard_idx = (ja->discard_idx + nr_got) % ja->nr;
	if (pos <= ja->dirty_idx_ondisk)
		ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + nr_got) % ja->nr;
	if (pos <= ja->dirty_idx)
		ja->dirty_idx = (ja->dirty_idx + nr_got) % ja->nr;
	if (pos <= ja->cur_idx)
		ja->cur_idx = (ja->cur_idx + nr_got) % ja->nr;
1017

1018
	if (c)
1019 1020 1021 1022
		spin_unlock(&c->journal.lock);
err_unblock:
	if (c) {
		bch2_journal_unblock(&c->journal);
1023
		mutex_unlock(&c->sb_lock);
1024
	}
1025

1026 1027 1028
	if (ret && !new_fs)
		for (i = 0; i < nr_got; i++)
			bch2_trans_run(c,
1029
				bch2_trans_mark_metadata_bucket(trans, ca,
1030
						bu[i], BCH_DATA_free, 0,
1031
						BTREE_TRIGGER_transactional));
1032 1033
err_free:
	if (!new_fs)
1034 1035 1036
		for (i = 0; i < nr_got; i++)
			bch2_open_bucket_put(c, ob[i]);

1037 1038
	kfree(new_bucket_seq);
	kfree(new_buckets);
1039 1040
	kfree(ob);
	kfree(bu);
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
	return ret;
}

/*
 * Allocate more journal space at runtime - not currently making use if it, but
 * the code works:
 */
int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
				unsigned nr)
{
	struct journal_device *ja = &ca->journal;
	struct closure cl;
1053 1054
	int ret = 0;

1055 1056 1057 1058
	closure_init_stack(&cl);

	down_write(&c->state_lock);

1059 1060
	/* don't handle reducing nr of buckets yet: */
	if (nr < ja->nr)
1061
		goto unlock;
1062

1063
	while (ja->nr < nr) {
1064
		struct disk_reservation disk_res = { 0, 0, 0 };
1065 1066 1067 1068 1069 1070

		/*
		 * note: journal buckets aren't really counted as _sectors_ used yet, so
		 * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
		 * when space used goes up without a reservation - but we do need the
		 * reservation to ensure we'll actually be able to allocate:
1071 1072 1073 1074
		 *
		 * XXX: that's not right, disk reservations only ensure a
		 * filesystem-wide allocation will succeed, this is a device
		 * specific allocation - we can hang here:
1075 1076
		 */

1077 1078
		ret = bch2_disk_reservation_get(c, &disk_res,
						bucket_to_sector(ca, nr - ja->nr), 1, 0);
1079 1080
		if (ret)
			break;
1081 1082 1083 1084 1085

		ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);

		bch2_disk_reservation_put(c, &disk_res);

1086 1087 1088 1089
		closure_sync(&cl);

		if (ret && ret != -BCH_ERR_bucket_alloc_blocked)
			break;
1090
	}
1091

1092
	bch_err_fn(c, ret);
1093 1094
unlock:
	up_write(&c->state_lock);
1095 1096 1097 1098 1099 1100
	return ret;
}

int bch2_dev_journal_alloc(struct bch_dev *ca)
{
	unsigned nr;
1101
	int ret;
1102

1103 1104 1105 1106
	if (dynamic_fault("bcachefs:add:journal_alloc")) {
		ret = -BCH_ERR_ENOMEM_set_nr_journal_buckets;
		goto err;
	}
1107

1108 1109 1110
	/* 1/128th of the device by default: */
	nr = ca->mi.nbuckets >> 7;

1111
	/*
1112
	 * clamp journal size to 8192 buckets or 8GB (in sectors), whichever
1113 1114
	 * is smaller:
	 */
1115
	nr = clamp_t(unsigned, nr,
1116
		     BCH_JOURNAL_BUCKETS_MIN,
1117 1118
		     min(1 << 13,
			 (1 << 24) / ca->mi.bucket_size));
1119

1120 1121
	ret = __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
err:
1122
	bch_err_fn(ca, ret);
1123
	return ret;
1124 1125
}

1126 1127
int bch2_fs_journal_alloc(struct bch_fs *c)
{
1128
	for_each_online_member(c, ca) {
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
		if (ca->journal.nr)
			continue;

		int ret = bch2_dev_journal_alloc(ca);
		if (ret) {
			percpu_ref_put(&ca->io_ref);
			return ret;
		}
	}

	return 0;
}

1142 1143 1144 1145
/* startup/shutdown: */

static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
{
1146
	bool ret = false;
1147
	u64 seq;
1148 1149

	spin_lock(&j->lock);
1150 1151 1152 1153
	for (seq = journal_last_unwritten_seq(j);
	     seq <= journal_cur_seq(j) && !ret;
	     seq++) {
		struct journal_buf *buf = journal_seq_to_buf(j, seq);
1154

1155
		if (bch2_bkey_has_device_c(bkey_i_to_s_c(&buf->key), dev_idx))
1156 1157
			ret = true;
	}
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
	spin_unlock(&j->lock);

	return ret;
}

void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
{
	wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
}

void bch2_fs_journal_stop(struct journal *j)
{
1170
	bch2_journal_reclaim_stop(j);
1171 1172
	bch2_journal_flush_all_pins(j);

1173
	wait_event(j->wait, bch2_journal_entry_close(j));
1174

1175 1176 1177 1178 1179
	/*
	 * Always write a new journal entry, to make sure the clock hands are up
	 * to date (and match the superblock)
	 */
	bch2_journal_meta(j);
1180

1181
	journal_quiesce(j);
1182
	cancel_delayed_work_sync(&j->write_work);
1183

1184
	BUG_ON(!bch2_journal_error(j) &&
1185
	       test_bit(JOURNAL_replay_done, &j->flags) &&
1186
	       j->last_empty_seq != journal_cur_seq(j));
1187

1188
	if (!bch2_journal_error(j))
1189
		clear_bit(JOURNAL_running, &j->flags);
1190 1191
}

1192
int bch2_fs_journal_start(struct journal *j, u64 cur_seq)
1193
{
1194
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
1195
	struct journal_entry_pin_list *p;
1196 1197 1198
	struct journal_replay *i, **_i;
	struct genradix_iter iter;
	bool had_entries = false;
1199 1200
	u64 last_seq = cur_seq, nr, seq;

1201 1202 1203
	genradix_for_each_reverse(&c->journal_entries, iter, _i) {
		i = *_i;

1204
		if (journal_replay_ignore(i))
1205 1206 1207 1208 1209
			continue;

		last_seq = le64_to_cpu(i->j.last_seq);
		break;
	}
1210 1211 1212 1213 1214 1215 1216 1217

	nr = cur_seq - last_seq;

	if (nr + 1 > j->pin.size) {
		free_fifo(&j->pin);
		init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL);
		if (!j->pin.data) {
			bch_err(c, "error reallocating journal fifo (%llu open entries)", nr);
1218
			return -BCH_ERR_ENOMEM_journal_pin_fifo;
1219 1220 1221
		}
	}

1222 1223
	j->replay_journal_seq	= last_seq;
	j->replay_journal_seq_end = cur_seq;
1224
	j->last_seq_ondisk	= last_seq;
1225
	j->flushed_seq_ondisk	= cur_seq - 1;
1226
	j->seq_ondisk		= cur_seq - 1;
1227 1228 1229 1230
	j->pin.front		= last_seq;
	j->pin.back		= cur_seq;
	atomic64_set(&j->seq, cur_seq - 1);

1231 1232
	fifo_for_each_entry_ptr(p, &j->pin, seq)
		journal_pin_list_init(p, 1);
1233

1234 1235 1236
	genradix_for_each(&c->journal_entries, iter, _i) {
		i = *_i;

1237
		if (journal_replay_ignore(i))
1238
			continue;
1239

1240
		seq = le64_to_cpu(i->j.seq);
1241
		BUG_ON(seq >= cur_seq);
1242

1243 1244
		if (seq < last_seq)
			continue;
1245

1246 1247 1248
		if (journal_entry_empty(&i->j))
			j->last_empty_seq = le64_to_cpu(i->j.seq);

1249 1250 1251
		p = journal_seq_pin(j, seq);

		p->devs.nr = 0;
1252 1253
		darray_for_each(i->ptrs, ptr)
			bch2_dev_list_add_dev(&p->devs, ptr->dev);
1254 1255

		had_entries = true;
1256
	}
1257

1258
	if (!had_entries)
1259 1260
		j->last_empty_seq = cur_seq;

1261 1262
	spin_lock(&j->lock);

1263
	set_bit(JOURNAL_running, &j->flags);
1264
	j->last_flush_write = jiffies;
1265

1266 1267
	j->reservations.idx = j->reservations.unwritten_idx = journal_cur_seq(j);
	j->reservations.unwritten_idx++;
1268

1269 1270
	c->last_bucket_seq_cleanup = journal_cur_seq(j);

1271
	bch2_journal_space_available(j);
1272 1273
	spin_unlock(&j->lock);

1274
	return bch2_journal_reclaim_start(j);
1275 1276 1277 1278 1279 1280
}

/* init/exit: */

void bch2_dev_journal_exit(struct bch_dev *ca)
{
1281 1282 1283 1284 1285 1286
	struct journal_device *ja = &ca->journal;

	for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) {
		kfree(ja->bio[i]);
		ja->bio[i] = NULL;
	}
1287

1288 1289 1290 1291
	kfree(ja->buckets);
	kfree(ja->bucket_seq);
	ja->buckets	= NULL;
	ja->bucket_seq	= NULL;
1292 1293 1294 1295 1296 1297
}

int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
{
	struct journal_device *ja = &ca->journal;
	struct bch_sb_field_journal *journal_buckets =
1298
		bch2_sb_field_get(sb, journal);
1299
	struct bch_sb_field_journal_v2 *journal_buckets_v2 =
1300
		bch2_sb_field_get(sb, journal_v2);
1301

1302 1303 1304 1305 1306
	ja->nr = 0;

	if (journal_buckets_v2) {
		unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2);

1307
		for (unsigned i = 0; i < nr; i++)
1308 1309 1310 1311
			ja->nr += le64_to_cpu(journal_buckets_v2->d[i].nr);
	} else if (journal_buckets) {
		ja->nr = bch2_nr_journal_buckets(journal_buckets);
	}
1312 1313 1314

	ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
	if (!ja->bucket_seq)
1315
		return -BCH_ERR_ENOMEM_dev_journal_init;
1316

1317
	unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE);
1318

1319
	for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) {
1320 1321
		ja->bio[i] = kmalloc(struct_size(ja->bio[i], bio.bi_inline_vecs,
				     nr_bvecs), GFP_KERNEL);
1322 1323
		if (!ja->bio[i])
			return -BCH_ERR_ENOMEM_dev_journal_init;
1324 1325 1326 1327

		ja->bio[i]->ca = ca;
		ja->bio[i]->buf_idx = i;
		bio_init(&ja->bio[i]->bio, NULL, ja->bio[i]->bio.bi_inline_vecs, nr_bvecs, 0);
1328
	}
1329 1330 1331

	ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
	if (!ja->buckets)
1332
		return -BCH_ERR_ENOMEM_dev_journal_init;
1333

1334 1335
	if (journal_buckets_v2) {
		unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2);
1336
		unsigned dst = 0;
1337

1338 1339
		for (unsigned i = 0; i < nr; i++)
			for (unsigned j = 0; j < le64_to_cpu(journal_buckets_v2->d[i].nr); j++)
1340 1341 1342
				ja->buckets[dst++] =
					le64_to_cpu(journal_buckets_v2->d[i].start) + j;
	} else if (journal_buckets) {
1343
		for (unsigned i = 0; i < ja->nr; i++)
1344 1345
			ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
	}
1346 1347 1348 1349 1350 1351

	return 0;
}

void bch2_fs_journal_exit(struct journal *j)
{
1352 1353
	if (j->wq)
		destroy_workqueue(j->wq);
1354

1355 1356
	darray_exit(&j->early_journal_entries);

1357
	for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++)
Kent Overstreet's avatar
Kent Overstreet committed
1358
		kvfree(j->buf[i].data);
1359 1360 1361 1362 1363 1364 1365
	free_fifo(&j->pin);
}

int bch2_fs_journal_init(struct journal *j)
{
	static struct lock_class_key res_key;

1366
	mutex_init(&j->buf_lock);
1367 1368 1369 1370
	spin_lock_init(&j->lock);
	spin_lock_init(&j->err_lock);
	init_waitqueue_head(&j->wait);
	INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1371
	init_waitqueue_head(&j->reclaim_wait);
1372
	init_waitqueue_head(&j->pin_flush_wait);
1373
	mutex_init(&j->reclaim_lock);
1374
	mutex_init(&j->discard_lock);
1375 1376 1377 1378 1379 1380 1381

	lockdep_init_map(&j->res_map, "journal res", &res_key, 0);

	atomic64_set(&j->reservations.counter,
		((union journal_res_state)
		 { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);

1382 1383
	if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)))
		return -BCH_ERR_ENOMEM_journal_pin_fifo;
1384

1385
	for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) {
1386
		j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN;
Kent Overstreet's avatar
Kent Overstreet committed
1387
		j->buf[i].data = kvmalloc(j->buf[i].buf_size, GFP_KERNEL);
1388 1389
		if (!j->buf[i].data)
			return -BCH_ERR_ENOMEM_journal_buf;
1390
		j->buf[i].idx = i;
1391 1392
	}

1393
	j->pin.front = j->pin.back = 1;
1394 1395 1396 1397 1398

	j->wq = alloc_workqueue("bcachefs_journal",
				WQ_HIGHPRI|WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 512);
	if (!j->wq)
		return -BCH_ERR_ENOMEM_fs_other_alloc;
1399
	return 0;
1400 1401 1402 1403
}

/* debug: */

1404 1405 1406 1407 1408 1409 1410
static const char * const bch2_journal_flags_strs[] = {
#define x(n)	#n,
	JOURNAL_FLAGS()
#undef x
	NULL
};

1411
void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1412 1413
{
	struct bch_fs *c = container_of(j, struct bch_fs, journal);
1414
	union journal_res_state s;
1415
	unsigned long now = jiffies;
1416
	u64 nr_writes = j->nr_flush_writes + j->nr_noflush_writes;
1417

1418 1419
	if (!out->nr_tabstops)
		printbuf_tabstop_push(out, 24);
1420 1421
	out->atomic++;

1422
	rcu_read_lock();
1423
	s = READ_ONCE(j->reservations);
1424

1425 1426 1427
	prt_printf(out, "flags:\t");
	prt_bitflags(out, bch2_journal_flags_strs, j->flags);
	prt_newline(out);
1428
	prt_printf(out, "dirty journal entries:\t%llu/%llu\n",	fifo_used(&j->pin), j->pin.size);
1429 1430 1431
	prt_printf(out, "seq:\t%llu\n",				journal_cur_seq(j));
	prt_printf(out, "seq_ondisk:\t%llu\n",			j->seq_ondisk);
	prt_printf(out, "last_seq:\t%llu\n",			journal_last_seq(j));
1432
	prt_printf(out, "last_seq_ondisk:\t%llu\n",		j->last_seq_ondisk);
1433
	prt_printf(out, "flushed_seq_ondisk:\t%llu\n",		j->flushed_seq_ondisk);
1434
	prt_printf(out, "watermark:\t%s\n",			bch2_watermarks[j->watermark]);
1435
	prt_printf(out, "each entry reserved:\t%u\n",		j->entry_u64s_reserved);
1436
	prt_printf(out, "nr flush writes:\t%llu\n",		j->nr_flush_writes);
1437 1438 1439 1440 1441
	prt_printf(out, "nr noflush writes:\t%llu\n",		j->nr_noflush_writes);
	prt_printf(out, "average write size:\t");
	prt_human_readable_u64(out, nr_writes ? div64_u64(j->entry_bytes_written, nr_writes) : 0);
	prt_newline(out);
	prt_printf(out, "nr direct reclaim:\t%llu\n",		j->nr_direct_reclaim);
1442
	prt_printf(out, "nr background reclaim:\t%llu\n",	j->nr_background_reclaim);
1443
	prt_printf(out, "reclaim kicked:\t%u\n",		j->reclaim_kicked);
1444
	prt_printf(out, "reclaim runs in:\t%u ms\n",		time_after(j->next_reclaim, now)
1445
	       ? jiffies_to_msecs(j->next_reclaim - jiffies) : 0);
1446
	prt_printf(out, "blocked:\t%u\n",			j->blocked);
1447 1448
	prt_printf(out, "current entry sectors:\t%u\n",		j->cur_entry_sectors);
	prt_printf(out, "current entry error:\t%s\n",		bch2_journal_errors[j->cur_entry_error]);
1449
	prt_printf(out, "current entry:\t");
1450 1451 1452

	switch (s.cur_entry_offset) {
	case JOURNAL_ENTRY_ERROR_VAL:
1453
		prt_printf(out, "error\n");
1454 1455
		break;
	case JOURNAL_ENTRY_CLOSED_VAL:
1456
		prt_printf(out, "closed\n");
1457 1458
		break;
	default:
1459
		prt_printf(out, "%u/%u\n", s.cur_entry_offset, j->cur_entry_u64s);
1460 1461 1462
		break;
	}

1463
	prt_printf(out, "unwritten entries:\n");
1464
	bch2_journal_bufs_to_text(out, j);
1465

1466
	prt_printf(out, "space:\n");
1467 1468
	printbuf_indent_add(out, 2);
	prt_printf(out, "discarded\t%u:%u\n",
1469 1470
	       j->space[journal_space_discarded].next_entry,
	       j->space[journal_space_discarded].total);
1471
	prt_printf(out, "clean ondisk\t%u:%u\n",
1472 1473
	       j->space[journal_space_clean_ondisk].next_entry,
	       j->space[journal_space_clean_ondisk].total);
1474
	prt_printf(out, "clean\t%u:%u\n",
1475 1476
	       j->space[journal_space_clean].next_entry,
	       j->space[journal_space_clean].total);
1477
	prt_printf(out, "total\t%u:%u\n",
1478 1479
	       j->space[journal_space_total].next_entry,
	       j->space[journal_space_total].total);
1480
	printbuf_indent_sub(out, 2);
1481

1482
	for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) {
1483 1484
		struct journal_device *ja = &ca->journal;

1485 1486 1487
		if (!test_bit(ca->dev_idx, c->rw_devs[BCH_DATA_journal].d))
			continue;

1488 1489 1490
		if (!ja->nr)
			continue;

1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
		prt_printf(out, "dev %u:\n",			ca->dev_idx);
		printbuf_indent_add(out, 2);
		prt_printf(out, "nr\t%u\n",			ja->nr);
		prt_printf(out, "bucket size\t%u\n",		ca->mi.bucket_size);
		prt_printf(out, "available\t%u:%u\n",		bch2_journal_dev_buckets_available(j, ja, journal_space_discarded), ja->sectors_free);
		prt_printf(out, "discard_idx\t%u\n",		ja->discard_idx);
		prt_printf(out, "dirty_ondisk\t%u (seq %llu)\n",ja->dirty_idx_ondisk,	ja->bucket_seq[ja->dirty_idx_ondisk]);
		prt_printf(out, "dirty_idx\t%u (seq %llu)\n",	ja->dirty_idx,		ja->bucket_seq[ja->dirty_idx]);
		prt_printf(out, "cur_idx\t%u (seq %llu)\n",	ja->cur_idx,		ja->bucket_seq[ja->cur_idx]);
		printbuf_indent_sub(out, 2);
1501 1502 1503
	}

	rcu_read_unlock();
1504 1505

	--out->atomic;
1506 1507
}

1508 1509 1510 1511 1512 1513 1514
void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
{
	spin_lock(&j->lock);
	__bch2_journal_debug_to_text(out, j);
	spin_unlock(&j->lock);
}

1515
bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 *seq)
1516 1517 1518 1519 1520
{
	struct journal_entry_pin_list *pin_list;
	struct journal_entry_pin *pin;

	spin_lock(&j->lock);
1521 1522 1523 1524 1525 1526 1527
	*seq = max(*seq, j->pin.front);

	if (*seq >= j->pin.back) {
		spin_unlock(&j->lock);
		return true;
	}

1528 1529
	out->atomic++;

1530
	pin_list = journal_seq_pin(j, *seq);
1531

1532
	prt_printf(out, "%llu: count %u\n", *seq, atomic_read(&pin_list->count));
1533
	printbuf_indent_add(out, 2);
1534

1535
	for (unsigned i = 0; i < ARRAY_SIZE(pin_list->list); i++)
1536 1537
		list_for_each_entry(pin, &pin_list->list[i], list)
			prt_printf(out, "\t%px %ps\n", pin, pin->flush);
1538

1539 1540
	if (!list_empty(&pin_list->flushed))
		prt_printf(out, "flushed:\n");
1541

1542 1543
	list_for_each_entry(pin, &pin_list->flushed, list)
		prt_printf(out, "\t%px %ps\n", pin, pin->flush);
1544

1545
	printbuf_indent_sub(out, 2);
1546

1547
	--out->atomic;
1548
	spin_unlock(&j->lock);
1549 1550 1551 1552 1553 1554 1555 1556 1557 1558

	return false;
}

void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
{
	u64 seq = 0;

	while (!bch2_journal_seq_pins_to_text(out, j, &seq))
		seq++;
1559
}