Commit 38789c25 authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: closure per journal buf

Prep work for having multiple journal writes in flight.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 51654002
...@@ -186,8 +186,10 @@ void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write) ...@@ -186,8 +186,10 @@ void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write)
if (__bch2_journal_pin_put(j, seq)) if (__bch2_journal_pin_put(j, seq))
bch2_journal_reclaim_fast(j); bch2_journal_reclaim_fast(j);
if (write) if (write) {
closure_call(&j->io, bch2_journal_write, j->wq, NULL); struct journal_buf *w = j->buf + (seq & JOURNAL_BUF_MASK);
closure_call(&w->io, bch2_journal_write, j->wq, NULL);
}
} }
/* /*
...@@ -1274,10 +1276,14 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) ...@@ -1274,10 +1276,14 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE);
for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) {
ja->bio[i] = bio_kmalloc(nr_bvecs, GFP_KERNEL); ja->bio[i] = kmalloc(struct_size(ja->bio[i], bio.bi_inline_vecs,
nr_bvecs), GFP_KERNEL);
if (!ja->bio[i]) if (!ja->bio[i])
return -BCH_ERR_ENOMEM_dev_journal_init; return -BCH_ERR_ENOMEM_dev_journal_init;
bio_init(ja->bio[i], NULL, ja->bio[i]->bi_inline_vecs, nr_bvecs, 0);
ja->bio[i]->ca = ca;
ja->bio[i]->buf_idx = i;
bio_init(&ja->bio[i]->bio, NULL, ja->bio[i]->bio.bi_inline_vecs, nr_bvecs, 0);
} }
ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
...@@ -1340,6 +1346,7 @@ int bch2_fs_journal_init(struct journal *j) ...@@ -1340,6 +1346,7 @@ int bch2_fs_journal_init(struct journal *j)
j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL); j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL);
if (!j->buf[i].data) if (!j->buf[i].data)
return -BCH_ERR_ENOMEM_journal_buf; return -BCH_ERR_ENOMEM_journal_buf;
j->buf[i].idx = i;
} }
j->pin.front = j->pin.back = 1; j->pin.front = j->pin.back = 1;
...@@ -1459,7 +1466,6 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 ...@@ -1459,7 +1466,6 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64
{ {
struct journal_entry_pin_list *pin_list; struct journal_entry_pin_list *pin_list;
struct journal_entry_pin *pin; struct journal_entry_pin *pin;
unsigned i;
spin_lock(&j->lock); spin_lock(&j->lock);
*seq = max(*seq, j->pin.front); *seq = max(*seq, j->pin.front);
...@@ -1477,7 +1483,7 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 ...@@ -1477,7 +1483,7 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64
prt_newline(out); prt_newline(out);
printbuf_indent_add(out, 2); printbuf_indent_add(out, 2);
for (i = 0; i < ARRAY_SIZE(pin_list->list); i++) for (unsigned i = 0; i < ARRAY_SIZE(pin_list->list); i++)
list_for_each_entry(pin, &pin_list->list[i], list) { list_for_each_entry(pin, &pin_list->list[i], list) {
prt_printf(out, "\t%px %ps", pin, pin->flush); prt_printf(out, "\t%px %ps", pin, pin->flush);
prt_newline(out); prt_newline(out);
......
...@@ -1597,9 +1597,9 @@ static inline struct journal_buf *journal_last_unwritten_buf(struct journal *j) ...@@ -1597,9 +1597,9 @@ static inline struct journal_buf *journal_last_unwritten_buf(struct journal *j)
static CLOSURE_CALLBACK(journal_write_done) static CLOSURE_CALLBACK(journal_write_done)
{ {
closure_type(j, struct journal, io); closure_type(w, struct journal_buf, io);
struct journal *j = container_of(w, struct journal, buf[w->idx]);
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *w = journal_last_unwritten_buf(j);
struct bch_replicas_padded replicas; struct bch_replicas_padded replicas;
union journal_res_state old, new; union journal_res_state old, new;
u64 v, seq; u64 v, seq;
...@@ -1676,8 +1676,9 @@ static CLOSURE_CALLBACK(journal_write_done) ...@@ -1676,8 +1676,9 @@ static CLOSURE_CALLBACK(journal_write_done)
if (!journal_state_count(new, new.unwritten_idx) && if (!journal_state_count(new, new.unwritten_idx) &&
journal_last_unwritten_seq(j) <= journal_cur_seq(j)) { journal_last_unwritten_seq(j) <= journal_cur_seq(j)) {
struct journal_buf *w = j->buf + (journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK);
spin_unlock(&j->lock); spin_unlock(&j->lock);
closure_call(&j->io, bch2_journal_write, j->wq, NULL); closure_call(&w->io, bch2_journal_write, j->wq, NULL);
} else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) && } else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) &&
new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) { new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) {
struct journal_buf *buf = journal_cur_buf(j); struct journal_buf *buf = journal_cur_buf(j);
...@@ -1698,31 +1699,32 @@ static CLOSURE_CALLBACK(journal_write_done) ...@@ -1698,31 +1699,32 @@ static CLOSURE_CALLBACK(journal_write_done)
static void journal_write_endio(struct bio *bio) static void journal_write_endio(struct bio *bio)
{ {
struct bch_dev *ca = bio->bi_private; struct journal_bio *jbio = container_of(bio, struct journal_bio, bio);
struct bch_dev *ca = jbio->ca;
struct journal *j = &ca->fs->journal; struct journal *j = &ca->fs->journal;
struct journal_buf *w = journal_last_unwritten_buf(j); struct journal_buf *w = j->buf + jbio->buf_idx;
unsigned long flags;
if (bch2_dev_io_err_on(bio->bi_status, ca, BCH_MEMBER_ERROR_write, if (bch2_dev_io_err_on(bio->bi_status, ca, BCH_MEMBER_ERROR_write,
"error writing journal entry %llu: %s", "error writing journal entry %llu: %s",
le64_to_cpu(w->data->seq), le64_to_cpu(w->data->seq),
bch2_blk_status_to_str(bio->bi_status)) || bch2_blk_status_to_str(bio->bi_status)) ||
bch2_meta_write_fault("journal")) { bch2_meta_write_fault("journal")) {
unsigned long flags;
spin_lock_irqsave(&j->err_lock, flags); spin_lock_irqsave(&j->err_lock, flags);
bch2_dev_list_drop_dev(&w->devs_written, ca->dev_idx); bch2_dev_list_drop_dev(&w->devs_written, ca->dev_idx);
spin_unlock_irqrestore(&j->err_lock, flags); spin_unlock_irqrestore(&j->err_lock, flags);
} }
closure_put(&j->io); closure_put(&w->io);
percpu_ref_put(&ca->io_ref); percpu_ref_put(&ca->io_ref);
} }
static CLOSURE_CALLBACK(do_journal_write) static CLOSURE_CALLBACK(do_journal_write)
{ {
closure_type(j, struct journal, io); closure_type(w, struct journal_buf, io);
struct journal *j = container_of(w, struct journal, buf[w->idx]);
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
unsigned buf_idx = journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK;
struct journal_buf *w = j->buf + buf_idx;
unsigned sectors = vstruct_sectors(w->data, c->block_bits); unsigned sectors = vstruct_sectors(w->data, c->block_bits);
extent_for_each_ptr(bkey_i_to_s_extent(&w->key), ptr) { extent_for_each_ptr(bkey_i_to_s_extent(&w->key), ptr) {
...@@ -1738,7 +1740,7 @@ static CLOSURE_CALLBACK(do_journal_write) ...@@ -1738,7 +1740,7 @@ static CLOSURE_CALLBACK(do_journal_write)
this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_journal], this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_journal],
sectors); sectors);
struct bio *bio = ja->bio[buf_idx]; struct bio *bio = &ja->bio[w->idx]->bio;
bio_reset(bio, ca->disk_sb.bdev, REQ_OP_WRITE|REQ_SYNC|REQ_META); bio_reset(bio, ca->disk_sb.bdev, REQ_OP_WRITE|REQ_SYNC|REQ_META);
bio->bi_iter.bi_sector = ptr->offset; bio->bi_iter.bi_sector = ptr->offset;
bio->bi_end_io = journal_write_endio; bio->bi_end_io = journal_write_endio;
...@@ -1937,10 +1939,9 @@ static int bch2_journal_write_pick_flush(struct journal *j, struct journal_buf * ...@@ -1937,10 +1939,9 @@ static int bch2_journal_write_pick_flush(struct journal *j, struct journal_buf *
CLOSURE_CALLBACK(bch2_journal_write) CLOSURE_CALLBACK(bch2_journal_write)
{ {
closure_type(j, struct journal, io); closure_type(w, struct journal_buf, io);
struct journal *j = container_of(w, struct journal, buf[w->idx]);
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
unsigned buf_idx = journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK;
struct journal_buf *w = j->buf + buf_idx;
struct bch_replicas_padded replicas; struct bch_replicas_padded replicas;
struct printbuf journal_debug_buf = PRINTBUF; struct printbuf journal_debug_buf = PRINTBUF;
unsigned nr_rw_members = 0; unsigned nr_rw_members = 0;
...@@ -2019,12 +2020,15 @@ CLOSURE_CALLBACK(bch2_journal_write) ...@@ -2019,12 +2020,15 @@ CLOSURE_CALLBACK(bch2_journal_write)
if (ret) if (ret)
goto err; goto err;
if (!JSET_NO_FLUSH(w->data))
closure_wait_event(&j->async_wait, j->seq_ondisk + 1 == le64_to_cpu(w->data->seq));
if (!JSET_NO_FLUSH(w->data) && w->separate_flush) { if (!JSET_NO_FLUSH(w->data) && w->separate_flush) {
for_each_rw_member(c, ca) { for_each_rw_member(c, ca) {
percpu_ref_get(&ca->io_ref); percpu_ref_get(&ca->io_ref);
struct journal_device *ja = &ca->journal; struct journal_device *ja = &ca->journal;
struct bio *bio = ja->bio[buf_idx]; struct bio *bio = &ja->bio[w->idx]->bio;
bio_reset(bio, ca->disk_sb.bdev, bio_reset(bio, ca->disk_sb.bdev,
REQ_OP_WRITE|REQ_SYNC|REQ_META|REQ_PREFLUSH); REQ_OP_WRITE|REQ_SYNC|REQ_META|REQ_PREFLUSH);
bio->bi_end_io = journal_write_endio; bio->bi_end_io = journal_write_endio;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
* the journal that are being staged or in flight. * the journal that are being staged or in flight.
*/ */
struct journal_buf { struct journal_buf {
struct closure io;
struct jset *data; struct jset *data;
__BKEY_PADDED(key, BCH_REPLICAS_MAX); __BKEY_PADDED(key, BCH_REPLICAS_MAX);
...@@ -37,6 +38,7 @@ struct journal_buf { ...@@ -37,6 +38,7 @@ struct journal_buf {
bool must_flush; /* something wants a flush */ bool must_flush; /* something wants a flush */
bool separate_flush; bool separate_flush;
bool need_flush_to_write_buffer; bool need_flush_to_write_buffer;
u8 idx;
}; };
/* /*
...@@ -150,6 +152,13 @@ enum journal_errors { ...@@ -150,6 +152,13 @@ enum journal_errors {
typedef DARRAY(u64) darray_u64; typedef DARRAY(u64) darray_u64;
struct journal_bio {
struct bch_dev *ca;
unsigned buf_idx;
struct bio bio;
};
/* Embedded in struct bch_fs */ /* Embedded in struct bch_fs */
struct journal { struct journal {
/* Fastpath stuff up front: */ /* Fastpath stuff up front: */
...@@ -204,7 +213,6 @@ struct journal { ...@@ -204,7 +213,6 @@ struct journal {
wait_queue_head_t wait; wait_queue_head_t wait;
struct closure_waitlist async_wait; struct closure_waitlist async_wait;
struct closure io;
struct delayed_work write_work; struct delayed_work write_work;
struct workqueue_struct *wq; struct workqueue_struct *wq;
...@@ -315,7 +323,7 @@ struct journal_device { ...@@ -315,7 +323,7 @@ struct journal_device {
u64 *buckets; u64 *buckets;
/* Bio for journal reads/writes to this device */ /* Bio for journal reads/writes to this device */
struct bio *bio[JOURNAL_BUF_NR]; struct journal_bio *bio[JOURNAL_BUF_NR];
/* for bch_journal_read_device */ /* for bch_journal_read_device */
struct closure read; struct closure read;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment