Commit 09caeabe authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: btree write buffer now slurps keys from journal

Previosuly, the transaction commit path would have to add keys to the
btree write buffer as a separate operation, requiring additional global
synchronization.

This patch introduces a new journal entry type, which indicates that the
keys need to be copied into the btree write buffer prior to being
written out. We switch the journal entry type back to
JSET_ENTRY_btree_keys prior to write, so this is not an on disk format
change.

Flushing the btree write buffer may require pulling keys out of journal
entries yet to be written, and quiescing outstanding journal
reservations; we previously added journal->buf_lock for synchronization
with the journal write path.

We also can't put strict bounds on the number of keys in the journal
destined for the write buffer, which means we might overflow the size of
the preallocated buffer and have to reallocate - this introduces a
potentially fatal memory allocation failure. This is something we'll
have to watch for, if it becomes an issue in practice we can do
additional mitigation.

The transaction commit path no longer has to explicitly check if the
write buffer is full and wait on flushing; this is another performance
optimization. Instead, when the btree write buffer is close to full we
change the journal watermark, so that only reservations for journal
reclaim are allowed.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent b05c0e93
...@@ -427,6 +427,7 @@ BCH_DEBUG_PARAMS_DEBUG() ...@@ -427,6 +427,7 @@ BCH_DEBUG_PARAMS_DEBUG()
x(blocked_journal_max_in_flight) \ x(blocked_journal_max_in_flight) \
x(blocked_allocate) \ x(blocked_allocate) \
x(blocked_allocate_open_bucket) \ x(blocked_allocate_open_bucket) \
x(blocked_write_buffer_full) \
x(nocow_lock_contended) x(nocow_lock_contended)
enum bch_time_stats { enum bch_time_stats {
...@@ -1122,6 +1123,16 @@ static inline void bch2_write_ref_get(struct bch_fs *c, enum bch_write_ref ref) ...@@ -1122,6 +1123,16 @@ static inline void bch2_write_ref_get(struct bch_fs *c, enum bch_write_ref ref)
#endif #endif
} }
static inline bool __bch2_write_ref_tryget(struct bch_fs *c, enum bch_write_ref ref)
{
#ifdef BCH_WRITE_REF_DEBUG
return !test_bit(BCH_FS_going_ro, &c->flags) &&
atomic_long_inc_not_zero(&c->writes[ref]);
#else
return percpu_ref_tryget(&c->writes);
#endif
}
static inline bool bch2_write_ref_tryget(struct bch_fs *c, enum bch_write_ref ref) static inline bool bch2_write_ref_tryget(struct bch_fs *c, enum bch_write_ref ref)
{ {
#ifdef BCH_WRITE_REF_DEBUG #ifdef BCH_WRITE_REF_DEBUG
......
...@@ -2162,7 +2162,8 @@ static inline __u64 __bset_magic(struct bch_sb *sb) ...@@ -2162,7 +2162,8 @@ static inline __u64 __bset_magic(struct bch_sb *sb)
x(clock, 7) \ x(clock, 7) \
x(dev_usage, 8) \ x(dev_usage, 8) \
x(log, 9) \ x(log, 9) \
x(overwrite, 10) x(overwrite, 10) \
x(write_buffer_keys, 11)
enum { enum {
#define x(f, nr) BCH_JSET_ENTRY_##f = nr, #define x(f, nr) BCH_JSET_ENTRY_##f = nr,
......
...@@ -659,10 +659,6 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags, ...@@ -659,10 +659,6 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags,
i->k->k.needs_whiteout = false; i->k->k.needs_whiteout = false;
} }
if (trans->nr_wb_updates &&
trans->nr_wb_updates + c->btree_write_buffer.state.nr > c->btree_write_buffer.size)
return -BCH_ERR_btree_insert_need_flush_buffer;
/* /*
* Don't get journal reservation until after we know insert will * Don't get journal reservation until after we know insert will
* succeed: * succeed:
...@@ -697,14 +693,6 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags, ...@@ -697,14 +693,6 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags,
bch2_trans_fs_usage_apply(trans, trans->fs_usage_deltas)) bch2_trans_fs_usage_apply(trans, trans->fs_usage_deltas))
return -BCH_ERR_btree_insert_need_mark_replicas; return -BCH_ERR_btree_insert_need_mark_replicas;
if (trans->nr_wb_updates) {
EBUG_ON(flags & BCH_TRANS_COMMIT_no_journal_res);
ret = bch2_btree_insert_keys_write_buffer(trans);
if (ret)
goto revert_fs_usage;
}
h = trans->hooks; h = trans->hooks;
while (h) { while (h) {
ret = h->fn(trans, h); ret = h->fn(trans, h);
...@@ -757,7 +745,7 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags, ...@@ -757,7 +745,7 @@ bch2_trans_commit_write_locked(struct btree_trans *trans, unsigned flags,
trans_for_each_wb_update(trans, wb) { trans_for_each_wb_update(trans, wb) {
entry = bch2_journal_add_entry(j, &trans->journal_res, entry = bch2_journal_add_entry(j, &trans->journal_res,
BCH_JSET_ENTRY_btree_keys, BCH_JSET_ENTRY_write_buffer_keys,
wb->btree, 0, wb->btree, 0,
wb->k.k.u64s); wb->k.k.u64s);
bkey_copy((struct bkey_i *) entry->start, &wb->k); bkey_copy((struct bkey_i *) entry->start, &wb->k);
...@@ -948,30 +936,6 @@ int bch2_trans_commit_error(struct btree_trans *trans, unsigned flags, ...@@ -948,30 +936,6 @@ int bch2_trans_commit_error(struct btree_trans *trans, unsigned flags,
ret = bch2_trans_relock(trans); ret = bch2_trans_relock(trans);
break; break;
case -BCH_ERR_btree_insert_need_flush_buffer: {
struct btree_write_buffer *wb = &c->btree_write_buffer;
ret = 0;
if (wb->state.nr > wb->size * 3 / 4) {
bch2_trans_unlock(trans);
mutex_lock(&wb->flush_lock);
if (wb->state.nr > wb->size * 3 / 4) {
bch2_trans_begin(trans);
ret = bch2_btree_write_buffer_flush_locked(trans);
mutex_unlock(&wb->flush_lock);
if (!ret) {
trace_and_count(c, trans_restart_write_buffer_flush, trans, _THIS_IP_);
ret = btree_trans_restart(trans, BCH_ERR_transaction_restart_write_buffer_flush);
}
} else {
mutex_unlock(&wb->flush_lock);
ret = bch2_trans_relock(trans);
}
}
break;
}
default: default:
BUG_ON(ret >= 0); BUG_ON(ret >= 0);
break; break;
...@@ -1070,20 +1034,6 @@ int __bch2_trans_commit(struct btree_trans *trans, unsigned flags) ...@@ -1070,20 +1034,6 @@ int __bch2_trans_commit(struct btree_trans *trans, unsigned flags)
goto out_reset; goto out_reset;
} }
if (c->btree_write_buffer.state.nr > c->btree_write_buffer.size / 2 &&
mutex_trylock(&c->btree_write_buffer.flush_lock)) {
bch2_trans_begin(trans);
bch2_trans_unlock(trans);
ret = bch2_btree_write_buffer_flush_locked(trans);
mutex_unlock(&c->btree_write_buffer.flush_lock);
if (!ret) {
trace_and_count(c, trans_restart_write_buffer_flush, trans, _THIS_IP_);
ret = btree_trans_restart(trans, BCH_ERR_transaction_restart_write_buffer_flush);
}
goto out;
}
EBUG_ON(test_bit(BCH_FS_clean_shutdown, &c->flags)); EBUG_ON(test_bit(BCH_FS_clean_shutdown, &c->flags));
trans->journal_u64s = trans->journal_entries_u64s; trans->journal_u64s = trans->journal_entries_u64s;
......
...@@ -7,30 +7,25 @@ ...@@ -7,30 +7,25 @@
#include "btree_write_buffer.h" #include "btree_write_buffer.h"
#include "error.h" #include "error.h"
#include "journal.h" #include "journal.h"
#include "journal_io.h"
#include "journal_reclaim.h" #include "journal_reclaim.h"
#include <linux/prefetch.h>
#include <linux/sort.h> #include <linux/sort.h>
static int bch2_btree_write_buffer_journal_flush(struct journal *, static int bch2_btree_write_buffer_journal_flush(struct journal *,
struct journal_entry_pin *, u64); struct journal_entry_pin *, u64);
static int btree_write_buffered_key_cmp(const void *_l, const void *_r) static int bch2_journal_keys_to_write_buffer(struct bch_fs *, struct journal_buf *);
{
const struct btree_write_buffered_key *l = _l;
const struct btree_write_buffered_key *r = _r;
return cmp_int(l->btree, r->btree) ?:
bpos_cmp(l->k.k.p, r->k.k.p) ?:
cmp_int(l->journal_seq, r->journal_seq) ?:
cmp_int(l->journal_offset, r->journal_offset);
}
static int btree_write_buffered_journal_cmp(const void *_l, const void *_r) static inline int wb_key_cmp(const void *_l, const void *_r)
{ {
const struct btree_write_buffered_key *l = _l; const struct wb_key_ref *l = _l;
const struct btree_write_buffered_key *r = _r; const struct wb_key_ref *r = _r;
return cmp_int(l->journal_seq, r->journal_seq); return cmp_int(l->hi, r->hi) ?:
cmp_int(l->mi, r->mi) ?:
cmp_int(l->lo, r->lo);
} }
static noinline int wb_flush_one_slowpath(struct btree_trans *trans, static noinline int wb_flush_one_slowpath(struct btree_trans *trans,
...@@ -59,6 +54,9 @@ static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *ite ...@@ -59,6 +54,9 @@ static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *ite
int ret; int ret;
EBUG_ON(!wb->journal_seq); EBUG_ON(!wb->journal_seq);
EBUG_ON(!c->btree_write_buffer.flushing.pin.seq);
EBUG_ON(c->btree_write_buffer.flushing.pin.seq > wb->journal_seq);
ret = bch2_btree_iter_traverse(iter); ret = bch2_btree_iter_traverse(iter);
if (ret) if (ret)
return ret; return ret;
...@@ -91,26 +89,6 @@ static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *ite ...@@ -91,26 +89,6 @@ static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *ite
return 0; return 0;
} }
static union btree_write_buffer_state btree_write_buffer_switch(struct btree_write_buffer *wb)
{
union btree_write_buffer_state old, new;
u64 v = READ_ONCE(wb->state.v);
do {
old.v = new.v = v;
new.nr = 0;
new.idx++;
} while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
while (old.idx == 0 ? wb->state.ref0 : wb->state.ref1)
cpu_relax();
smp_mb();
return old;
}
/* /*
* Update a btree with a write buffered key using the journal seq of the * Update a btree with a write buffered key using the journal seq of the
* original write buffer insert. * original write buffer insert.
...@@ -140,28 +118,79 @@ btree_write_buffered_insert(struct btree_trans *trans, ...@@ -140,28 +118,79 @@ btree_write_buffered_insert(struct btree_trans *trans,
return ret; return ret;
} }
int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) static void move_keys_from_inc_to_flushing(struct btree_write_buffer *wb)
{
struct bch_fs *c = container_of(wb, struct bch_fs, btree_write_buffer);
struct journal *j = &c->journal;
if (!wb->inc.keys.nr)
return;
bch2_journal_pin_add(j, wb->inc.keys.data[0].journal_seq, &wb->flushing.pin,
bch2_btree_write_buffer_journal_flush);
darray_resize(&wb->flushing.keys, min_t(size_t, 1U << 20, wb->flushing.keys.nr + wb->inc.keys.nr));
darray_resize(&wb->sorted, wb->flushing.keys.size);
if (!wb->flushing.keys.nr && wb->sorted.size >= wb->inc.keys.nr) {
swap(wb->flushing.keys, wb->inc.keys);
goto out;
}
size_t nr = min(darray_room(wb->flushing.keys),
wb->sorted.size - wb->flushing.keys.nr);
nr = min(nr, wb->inc.keys.nr);
memcpy(&darray_top(wb->flushing.keys),
wb->inc.keys.data,
sizeof(wb->inc.keys.data[0]) * nr);
memmove(wb->inc.keys.data,
wb->inc.keys.data + nr,
sizeof(wb->inc.keys.data[0]) * (wb->inc.keys.nr - nr));
wb->flushing.keys.nr += nr;
wb->inc.keys.nr -= nr;
out:
if (!wb->inc.keys.nr)
bch2_journal_pin_drop(j, &wb->inc.pin);
else
bch2_journal_pin_update(j, wb->inc.keys.data[0].journal_seq, &wb->inc.pin,
bch2_btree_write_buffer_journal_flush);
if (j->watermark) {
spin_lock(&j->lock);
bch2_journal_set_watermark(j);
spin_unlock(&j->lock);
}
BUG_ON(wb->sorted.size < wb->flushing.keys.nr);
}
static int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
{ {
struct bch_fs *c = trans->c; struct bch_fs *c = trans->c;
struct journal *j = &c->journal; struct journal *j = &c->journal;
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
struct journal_entry_pin pin; struct wb_key_ref *i;
struct btree_write_buffered_key *i, *keys;
struct btree_iter iter = { NULL }; struct btree_iter iter = { NULL };
size_t nr = 0, skipped = 0, fast = 0, slowpath = 0; size_t skipped = 0, fast = 0, slowpath = 0;
bool write_locked = false; bool write_locked = false;
union btree_write_buffer_state s;
int ret = 0; int ret = 0;
memset(&pin, 0, sizeof(pin)); bch2_trans_unlock(trans);
bch2_trans_begin(trans);
bch2_journal_pin_copy(j, &pin, &wb->journal_pin, mutex_lock(&wb->inc.lock);
bch2_btree_write_buffer_journal_flush); move_keys_from_inc_to_flushing(wb);
bch2_journal_pin_drop(j, &wb->journal_pin); mutex_unlock(&wb->inc.lock);
s = btree_write_buffer_switch(wb); for (size_t i = 0; i < wb->flushing.keys.nr; i++) {
keys = wb->keys[s.idx]; wb->sorted.data[i].idx = i;
nr = s.nr; wb->sorted.data[i].btree = wb->flushing.keys.data[i].btree;
wb->sorted.data[i].pos = wb->flushing.keys.data[i].k.k.p;
}
wb->sorted.nr = wb->flushing.keys.nr;
/* /*
* We first sort so that we can detect and skip redundant updates, and * We first sort so that we can detect and skip redundant updates, and
...@@ -177,32 +206,46 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) ...@@ -177,32 +206,46 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
* If that happens, simply skip the key so we can optimistically insert * If that happens, simply skip the key so we can optimistically insert
* as many keys as possible in the fast path. * as many keys as possible in the fast path.
*/ */
sort(keys, nr, sizeof(keys[0]), sort(wb->sorted.data, wb->sorted.nr,
btree_write_buffered_key_cmp, NULL); sizeof(wb->sorted.data[0]),
wb_key_cmp, NULL);
darray_for_each(wb->sorted, i) {
struct btree_write_buffered_key *k = &wb->flushing.keys.data[i->idx];
for (struct wb_key_ref *n = i + 1; n < min(i + 4, &darray_top(wb->sorted)); n++)
prefetch(&wb->flushing.keys.data[n->idx]);
for (i = keys; i < keys + nr; i++) { BUG_ON(!k->journal_seq);
if (i + 1 < keys + nr &&
if (i + 1 < &darray_top(wb->sorted) &&
i[0].btree == i[1].btree && i[0].btree == i[1].btree &&
bpos_eq(i[0].k.k.p, i[1].k.k.p)) { bpos_eq(i[0].pos, i[1].pos)) {
struct btree_write_buffered_key *n = &wb->flushing.keys.data[i[1].idx];
skipped++; skipped++;
i->journal_seq = 0; n->journal_seq = min_t(u64, n->journal_seq, k->journal_seq);
k->journal_seq = 0;
continue; continue;
} }
if (write_locked && if (write_locked) {
(iter.path->btree_id != i->btree || struct btree_path *path = iter.path;
bpos_gt(i->k.k.p, iter.path->l[0].b->key.k.p))) {
bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b); if (path->btree_id != i->btree ||
bpos_gt(k->k.k.p, path->l[0].b->key.k.p)) {
bch2_btree_node_unlock_write(trans, path, path->l[0].b);
write_locked = false; write_locked = false;
} }
}
if (!iter.path || iter.path->btree_id != i->btree) { if (!iter.path || iter.btree_id != k->btree) {
bch2_trans_iter_exit(trans, &iter); bch2_trans_iter_exit(trans, &iter);
bch2_trans_iter_init(trans, &iter, i->btree, i->k.k.p, bch2_trans_iter_init(trans, &iter, k->btree, k->k.k.p,
BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS); BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS);
} }
bch2_btree_iter_set_pos(&iter, i->k.k.p); bch2_btree_iter_set_pos(&iter, k->k.k.p);
iter.path->preserve = false; iter.path->preserve = false;
do { do {
...@@ -211,13 +254,13 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) ...@@ -211,13 +254,13 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
break; break;
} }
ret = wb_flush_one(trans, &iter, i, &write_locked, &fast); ret = wb_flush_one(trans, &iter, k, &write_locked, &fast);
if (!write_locked) if (!write_locked)
bch2_trans_begin(trans); bch2_trans_begin(trans);
} while (bch2_err_matches(ret, BCH_ERR_transaction_restart)); } while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
if (!ret) { if (!ret) {
i->journal_seq = 0; k->journal_seq = 0;
} else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) { } else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
slowpath++; slowpath++;
ret = 0; ret = 0;
...@@ -239,19 +282,18 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) ...@@ -239,19 +282,18 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
* The fastpath zapped the seq of keys that were successfully flushed so * The fastpath zapped the seq of keys that were successfully flushed so
* we can skip those here. * we can skip those here.
*/ */
trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, nr); trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, wb->flushing.keys.nr);
sort(keys, nr, sizeof(keys[0]),
btree_write_buffered_journal_cmp,
NULL);
for (i = keys; i < keys + nr; i++) { struct btree_write_buffered_key *i;
darray_for_each(wb->flushing.keys, i) {
if (!i->journal_seq) if (!i->journal_seq)
continue; continue;
bch2_journal_pin_update(j, i->journal_seq, &pin, bch2_journal_pin_update(j, i->journal_seq, &wb->flushing.pin,
bch2_btree_write_buffer_journal_flush); bch2_btree_write_buffer_journal_flush);
bch2_trans_begin(trans);
ret = commit_do(trans, NULL, NULL, ret = commit_do(trans, NULL, NULL,
BCH_WATERMARK_reclaim| BCH_WATERMARK_reclaim|
BCH_TRANS_COMMIT_no_check_rw| BCH_TRANS_COMMIT_no_check_rw|
...@@ -265,37 +307,78 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) ...@@ -265,37 +307,78 @@ int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
} }
err: err:
bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret)); bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret));
trace_write_buffer_flush(trans, nr, skipped, fast, wb->size); trace_write_buffer_flush(trans, wb->flushing.keys.nr, skipped, fast, 0);
bch2_journal_pin_drop(j, &pin); bch2_journal_pin_drop(j, &wb->flushing.pin);
wb->flushing.keys.nr = 0;
return ret; return ret;
} }
int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans) static int fetch_wb_keys_from_journal(struct bch_fs *c, u64 seq)
{ {
struct bch_fs *c = trans->c; struct journal *j = &c->journal;
struct journal_buf *buf;
int ret = 0;
if (!bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer)) while (!ret && (buf = bch2_next_write_buffer_flush_journal_buf(j, seq))) {
return -BCH_ERR_erofs_no_writes; ret = bch2_journal_keys_to_write_buffer(c, buf);
mutex_unlock(&j->buf_lock);
}
trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_); return ret;
}
static int btree_write_buffer_flush_seq(struct btree_trans *trans, u64 seq)
{
struct bch_fs *c = trans->c;
struct btree_write_buffer *wb = &c->btree_write_buffer;
int ret = 0, fetch_from_journal_err;
do {
bch2_trans_unlock(trans); bch2_trans_unlock(trans);
mutex_lock(&c->btree_write_buffer.flush_lock);
int ret = bch2_btree_write_buffer_flush_locked(trans); fetch_from_journal_err = fetch_wb_keys_from_journal(c, seq);
mutex_unlock(&c->btree_write_buffer.flush_lock);
bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer); /*
* On memory allocation failure, bch2_btree_write_buffer_flush_locked()
* is not guaranteed to empty wb->inc:
*/
mutex_lock(&wb->flushing.lock);
ret = bch2_btree_write_buffer_flush_locked(trans);
mutex_unlock(&wb->flushing.lock);
} while (!ret &&
(fetch_from_journal_err ||
(wb->inc.pin.seq && wb->inc.pin.seq <= seq) ||
(wb->flushing.pin.seq && wb->flushing.pin.seq <= seq)));
return ret; return ret;
} }
static int bch2_btree_write_buffer_journal_flush(struct journal *j,
struct journal_entry_pin *_pin, u64 seq)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
return bch2_trans_run(c, btree_write_buffer_flush_seq(trans, seq));
}
int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
{
struct bch_fs *c = trans->c;
trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_);
return btree_write_buffer_flush_seq(trans, journal_cur_seq(&c->journal));
}
int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans) int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans)
{ {
struct bch_fs *c = trans->c; struct bch_fs *c = trans->c;
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
int ret = 0; int ret = 0;
if (mutex_trylock(&wb->flush_lock)) { if (mutex_trylock(&wb->flushing.lock)) {
ret = bch2_btree_write_buffer_flush_locked(trans); ret = bch2_btree_write_buffer_flush_locked(trans);
mutex_unlock(&wb->flush_lock); mutex_unlock(&wb->flushing.lock);
} }
return ret; return ret;
...@@ -313,90 +396,179 @@ int bch2_btree_write_buffer_tryflush(struct btree_trans *trans) ...@@ -313,90 +396,179 @@ int bch2_btree_write_buffer_tryflush(struct btree_trans *trans)
return ret; return ret;
} }
static int bch2_btree_write_buffer_journal_flush(struct journal *j, static void bch2_btree_write_buffer_flush_work(struct work_struct *work)
struct journal_entry_pin *_pin, u64 seq)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(work, struct bch_fs, btree_write_buffer.flush_work);
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
int ret;
mutex_lock(&wb->flush_lock); mutex_lock(&wb->flushing.lock);
int ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans)); do {
mutex_unlock(&wb->flush_lock); ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans));
} while (!ret && bch2_btree_write_buffer_should_flush(c));
mutex_unlock(&wb->flushing.lock);
return ret; bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
} }
static inline u64 btree_write_buffer_ref(int idx) int __bch2_journal_key_to_wb(struct bch_fs *c,
struct journal_keys_to_wb *dst,
enum btree_id btree, struct bkey_i *k)
{ {
return ((union btree_write_buffer_state) { struct btree_write_buffer *wb = &c->btree_write_buffer;
.ref0 = idx == 0, int ret;
.ref1 = idx == 1, retry:
}).v; ret = darray_make_room_gfp(&dst->wb->keys, 1, GFP_KERNEL);
if (!ret && dst->wb == &wb->flushing)
ret = darray_resize(&wb->sorted, wb->flushing.keys.size);
if (unlikely(ret)) {
if (dst->wb == &c->btree_write_buffer.flushing) {
mutex_unlock(&dst->wb->lock);
dst->wb = &c->btree_write_buffer.inc;
bch2_journal_pin_add(&c->journal, dst->seq, &dst->wb->pin,
bch2_btree_write_buffer_journal_flush);
goto retry;
}
return ret;
}
dst->room = darray_room(dst->wb->keys);
if (dst->wb == &wb->flushing)
dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
BUG_ON(!dst->room);
BUG_ON(!dst->seq);
struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys);
wb_k->journal_seq = dst->seq;
wb_k->btree = btree;
bkey_copy(&wb_k->k, k);
dst->wb->keys.nr++;
dst->room--;
return 0;
} }
int bch2_btree_insert_keys_write_buffer(struct btree_trans *trans) void bch2_journal_keys_to_write_buffer_start(struct bch_fs *c, struct journal_keys_to_wb *dst, u64 seq)
{ {
struct bch_fs *c = trans->c;
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
struct btree_write_buffered_key *i;
union btree_write_buffer_state old, new;
int ret = 0;
u64 v;
trans_for_each_wb_update(trans, i) { if (mutex_trylock(&wb->flushing.lock)) {
EBUG_ON(i->k.k.u64s > BTREE_WRITE_BUFERED_U64s_MAX); mutex_lock(&wb->inc.lock);
move_keys_from_inc_to_flushing(wb);
/*
* Attempt to skip wb->inc, and add keys directly to
* wb->flushing, saving us a copy later:
*/
i->journal_seq = trans->journal_res.seq; if (!wb->inc.keys.nr) {
i->journal_offset = trans->journal_res.offset; dst->wb = &wb->flushing;
} else {
mutex_unlock(&wb->flushing.lock);
dst->wb = &wb->inc;
}
} else {
mutex_lock(&wb->inc.lock);
dst->wb = &wb->inc;
} }
preempt_disable(); dst->room = darray_room(dst->wb->keys);
v = READ_ONCE(wb->state.v); if (dst->wb == &wb->flushing)
do { dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
old.v = new.v = v; dst->seq = seq;
bch2_journal_pin_add(&c->journal, seq, &dst->wb->pin,
bch2_btree_write_buffer_journal_flush);
}
void bch2_journal_keys_to_write_buffer_end(struct bch_fs *c, struct journal_keys_to_wb *dst)
{
struct btree_write_buffer *wb = &c->btree_write_buffer;
if (!dst->wb->keys.nr)
bch2_journal_pin_drop(&c->journal, &dst->wb->pin);
if (bch2_btree_write_buffer_should_flush(c) &&
__bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer) &&
!queue_work(system_unbound_wq, &c->btree_write_buffer.flush_work))
bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
if (dst->wb == &wb->flushing)
mutex_unlock(&wb->flushing.lock);
mutex_unlock(&wb->inc.lock);
}
static int bch2_journal_keys_to_write_buffer(struct bch_fs *c, struct journal_buf *buf)
{
struct journal_keys_to_wb dst;
struct jset_entry *entry;
struct bkey_i *k;
int ret = 0;
bch2_journal_keys_to_write_buffer_start(c, &dst, le64_to_cpu(buf->data->seq));
new.v += btree_write_buffer_ref(new.idx); for_each_jset_entry_type(entry, buf->data, BCH_JSET_ENTRY_write_buffer_keys) {
new.nr += trans->nr_wb_updates; jset_entry_for_each_key(entry, k) {
if (new.nr > wb->size) { ret = bch2_journal_key_to_wb(c, &dst, entry->btree_id, k);
ret = -BCH_ERR_btree_insert_need_flush_buffer; if (ret)
goto out; goto out;
} }
} while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
memcpy(wb->keys[new.idx] + old.nr,
trans->wb_updates,
sizeof(trans->wb_updates[0]) * trans->nr_wb_updates);
bch2_journal_pin_add(&c->journal, trans->journal_res.seq, &wb->journal_pin, entry->type = BCH_JSET_ENTRY_btree_keys;
bch2_btree_write_buffer_journal_flush); }
atomic64_sub_return_release(btree_write_buffer_ref(new.idx), &wb->state.counter); buf->need_flush_to_write_buffer = false;
out: out:
preempt_enable(); bch2_journal_keys_to_write_buffer_end(c, &dst);
return ret; return ret;
} }
static int wb_keys_resize(struct btree_write_buffer_keys *wb, size_t new_size)
{
if (wb->keys.size >= new_size)
return 0;
if (!mutex_trylock(&wb->lock))
return -EINTR;
int ret = darray_resize(&wb->keys, new_size);
mutex_unlock(&wb->lock);
return ret;
}
int bch2_btree_write_buffer_resize(struct bch_fs *c, size_t new_size)
{
struct btree_write_buffer *wb = &c->btree_write_buffer;
return wb_keys_resize(&wb->flushing, new_size) ?:
wb_keys_resize(&wb->inc, new_size);
}
void bch2_fs_btree_write_buffer_exit(struct bch_fs *c) void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
{ {
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
BUG_ON(wb->state.nr && !bch2_journal_error(&c->journal)); BUG_ON((wb->inc.keys.nr || wb->flushing.keys.nr) &&
!bch2_journal_error(&c->journal));
kvfree(wb->keys[1]); darray_exit(&wb->sorted);
kvfree(wb->keys[0]); darray_exit(&wb->flushing.keys);
darray_exit(&wb->inc.keys);
} }
int bch2_fs_btree_write_buffer_init(struct bch_fs *c) int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
{ {
struct btree_write_buffer *wb = &c->btree_write_buffer; struct btree_write_buffer *wb = &c->btree_write_buffer;
mutex_init(&wb->flush_lock); mutex_init(&wb->inc.lock);
wb->size = c->opts.btree_write_buffer_size; mutex_init(&wb->flushing.lock);
INIT_WORK(&wb->flush_work, bch2_btree_write_buffer_flush_work);
wb->keys[0] = kvmalloc_array(wb->size, sizeof(*wb->keys[0]), GFP_KERNEL); /* Will be resized by journal as needed: */
wb->keys[1] = kvmalloc_array(wb->size, sizeof(*wb->keys[1]), GFP_KERNEL); unsigned initial_size = 1 << 16;
if (!wb->keys[0] || !wb->keys[1])
return -BCH_ERR_ENOMEM_fs_btree_write_buffer_init;
return 0; return darray_make_room(&wb->inc.keys, initial_size) ?:
darray_make_room(&wb->flushing.keys, initial_size) ?:
darray_make_room(&wb->sorted, initial_size);
} }
...@@ -2,13 +2,59 @@ ...@@ -2,13 +2,59 @@
#ifndef _BCACHEFS_BTREE_WRITE_BUFFER_H #ifndef _BCACHEFS_BTREE_WRITE_BUFFER_H
#define _BCACHEFS_BTREE_WRITE_BUFFER_H #define _BCACHEFS_BTREE_WRITE_BUFFER_H
int bch2_btree_write_buffer_flush_locked(struct btree_trans *); #include "bkey.h"
int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *);
static inline bool bch2_btree_write_buffer_should_flush(struct bch_fs *c)
{
struct btree_write_buffer *wb = &c->btree_write_buffer;
return wb->inc.keys.nr + wb->flushing.keys.nr > wb->inc.keys.size / 4;
}
static inline bool bch2_btree_write_buffer_must_wait(struct bch_fs *c)
{
struct btree_write_buffer *wb = &c->btree_write_buffer;
return wb->inc.keys.nr > wb->inc.keys.size * 3 / 4;
}
struct btree_trans;
int bch2_btree_write_buffer_flush_sync(struct btree_trans *); int bch2_btree_write_buffer_flush_sync(struct btree_trans *);
int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *);
int bch2_btree_write_buffer_tryflush(struct btree_trans *); int bch2_btree_write_buffer_tryflush(struct btree_trans *);
int bch2_btree_insert_keys_write_buffer(struct btree_trans *); struct journal_keys_to_wb {
struct btree_write_buffer_keys *wb;
size_t room;
u64 seq;
};
int __bch2_journal_key_to_wb(struct bch_fs *,
struct journal_keys_to_wb *,
enum btree_id, struct bkey_i *);
static inline int bch2_journal_key_to_wb(struct bch_fs *c,
struct journal_keys_to_wb *dst,
enum btree_id btree, struct bkey_i *k)
{
EBUG_ON(!dst->seq);
if (unlikely(!dst->room))
return __bch2_journal_key_to_wb(c, dst, btree, k);
struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys);
wb_k->journal_seq = dst->seq;
wb_k->btree = btree;
bkey_copy(&wb_k->k, k);
dst->wb->keys.nr++;
dst->room--;
return 0;
}
void bch2_journal_keys_to_write_buffer_start(struct bch_fs *, struct journal_keys_to_wb *, u64);
void bch2_journal_keys_to_write_buffer_end(struct bch_fs *, struct journal_keys_to_wb *);
int bch2_btree_write_buffer_resize(struct bch_fs *, size_t);
void bch2_fs_btree_write_buffer_exit(struct bch_fs *); void bch2_fs_btree_write_buffer_exit(struct bch_fs *);
int bch2_fs_btree_write_buffer_init(struct bch_fs *); int bch2_fs_btree_write_buffer_init(struct bch_fs *);
......
...@@ -2,43 +2,56 @@ ...@@ -2,43 +2,56 @@
#ifndef _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H #ifndef _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H
#define _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H #define _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H
#include "darray.h"
#include "journal_types.h" #include "journal_types.h"
#define BTREE_WRITE_BUFERED_VAL_U64s_MAX 4 #define BTREE_WRITE_BUFERED_VAL_U64s_MAX 4
#define BTREE_WRITE_BUFERED_U64s_MAX (BKEY_U64s + BTREE_WRITE_BUFERED_VAL_U64s_MAX) #define BTREE_WRITE_BUFERED_U64s_MAX (BKEY_U64s + BTREE_WRITE_BUFERED_VAL_U64s_MAX)
struct btree_write_buffered_key { struct wb_key_ref {
u64 journal_seq; union {
unsigned journal_offset;
enum btree_id btree;
__BKEY_PADDED(k, BTREE_WRITE_BUFERED_VAL_U64s_MAX);
};
union btree_write_buffer_state {
struct { struct {
atomic64_t counter; #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
}; unsigned idx:24;
struct bpos pos;
struct { enum btree_id btree:8;
u64 v; #else
}; enum btree_id btree:8;
struct bpos pos;
unsigned idx:24;
#endif
} __packed;
struct { struct {
u64 nr:23; #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
u64 idx:1; u64 lo;
u64 ref0:20; u64 mi;
u64 ref1:20; u64 hi;
#else
u64 hi;
u64 mi;
u64 lo;
#endif
}; };
}; };
};
struct btree_write_buffer { struct btree_write_buffered_key {
struct mutex flush_lock; enum btree_id btree:8;
struct journal_entry_pin journal_pin; u64 journal_seq:56;
__BKEY_PADDED(k, BTREE_WRITE_BUFERED_VAL_U64s_MAX);
};
union btree_write_buffer_state state; struct btree_write_buffer_keys {
size_t size; DARRAY(struct btree_write_buffered_key) keys;
struct journal_entry_pin pin;
struct mutex lock;
};
struct btree_write_buffered_key *keys[2]; struct btree_write_buffer {
DARRAY(struct wb_key_ref) sorted;
struct btree_write_buffer_keys inc;
struct btree_write_buffer_keys flushing;
struct work_struct flush_work;
}; };
#endif /* _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H */ #endif /* _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H */
...@@ -1005,7 +1005,7 @@ static int ec_stripe_update_extents(struct bch_fs *c, struct ec_stripe_buf *s) ...@@ -1005,7 +1005,7 @@ static int ec_stripe_update_extents(struct bch_fs *c, struct ec_stripe_buf *s)
unsigned i, nr_data = v->nr_blocks - v->nr_redundant; unsigned i, nr_data = v->nr_blocks - v->nr_redundant;
int ret = 0; int ret = 0;
ret = bch2_btree_write_buffer_flush_nocheck_rw(trans); ret = bch2_btree_write_buffer_flush_sync(trans);
if (ret) if (ret)
goto err; goto err;
......
...@@ -151,7 +151,6 @@ ...@@ -151,7 +151,6 @@
x(BCH_ERR_btree_insert_fail, btree_insert_need_mark_replicas) \ x(BCH_ERR_btree_insert_fail, btree_insert_need_mark_replicas) \
x(BCH_ERR_btree_insert_fail, btree_insert_need_journal_res) \ x(BCH_ERR_btree_insert_fail, btree_insert_need_journal_res) \
x(BCH_ERR_btree_insert_fail, btree_insert_need_journal_reclaim) \ x(BCH_ERR_btree_insert_fail, btree_insert_need_journal_reclaim) \
x(BCH_ERR_btree_insert_fail, btree_insert_need_flush_buffer) \
x(0, backpointer_to_overwritten_btree_node) \ x(0, backpointer_to_overwritten_btree_node) \
x(0, lock_fail_root_changed) \ x(0, lock_fail_root_changed) \
x(0, journal_reclaim_would_deadlock) \ x(0, journal_reclaim_would_deadlock) \
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "bkey_methods.h" #include "bkey_methods.h"
#include "btree_gc.h" #include "btree_gc.h"
#include "btree_update.h" #include "btree_update.h"
#include "btree_write_buffer.h"
#include "buckets.h" #include "buckets.h"
#include "error.h" #include "error.h"
#include "journal.h" #include "journal.h"
...@@ -332,6 +333,7 @@ static int journal_entry_open(struct journal *j) ...@@ -332,6 +333,7 @@ static int journal_entry_open(struct journal *j)
buf->must_flush = false; buf->must_flush = false;
buf->separate_flush = false; buf->separate_flush = false;
buf->flush_time = 0; buf->flush_time = 0;
buf->need_flush_to_write_buffer = true;
memset(buf->data, 0, sizeof(*buf->data)); memset(buf->data, 0, sizeof(*buf->data));
buf->data->seq = cpu_to_le64(journal_cur_seq(j)); buf->data->seq = cpu_to_le64(journal_cur_seq(j));
...@@ -768,6 +770,48 @@ void bch2_journal_block(struct journal *j) ...@@ -768,6 +770,48 @@ void bch2_journal_block(struct journal *j)
journal_quiesce(j); journal_quiesce(j);
} }
static struct journal_buf *__bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq)
{
struct journal_buf *ret = NULL;
mutex_lock(&j->buf_lock);
spin_lock(&j->lock);
max_seq = min(max_seq, journal_cur_seq(j));
for (u64 seq = journal_last_unwritten_seq(j);
seq <= max_seq;
seq++) {
unsigned idx = seq & JOURNAL_BUF_MASK;
struct journal_buf *buf = j->buf + idx;
if (buf->need_flush_to_write_buffer) {
if (seq == journal_cur_seq(j))
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL);
union journal_res_state s;
s.v = atomic64_read_acquire(&j->reservations.counter);
ret = journal_state_count(s, idx)
? ERR_PTR(-EAGAIN)
: buf;
break;
}
}
spin_unlock(&j->lock);
if (IS_ERR_OR_NULL(ret))
mutex_unlock(&j->buf_lock);
return ret;
}
struct journal_buf *bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq)
{
struct journal_buf *ret;
wait_event(j->wait, (ret = __bch2_next_write_buffer_flush_journal_buf(j, max_seq)) != ERR_PTR(-EAGAIN));
return ret;
}
/* allocate journal on a device: */ /* allocate journal on a device: */
static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
......
...@@ -425,6 +425,7 @@ static inline void bch2_journal_set_replay_done(struct journal *j) ...@@ -425,6 +425,7 @@ static inline void bch2_journal_set_replay_done(struct journal *j)
void bch2_journal_unblock(struct journal *); void bch2_journal_unblock(struct journal *);
void bch2_journal_block(struct journal *); void bch2_journal_block(struct journal *);
struct journal_buf *bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq);
void __bch2_journal_debug_to_text(struct printbuf *, struct journal *); void __bch2_journal_debug_to_text(struct printbuf *, struct journal *);
void bch2_journal_debug_to_text(struct printbuf *, struct journal *); void bch2_journal_debug_to_text(struct printbuf *, struct journal *);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "alloc_foreground.h" #include "alloc_foreground.h"
#include "btree_io.h" #include "btree_io.h"
#include "btree_update_interior.h" #include "btree_update_interior.h"
#include "btree_write_buffer.h"
#include "buckets.h" #include "buckets.h"
#include "checksum.h" #include "checksum.h"
#include "disk_groups.h" #include "disk_groups.h"
...@@ -723,6 +724,22 @@ static void journal_entry_overwrite_to_text(struct printbuf *out, struct bch_fs ...@@ -723,6 +724,22 @@ static void journal_entry_overwrite_to_text(struct printbuf *out, struct bch_fs
journal_entry_btree_keys_to_text(out, c, entry); journal_entry_btree_keys_to_text(out, c, entry);
} }
static int journal_entry_write_buffer_keys_validate(struct bch_fs *c,
struct jset *jset,
struct jset_entry *entry,
unsigned version, int big_endian,
enum bkey_invalid_flags flags)
{
return journal_entry_btree_keys_validate(c, jset, entry,
version, big_endian, READ);
}
static void journal_entry_write_buffer_keys_to_text(struct printbuf *out, struct bch_fs *c,
struct jset_entry *entry)
{
journal_entry_btree_keys_to_text(out, c, entry);
}
struct jset_entry_ops { struct jset_entry_ops {
int (*validate)(struct bch_fs *, struct jset *, int (*validate)(struct bch_fs *, struct jset *,
struct jset_entry *, unsigned, int, struct jset_entry *, unsigned, int,
...@@ -1503,6 +1520,8 @@ static int journal_write_alloc(struct journal *j, struct journal_buf *w) ...@@ -1503,6 +1520,8 @@ static int journal_write_alloc(struct journal *j, struct journal_buf *w)
static void journal_buf_realloc(struct journal *j, struct journal_buf *buf) static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal);
/* we aren't holding j->lock: */ /* we aren't holding j->lock: */
unsigned new_size = READ_ONCE(j->buf_size_want); unsigned new_size = READ_ONCE(j->buf_size_want);
void *new_buf; void *new_buf;
...@@ -1510,6 +1529,11 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf) ...@@ -1510,6 +1529,11 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
if (buf->buf_size >= new_size) if (buf->buf_size >= new_size)
return; return;
size_t btree_write_buffer_size = new_size / 64;
if (bch2_btree_write_buffer_resize(c, btree_write_buffer_size))
return;
new_buf = kvpmalloc(new_size, GFP_NOFS|__GFP_NOWARN); new_buf = kvpmalloc(new_size, GFP_NOFS|__GFP_NOWARN);
if (!new_buf) if (!new_buf)
return; return;
...@@ -1703,9 +1727,11 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) ...@@ -1703,9 +1727,11 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct jset_entry *start, *end, *i, *next, *prev = NULL; struct jset_entry *start, *end, *i, *next, *prev = NULL;
struct jset *jset = w->data; struct jset *jset = w->data;
struct journal_keys_to_wb wb = { NULL };
unsigned sectors, bytes, u64s; unsigned sectors, bytes, u64s;
bool validate_before_checksum = false;
unsigned long btree_roots_have = 0; unsigned long btree_roots_have = 0;
bool validate_before_checksum = false;
u64 seq = le64_to_cpu(jset->seq);
int ret; int ret;
/* /*
...@@ -1733,9 +1759,28 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) ...@@ -1733,9 +1759,28 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
* to c->btree_roots we have to get any missing btree roots and * to c->btree_roots we have to get any missing btree roots and
* add them to this journal entry: * add them to this journal entry:
*/ */
if (i->type == BCH_JSET_ENTRY_btree_root) { switch (i->type) {
case BCH_JSET_ENTRY_btree_root:
bch2_journal_entry_to_btree_root(c, i); bch2_journal_entry_to_btree_root(c, i);
__set_bit(i->btree_id, &btree_roots_have); __set_bit(i->btree_id, &btree_roots_have);
break;
case BCH_JSET_ENTRY_write_buffer_keys:
EBUG_ON(!w->need_flush_to_write_buffer);
if (!wb.wb)
bch2_journal_keys_to_write_buffer_start(c, &wb, seq);
struct bkey_i *k;
jset_entry_for_each_key(i, k) {
ret = bch2_journal_key_to_wb(c, &wb, i->btree_id, k);
if (ret) {
bch2_fs_fatal_error(c, "-ENOMEM flushing journal keys to btree write buffer");
bch2_journal_keys_to_write_buffer_end(c, &wb);
return ret;
}
}
i->type = BCH_JSET_ENTRY_btree_keys;
break;
} }
/* Can we merge with previous entry? */ /* Can we merge with previous entry? */
...@@ -1758,6 +1803,10 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) ...@@ -1758,6 +1803,10 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
memmove_u64s_down(prev, i, jset_u64s(u64s)); memmove_u64s_down(prev, i, jset_u64s(u64s));
} }
if (wb.wb)
bch2_journal_keys_to_write_buffer_end(c, &wb);
w->need_flush_to_write_buffer = false;
prev = prev ? vstruct_next(prev) : jset->start; prev = prev ? vstruct_next(prev) : jset->start;
jset->u64s = cpu_to_le32((u64 *) prev - jset->_data); jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
...@@ -1765,8 +1814,7 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) ...@@ -1765,8 +1814,7 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
end = bch2_btree_roots_to_journal_entries(c, end, btree_roots_have); end = bch2_btree_roots_to_journal_entries(c, end, btree_roots_have);
bch2_journal_super_entries_add_common(c, &end, bch2_journal_super_entries_add_common(c, &end, seq);
le64_to_cpu(jset->seq));
u64s = (u64 *) end - (u64 *) start; u64s = (u64 *) end - (u64 *) start;
BUG_ON(u64s > j->entry_u64s_reserved); BUG_ON(u64s > j->entry_u64s_reserved);
...@@ -1789,7 +1837,7 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) ...@@ -1789,7 +1837,7 @@ static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c)); SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
if (!JSET_NO_FLUSH(jset) && journal_entry_empty(jset)) if (!JSET_NO_FLUSH(jset) && journal_entry_empty(jset))
j->last_empty_seq = le64_to_cpu(jset->seq); j->last_empty_seq = seq;
if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset))) if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)))
validate_before_checksum = true; validate_before_checksum = true;
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "bcachefs.h" #include "bcachefs.h"
#include "btree_key_cache.h" #include "btree_key_cache.h"
#include "btree_update.h" #include "btree_update.h"
#include "btree_write_buffer.h"
#include "buckets.h" #include "buckets.h"
#include "errcode.h" #include "errcode.h"
#include "error.h" #include "error.h"
...@@ -50,20 +51,23 @@ unsigned bch2_journal_dev_buckets_available(struct journal *j, ...@@ -50,20 +51,23 @@ unsigned bch2_journal_dev_buckets_available(struct journal *j,
return available; return available;
} }
static inline void journal_set_watermark(struct journal *j) void bch2_journal_set_watermark(struct journal *j)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal); struct bch_fs *c = container_of(j, struct bch_fs, journal);
bool low_on_space = j->space[journal_space_clean].total * 4 <= bool low_on_space = j->space[journal_space_clean].total * 4 <=
j->space[journal_space_total].total; j->space[journal_space_total].total;
bool low_on_pin = fifo_free(&j->pin) < j->pin.size / 4; bool low_on_pin = fifo_free(&j->pin) < j->pin.size / 4;
unsigned watermark = low_on_space || low_on_pin bool low_on_wb = bch2_btree_write_buffer_must_wait(c);
unsigned watermark = low_on_space || low_on_pin || low_on_wb
? BCH_WATERMARK_reclaim ? BCH_WATERMARK_reclaim
: BCH_WATERMARK_stripe; : BCH_WATERMARK_stripe;
if (track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_space], if (track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_space],
&j->low_on_space_start, low_on_space) || &j->low_on_space_start, low_on_space) ||
track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_pin], track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_pin],
&j->low_on_pin_start, low_on_pin)) &j->low_on_pin_start, low_on_pin) ||
track_event_change(&c->times[BCH_TIME_blocked_write_buffer_full],
&j->write_buffer_full_start, low_on_wb))
trace_and_count(c, journal_full, c); trace_and_count(c, journal_full, c);
swap(watermark, j->watermark); swap(watermark, j->watermark);
...@@ -230,7 +234,7 @@ void bch2_journal_space_available(struct journal *j) ...@@ -230,7 +234,7 @@ void bch2_journal_space_available(struct journal *j)
else else
clear_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags); clear_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags);
journal_set_watermark(j); bch2_journal_set_watermark(j);
out: out:
j->cur_entry_sectors = !ret ? j->space[journal_space_discarded].next_entry : 0; j->cur_entry_sectors = !ret ? j->space[journal_space_discarded].next_entry : 0;
j->cur_entry_error = ret; j->cur_entry_error = ret;
......
...@@ -16,6 +16,7 @@ static inline void journal_reclaim_kick(struct journal *j) ...@@ -16,6 +16,7 @@ static inline void journal_reclaim_kick(struct journal *j)
unsigned bch2_journal_dev_buckets_available(struct journal *, unsigned bch2_journal_dev_buckets_available(struct journal *,
struct journal_device *, struct journal_device *,
enum journal_space_from); enum journal_space_from);
void bch2_journal_set_watermark(struct journal *);
void bch2_journal_space_available(struct journal *); void bch2_journal_space_available(struct journal *);
static inline bool journal_pin_active(struct journal_entry_pin *pin) static inline bool journal_pin_active(struct journal_entry_pin *pin)
......
...@@ -36,6 +36,7 @@ struct journal_buf { ...@@ -36,6 +36,7 @@ struct journal_buf {
bool noflush; /* write has already been kicked off, and was noflush */ bool noflush; /* write has already been kicked off, and was noflush */
bool must_flush; /* something wants a flush */ bool must_flush; /* something wants a flush */
bool separate_flush; bool separate_flush;
bool need_flush_to_write_buffer;
}; };
/* /*
...@@ -276,6 +277,7 @@ struct journal { ...@@ -276,6 +277,7 @@ struct journal {
u64 low_on_space_start; u64 low_on_space_start;
u64 low_on_pin_start; u64 low_on_pin_start;
u64 max_in_flight_start; u64 max_in_flight_start;
u64 write_buffer_full_start;
struct bch2_time_stats *flush_write_time; struct bch2_time_stats *flush_write_time;
struct bch2_time_stats *noflush_write_time; struct bch2_time_stats *noflush_write_time;
......
...@@ -233,11 +233,6 @@ enum fsck_err_opts { ...@@ -233,11 +233,6 @@ enum fsck_err_opts {
OPT_BOOL(), \ OPT_BOOL(), \
BCH2_NO_SB_OPT, true, \ BCH2_NO_SB_OPT, true, \
NULL, "Stash pointer to in memory btree node in btree ptr")\ NULL, "Stash pointer to in memory btree node in btree ptr")\
x(btree_write_buffer_size, u32, \
OPT_FS|OPT_MOUNT, \
OPT_UINT(16, (1U << 20) - 1), \
BCH2_NO_SB_OPT, 1U << 13, \
NULL, "Number of btree write buffer entries") \
x(gc_reserve_percent, u8, \ x(gc_reserve_percent, u8, \
OPT_FS|OPT_FORMAT|OPT_MOUNT|OPT_RUNTIME, \ OPT_FS|OPT_FORMAT|OPT_MOUNT|OPT_RUNTIME, \
OPT_UINT(5, 21), \ OPT_UINT(5, 21), \
......
...@@ -363,7 +363,8 @@ void bch2_fs_read_only(struct bch_fs *c) ...@@ -363,7 +363,8 @@ void bch2_fs_read_only(struct bch_fs *c)
BUG_ON(c->journal.last_empty_seq != journal_cur_seq(&c->journal)); BUG_ON(c->journal.last_empty_seq != journal_cur_seq(&c->journal));
BUG_ON(atomic_read(&c->btree_cache.dirty)); BUG_ON(atomic_read(&c->btree_cache.dirty));
BUG_ON(atomic_long_read(&c->btree_key_cache.nr_dirty)); BUG_ON(atomic_long_read(&c->btree_key_cache.nr_dirty));
BUG_ON(c->btree_write_buffer.state.nr); BUG_ON(c->btree_write_buffer.inc.keys.nr);
BUG_ON(c->btree_write_buffer.flushing.keys.nr);
bch_verbose(c, "marking filesystem clean"); bch_verbose(c, "marking filesystem clean");
bch2_fs_mark_clean(c); bch2_fs_mark_clean(c);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment