Commit 43cedbf0 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Ensure that we grab the XPRT_LOCK before calling xprt_alloc_slot

This throttles the allocation of new slots when the socket is busy
reconnecting and/or is out of buffer space.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent f85ef69c
...@@ -111,7 +111,7 @@ struct rpc_rqst { ...@@ -111,7 +111,7 @@ struct rpc_rqst {
struct rpc_xprt_ops { struct rpc_xprt_ops {
void (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize); void (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize);
int (*reserve_xprt)(struct rpc_task *task); int (*reserve_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task); void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*rpcbind)(struct rpc_task *task); void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port); void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
...@@ -271,8 +271,8 @@ struct xprt_class { ...@@ -271,8 +271,8 @@ struct xprt_class {
struct rpc_xprt *xprt_create_transport(struct xprt_create *args); struct rpc_xprt *xprt_create_transport(struct xprt_create *args);
void xprt_connect(struct rpc_task *task); void xprt_connect(struct rpc_task *task);
void xprt_reserve(struct rpc_task *task); void xprt_reserve(struct rpc_task *task);
int xprt_reserve_xprt(struct rpc_task *task); int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_reserve_xprt_cong(struct rpc_task *task); int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_prepare_transmit(struct rpc_task *task); int xprt_prepare_transmit(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task); void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task); void xprt_end_transmit(struct rpc_task *task);
......
...@@ -191,10 +191,9 @@ EXPORT_SYMBOL_GPL(xprt_load_transport); ...@@ -191,10 +191,9 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
* transport connects from colliding with writes. No congestion control * transport connects from colliding with writes. No congestion control
* is provided. * is provided.
*/ */
int xprt_reserve_xprt(struct rpc_task *task) int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task) if (task == xprt->snd_task)
...@@ -202,8 +201,10 @@ int xprt_reserve_xprt(struct rpc_task *task) ...@@ -202,8 +201,10 @@ int xprt_reserve_xprt(struct rpc_task *task)
goto out_sleep; goto out_sleep;
} }
xprt->snd_task = task; xprt->snd_task = task;
if (req != NULL) {
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_ntrans++; req->rq_ntrans++;
}
return 1; return 1;
...@@ -212,7 +213,7 @@ int xprt_reserve_xprt(struct rpc_task *task) ...@@ -212,7 +213,7 @@ int xprt_reserve_xprt(struct rpc_task *task)
task->tk_pid, xprt); task->tk_pid, xprt);
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
if (req->rq_ntrans) if (req != NULL && req->rq_ntrans)
rpc_sleep_on(&xprt->resend, task, NULL); rpc_sleep_on(&xprt->resend, task, NULL);
else else
rpc_sleep_on(&xprt->sending, task, NULL); rpc_sleep_on(&xprt->sending, task, NULL);
...@@ -239,9 +240,8 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) ...@@ -239,9 +240,8 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
* integrated into the decision of whether a request is allowed to be * integrated into the decision of whether a request is allowed to be
* woken up and given access to the transport. * woken up and given access to the transport.
*/ */
int xprt_reserve_xprt_cong(struct rpc_task *task) int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
...@@ -249,12 +249,14 @@ int xprt_reserve_xprt_cong(struct rpc_task *task) ...@@ -249,12 +249,14 @@ int xprt_reserve_xprt_cong(struct rpc_task *task)
return 1; return 1;
goto out_sleep; goto out_sleep;
} }
if (req == NULL) {
xprt->snd_task = task;
return 1;
}
if (__xprt_get_cong(xprt, task)) { if (__xprt_get_cong(xprt, task)) {
xprt->snd_task = task; xprt->snd_task = task;
if (req) {
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_ntrans++; req->rq_ntrans++;
}
return 1; return 1;
} }
xprt_clear_locked(xprt); xprt_clear_locked(xprt);
...@@ -262,7 +264,7 @@ int xprt_reserve_xprt_cong(struct rpc_task *task) ...@@ -262,7 +264,7 @@ int xprt_reserve_xprt_cong(struct rpc_task *task)
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
if (req && req->rq_ntrans) if (req != NULL && req->rq_ntrans)
rpc_sleep_on(&xprt->resend, task, NULL); rpc_sleep_on(&xprt->resend, task, NULL);
else else
rpc_sleep_on(&xprt->sending, task, NULL); rpc_sleep_on(&xprt->sending, task, NULL);
...@@ -275,7 +277,7 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -275,7 +277,7 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
int retval; int retval;
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
retval = xprt->ops->reserve_xprt(task); retval = xprt->ops->reserve_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
return retval; return retval;
} }
...@@ -291,7 +293,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -291,7 +293,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
task = rpc_wake_up_next(&xprt->resend); task = rpc_wake_up_next(&xprt->resend);
if (!task) { if (!task) {
task = rpc_wake_up_next(&xprt->sending); task = rpc_wake_up_next(&xprt->sending);
if (!task) if (task == NULL)
goto out_unlock; goto out_unlock;
} }
...@@ -310,6 +312,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -310,6 +312,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{ {
struct rpc_task *task; struct rpc_task *task;
struct rpc_rqst *req;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return; return;
...@@ -318,16 +321,19 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) ...@@ -318,16 +321,19 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
task = rpc_wake_up_next(&xprt->resend); task = rpc_wake_up_next(&xprt->resend);
if (!task) { if (!task) {
task = rpc_wake_up_next(&xprt->sending); task = rpc_wake_up_next(&xprt->sending);
if (!task) if (task == NULL)
goto out_unlock; goto out_unlock;
} }
req = task->tk_rqstp;
if (req == NULL) {
xprt->snd_task = task;
return;
}
if (__xprt_get_cong(xprt, task)) { if (__xprt_get_cong(xprt, task)) {
struct rpc_rqst *req = task->tk_rqstp;
xprt->snd_task = task; xprt->snd_task = task;
if (req) {
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_ntrans++; req->rq_ntrans++;
}
return; return;
} }
out_unlock: out_unlock:
...@@ -852,7 +858,7 @@ int xprt_prepare_transmit(struct rpc_task *task) ...@@ -852,7 +858,7 @@ int xprt_prepare_transmit(struct rpc_task *task)
err = req->rq_reply_bytes_recvd; err = req->rq_reply_bytes_recvd;
goto out_unlock; goto out_unlock;
} }
if (!xprt->ops->reserve_xprt(task)) if (!xprt->ops->reserve_xprt(xprt, task))
err = -EAGAIN; err = -EAGAIN;
out_unlock: out_unlock:
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
...@@ -933,8 +939,6 @@ static void xprt_alloc_slot(struct rpc_task *task) ...@@ -933,8 +939,6 @@ static void xprt_alloc_slot(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp)
return;
if (!list_empty(&xprt->free)) { if (!list_empty(&xprt->free)) {
struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del_init(&req->rq_list); list_del_init(&req->rq_list);
...@@ -944,7 +948,6 @@ static void xprt_alloc_slot(struct rpc_task *task) ...@@ -944,7 +948,6 @@ static void xprt_alloc_slot(struct rpc_task *task)
} }
dprintk("RPC: waiting for request slot\n"); dprintk("RPC: waiting for request slot\n");
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
task->tk_timeout = 0;
rpc_sleep_on(&xprt->backlog, task, NULL); rpc_sleep_on(&xprt->backlog, task, NULL);
} }
...@@ -1001,10 +1004,25 @@ void xprt_reserve(struct rpc_task *task) ...@@ -1001,10 +1004,25 @@ void xprt_reserve(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
return;
/* Note: grabbing the xprt_lock_write() here is not strictly needed,
* but ensures that we throttle new slot allocation if the transport
* is congested (e.g. if reconnecting or if we're out of socket
* write buffer space).
*/
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
if (!xprt_lock_write(xprt, task))
return;
task->tk_status = -EIO; task->tk_status = -EIO;
spin_lock(&xprt->reserve_lock); spin_lock(&xprt->reserve_lock);
xprt_alloc_slot(task); xprt_alloc_slot(task);
spin_unlock(&xprt->reserve_lock); spin_unlock(&xprt->reserve_lock);
xprt_release_write(xprt, task);
} }
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
......
...@@ -452,9 +452,8 @@ xprt_rdma_connect(struct rpc_task *task) ...@@ -452,9 +452,8 @@ xprt_rdma_connect(struct rpc_task *task)
} }
static int static int
xprt_rdma_reserve_xprt(struct rpc_task *task) xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
int credits = atomic_read(&r_xprt->rx_buf.rb_credits); int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
...@@ -466,7 +465,7 @@ xprt_rdma_reserve_xprt(struct rpc_task *task) ...@@ -466,7 +465,7 @@ xprt_rdma_reserve_xprt(struct rpc_task *task)
BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
} }
xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
return xprt_reserve_xprt_cong(task); return xprt_reserve_xprt_cong(xprt, task);
} }
/* /*
......
...@@ -755,6 +755,8 @@ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -755,6 +755,8 @@ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
if (task == NULL) if (task == NULL)
goto out_release; goto out_release;
req = task->tk_rqstp; req = task->tk_rqstp;
if (req == NULL)
goto out_release;
if (req->rq_bytes_sent == 0) if (req->rq_bytes_sent == 0)
goto out_release; goto out_release;
if (req->rq_bytes_sent == req->rq_snd_buf.len) if (req->rq_bytes_sent == req->rq_snd_buf.len)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment