Commit c6b43f23 authored by Trond Myklebust's avatar Trond Myklebust

[PATCH] RPC over UDP congestion control updates [4/8]

Cleanups for the socket locking mechanism.

Improve RPC request ordering by ensuring that RPC tasks that are
already queued on xprt->sending get sent before tasks that happen to
get scheduled just when there is a free slot.

In case the socket send buffer is full, queue the tasks on
xprt->pending rather than xprt->sending in order to eliminate the risk
of accidental wakeups from xprt_release_write() and xprt_write_space().
parent 9ba7d221
...@@ -154,17 +154,35 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -154,17 +154,35 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
return retval; return retval;
} }
static void
__xprt_lock_write_next(struct rpc_xprt *xprt)
{
struct rpc_task *task;
if (xprt->snd_task)
return;
task = rpc_wake_up_next(&xprt->sending);
if (!task)
return;
xprt->snd_task = task;
}
/* /*
* Releases the socket for use by other requests. * Releases the socket for use by other requests.
*/ */
static void static void
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task)
xprt->snd_task = NULL;
__xprt_lock_write_next(xprt);
}
static inline void
xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (xprt->snd_task == task) { __xprt_release_write(xprt, task);
xprt->snd_task = NULL;
rpc_wake_up_next(&xprt->sending);
}
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
} }
...@@ -429,7 +447,7 @@ xprt_reconnect(struct rpc_task *task) ...@@ -429,7 +447,7 @@ xprt_reconnect(struct rpc_task *task)
/* if the socket is already closing, delay 5 secs */ /* if the socket is already closing, delay 5 secs */
if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV)) if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV))
task->tk_timeout = 5*HZ; task->tk_timeout = 5*HZ;
rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL); rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
release_sock(inet); release_sock(inet);
return; return;
} }
...@@ -883,7 +901,7 @@ tcp_state_change(struct sock *sk) ...@@ -883,7 +901,7 @@ tcp_state_change(struct sock *sk)
xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
rpc_wake_up_task(xprt->snd_task); rpc_wake_up_task(xprt->snd_task);
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
break; break;
...@@ -920,7 +938,7 @@ xprt_write_space(struct sock *sk) ...@@ -920,7 +938,7 @@ xprt_write_space(struct sock *sk)
if (!xprt_test_and_set_wspace(xprt)) { if (!xprt_test_and_set_wspace(xprt)) {
spin_lock(&xprt->sock_lock); spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
rpc_wake_up_task(xprt->snd_task); rpc_wake_up_task(xprt->snd_task);
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
} }
...@@ -1078,7 +1096,7 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1078,7 +1096,7 @@ do_xprt_transmit(struct rpc_task *task)
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!xprt_wspace(xprt)) { if (!xprt_wspace(xprt)) {
task->tk_timeout = req->rq_timeout.to_current; task->tk_timeout = req->rq_timeout.to_current;
rpc_sleep_on(&xprt->sending, task, NULL, NULL); rpc_sleep_on(&xprt->pending, task, NULL, NULL);
} }
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
return; return;
...@@ -1094,9 +1112,10 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1094,9 +1112,10 @@ do_xprt_transmit(struct rpc_task *task)
if (xprt->stream) if (xprt->stream)
xprt_disconnect(xprt); xprt_disconnect(xprt);
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
goto out_release;
} }
out_release:
xprt_release_write(xprt, task);
return;
out_receive: out_receive:
dprintk("RPC: %4d xmit complete\n", task->tk_pid); dprintk("RPC: %4d xmit complete\n", task->tk_pid);
/* Set the task's receive timeout value */ /* Set the task's receive timeout value */
...@@ -1111,9 +1130,8 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1111,9 +1130,8 @@ do_xprt_transmit(struct rpc_task *task)
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!req->rq_received) if (!req->rq_received)
rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
__xprt_release_write(xprt, task);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
out_release:
xprt_release_write(xprt, task);
} }
/* /*
...@@ -1218,14 +1236,10 @@ xprt_release(struct rpc_task *task) ...@@ -1218,14 +1236,10 @@ xprt_release(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
if (xprt->snd_task == task) {
if (xprt->stream)
xprt_disconnect(xprt);
xprt_release_write(xprt, task);
}
if (!(req = task->tk_rqstp)) if (!(req = task->tk_rqstp))
return; return;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
__xprt_release_write(xprt, task);
if (!list_empty(&req->rq_list)) if (!list_empty(&req->rq_list))
list_del(&req->rq_list); list_del(&req->rq_list);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment