Commit 916abefd authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: better journal pipelining

Recently a severe performance regression was discovered, which bisected
to

  a6548c8b bcachefs: Avoid flushing the journal in the discard path

It turns out the old behaviour, which issued excessive journal flushes,
worked around a performance issue where queueing delays would cause the
journal to not be able to write quickly enough and stall.

The journal flushes masked the issue because they periodically flushed
the device write cache, reducing write latency for non flushes.

This patch reworks the journalling code to allow more than one
(non-flush) write to be in flight at a time. With this patch, doing 4k
random writes and an iodepth of 128, we are now able to hit 560k iops to
a Samsung 970 EVO Plus - previously, we were stuck in the ~200k range.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 38789c25
......@@ -74,6 +74,13 @@ static void bch2_journal_buf_to_text(struct printbuf *out, struct journal *j, u6
prt_printf(out, "%li jiffies", buf->expires - jiffies);
prt_newline(out);
if (buf->write_done)
prt_printf(out, "write done\n");
else if (buf->write_allocated)
prt_printf(out, "write allocated\n");
else if (buf->write_started)
prt_printf(out, "write started\n");
printbuf_indent_sub(out, 2);
}
......@@ -175,21 +182,40 @@ journal_error_check_stuck(struct journal *j, int error, unsigned flags)
return stuck;
}
void bch2_journal_do_writes(struct journal *j)
{
for (u64 seq = journal_last_unwritten_seq(j);
seq <= journal_cur_seq(j);
seq++) {
unsigned idx = seq & JOURNAL_BUF_MASK;
struct journal_buf *w = j->buf + idx;
if (w->write_started && !w->write_allocated)
break;
if (w->write_started)
continue;
if (!journal_state_count(j->reservations, idx)) {
w->write_started = true;
closure_call(&w->io, bch2_journal_write, j->wq, NULL);
}
break;
}
}
/*
* Final processing when the last reference of a journal buffer has been
* dropped. Drop the pin list reference acquired at journal entry open and write
* the buffer, if requested.
*/
void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write)
void bch2_journal_buf_put_final(struct journal *j, u64 seq)
{
lockdep_assert_held(&j->lock);
if (__bch2_journal_pin_put(j, seq))
bch2_journal_reclaim_fast(j);
if (write) {
struct journal_buf *w = j->buf + (seq & JOURNAL_BUF_MASK);
closure_call(&w->io, bch2_journal_write, j->wq, NULL);
}
bch2_journal_do_writes(j);
}
/*
......@@ -386,6 +412,9 @@ static int journal_entry_open(struct journal *j)
buf->separate_flush = false;
buf->flush_time = 0;
buf->need_flush_to_write_buffer = true;
buf->write_started = false;
buf->write_allocated = false;
buf->write_done = false;
memset(buf->data, 0, sizeof(*buf->data));
buf->data->seq = cpu_to_le64(journal_cur_seq(j));
......
......@@ -264,7 +264,8 @@ static inline union journal_res_state journal_state_buf_put(struct journal *j, u
}
bool bch2_journal_entry_close(struct journal *);
void bch2_journal_buf_put_final(struct journal *, u64, bool);
void bch2_journal_do_writes(struct journal *);
void bch2_journal_buf_put_final(struct journal *, u64);
static inline void __bch2_journal_buf_put(struct journal *j, unsigned idx, u64 seq)
{
......@@ -272,7 +273,7 @@ static inline void __bch2_journal_buf_put(struct journal *j, unsigned idx, u64 s
s = journal_state_buf_put(j, idx);
if (!journal_state_count(s, idx))
bch2_journal_buf_put_final(j, seq, idx == s.unwritten_idx);
bch2_journal_buf_put_final(j, seq);
}
static inline void bch2_journal_buf_put(struct journal *j, unsigned idx, u64 seq)
......@@ -282,7 +283,7 @@ static inline void bch2_journal_buf_put(struct journal *j, unsigned idx, u64 seq
s = journal_state_buf_put(j, idx);
if (!journal_state_count(s, idx)) {
spin_lock(&j->lock);
bch2_journal_buf_put_final(j, seq, idx == s.unwritten_idx);
bch2_journal_buf_put_final(j, seq);
spin_unlock(&j->lock);
}
}
......
......@@ -1602,7 +1602,7 @@ static CLOSURE_CALLBACK(journal_write_done)
struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct bch_replicas_padded replicas;
union journal_res_state old, new;
u64 v, seq;
u64 v, seq = le64_to_cpu(w->data->seq);
int err = 0;
bch2_time_stats_update(!JSET_NO_FLUSH(w->data)
......@@ -1622,24 +1622,32 @@ static CLOSURE_CALLBACK(journal_write_done)
if (err)
bch2_fatal_error(c);
spin_lock(&j->lock);
seq = le64_to_cpu(w->data->seq);
closure_debug_destroy(cl);
spin_lock(&j->lock);
if (seq >= j->pin.front)
journal_seq_pin(j, seq)->devs = w->devs_written;
if (err && (!j->err_seq || seq < j->err_seq))
j->err_seq = seq;
w->write_done = true;
bool completed = false;
if (!err) {
if (!JSET_NO_FLUSH(w->data)) {
for (seq = journal_last_unwritten_seq(j);
seq <= journal_cur_seq(j);
seq++) {
w = j->buf + (seq & JOURNAL_BUF_MASK);
if (!w->write_done)
break;
if (!j->err_seq && !JSET_NO_FLUSH(w->data)) {
j->flushed_seq_ondisk = seq;
j->last_seq_ondisk = w->last_seq;
bch2_do_discards(c);
closure_wake_up(&c->freelist_wait);
bch2_reset_alloc_cursors(c);
}
} else if (!j->err_seq || seq < j->err_seq)
j->err_seq = seq;
j->seq_ondisk = seq;
......@@ -1653,18 +1661,19 @@ static CLOSURE_CALLBACK(journal_write_done)
if (j->watermark != BCH_WATERMARK_stripe)
journal_reclaim_kick(&c->journal);
/* also must come before signalling write completion: */
closure_debug_destroy(cl);
v = atomic64_read(&j->reservations.counter);
do {
old.v = new.v = v;
BUG_ON(journal_state_count(new, new.unwritten_idx));
BUG_ON(new.unwritten_idx != (seq & JOURNAL_BUF_MASK));
new.unwritten_idx++;
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
} while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v);
completed = true;
}
if (completed) {
bch2_journal_reclaim_fast(j);
bch2_journal_space_available(j);
......@@ -1673,13 +1682,9 @@ static CLOSURE_CALLBACK(journal_write_done)
closure_wake_up(&w->wait);
journal_wake(j);
}
if (!journal_state_count(new, new.unwritten_idx) &&
journal_last_unwritten_seq(j) <= journal_cur_seq(j)) {
struct journal_buf *w = j->buf + (journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK);
spin_unlock(&j->lock);
closure_call(&w->io, bch2_journal_write, j->wq, NULL);
} else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) &&
if (journal_last_unwritten_seq(j) == journal_cur_seq(j) &&
new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) {
struct journal_buf *buf = journal_cur_buf(j);
long delta = buf->expires - jiffies;
......@@ -1689,12 +1694,10 @@ static CLOSURE_CALLBACK(journal_write_done)
* previous entries still in flight - the current journal entry
* might want to be written now:
*/
spin_unlock(&j->lock);
mod_delayed_work(j->wq, &j->write_work, max(0L, delta));
} else {
spin_unlock(&j->lock);
}
spin_unlock(&j->lock);
}
static void journal_write_endio(struct bio *bio)
......@@ -1948,6 +1951,7 @@ CLOSURE_CALLBACK(bch2_journal_write)
int ret;
BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb));
BUG_ON(w->write_allocated);
j->write_start_time = local_clock();
......@@ -1991,12 +1995,14 @@ CLOSURE_CALLBACK(bch2_journal_write)
* bch2_journal_space_available():
*/
w->sectors = 0;
w->write_allocated = true;
/*
* journal entry has been compacted and allocated, recalculate space
* available:
*/
bch2_journal_space_available(j);
bch2_journal_do_writes(j);
spin_unlock(&j->lock);
w->devs_written = bch2_bkey_devs(bkey_i_to_s_c(&w->key));
......
......@@ -34,10 +34,13 @@ struct journal_buf {
unsigned disk_sectors; /* maximum size entry could have been, if
buf_size was bigger */
unsigned u64s_reserved;
bool noflush; /* write has already been kicked off, and was noflush */
bool must_flush; /* something wants a flush */
bool separate_flush;
bool need_flush_to_write_buffer;
bool noflush:1; /* write has already been kicked off, and was noflush */
bool must_flush:1; /* something wants a flush */
bool separate_flush:1;
bool need_flush_to_write_buffer:1;
bool write_started:1;
bool write_allocated:1;
bool write_done:1;
u8 idx;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment