Commit b0b227f0 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Use an llist to manage free rpcrdma_reps

rpcrdma_rep objects are removed from their free list by only a
single thread: the Receive completion handler. Thus that free list
can be converted to an llist, where a single-threaded consumer and
a multi-threaded producer (rpcrdma_buffer_put) can both access the
llist without the need for any serialization.

This eliminates spin lock contention between the Receive completion
handler and rpcrdma_buffer_get, and makes the rep consumer wait-
free.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 4d6b8890
...@@ -75,6 +75,7 @@ ...@@ -75,6 +75,7 @@
* internal functions * internal functions
*/ */
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mr_free(struct rpcrdma_mr *mr); static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
...@@ -407,7 +408,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) ...@@ -407,7 +408,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req; struct rpcrdma_req *req;
struct rpcrdma_rep *rep;
cancel_work_sync(&buf->rb_refresh_worker); cancel_work_sync(&buf->rb_refresh_worker);
...@@ -431,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) ...@@ -431,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
/* The ULP is responsible for ensuring all DMA /* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone. * mappings and MRs are gone.
*/ */
list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) rpcrdma_reps_destroy(buf);
rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) { list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf); rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
...@@ -1071,6 +1070,40 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, ...@@ -1071,6 +1070,40 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
return NULL; return NULL;
} }
static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
{
struct llist_node *node;
/* Calls to llist_del_first are required to be serialized */
node = llist_del_first(&buf->rb_free_reps);
if (!node)
return NULL;
return llist_entry(node, struct rpcrdma_rep, rr_node);
}
static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
struct rpcrdma_rep *rep)
{
if (!rep->rr_temp)
llist_add(&rep->rr_node, &buf->rb_free_reps);
else
rpcrdma_rep_destroy(rep);
}
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_rep *rep;
while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
rpcrdma_rep_destroy(rep);
}
/** /**
* rpcrdma_buffer_create - Create initial set of req/rep objects * rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize * @r_xprt: transport instance to (re)initialize
...@@ -1106,7 +1139,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1106,7 +1139,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
} }
buf->rb_credits = 1; buf->rb_credits = 1;
INIT_LIST_HEAD(&buf->rb_recv_bufs); init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt); rc = rpcrdma_sendctxs_create(r_xprt);
if (rc) if (rc)
...@@ -1118,12 +1151,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1118,12 +1151,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
return rc; return rc;
} }
static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
/** /**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object * rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed * @req: unused object to be destroyed
...@@ -1182,15 +1209,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) ...@@ -1182,15 +1209,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
cancel_work_sync(&buf->rb_refresh_worker); cancel_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf); rpcrdma_sendctxs_destroy(buf);
rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
rep = list_first_entry(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
rpcrdma_rep_destroy(rep);
}
while (!list_empty(&buf->rb_send_bufs)) { while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req; struct rpcrdma_req *req;
...@@ -1281,39 +1300,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) ...@@ -1281,39 +1300,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/ */
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{ {
struct rpcrdma_rep *rep = req->rl_reply; if (req->rl_reply)
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL; req->rl_reply = NULL;
spin_lock(&buffers->rb_lock); spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs); list_add(&req->rl_list, &buffers->rb_send_bufs);
if (rep) {
if (!rep->rr_temp) {
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
rep = NULL;
}
}
spin_unlock(&buffers->rb_lock); spin_unlock(&buffers->rb_lock);
if (rep)
rpcrdma_rep_destroy(rep);
} }
/* /**
* Put reply buffers back into pool when not attached to * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
* request. This happens in error conditions. * @rep: rep to release
*
* Used after error conditions.
*/ */
void void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{ {
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
if (!rep->rr_temp) {
spin_lock(&buffers->rb_lock);
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
} else {
rpcrdma_rep_destroy(rep);
}
} }
/* Returns a pointer to a rpcrdma_regbuf object, or NULL. /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
...@@ -1469,22 +1473,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1469,22 +1473,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
/* fast path: all needed reps can be found on the free list */ /* fast path: all needed reps can be found on the free list */
wr = NULL; wr = NULL;
spin_lock(&buf->rb_lock);
while (needed) { while (needed) {
rep = list_first_entry_or_null(&buf->rb_recv_bufs, rep = rpcrdma_rep_get_locked(buf);
struct rpcrdma_rep, rr_list);
if (!rep) if (!rep)
break; rep = rpcrdma_rep_create(r_xprt, temp);
list_del(&rep->rr_list);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
--needed;
}
spin_unlock(&buf->rb_lock);
while (needed) {
rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep) if (!rep)
break; break;
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include <linux/atomic.h> /* atomic_t, etc */ #include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */ #include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */ #include <linux/workqueue.h> /* struct work_struct */
#include <linux/llist.h>
#include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/rdma_cm.h> /* RDMA connection api */
#include <rdma/ib_verbs.h> /* RDMA verbs api */ #include <rdma/ib_verbs.h> /* RDMA verbs api */
...@@ -200,7 +201,7 @@ struct rpcrdma_rep { ...@@ -200,7 +201,7 @@ struct rpcrdma_rep {
struct rpc_rqst *rr_rqst; struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf; struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream; struct xdr_stream rr_stream;
struct list_head rr_list; struct llist_node rr_node;
struct ib_recv_wr rr_recv_wr; struct ib_recv_wr rr_recv_wr;
}; };
...@@ -362,7 +363,6 @@ rpcrdma_mr_pop(struct list_head *list) ...@@ -362,7 +363,6 @@ rpcrdma_mr_pop(struct list_head *list)
struct rpcrdma_buffer { struct rpcrdma_buffer {
spinlock_t rb_lock; spinlock_t rb_lock;
struct list_head rb_send_bufs; struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
struct list_head rb_mrs; struct list_head rb_mrs;
unsigned long rb_sc_head; unsigned long rb_sc_head;
...@@ -373,6 +373,8 @@ struct rpcrdma_buffer { ...@@ -373,6 +373,8 @@ struct rpcrdma_buffer {
struct list_head rb_allreqs; struct list_head rb_allreqs;
struct list_head rb_all_mrs; struct list_head rb_all_mrs;
struct llist_head rb_free_reps;
u32 rb_max_requests; u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */ u32 rb_credits; /* most recent credit grant */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment