Commit 731bdd2e authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Add a workqueue for btree io completions

Also, clean up workqueue usage - we shouldn't be using system
workqueues, pretty much everything we do needs to be on our own
WQ_MEM_RECLAIM workqueues.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@gmail.com>
parent 2eba51a6
......@@ -627,6 +627,7 @@ struct bch_fs {
/* BTREE CACHE */
struct bio_set btree_bio;
struct workqueue_struct *io_complete_wq;
struct btree_root btree_roots[BTREE_ID_NR];
struct mutex btree_root_lock;
......@@ -664,7 +665,7 @@ struct bch_fs {
struct btree_key_cache btree_key_cache;
struct workqueue_struct *wq;
struct workqueue_struct *btree_update_wq;
/* copygc needs its own workqueue for index updates.. */
struct workqueue_struct *copygc_wq;
......
......@@ -1109,7 +1109,7 @@ static void btree_node_read_endio(struct bio *bio)
bch2_latency_acct(ca, rb->start_time, READ);
}
queue_work(system_unbound_wq, &rb->work);
queue_work(c->io_complete_wq, &rb->work);
}
struct btree_node_read_all {
......@@ -1362,7 +1362,8 @@ static int btree_node_read_all_replicas(struct bch_fs *c, struct btree *b, bool
closure_sync(&ra->cl);
btree_node_read_all_replicas_done(&ra->cl);
} else {
continue_at(&ra->cl, btree_node_read_all_replicas_done, system_unbound_wq);
continue_at(&ra->cl, btree_node_read_all_replicas_done,
c->io_complete_wq);
}
return 0;
......@@ -1433,7 +1434,7 @@ void bch2_btree_node_read(struct bch_fs *c, struct btree *b,
if (sync)
btree_node_read_work(&rb->work);
else
queue_work(system_unbound_wq, &rb->work);
queue_work(c->io_complete_wq, &rb->work);
}
}
......@@ -1600,7 +1601,7 @@ static void btree_node_write_work(struct work_struct *work)
bio_list_add(&c->btree_write_error_list, &wbio->wbio.bio);
spin_unlock_irqrestore(&c->btree_write_error_lock, flags);
queue_work(c->wq, &c->btree_write_error_work);
queue_work(c->btree_update_wq, &c->btree_write_error_work);
return;
}
......@@ -1639,7 +1640,7 @@ static void btree_node_write_endio(struct bio *bio)
container_of(orig, struct btree_write_bio, wbio);
INIT_WORK(&wb->work, btree_node_write_work);
queue_work(system_unbound_wq, &wb->work);
queue_work(c->io_complete_wq, &wb->work);
}
}
......@@ -1900,7 +1901,7 @@ void __bch2_btree_node_write(struct bch_fs *c, struct btree *b)
atomic64_add(sectors_to_write, &c->btree_writes_sectors);
INIT_WORK(&wbio->work, btree_write_submit);
schedule_work(&wbio->work);
queue_work(c->io_complete_wq, &wbio->work);
return;
err:
set_btree_node_noevict(b);
......
......@@ -908,7 +908,8 @@ void bch2_btree_update_done(struct btree_update *as)
bch2_btree_reserve_put(as);
continue_at(&as->cl, btree_update_set_nodes_written, system_freezable_wq);
continue_at(&as->cl, btree_update_set_nodes_written,
as->c->btree_interior_update_worker);
}
struct btree_update *
......@@ -1847,7 +1848,7 @@ void bch2_btree_node_rewrite_async(struct bch_fs *c, struct btree *b)
a->seq = b->data->keys.seq;
INIT_WORK(&a->work, async_btree_node_rewrite_work);
queue_work(system_long_wq, &a->work);
queue_work(c->btree_interior_update_worker, &a->work);
}
static void __bch2_btree_node_update_key(struct bch_fs *c,
......
......@@ -1440,7 +1440,7 @@ static void promote_start(struct promote_op *op, struct bch_read_bio *rbio)
bch2_migrate_read_done(&op->write, rbio);
closure_init(cl, NULL);
closure_call(&op->write.op.cl, bch2_write, c->wq, cl);
closure_call(&op->write.op.cl, bch2_write, c->btree_update_wq, cl);
closure_return_with_destructor(cl, promote_done);
}
......@@ -1823,6 +1823,13 @@ static void __bch2_read_endio(struct work_struct *work)
if (bch2_crc_cmp(csum, rbio->pick.crc.csum) && !c->opts.no_data_io)
goto csum_err;
/*
* XXX
* We need to rework the narrow_crcs path to deliver the read completion
* first, and then punt to a different workqueue, otherwise we're
* holding up reads while doing btree updates which is bad for memory
* reclaim.
*/
if (unlikely(rbio->narrow_crcs))
bch2_rbio_narrow_crcs(rbio);
......
......@@ -62,7 +62,7 @@ static inline struct workqueue_struct *index_update_wq(struct bch_write_op *op)
{
return op->alloc_reserve == RESERVE_MOVINGGC
? op->c->copygc_wq
: op->c->wq;
: op->c->btree_update_wq;
}
int bch2_sum_sector_overwrites(struct btree_trans *, struct btree_iter *,
......
......@@ -117,7 +117,9 @@ void bch2_journal_halt(struct journal *j)
void __bch2_journal_buf_put(struct journal *j)
{
closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
struct bch_fs *c = container_of(j, struct bch_fs, journal);
closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL);
}
/*
......@@ -303,7 +305,7 @@ static int journal_entry_open(struct journal *j)
j->res_get_blocked_start);
j->res_get_blocked_start = 0;
mod_delayed_work(system_freezable_wq,
mod_delayed_work(c->io_complete_wq,
&j->write_work,
msecs_to_jiffies(j->write_delay_ms));
journal_wake(j);
......
......@@ -1295,12 +1295,12 @@ static void journal_write_done(struct closure *cl)
journal_wake(j);
if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
mod_delayed_work(system_freezable_wq, &j->write_work, 0);
mod_delayed_work(c->io_complete_wq, &j->write_work, 0);
spin_unlock(&j->lock);
if (new.unwritten_idx != new.idx &&
!journal_state_count(new, new.unwritten_idx))
closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL);
}
static void journal_write_endio(struct bio *bio)
......@@ -1367,7 +1367,7 @@ static void do_journal_write(struct closure *cl)
le64_to_cpu(w->data->seq);
}
continue_at(cl, journal_write_done, system_highpri_wq);
continue_at(cl, journal_write_done, c->io_complete_wq);
return;
}
......@@ -1506,7 +1506,7 @@ void bch2_journal_write(struct closure *cl)
journal_debug_buf);
kfree(journal_debug_buf);
bch2_fatal_error(c);
continue_at(cl, journal_write_done, system_highpri_wq);
continue_at(cl, journal_write_done, c->io_complete_wq);
return;
}
......@@ -1537,14 +1537,14 @@ void bch2_journal_write(struct closure *cl)
bch2_bucket_seq_cleanup(c);
continue_at(cl, do_journal_write, system_highpri_wq);
continue_at(cl, do_journal_write, c->io_complete_wq);
return;
no_io:
bch2_bucket_seq_cleanup(c);
continue_at(cl, journal_write_done, system_highpri_wq);
continue_at(cl, journal_write_done, c->io_complete_wq);
return;
err:
bch2_inconsistent_error(c);
continue_at(cl, journal_write_done, system_highpri_wq);
continue_at(cl, journal_write_done, c->io_complete_wq);
}
......@@ -510,10 +510,12 @@ static void __bch2_fs_free(struct bch_fs *c)
kfree(c->unused_inode_hints);
free_heap(&c->copygc_heap);
if (c->io_complete_wq )
destroy_workqueue(c->io_complete_wq );
if (c->copygc_wq)
destroy_workqueue(c->copygc_wq);
if (c->wq)
destroy_workqueue(c->wq);
if (c->btree_update_wq)
destroy_workqueue(c->btree_update_wq);
bch2_free_super(&c->disk_sb);
kvpfree(c, sizeof(*c));
......@@ -762,10 +764,12 @@ static struct bch_fs *bch2_fs_alloc(struct bch_sb *sb, struct bch_opts opts)
c->inode_shard_bits = ilog2(roundup_pow_of_two(num_possible_cpus()));
if (!(c->wq = alloc_workqueue("bcachefs",
if (!(c->btree_update_wq = alloc_workqueue("bcachefs",
WQ_FREEZABLE|WQ_MEM_RECLAIM, 1)) ||
!(c->copygc_wq = alloc_workqueue("bcachefs_copygc",
WQ_FREEZABLE|WQ_MEM_RECLAIM|WQ_CPU_INTENSIVE, 1)) ||
!(c->io_complete_wq = alloc_workqueue("bcachefs_io",
WQ_FREEZABLE|WQ_HIGHPRI|WQ_MEM_RECLAIM, 1)) ||
percpu_ref_init(&c->writes, bch2_writes_disabled,
PERCPU_REF_INIT_DEAD, GFP_KERNEL) ||
mempool_init_kmalloc_pool(&c->fill_iter, 1, iter_size) ||
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment