Commit fb43d172 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Use the multipath iterator to assign a transport to each task

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent ad01b2c6
...@@ -69,6 +69,8 @@ struct rpc_task { ...@@ -69,6 +69,8 @@ struct rpc_task {
const struct rpc_call_ops *tk_ops; /* Caller callbacks */ const struct rpc_call_ops *tk_ops; /* Caller callbacks */
struct rpc_clnt * tk_client; /* RPC client */ struct rpc_clnt * tk_client; /* RPC client */
struct rpc_xprt * tk_xprt; /* Transport */
struct rpc_rqst * tk_rqstp; /* RPC request */ struct rpc_rqst * tk_rqstp; /* RPC request */
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
......
...@@ -894,6 +894,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program); ...@@ -894,6 +894,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program);
void rpc_task_release_client(struct rpc_task *task) void rpc_task_release_client(struct rpc_task *task)
{ {
struct rpc_clnt *clnt = task->tk_client; struct rpc_clnt *clnt = task->tk_client;
struct rpc_xprt *xprt = task->tk_xprt;
if (clnt != NULL) { if (clnt != NULL) {
/* Remove from client task list */ /* Remove from client task list */
...@@ -904,13 +905,22 @@ void rpc_task_release_client(struct rpc_task *task) ...@@ -904,13 +905,22 @@ void rpc_task_release_client(struct rpc_task *task)
rpc_release_client(clnt); rpc_release_client(clnt);
} }
if (xprt != NULL) {
task->tk_xprt = NULL;
xprt_put(xprt);
}
} }
static static
void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
{ {
if (clnt != NULL) { if (clnt != NULL) {
rpc_task_release_client(task); rpc_task_release_client(task);
if (task->tk_xprt == NULL)
task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
task->tk_client = clnt; task->tk_client = clnt;
atomic_inc(&clnt->cl_count); atomic_inc(&clnt->cl_count);
if (clnt->cl_softrtry) if (clnt->cl_softrtry)
...@@ -2122,11 +2132,9 @@ call_timeout(struct rpc_task *task) ...@@ -2122,11 +2132,9 @@ call_timeout(struct rpc_task *task)
} }
if (RPC_IS_SOFT(task)) { if (RPC_IS_SOFT(task)) {
if (clnt->cl_chatty) { if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, timed out\n", printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
clnt->cl_program->name, clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
rcu_read_unlock();
} }
if (task->tk_flags & RPC_TASK_TIMEOUT) if (task->tk_flags & RPC_TASK_TIMEOUT)
rpc_exit(task, -ETIMEDOUT); rpc_exit(task, -ETIMEDOUT);
...@@ -2138,11 +2146,9 @@ call_timeout(struct rpc_task *task) ...@@ -2138,11 +2146,9 @@ call_timeout(struct rpc_task *task)
if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) { if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
task->tk_flags |= RPC_CALL_MAJORSEEN; task->tk_flags |= RPC_CALL_MAJORSEEN;
if (clnt->cl_chatty) { if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, still trying\n", printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
clnt->cl_program->name, clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
rcu_read_unlock();
} }
} }
rpc_force_rebind(clnt); rpc_force_rebind(clnt);
...@@ -2172,11 +2178,9 @@ call_decode(struct rpc_task *task) ...@@ -2172,11 +2178,9 @@ call_decode(struct rpc_task *task)
if (task->tk_flags & RPC_CALL_MAJORSEEN) { if (task->tk_flags & RPC_CALL_MAJORSEEN) {
if (clnt->cl_chatty) { if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s OK\n", printk(KERN_NOTICE "%s: server %s OK\n",
clnt->cl_program->name, clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
rcu_read_unlock();
} }
task->tk_flags &= ~RPC_CALL_MAJORSEEN; task->tk_flags &= ~RPC_CALL_MAJORSEEN;
} }
...@@ -2330,11 +2334,9 @@ rpc_verify_header(struct rpc_task *task) ...@@ -2330,11 +2334,9 @@ rpc_verify_header(struct rpc_task *task)
task->tk_action = call_bind; task->tk_action = call_bind;
goto out_retry; goto out_retry;
case RPC_AUTH_TOOWEAK: case RPC_AUTH_TOOWEAK:
rcu_read_lock();
printk(KERN_NOTICE "RPC: server %s requires stronger " printk(KERN_NOTICE "RPC: server %s requires stronger "
"authentication.\n", "authentication.\n",
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
rcu_read_unlock();
break; break;
default: default:
dprintk("RPC: %5u %s: unknown auth error: %x\n", dprintk("RPC: %5u %s: unknown auth error: %x\n",
...@@ -2359,27 +2361,27 @@ rpc_verify_header(struct rpc_task *task) ...@@ -2359,27 +2361,27 @@ rpc_verify_header(struct rpc_task *task)
case RPC_SUCCESS: case RPC_SUCCESS:
return p; return p;
case RPC_PROG_UNAVAIL: case RPC_PROG_UNAVAIL:
dprintk_rcu("RPC: %5u %s: program %u is unsupported " dprintk("RPC: %5u %s: program %u is unsupported "
"by server %s\n", task->tk_pid, __func__, "by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog, (unsigned int)clnt->cl_prog,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
error = -EPFNOSUPPORT; error = -EPFNOSUPPORT;
goto out_err; goto out_err;
case RPC_PROG_MISMATCH: case RPC_PROG_MISMATCH:
dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported " dprintk("RPC: %5u %s: program %u, version %u unsupported "
"by server %s\n", task->tk_pid, __func__, "by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog, (unsigned int)clnt->cl_prog,
(unsigned int)clnt->cl_vers, (unsigned int)clnt->cl_vers,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
error = -EPROTONOSUPPORT; error = -EPROTONOSUPPORT;
goto out_err; goto out_err;
case RPC_PROC_UNAVAIL: case RPC_PROC_UNAVAIL:
dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, " dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
"version %u on server %s\n", "version %u on server %s\n",
task->tk_pid, __func__, task->tk_pid, __func__,
rpc_proc_name(task), rpc_proc_name(task),
clnt->cl_prog, clnt->cl_vers, clnt->cl_prog, clnt->cl_vers,
rcu_dereference(clnt->cl_xprt)->servername); task->tk_xprt->servername);
error = -EOPNOTSUPP; error = -EOPNOTSUPP;
goto out_err; goto out_err;
case RPC_GARBAGE_ARGS: case RPC_GARBAGE_ARGS:
......
...@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task) ...@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task)
int status; int status;
rcu_read_lock(); rcu_read_lock();
do { clnt = rpcb_find_transport_owner(task->tk_client);
clnt = rpcb_find_transport_owner(task->tk_client);
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
} while (xprt == NULL);
rcu_read_unlock(); rcu_read_unlock();
xprt = xprt_get(task->tk_xprt);
dprintk("RPC: %5u %s(%s, %u, %u, %d)\n", dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
task->tk_pid, __func__, task->tk_pid, __func__,
......
...@@ -1181,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free); ...@@ -1181,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free);
*/ */
void xprt_reserve(struct rpc_task *task) void xprt_reserve(struct rpc_task *task)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp != NULL) if (task->tk_rqstp != NULL)
...@@ -1189,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task) ...@@ -1189,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task)
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
if (!xprt_throttle_congested(xprt, task)) if (!xprt_throttle_congested(xprt, task))
xprt->ops->alloc_slot(xprt, task); xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
} }
/** /**
...@@ -1207,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task) ...@@ -1207,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task)
*/ */
void xprt_retry_reserve(struct rpc_task *task) void xprt_retry_reserve(struct rpc_task *task)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp != NULL) if (task->tk_rqstp != NULL)
...@@ -1215,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task) ...@@ -1215,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task)
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt->ops->alloc_slot(xprt, task); xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
} }
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
...@@ -1265,11 +1259,9 @@ void xprt_release(struct rpc_task *task) ...@@ -1265,11 +1259,9 @@ void xprt_release(struct rpc_task *task)
if (req == NULL) { if (req == NULL) {
if (task->tk_client) { if (task->tk_client) {
rcu_read_lock(); xprt = task->tk_xprt;
xprt = rcu_dereference(task->tk_client->cl_xprt);
if (xprt->snd_task == task) if (xprt->snd_task == task)
xprt_release_write(xprt, task); xprt_release_write(xprt, task);
rcu_read_unlock();
} }
return; return;
} }
......
...@@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) ...@@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
*/ */
static void xs_local_rpcbind(struct rpc_task *task) static void xs_local_rpcbind(struct rpc_task *task)
{ {
rcu_read_lock(); xprt_set_bound(task->tk_xprt);
xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
rcu_read_unlock();
} }
static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment