Commit 30ef633a authored by Kent Overstreet's avatar Kent Overstreet Committed by Kent Overstreet

bcachefs: Refactor journal code to not use unwritten_idx

It makes the code more readable if we work off of sequence numbers,
instead of direct indexes into the array of journal buffers.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@gmail.com>
parent f0a3a2cc
......@@ -19,11 +19,6 @@
#include "super-io.h"
#include "trace.h"
static u64 last_unwritten_seq(struct journal *j)
{
return j->seq_ondisk + 1;
}
static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
{
return seq > j->seq_ondisk;
......@@ -34,6 +29,11 @@ static bool __journal_entry_is_open(union journal_res_state state)
return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
}
static inline unsigned nr_unwritten_journal_entries(struct journal *j)
{
return atomic64_read(&j->seq) - j->seq_ondisk;
}
static bool journal_entry_is_open(struct journal *j)
{
return __journal_entry_is_open(j->reservations);
......@@ -166,6 +166,7 @@ static bool __journal_entry_close(struct journal *j)
*/
buf->last_seq = journal_last_seq(j);
buf->data->last_seq = cpu_to_le64(buf->last_seq);
BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq));
__bch2_journal_pin_put(j, le64_to_cpu(buf->data->seq));
......@@ -179,18 +180,19 @@ static bool __journal_entry_close(struct journal *j)
static bool journal_entry_want_write(struct journal *j)
{
union journal_res_state s = READ_ONCE(j->reservations);
struct journal_buf *buf = journal_cur_buf(j);
bool ret = false;
if (!buf->flush_time) {
buf->flush_time = local_clock() ?: 1;
buf->expires = jiffies;
}
bool ret = !journal_entry_is_open(j) ||
(journal_cur_seq(j) == journal_last_unwritten_seq(j) &&
__journal_entry_close(j));
/* Don't close it yet if we already have a write in flight: */
if (s.idx == s.unwritten_idx)
ret = __journal_entry_close(j);
if (!ret && nr_unwritten_journal_entries(j)) {
struct journal_buf *buf = journal_cur_buf(j);
if (!buf->flush_time) {
buf->flush_time = local_clock() ?: 1;
buf->expires = jiffies;
}
}
return ret;
}
......@@ -310,8 +312,8 @@ static int journal_entry_open(struct journal *j)
static bool journal_quiesced(struct journal *j)
{
union journal_res_state s = READ_ONCE(j->reservations);
bool ret = s.idx == s.unwritten_idx && !__journal_entry_is_open(s);
bool ret = atomic64_read(&j->seq) == j->seq_ondisk ||
bch2_journal_error(j);
if (!ret)
journal_entry_close(j);
......@@ -416,7 +418,7 @@ static int __journal_res_get(struct journal *j, struct journal_res *res,
if ((ret == cur_entry_journal_full ||
ret == cur_entry_journal_pin_full) &&
!can_discard &&
j->reservations.idx == j->reservations.unwritten_idx &&
!nr_unwritten_journal_entries(j) &&
(flags & JOURNAL_RES_GET_RESERVED)) {
struct printbuf buf = PRINTBUF;
......@@ -576,7 +578,7 @@ int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
}
/* if seq was written, but not flushed - flush a newer one instead */
seq = max(seq, last_unwritten_seq(j));
seq = max(seq, journal_last_unwritten_seq(j));
recheck_need_open:
if (seq > journal_cur_seq(j)) {
......@@ -709,13 +711,13 @@ bool bch2_journal_noflush_seq(struct journal *j, u64 seq)
if (seq <= c->journal.flushed_seq_ondisk)
goto out;
for (unwritten_seq = last_unwritten_seq(j);
for (unwritten_seq = journal_last_unwritten_seq(j);
unwritten_seq < seq;
unwritten_seq++) {
struct journal_buf *buf = journal_seq_to_buf(j, unwritten_seq);
/* journal write is already in flight, and was a flush write: */
if (unwritten_seq == last_unwritten_seq(j) && !buf->noflush)
if (unwritten_seq == journal_last_unwritten_seq(j) && !buf->noflush)
goto out;
buf->noflush = true;
......@@ -940,17 +942,16 @@ int bch2_dev_journal_alloc(struct bch_dev *ca)
static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
{
union journal_res_state state;
bool ret = false;
unsigned i;
u64 seq;
spin_lock(&j->lock);
state = READ_ONCE(j->reservations);
i = state.idx;
for (seq = journal_last_unwritten_seq(j);
seq <= journal_cur_seq(j) && !ret;
seq++) {
struct journal_buf *buf = journal_seq_to_buf(j, seq);
while (i != state.unwritten_idx) {
i = (i - 1) & JOURNAL_BUF_MASK;
if (bch2_bkey_has_device(bkey_i_to_s_c(&j->buf[i].key), dev_idx))
if (bch2_bkey_has_device(bkey_i_to_s_c(&buf->key), dev_idx))
ret = true;
}
spin_unlock(&j->lock);
......@@ -1012,6 +1013,7 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
j->replay_journal_seq_end = cur_seq;
j->last_seq_ondisk = last_seq;
j->flushed_seq_ondisk = cur_seq - 1;
j->seq_ondisk = cur_seq - 1;
j->pin.front = last_seq;
j->pin.back = cur_seq;
atomic64_set(&j->seq, cur_seq - 1);
......@@ -1164,15 +1166,18 @@ void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
union journal_res_state s;
struct bch_dev *ca;
unsigned long now = jiffies;
u64 seq;
unsigned i;
out->atomic++;
out->tabstops[0] = 24;
rcu_read_lock();
s = READ_ONCE(j->reservations);
pr_buf(out, "active journal entries:\t%llu\n", fifo_used(&j->pin));
pr_buf(out, "dirty journal entries:\t%llu\n", fifo_used(&j->pin));
pr_buf(out, "seq:\t\t\t%llu\n", journal_cur_seq(j));
pr_buf(out, "seq_ondisk:\t\t%llu\n", j->seq_ondisk);
pr_buf(out, "last_seq:\t\t%llu\n", journal_last_seq(j));
pr_buf(out, "last_seq_ondisk:\t%llu\n", j->last_seq_ondisk);
pr_buf(out, "flushed_seq_ondisk:\t%llu\n", j->flushed_seq_ondisk);
......@@ -1191,33 +1196,42 @@ void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
switch (s.cur_entry_offset) {
case JOURNAL_ENTRY_ERROR_VAL:
pr_buf(out, "error\n");
pr_buf(out, "error");
break;
case JOURNAL_ENTRY_CLOSED_VAL:
pr_buf(out, "closed\n");
pr_buf(out, "closed");
break;
default:
pr_buf(out, "%u/%u\n", s.cur_entry_offset, j->cur_entry_u64s);
pr_buf(out, "%u/%u", s.cur_entry_offset, j->cur_entry_u64s);
break;
}
pr_buf(out, "current entry:\t\tidx %u refcount %u\n", s.idx, journal_state_count(s, s.idx));
pr_newline(out);
i = s.idx;
while (i != s.unwritten_idx) {
i = (i - 1) & JOURNAL_BUF_MASK;
for (seq = journal_cur_seq(j);
seq >= journal_last_unwritten_seq(j);
--seq) {
i = seq & JOURNAL_BUF_MASK;
pr_buf(out, "unwritten entry:\tidx %u refcount %u sectors %u\n",
i, journal_state_count(s, i), j->buf[i].sectors);
pr_buf(out, "unwritten entry:");
pr_tab(out);
pr_buf(out, "%llu", seq);
pr_newline(out);
pr_indent_push(out, 2);
pr_buf(out, "refcount %u", journal_state_count(s, i));
pr_buf(out, "refcount:");
pr_tab(out);
pr_buf(out, "%u", journal_state_count(s, i));
pr_newline(out);
pr_buf(out, "sectors %u", j->buf[i].sectors);
pr_buf(out, "sectors:");
pr_tab(out);
pr_buf(out, "%u", j->buf[i].sectors);
pr_newline(out);
pr_buf(out, "expires %li ms", jiffies_to_msecs(j->buf[i].expires - jiffies));
pr_buf(out, "expires");
pr_tab(out);
pr_buf(out, "%li jiffies", j->buf[i].expires - jiffies);
pr_newline(out);
pr_indent_pop(out, 2);
......
......@@ -141,6 +141,11 @@ static inline u64 journal_cur_seq(struct journal *j)
return j->pin.back - 1;
}
static inline u64 journal_last_unwritten_seq(struct journal *j)
{
return j->seq_ondisk + 1;
}
static inline int journal_state_count(union journal_res_state s, int idx)
{
switch (idx) {
......
......@@ -1331,7 +1331,7 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
static inline struct journal_buf *journal_last_unwritten_buf(struct journal *j)
{
return j->buf + j->reservations.unwritten_idx;
return j->buf + (journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK);
}
static void journal_write_done(struct closure *cl)
......@@ -1403,12 +1403,13 @@ static void journal_write_done(struct closure *cl)
closure_wake_up(&w->wait);
journal_wake(j);
if (new.unwritten_idx == new.idx) {
if (journal_last_unwritten_seq(j) == journal_cur_seq(j)) {
struct journal_buf *buf = journal_cur_buf(j);
long delta = buf->expires - jiffies;
mod_delayed_work(c->io_complete_wq, &j->write_work, max(0L, delta));
} else if (!journal_state_count(new, new.unwritten_idx))
} else if (journal_last_unwritten_seq(j) < journal_cur_seq(j) &&
!journal_state_count(new, new.unwritten_idx))
closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL);
spin_unlock(&j->lock);
......
......@@ -59,25 +59,13 @@ static void journal_set_remaining(struct journal *j, unsigned u64s_remaining)
old.v, new.v)) != old.v);
}
static inline unsigned get_unwritten_sectors(struct journal *j, unsigned *idx)
{
unsigned sectors = 0;
while (!sectors && *idx != j->reservations.idx) {
sectors = j->buf[*idx].sectors;
*idx = (*idx + 1) & JOURNAL_BUF_MASK;
}
return sectors;
}
static struct journal_space
journal_dev_space_available(struct journal *j, struct bch_dev *ca,
enum journal_space_from from)
{
struct journal_device *ja = &ca->journal;
unsigned sectors, buckets, unwritten, idx = j->reservations.unwritten_idx;
unsigned sectors, buckets, unwritten;
u64 seq;
if (from == journal_space_total)
return (struct journal_space) {
......@@ -92,7 +80,14 @@ journal_dev_space_available(struct journal *j, struct bch_dev *ca,
* We that we don't allocate the space for a journal entry
* until we write it out - thus, account for it here:
*/
while ((unwritten = get_unwritten_sectors(j, &idx))) {
for (seq = journal_last_unwritten_seq(j);
seq <= journal_cur_seq(j);
seq++) {
unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors;
if (!unwritten)
continue;
/* entry won't fit on this device, skip: */
if (unwritten > ca->mi.bucket_size)
continue;
......@@ -214,8 +209,7 @@ void bch2_journal_space_available(struct journal *j)
total = j->space[journal_space_total].total;
if (!clean_ondisk &&
j->reservations.idx ==
j->reservations.unwritten_idx) {
journal_cur_seq(j) == j->seq_ondisk) {
struct printbuf buf = PRINTBUF;
__bch2_journal_debug_to_text(&buf, j);
......
......@@ -25,7 +25,7 @@ struct journal_buf {
struct closure_waitlist wait;
u64 last_seq; /* copy of data->last_seq */
unsigned long expires;
long expires;
u64 flush_time;
unsigned buf_size; /* size in bytes of @data */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment