Commit 0b51abc8 authored by Trond Myklebust's avatar Trond Myklebust

[PATCH] RPC over UDP congestion control updates [7/8]

  - Divorce the allocation of free request slots and the congestion
    control. Make the congestion control apply only to when we
    actually send data over the wire. This means that we *do* apply
    congestion control to resent requests: if a timeout has occured,
    and there are too many requests on the wire, delay resending until
    the congestion algorithm allows it.

  - Improve spinlocking by putting the congestion avoidance algoritm
    under xprt->sock_lock. This lock has to be taken *anyway* in
    (almost) all cases where we are updating the congestion control
    data.
parent 4edf0555
...@@ -141,9 +141,11 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -141,9 +141,11 @@ xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
int retval; int retval;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (!xprt->snd_task) if (!xprt->snd_task) {
xprt->snd_task = task; if (xprt->nocong || __xprt_get_cong(xprt, task))
else if (xprt->snd_task != task) { xprt->snd_task = task;
}
if (xprt->snd_task != task) {
dprintk("RPC: %4d TCP write queue full (task %d)\n", dprintk("RPC: %4d TCP write queue full (task %d)\n",
task->tk_pid, xprt->snd_task->tk_pid); task->tk_pid, xprt->snd_task->tk_pid);
task->tk_timeout = 0; task->tk_timeout = 0;
...@@ -162,10 +164,13 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -162,10 +164,13 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
if (xprt->snd_task) if (xprt->snd_task)
return; return;
if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
return;
task = rpc_wake_up_next(&xprt->sending); task = rpc_wake_up_next(&xprt->sending);
if (!task) if (!task)
return; return;
xprt->snd_task = task; if (xprt->nocong || __xprt_get_cong(xprt, task))
xprt->snd_task = task;
} }
/* /*
...@@ -297,12 +302,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) ...@@ -297,12 +302,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
{ {
unsigned long cwnd; unsigned long cwnd;
if (xprt->nocong)
return;
/*
* Note: we're in a BH context
*/
spin_lock(&xprt->xprt_lock);
cwnd = xprt->cwnd; cwnd = xprt->cwnd;
if (result >= 0 && xprt->cong <= cwnd) { if (result >= 0 && xprt->cong <= cwnd) {
/* The (cwnd >> 1) term makes sure /* The (cwnd >> 1) term makes sure
...@@ -319,8 +318,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) ...@@ -319,8 +318,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
xprt->cong, xprt->cwnd, cwnd); xprt->cong, xprt->cwnd, cwnd);
xprt->cwnd = cwnd; xprt->cwnd = cwnd;
out:
spin_unlock(&xprt->xprt_lock);
} }
/* /*
...@@ -534,6 +531,7 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) ...@@ -534,6 +531,7 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
/* Adjust congestion window */ /* Adjust congestion window */
if (!xprt->nocong) { if (!xprt->nocong) {
xprt_adjust_cwnd(xprt, copied); xprt_adjust_cwnd(xprt, copied);
__xprt_put_cong(xprt, req);
if (!req->rq_nresend) { if (!req->rq_nresend) {
int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc); int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
if (timer) if (timer)
...@@ -1011,6 +1009,7 @@ xprt_timer(struct rpc_task *task) ...@@ -1011,6 +1009,7 @@ xprt_timer(struct rpc_task *task)
} }
rpc_inc_timeo(&task->tk_client->cl_rtt); rpc_inc_timeo(&task->tk_client->cl_rtt);
xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
__xprt_put_cong(xprt, req);
} }
req->rq_nresend++; req->rq_nresend++;
...@@ -1139,7 +1138,10 @@ do_xprt_transmit(struct rpc_task *task) ...@@ -1139,7 +1138,10 @@ do_xprt_transmit(struct rpc_task *task)
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
} }
out_release: out_release:
xprt_release_write(xprt, task); spin_lock_bh(&xprt->sock_lock);
__xprt_release_write(xprt, task);
__xprt_put_cong(xprt, req);
spin_unlock_bh(&xprt->sock_lock);
return; return;
out_receive: out_receive:
dprintk("RPC: %4d xmit complete\n", task->tk_pid); dprintk("RPC: %4d xmit complete\n", task->tk_pid);
...@@ -1171,7 +1173,7 @@ xprt_reserve(struct rpc_task *task) ...@@ -1171,7 +1173,7 @@ xprt_reserve(struct rpc_task *task)
if (task->tk_rqstp) if (task->tk_rqstp)
return 0; return 0;
spin_lock_bh(&xprt->xprt_lock); spin_lock(&xprt->xprt_lock);
xprt_reserve_status(task); xprt_reserve_status(task);
if (task->tk_rqstp) { if (task->tk_rqstp) {
task->tk_timeout = 0; task->tk_timeout = 0;
...@@ -1182,7 +1184,7 @@ xprt_reserve(struct rpc_task *task) ...@@ -1182,7 +1184,7 @@ xprt_reserve(struct rpc_task *task)
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL, NULL); rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
} }
spin_unlock_bh(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
dprintk("RPC: %4d xprt_reserve returns %d\n", dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status); task->tk_pid, task->tk_status);
return task->tk_status; return task->tk_status;
...@@ -1206,17 +1208,11 @@ xprt_reserve_status(struct rpc_task *task) ...@@ -1206,17 +1208,11 @@ xprt_reserve_status(struct rpc_task *task)
} else { } else {
if (!(req = xprt->free)) if (!(req = xprt->free))
goto out_nofree; goto out_nofree;
if (!(xprt->nocong || __xprt_get_cong(xprt, req))) /* OK: There's room for us. Grab a free slot */
goto out_nofree;
/* OK: There's room for us. Grab a free slot and bump
* congestion value */
xprt->free = req->rq_next; xprt->free = req->rq_next;
req->rq_next = NULL; req->rq_next = NULL;
task->tk_rqstp = req; task->tk_rqstp = req;
xprt_request_init(task, xprt); xprt_request_init(task, xprt);
if (xprt->free)
xprt_clear_backlog(xprt);
} }
return; return;
...@@ -1264,6 +1260,7 @@ xprt_release(struct rpc_task *task) ...@@ -1264,6 +1260,7 @@ xprt_release(struct rpc_task *task)
return; return;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
__xprt_release_write(xprt, task); __xprt_release_write(xprt, task);
__xprt_put_cong(xprt, req);
if (!list_empty(&req->rq_list)) if (!list_empty(&req->rq_list))
list_del(&req->rq_list); list_del(&req->rq_list);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
...@@ -1272,13 +1269,12 @@ xprt_release(struct rpc_task *task) ...@@ -1272,13 +1269,12 @@ xprt_release(struct rpc_task *task)
dprintk("RPC: %4d release request %p\n", task->tk_pid, req); dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
spin_lock_bh(&xprt->xprt_lock); spin_lock(&xprt->xprt_lock);
req->rq_next = xprt->free; req->rq_next = xprt->free;
xprt->free = req; xprt->free = req;
__xprt_put_cong(xprt, req);
xprt_clear_backlog(xprt); xprt_clear_backlog(xprt);
spin_unlock_bh(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
} }
/* /*
...@@ -1492,8 +1488,6 @@ xprt_shutdown(struct rpc_xprt *xprt) ...@@ -1492,8 +1488,6 @@ xprt_shutdown(struct rpc_xprt *xprt)
*/ */
int int
xprt_clear_backlog(struct rpc_xprt *xprt) { xprt_clear_backlog(struct rpc_xprt *xprt) {
if (RPCXPRT_CONGESTED(xprt))
return 0;
rpc_wake_up_next(&xprt->backlog); rpc_wake_up_next(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait)) if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait); wake_up(&xprt->cong_wait);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment