Commit a87d7392 authored by Neil Brown's avatar Neil Brown Committed by Linus Torvalds

[PATCH] PATCH 13/16: NFSD: TCP: Reserve space on sndbuf so we never block when writing

Make sure there is alway adequate sndbuf space for replies.

We keep track of how much space might be needed for replies
and never dequeue a request unless there is adequate space
for a maximal reply.  We assume each request will generate a maximal
sized reply until the request is partly decoded.
Each RPC program/procedure can specify the maximum size
of a reply to the precedure (though they don't yet).

The wspace callback is used to enqueue sockets that may be waiting
for sndbuf space to become available.

As there should always be enough buffer space to the full
reply, the only reason that sock_sendmsg could block is due
to a kmalloc delay.  As this is likely to be fairly quick (and if
it isn't the server is clagged anyway) we remove the MSG_DONTWAIT
flag, but set a 30 second timeout on waiting.  If the wait
ever times out, we close the connection.  If it doesn't we can
be sure that we did a complete write.


When a request completes, we make sure that the space
used for the reply does not exceed the space reserved.  This
is an internal consistancy check.

This patchs sets the sndbuf and rcvbuf sizes for all sockets
used for rpc service.  This size if dependant on the servers bufsize (S) and
partially on the number of threads (N).

For UDP
    sndbuf == 5*S
    rcvbuf == (N+2)*S
for TCP
    sndbuf == N*S
    rcvbuf == 3*S

see code for rationale (in comments).
parent cbf593fc
...@@ -115,6 +115,10 @@ struct svc_rqst { ...@@ -115,6 +115,10 @@ struct svc_rqst {
void * rq_argp; /* decoded arguments */ void * rq_argp; /* decoded arguments */
void * rq_resp; /* xdr'd results */ void * rq_resp; /* xdr'd results */
int rq_reserved; /* space on socket outq
* reserved for this request
*/
/* Catering to nfsd */ /* Catering to nfsd */
struct svc_client * rq_client; /* RPC peer info */ struct svc_client * rq_client; /* RPC peer info */
struct svc_cacherep * rq_cacherep; /* cache info */ struct svc_cacherep * rq_cacherep; /* cache info */
...@@ -163,6 +167,7 @@ struct svc_procedure { ...@@ -163,6 +167,7 @@ struct svc_procedure {
unsigned int pc_ressize; /* result struct size */ unsigned int pc_ressize; /* result struct size */
unsigned int pc_count; /* call count */ unsigned int pc_count; /* call count */
unsigned int pc_cachetype; /* cache info (NFS) */ unsigned int pc_cachetype; /* cache info (NFS) */
unsigned int pc_xdrressize; /* maximum size of XDR reply */
}; };
/* /*
...@@ -180,5 +185,6 @@ void svc_destroy(struct svc_serv *); ...@@ -180,5 +185,6 @@ void svc_destroy(struct svc_serv *);
int svc_process(struct svc_serv *, struct svc_rqst *); int svc_process(struct svc_serv *, struct svc_rqst *);
int svc_register(struct svc_serv *, int, unsigned short); int svc_register(struct svc_serv *, int, unsigned short);
void svc_wake_up(struct svc_serv *); void svc_wake_up(struct svc_serv *);
void svc_reserve(struct svc_rqst *rqstp, int space);
#endif /* SUNRPC_SVC_H */ #endif /* SUNRPC_SVC_H */
...@@ -31,12 +31,15 @@ struct svc_sock { ...@@ -31,12 +31,15 @@ struct svc_sock {
#define SK_QUED 5 /* on serv->sk_sockets */ #define SK_QUED 5 /* on serv->sk_sockets */
#define SK_DEAD 6 /* socket closed */ #define SK_DEAD 6 /* socket closed */
int sk_reserved; /* space on outq that is reserved */
int (*sk_recvfrom)(struct svc_rqst *rqstp); int (*sk_recvfrom)(struct svc_rqst *rqstp);
int (*sk_sendto)(struct svc_rqst *rqstp); int (*sk_sendto)(struct svc_rqst *rqstp);
/* We keep the old state_change and data_ready CB's here */ /* We keep the old state_change and data_ready CB's here */
void (*sk_ostate)(struct sock *); void (*sk_ostate)(struct sock *);
void (*sk_odata)(struct sock *, int bytes); void (*sk_odata)(struct sock *, int bytes);
void (*sk_owspace)(struct sock *);
/* private TCP part */ /* private TCP part */
int sk_reclen; /* length of record */ int sk_reclen; /* length of record */
...@@ -52,5 +55,6 @@ void svc_delete_socket(struct svc_sock *); ...@@ -52,5 +55,6 @@ void svc_delete_socket(struct svc_sock *);
int svc_recv(struct svc_serv *, struct svc_rqst *, long); int svc_recv(struct svc_serv *, struct svc_rqst *, long);
int svc_send(struct svc_rqst *); int svc_send(struct svc_rqst *);
void svc_drop(struct svc_rqst *); void svc_drop(struct svc_rqst *);
void svc_sock_update_bufs(struct svc_serv *serv);
#endif /* SUNRPC_SVCSOCK_H */ #endif /* SUNRPC_SVCSOCK_H */
...@@ -67,8 +67,10 @@ svc_destroy(struct svc_serv *serv) ...@@ -67,8 +67,10 @@ svc_destroy(struct svc_serv *serv)
serv->sv_nrthreads); serv->sv_nrthreads);
if (serv->sv_nrthreads) { if (serv->sv_nrthreads) {
if (--(serv->sv_nrthreads) != 0) if (--(serv->sv_nrthreads) != 0) {
svc_sock_update_bufs(serv);
return; return;
}
} else } else
printk("svc_destroy: no threads for serv=%p!\n", serv); printk("svc_destroy: no threads for serv=%p!\n", serv);
...@@ -148,6 +150,7 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv) ...@@ -148,6 +150,7 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
error = kernel_thread((int (*)(void *)) func, rqstp, 0); error = kernel_thread((int (*)(void *)) func, rqstp, 0);
if (error < 0) if (error < 0)
goto out_thread; goto out_thread;
svc_sock_update_bufs(serv);
error = 0; error = 0;
out: out:
return error; return error;
...@@ -306,6 +309,12 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp) ...@@ -306,6 +309,12 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp)
memset(rqstp->rq_argp, 0, procp->pc_argsize); memset(rqstp->rq_argp, 0, procp->pc_argsize);
memset(rqstp->rq_resp, 0, procp->pc_ressize); memset(rqstp->rq_resp, 0, procp->pc_ressize);
/* un-reserve some of the out-queue now that we have a
* better idea of reply size
*/
if (procp->pc_xdrressize)
svc_reserve(rqstp, procp->pc_xdrressize<<2);
/* Call the function that processes the request. */ /* Call the function that processes the request. */
if (!versp->vs_dispatch) { if (!versp->vs_dispatch) {
/* Decode arguments */ /* Decode arguments */
......
...@@ -135,6 +135,17 @@ svc_sock_enqueue(struct svc_sock *svsk) ...@@ -135,6 +135,17 @@ svc_sock_enqueue(struct svc_sock *svsk)
goto out_unlock; goto out_unlock;
} }
if (((svsk->sk_reserved + serv->sv_bufsz)*2
> sock_wspace(svsk->sk_sk))
&& !test_bit(SK_CLOSE, &svsk->sk_flags)
&& !test_bit(SK_CONN, &svsk->sk_flags)) {
/* Don't enqueue while not enough space for reply */
dprintk("svc: socket %p no space, %d > %ld, not enqueued\n",
svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz,
sock_wspace(svsk->sk_sk));
goto out_unlock;
}
/* Mark socket as busy. It will remain in this state until the /* Mark socket as busy. It will remain in this state until the
* server has processed all pending data and put the socket back * server has processed all pending data and put the socket back
* on the idle list. * on the idle list.
...@@ -154,6 +165,8 @@ svc_sock_enqueue(struct svc_sock *svsk) ...@@ -154,6 +165,8 @@ svc_sock_enqueue(struct svc_sock *svsk)
rqstp, rqstp->rq_sock); rqstp, rqstp->rq_sock);
rqstp->rq_sock = svsk; rqstp->rq_sock = svsk;
svsk->sk_inuse++; svsk->sk_inuse++;
rqstp->rq_reserved = serv->sv_bufsz;
svsk->sk_reserved += rqstp->rq_reserved;
wake_up(&rqstp->rq_wait); wake_up(&rqstp->rq_wait);
} else { } else {
dprintk("svc: socket %p put into queue\n", svsk->sk_sk); dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
...@@ -201,6 +214,31 @@ svc_sock_received(struct svc_sock *svsk) ...@@ -201,6 +214,31 @@ svc_sock_received(struct svc_sock *svsk)
} }
/**
* svc_reserve - change the space reserved for the reply to a request.
* @rqstp: The request in question
* @space: new max space to reserve
*
* Each request reserves some space on the output queue of the socket
* to make sure the reply fits. This function reduces that reserved
* space to be the amount of space used already, plus @space.
*
*/
void svc_reserve(struct svc_rqst *rqstp, int space)
{
space += rqstp->rq_resbuf.len<<2;
if (space < rqstp->rq_reserved) {
struct svc_sock *svsk = rqstp->rq_sock;
spin_lock_bh(&svsk->sk_server->sv_lock);
svsk->sk_reserved -= (rqstp->rq_reserved - space);
rqstp->rq_reserved = space;
spin_unlock_bh(&svsk->sk_server->sv_lock);
svc_sock_enqueue(svsk);
}
}
/* /*
* Release a socket after use. * Release a socket after use.
*/ */
...@@ -211,6 +249,20 @@ svc_sock_release(struct svc_rqst *rqstp) ...@@ -211,6 +249,20 @@ svc_sock_release(struct svc_rqst *rqstp)
struct svc_serv *serv = svsk->sk_server; struct svc_serv *serv = svsk->sk_server;
svc_release_skb(rqstp); svc_release_skb(rqstp);
/* Reset response buffer and release
* the reservation.
* But first, check that enough space was reserved
* for the reply, otherwise we have a bug!
*/
if ((rqstp->rq_resbuf.len<<2) > rqstp->rq_reserved)
printk(KERN_ERR "RPC request reserved %d but used %d\n",
rqstp->rq_reserved,
rqstp->rq_resbuf.len<<2);
rqstp->rq_resbuf.buf = rqstp->rq_resbuf.base;
rqstp->rq_resbuf.len = 0;
svc_reserve(rqstp, 0);
rqstp->rq_sock = NULL; rqstp->rq_sock = NULL;
spin_lock_bh(&serv->sv_lock); spin_lock_bh(&serv->sv_lock);
...@@ -269,7 +321,13 @@ svc_sendto(struct svc_rqst *rqstp, struct iovec *iov, int nr) ...@@ -269,7 +321,13 @@ svc_sendto(struct svc_rqst *rqstp, struct iovec *iov, int nr)
msg.msg_control = NULL; msg.msg_control = NULL;
msg.msg_controllen = 0; msg.msg_controllen = 0;
msg.msg_flags = MSG_DONTWAIT; /* This was MSG_DONTWAIT, but I now want it to wait.
* The only thing that it would wait for is memory and
* if we are fairly low on memory, then we aren't likely
* to make much progress anyway.
* sk->sndtimeo is set to 30seconds just in case.
*/
msg.msg_flags = 0;
oldfs = get_fs(); set_fs(KERNEL_DS); oldfs = get_fs(); set_fs(KERNEL_DS);
len = sock_sendmsg(sock, &msg, buflen); len = sock_sendmsg(sock, &msg, buflen);
...@@ -338,6 +396,32 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen) ...@@ -338,6 +396,32 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen)
return len; return len;
} }
/*
* Set socket snd and rcv buffer lengths
*/
static inline void
svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
{
#if 0
mm_segment_t oldfs;
oldfs = get_fs(); set_fs(KERNEL_DS);
sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
(char*)&snd, sizeof(snd));
sock_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
(char*)&rcv, sizeof(rcv));
#else
/* sock_setsockopt limits use to sysctl_?mem_max,
* which isn't acceptable. Until that is made conditional
* on not having CAP_SYS_RESOURCE or similar, we go direct...
* DaveM said I could!
*/
lock_sock(sock->sk);
sock->sk->sndbuf = snd * 2;
sock->sk->rcvbuf = rcv * 2;
sock->sk->userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
release_sock(sock->sk);
#endif
}
/* /*
* INET callback when data has been received on the socket. * INET callback when data has been received on the socket.
*/ */
...@@ -357,6 +441,27 @@ svc_udp_data_ready(struct sock *sk, int count) ...@@ -357,6 +441,27 @@ svc_udp_data_ready(struct sock *sk, int count)
wake_up_interruptible(sk->sleep); wake_up_interruptible(sk->sleep);
} }
/*
* INET callback when space is newly available on the socket.
*/
static void
svc_write_space(struct sock *sk)
{
struct svc_sock *svsk = (struct svc_sock *)(sk->user_data);
if (svsk) {
dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
svc_sock_enqueue(svsk);
}
if (sk->sleep && waitqueue_active(sk->sleep)) {
printk(KERN_WARNING "RPC svc_write_space: some sleeping on %p\n",
svsk);
wake_up_interruptible(sk->sleep);
}
}
/* /*
* Receive a datagram from a UDP socket. * Receive a datagram from a UDP socket.
*/ */
...@@ -448,6 +553,7 @@ static int ...@@ -448,6 +553,7 @@ static int
svc_udp_init(struct svc_sock *svsk) svc_udp_init(struct svc_sock *svsk)
{ {
svsk->sk_sk->data_ready = svc_udp_data_ready; svsk->sk_sk->data_ready = svc_udp_data_ready;
svsk->sk_sk->write_space = svc_write_space;
svsk->sk_recvfrom = svc_udp_recvfrom; svsk->sk_recvfrom = svc_udp_recvfrom;
svsk->sk_sendto = svc_udp_sendto; svsk->sk_sendto = svc_udp_sendto;
...@@ -581,6 +687,11 @@ svc_tcp_accept(struct svc_sock *svsk) ...@@ -581,6 +687,11 @@ svc_tcp_accept(struct svc_sock *svsk)
if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0))) if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0)))
goto failed; goto failed;
/* make sure that a write doesn't block forever when
* low on memory
*/
newsock->sk->sndtimeo = HZ*30;
/* Precharge. Data may have arrived on the socket before we /* Precharge. Data may have arrived on the socket before we
* installed the data_ready callback. * installed the data_ready callback.
*/ */
...@@ -723,8 +834,6 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) ...@@ -723,8 +834,6 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
/* /*
* Send out data on TCP socket. * Send out data on TCP socket.
* FIXME: Make the sendto call non-blocking in order not to hang
* a daemon on a dead client. Requires write queue maintenance.
*/ */
static int static int
svc_tcp_sendto(struct svc_rqst *rqstp) svc_tcp_sendto(struct svc_rqst *rqstp)
...@@ -766,14 +875,70 @@ svc_tcp_init(struct svc_sock *svsk) ...@@ -766,14 +875,70 @@ svc_tcp_init(struct svc_sock *svsk)
dprintk("setting up TCP socket for reading\n"); dprintk("setting up TCP socket for reading\n");
sk->state_change = svc_tcp_state_change; sk->state_change = svc_tcp_state_change;
sk->data_ready = svc_tcp_data_ready; sk->data_ready = svc_tcp_data_ready;
sk->write_space = svc_write_space;
svsk->sk_reclen = 0; svsk->sk_reclen = 0;
svsk->sk_tcplen = 0; svsk->sk_tcplen = 0;
/* sndbuf needs to have room for one request
* per thread, otherwise we can stall even when the
* network isn't a bottleneck.
* rcvbuf just needs to be able to hold a few requests.
* Normally they will be removed from the queue
* as soon a a complete request arrives.
*/
svc_sock_setbufsize(svsk->sk_sock,
svsk->sk_server->sv_nrthreads *
svsk->sk_server->sv_bufsz,
3 * svsk->sk_server->sv_bufsz);
} }
return 0; return 0;
} }
void
svc_sock_update_bufs(struct svc_serv *serv)
{
/*
* The number of server threads has changed. Update
* rcvbuf and sndbuf accordingly on all sockets
*/
struct list_head *le;
spin_lock_bh(&serv->sv_lock);
list_for_each(le, &serv->sv_permsocks) {
struct svc_sock *svsk =
list_entry(le, struct svc_sock, sk_list);
struct socket *sock = svsk->sk_sock;
if (sock->type == SOCK_DGRAM) {
/* udp sockets need large rcvbuf as all pending
* requests are still in that buffer.
* As outgoing requests do not wait for an
* ACK, only a moderate sndbuf is needed
*/
svc_sock_setbufsize(sock,
5 * serv->sv_bufsz,
(serv->sv_nrthreads+2)* serv->sv_bufsz);
} else if (svsk->sk_sk->state != TCP_LISTEN) {
printk(KERN_ERR "RPC update_bufs: permanent sock neither UDP or TCP_LISTEN\n");
}
}
list_for_each(le, &serv->sv_tempsocks) {
struct svc_sock *svsk =
list_entry(le, struct svc_sock, sk_list);
struct socket *sock = svsk->sk_sock;
if (sock->type == SOCK_STREAM) {
/* See svc_tcp_init above for rationale on buffer sizes */
svc_sock_setbufsize(sock,
serv->sv_nrthreads *
serv->sv_bufsz,
3 * serv->sv_bufsz);
} else
printk(KERN_ERR "RPC update_bufs: temp sock not TCP\n");
}
spin_unlock_bh(&serv->sv_lock);
}
/* /*
* Receive the next request on any socket. * Receive the next request on any socket.
*/ */
...@@ -824,6 +989,8 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) ...@@ -824,6 +989,8 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout)
} else if ((svsk = svc_sock_dequeue(serv)) != NULL) { } else if ((svsk = svc_sock_dequeue(serv)) != NULL) {
rqstp->rq_sock = svsk; rqstp->rq_sock = svsk;
svsk->sk_inuse++; svsk->sk_inuse++;
rqstp->rq_reserved = serv->sv_bufsz;
svsk->sk_reserved += rqstp->rq_reserved;
} else { } else {
/* No data pending. Go to sleep */ /* No data pending. Go to sleep */
svc_serv_enqueue(serv, rqstp); svc_serv_enqueue(serv, rqstp);
...@@ -944,6 +1111,7 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, ...@@ -944,6 +1111,7 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock,
svsk->sk_sk = inet; svsk->sk_sk = inet;
svsk->sk_ostate = inet->state_change; svsk->sk_ostate = inet->state_change;
svsk->sk_odata = inet->data_ready; svsk->sk_odata = inet->data_ready;
svsk->sk_owspace = inet->write_space;
svsk->sk_server = serv; svsk->sk_server = serv;
svsk->sk_lastrecv = CURRENT_TIME; svsk->sk_lastrecv = CURRENT_TIME;
...@@ -1044,6 +1212,7 @@ svc_delete_socket(struct svc_sock *svsk) ...@@ -1044,6 +1212,7 @@ svc_delete_socket(struct svc_sock *svsk)
sk->state_change = svsk->sk_ostate; sk->state_change = svsk->sk_ostate;
sk->data_ready = svsk->sk_odata; sk->data_ready = svsk->sk_odata;
sk->write_space = svsk->sk_owspace;
spin_lock_bh(&serv->sv_lock); spin_lock_bh(&serv->sv_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment