Commit f39c1bfb authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Fix a UDP transport regression

Commit 43cedbf0 (SUNRPC: Ensure that
we grab the XPRT_LOCK before calling xprt_alloc_slot) is causing
hangs in the case of NFS over UDP mounts.

Since neither the UDP or the RDMA transport mechanism use dynamic slot
allocation, we can skip grabbing the socket lock for those transports.
Add a new rpc_xprt_op to allow switching between the TCP and UDP/RDMA
case.

Note that the NFSv4.1 back channel assigns the slot directly
through rpc_run_bc_task, so we can ignore that case.
Reported-by: default avatarDick Streefland <dick.streefland@altium.nl>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
Cc: stable@vger.kernel.org [>= 3.1]
parent 01913b49
...@@ -114,6 +114,7 @@ struct rpc_xprt_ops { ...@@ -114,6 +114,7 @@ struct rpc_xprt_ops {
void (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize); void (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize);
int (*reserve_xprt)(struct rpc_xprt *xprt, struct rpc_task *task); int (*reserve_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task); void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*alloc_slot)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*rpcbind)(struct rpc_task *task); void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port); void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_task *task); void (*connect)(struct rpc_task *task);
...@@ -281,6 +282,8 @@ void xprt_connect(struct rpc_task *task); ...@@ -281,6 +282,8 @@ void xprt_connect(struct rpc_task *task);
void xprt_reserve(struct rpc_task *task); void xprt_reserve(struct rpc_task *task);
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task); int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_prepare_transmit(struct rpc_task *task); int xprt_prepare_transmit(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task); void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task); void xprt_end_transmit(struct rpc_task *task);
......
...@@ -969,11 +969,11 @@ static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -969,11 +969,11 @@ static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
return false; return false;
} }
static void xprt_alloc_slot(struct rpc_task *task) void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
spin_lock(&xprt->reserve_lock);
if (!list_empty(&xprt->free)) { if (!list_empty(&xprt->free)) {
req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del(&req->rq_list); list_del(&req->rq_list);
...@@ -994,12 +994,29 @@ static void xprt_alloc_slot(struct rpc_task *task) ...@@ -994,12 +994,29 @@ static void xprt_alloc_slot(struct rpc_task *task)
default: default:
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
} }
spin_unlock(&xprt->reserve_lock);
return; return;
out_init_req: out_init_req:
task->tk_status = 0; task->tk_status = 0;
task->tk_rqstp = req; task->tk_rqstp = req;
xprt_request_init(task, xprt); xprt_request_init(task, xprt);
spin_unlock(&xprt->reserve_lock);
} }
EXPORT_SYMBOL_GPL(xprt_alloc_slot);
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
/* Note: grabbing the xprt_lock_write() ensures that we throttle
* new slot allocation if the transport is congested (i.e. when
* reconnecting a stream transport or when out of socket write
* buffer space).
*/
if (xprt_lock_write(xprt, task)) {
xprt_alloc_slot(xprt, task);
xprt_release_write(xprt, task);
}
}
EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot);
static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ {
...@@ -1083,20 +1100,9 @@ void xprt_reserve(struct rpc_task *task) ...@@ -1083,20 +1100,9 @@ void xprt_reserve(struct rpc_task *task)
if (task->tk_rqstp != NULL) if (task->tk_rqstp != NULL)
return; return;
/* Note: grabbing the xprt_lock_write() here is not strictly needed,
* but ensures that we throttle new slot allocation if the transport
* is congested (e.g. if reconnecting or if we're out of socket
* write buffer space).
*/
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
if (!xprt_lock_write(xprt, task)) xprt->ops->alloc_slot(xprt, task);
return;
spin_lock(&xprt->reserve_lock);
xprt_alloc_slot(task);
spin_unlock(&xprt->reserve_lock);
xprt_release_write(xprt, task);
} }
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
......
...@@ -713,6 +713,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) ...@@ -713,6 +713,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
static struct rpc_xprt_ops xprt_rdma_procs = { static struct rpc_xprt_ops xprt_rdma_procs = {
.reserve_xprt = xprt_rdma_reserve_xprt, .reserve_xprt = xprt_rdma_reserve_xprt,
.release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
.alloc_slot = xprt_alloc_slot,
.release_request = xprt_release_rqst_cong, /* ditto */ .release_request = xprt_release_rqst_cong, /* ditto */
.set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */ .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
.rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */ .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
......
...@@ -2473,6 +2473,7 @@ static void bc_destroy(struct rpc_xprt *xprt) ...@@ -2473,6 +2473,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
static struct rpc_xprt_ops xs_local_ops = { static struct rpc_xprt_ops xs_local_ops = {
.reserve_xprt = xprt_reserve_xprt, .reserve_xprt = xprt_reserve_xprt,
.release_xprt = xs_tcp_release_xprt, .release_xprt = xs_tcp_release_xprt,
.alloc_slot = xprt_alloc_slot,
.rpcbind = xs_local_rpcbind, .rpcbind = xs_local_rpcbind,
.set_port = xs_local_set_port, .set_port = xs_local_set_port,
.connect = xs_connect, .connect = xs_connect,
...@@ -2489,6 +2490,7 @@ static struct rpc_xprt_ops xs_udp_ops = { ...@@ -2489,6 +2490,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
.set_buffer_size = xs_udp_set_buffer_size, .set_buffer_size = xs_udp_set_buffer_size,
.reserve_xprt = xprt_reserve_xprt_cong, .reserve_xprt = xprt_reserve_xprt_cong,
.release_xprt = xprt_release_xprt_cong, .release_xprt = xprt_release_xprt_cong,
.alloc_slot = xprt_alloc_slot,
.rpcbind = rpcb_getport_async, .rpcbind = rpcb_getport_async,
.set_port = xs_set_port, .set_port = xs_set_port,
.connect = xs_connect, .connect = xs_connect,
...@@ -2506,6 +2508,7 @@ static struct rpc_xprt_ops xs_udp_ops = { ...@@ -2506,6 +2508,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
static struct rpc_xprt_ops xs_tcp_ops = { static struct rpc_xprt_ops xs_tcp_ops = {
.reserve_xprt = xprt_reserve_xprt, .reserve_xprt = xprt_reserve_xprt,
.release_xprt = xs_tcp_release_xprt, .release_xprt = xs_tcp_release_xprt,
.alloc_slot = xprt_lock_and_alloc_slot,
.rpcbind = rpcb_getport_async, .rpcbind = rpcb_getport_async,
.set_port = xs_set_port, .set_port = xs_set_port,
.connect = xs_connect, .connect = xs_connect,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment