Commit 92d59adf authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: do message processing in softirq context

Move dlm message processing from an ordered workqueue context to an
ordered softirq context.  Handling dlm messages in softirq will allow
requests to be cleared more quickly and efficiently, and should avoid
longer queues of incomplete requests.  Later patches are expected to
run completion/blocking callbacks directly from this message processing
context, further reducing context switches required to complete a request.
In the longer term, concurrent message processing could be implemented.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 578acf9a
...@@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work); ...@@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages); static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock); static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending; static bool process_dlm_messages_pending;
static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
static atomic_t processqueue_count; static atomic_t processqueue_count;
static LIST_HEAD(processqueue); static LIST_HEAD(processqueue);
...@@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work) ...@@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work)
} }
list_del(&pentry->list); list_del(&pentry->list);
atomic_dec(&processqueue_count); if (atomic_dec_and_test(&processqueue_count))
wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock); spin_unlock_bh(&processqueue_lock);
for (;;) { for (;;) {
...@@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work) ...@@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work)
} }
list_del(&pentry->list); list_del(&pentry->list);
atomic_dec(&processqueue_count); if (atomic_dec_and_test(&processqueue_count))
wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock); spin_unlock_bh(&processqueue_lock);
} }
} }
...@@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work) ...@@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work)
/* CF_RECV_PENDING cleared */ /* CF_RECV_PENDING cleared */
break; break;
case DLM_IO_FLUSH: case DLM_IO_FLUSH:
flush_workqueue(process_workqueue); /* we can't flush the process_workqueue here because a
* WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
* WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
* we have a waitqueue to wait until all messages are
* processed.
*
* This handling is only necessary to backoff the sender and
* not queue all messages from the socket layer into DLM
* processqueue. When DLM is capable to parse multiple messages
* on an e.g. per socket basis this handling can might be
* removed. Especially in a message burst we are too slow to
* process messages and the queue will fill up memory.
*/
wait_event(processqueue_wq, !atomic_read(&processqueue_count));
fallthrough; fallthrough;
case DLM_IO_RESCHED: case DLM_IO_RESCHED:
cond_resched(); cond_resched();
...@@ -1701,11 +1717,7 @@ static int work_start(void) ...@@ -1701,11 +1717,7 @@ static int work_start(void)
return -ENOMEM; return -ENOMEM;
} }
/* ordered dlm message process queue, process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
* should be converted to a tasklet
*/
process_workqueue = alloc_ordered_workqueue("dlm_process",
WQ_HIGHPRI | WQ_MEM_RECLAIM);
if (!process_workqueue) { if (!process_workqueue) {
log_print("can't start dlm_process"); log_print("can't start dlm_process");
destroy_workqueue(io_workqueue); destroy_workqueue(io_workqueue);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment