Commit 05c5f4ee authored by Jens Axboe's avatar Jens Axboe

io-wq: get rid of FIXED worker flag

It makes the logic easier to follow if we just get rid of the fixed worker
flag, and simply ensure that we never exit the last worker in the group.
This also means that no particular worker is special.

Just track the last timeout state, and if we have hit it and no work
is pending, check if there are other workers. If yes, then we can exit
this one safely.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 15e20db2
...@@ -23,8 +23,7 @@ enum { ...@@ -23,8 +23,7 @@ enum {
IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_UP = 1, /* up and active */
IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_RUNNING = 2, /* account as running */
IO_WORKER_F_FREE = 4, /* worker on free list */ IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_FIXED = 8, /* static idle worker */ IO_WORKER_F_BOUND = 8, /* is doing bounded work */
IO_WORKER_F_BOUND = 16, /* is doing bounded work */
}; };
enum { enum {
...@@ -132,7 +131,7 @@ struct io_cb_cancel_data { ...@@ -132,7 +131,7 @@ struct io_cb_cancel_data {
bool cancel_all; bool cancel_all;
}; };
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first); static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static void io_wqe_dec_running(struct io_worker *worker); static void io_wqe_dec_running(struct io_worker *worker);
static bool io_worker_get(struct io_worker *worker) static bool io_worker_get(struct io_worker *worker)
...@@ -241,7 +240,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe, ...@@ -241,7 +240,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
*/ */
static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{ {
bool do_create = false, first = false; bool do_create = false;
/* /*
* Most likely an attempt to queue unbounded work on an io_wq that * Most likely an attempt to queue unbounded work on an io_wq that
...@@ -252,8 +251,6 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ...@@ -252,8 +251,6 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
raw_spin_lock(&wqe->lock); raw_spin_lock(&wqe->lock);
if (acct->nr_workers < acct->max_workers) { if (acct->nr_workers < acct->max_workers) {
if (!acct->nr_workers)
first = true;
acct->nr_workers++; acct->nr_workers++;
do_create = true; do_create = true;
} }
...@@ -261,7 +258,7 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ...@@ -261,7 +258,7 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
if (do_create) { if (do_create) {
atomic_inc(&acct->nr_running); atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs); atomic_inc(&wqe->wq->worker_refs);
create_io_worker(wqe->wq, wqe, acct->index, first); create_io_worker(wqe->wq, wqe, acct->index);
} }
} }
...@@ -278,7 +275,7 @@ static void create_worker_cb(struct callback_head *cb) ...@@ -278,7 +275,7 @@ static void create_worker_cb(struct callback_head *cb)
struct io_wq *wq; struct io_wq *wq;
struct io_wqe *wqe; struct io_wqe *wqe;
struct io_wqe_acct *acct; struct io_wqe_acct *acct;
bool do_create = false, first = false; bool do_create = false;
worker = container_of(cb, struct io_worker, create_work); worker = container_of(cb, struct io_worker, create_work);
wqe = worker->wqe; wqe = worker->wqe;
...@@ -286,14 +283,12 @@ static void create_worker_cb(struct callback_head *cb) ...@@ -286,14 +283,12 @@ static void create_worker_cb(struct callback_head *cb)
acct = &wqe->acct[worker->create_index]; acct = &wqe->acct[worker->create_index];
raw_spin_lock(&wqe->lock); raw_spin_lock(&wqe->lock);
if (acct->nr_workers < acct->max_workers) { if (acct->nr_workers < acct->max_workers) {
if (!acct->nr_workers)
first = true;
acct->nr_workers++; acct->nr_workers++;
do_create = true; do_create = true;
} }
raw_spin_unlock(&wqe->lock); raw_spin_unlock(&wqe->lock);
if (do_create) { if (do_create) {
create_io_worker(wq, wqe, worker->create_index, first); create_io_worker(wq, wqe, worker->create_index);
} else { } else {
atomic_dec(&acct->nr_running); atomic_dec(&acct->nr_running);
io_worker_ref_put(wq); io_worker_ref_put(wq);
...@@ -548,6 +543,7 @@ static int io_wqe_worker(void *data) ...@@ -548,6 +543,7 @@ static int io_wqe_worker(void *data)
struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe; struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
bool last_timeout = false;
char buf[TASK_COMM_LEN]; char buf[TASK_COMM_LEN];
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
...@@ -565,6 +561,13 @@ static int io_wqe_worker(void *data) ...@@ -565,6 +561,13 @@ static int io_wqe_worker(void *data)
io_worker_handle_work(worker); io_worker_handle_work(worker);
goto loop; goto loop;
} }
/* timed out, exit unless we're the last worker */
if (last_timeout && acct->nr_workers > 1) {
raw_spin_unlock(&wqe->lock);
__set_current_state(TASK_RUNNING);
break;
}
last_timeout = false;
__io_worker_idle(wqe, worker); __io_worker_idle(wqe, worker);
raw_spin_unlock(&wqe->lock); raw_spin_unlock(&wqe->lock);
if (io_flush_signals()) if (io_flush_signals())
...@@ -579,11 +582,7 @@ static int io_wqe_worker(void *data) ...@@ -579,11 +582,7 @@ static int io_wqe_worker(void *data)
break; break;
continue; continue;
} }
if (ret) last_timeout = !ret;
continue;
/* timed out, exit unless we're the fixed worker */
if (!(worker->flags & IO_WORKER_F_FIXED))
break;
} }
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
...@@ -634,7 +633,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk) ...@@ -634,7 +633,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock(&worker->wqe->lock); raw_spin_unlock(&worker->wqe->lock);
} }
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first) static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{ {
struct io_wqe_acct *acct = &wqe->acct[index]; struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker; struct io_worker *worker;
...@@ -675,8 +674,6 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo ...@@ -675,8 +674,6 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
worker->flags |= IO_WORKER_F_FREE; worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND) if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND; worker->flags |= IO_WORKER_F_BOUND;
if (first && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
raw_spin_unlock(&wqe->lock); raw_spin_unlock(&wqe->lock);
wake_up_new_task(tsk); wake_up_new_task(tsk);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment