Commit 294d77d9 authored by Neil Brown's avatar Neil Brown Committed by Linus Torvalds

[PATCH] PATCH 10/16: NFSD: TCP: rationalise locking in RPC server routines

Tidy up SMP locking for svc_sock

sk_lock is not necessary and is now removed.
The only things that were happening under sk_lock but
not the more global sv_lock were testing and setting
some of the flags: sk_busy, sk_conn, sk_data etc.

These have been changed to bits in a flags word which are atomically
set and tested.

Also, by establishing some simple rules about that must
be done after setting these flags, the locking is not needed.

With this patch sk_conn and sk_data are now flags, not counts (sk_data
was already a flag for udp).  They are set if there might be
a connection or data, and only clear when we are sure there aren't
(or when we are about to check if there is).

svc_sock_accepted becomes identical to svc_sock_recieved and
so is discarded in favour of the latter.

sk_rqstp was never used and is now gone.
parent 2b063bda
......@@ -19,17 +19,18 @@ struct svc_sock {
struct list_head sk_list; /* list of all sockets */
struct socket * sk_sock; /* berkeley socket layer */
struct sock * sk_sk; /* INET layer */
spinlock_t sk_lock;
struct svc_serv * sk_server; /* service for this socket */
unsigned char sk_inuse; /* use count */
unsigned char sk_busy; /* enqueued/receiving */
unsigned char sk_conn; /* conn pending */
unsigned char sk_close; /* dead or dying */
int sk_data; /* data pending */
unsigned int sk_temp : 1, /* temp socket */
sk_qued : 1, /* on serv->sk_sockets */
sk_dead : 1; /* socket closed */
unsigned int sk_flags;
#define SK_BUSY 0 /* enqueued/receiving */
#define SK_CONN 1 /* conn pending */
#define SK_CLOSE 2 /* dead or dying */
#define SK_DATA 3 /* data pending */
#define SK_TEMP 4 /* temp (TCP) socket */
#define SK_QUED 5 /* on serv->sk_sockets */
#define SK_DEAD 6 /* socket closed */
int (*sk_recvfrom)(struct svc_rqst *rqstp);
int (*sk_sendto)(struct svc_rqst *rqstp);
......@@ -40,9 +41,6 @@ struct svc_sock {
/* private TCP part */
int sk_reclen; /* length of record */
int sk_tcplen; /* current read length */
/* Debugging */
struct svc_rqst * sk_rqstp;
};
/*
......
......@@ -45,10 +45,19 @@
/* SMP locking strategy:
*
* svc_sock->sk_lock and svc_serv->sv_lock protect their
* respective structures.
* svc_serv->sv_lock protects most stuff for that service.
*
* Some flags can be set to certain values at any time
* providing that certain rules are followed:
*
* SK_BUSY can be set to 0 at any time.
* svc_sock_enqueue must be called afterwards
* SK_CONN, SK_DATA, can be set or cleared at any time.
* after a set, svc_sock_enqueue must be called.
* after a clear, the socket must be read/accepted
* if this succeeds, it must be set again.
* SK_CLOSE can set at any time. It is never cleared.
*
* Antideadlock ordering is sk_lock --> sv_lock.
*/
#define RPCDBG_FACILITY RPCDBG_SVCSOCK
......@@ -102,7 +111,6 @@ svc_release_skb(struct svc_rqst *rqstp)
* Queue up a socket with data pending. If there are idle nfsd
* processes, wake 'em up.
*
* This must be called with svsk->sk_lock held.
*/
static void
svc_sock_enqueue(struct svc_sock *svsk)
......@@ -110,15 +118,18 @@ svc_sock_enqueue(struct svc_sock *svsk)
struct svc_serv *serv = svsk->sk_server;
struct svc_rqst *rqstp;
/* NOTE: Local BH is already disabled by our caller. */
spin_lock(&serv->sv_lock);
if (!(svsk->sk_flags &
( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)) ))
return;
spin_lock_bh(&serv->sv_lock);
if (!list_empty(&serv->sv_threads) &&
!list_empty(&serv->sv_sockets))
printk(KERN_ERR
"svc_sock_enqueue: threads and sockets both waiting??\n");
if (svsk->sk_busy) {
if (test_bit(SK_BUSY, &svsk->sk_flags)) {
/* Don't enqueue socket while daemon is receiving */
dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
goto out_unlock;
......@@ -128,7 +139,7 @@ svc_sock_enqueue(struct svc_sock *svsk)
* server has processed all pending data and put the socket back
* on the idle list.
*/
svsk->sk_busy = 1;
set_bit(SK_BUSY, &svsk->sk_flags);
if (!list_empty(&serv->sv_threads)) {
rqstp = list_entry(serv->sv_threads.next,
......@@ -147,11 +158,11 @@ svc_sock_enqueue(struct svc_sock *svsk)
} else {
dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
svsk->sk_qued = 1;
set_bit(SK_QUED, &svsk->sk_flags);
}
out_unlock:
spin_unlock(&serv->sv_lock);
spin_unlock_bh(&serv->sv_lock);
}
/*
......@@ -171,49 +182,24 @@ svc_sock_dequeue(struct svc_serv *serv)
dprintk("svc: socket %p dequeued, inuse=%d\n",
svsk->sk_sk, svsk->sk_inuse);
svsk->sk_qued = 0;
clear_bit(SK_QUED, &svsk->sk_flags);
return svsk;
}
/*
* Having read count bytes from a socket, check whether it
* Having read something from a socket, check whether it
* needs to be re-enqueued.
* Note: SK_DATA only gets cleared when a read-attempt finds
* no (or insufficient) data.
*/
static inline void
svc_sock_received(struct svc_sock *svsk, int count)
svc_sock_received(struct svc_sock *svsk)
{
spin_lock_bh(&svsk->sk_lock);
if ((svsk->sk_data -= count) < 0) {
printk(KERN_NOTICE "svc: sk_data negative!\n");
svsk->sk_data = 0;
}
svsk->sk_rqstp = NULL; /* XXX */
svsk->sk_busy = 0;
if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
dprintk("svc: socket %p re-enqueued after receive\n",
svsk->sk_sk);
svc_sock_enqueue(svsk);
}
spin_unlock_bh(&svsk->sk_lock);
clear_bit(SK_BUSY, &svsk->sk_flags);
svc_sock_enqueue(svsk);
}
/*
* Dequeue a new connection.
*/
static inline void
svc_sock_accepted(struct svc_sock *svsk)
{
spin_lock_bh(&svsk->sk_lock);
svsk->sk_busy = 0;
svsk->sk_conn--;
if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
dprintk("svc: socket %p re-enqueued after accept\n",
svsk->sk_sk);
svc_sock_enqueue(svsk);
}
spin_unlock_bh(&svsk->sk_lock);
}
/*
* Release a socket after use.
......@@ -228,7 +214,7 @@ svc_sock_release(struct svc_rqst *rqstp)
rqstp->rq_sock = NULL;
spin_lock_bh(&serv->sv_lock);
if (!--(svsk->sk_inuse) && svsk->sk_dead) {
if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) {
spin_unlock_bh(&serv->sv_lock);
dprintk("svc: releasing dead socket\n");
sock_release(svsk->sk_sock);
......@@ -363,11 +349,9 @@ svc_udp_data_ready(struct sock *sk, int count)
if (!svsk)
goto out;
dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
svsk, sk, count, svsk->sk_busy);
spin_lock_bh(&svsk->sk_lock);
svsk->sk_data = 1;
svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
set_bit(SK_DATA, &svsk->sk_flags);
svc_sock_enqueue(svsk);
spin_unlock_bh(&svsk->sk_lock);
out:
if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible(sk->sleep);
......@@ -385,20 +369,21 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
u32 *data;
int err, len;
svsk->sk_data = 0;
clear_bit(SK_DATA, &svsk->sk_flags);
while ((skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err)) == NULL) {
svc_sock_received(svsk, 0);
svc_sock_received(svsk);
if (err == -EAGAIN)
return err;
/* possibly an icmp error */
dprintk("svc: recvfrom returned error %d\n", -err);
}
set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
/* Sorry. */
if (skb_is_nonlinear(skb)) {
if (skb_linearize(skb, GFP_KERNEL) != 0) {
kfree_skb(skb);
svc_sock_received(svsk, 0);
svc_sock_received(svsk);
return 0;
}
}
......@@ -406,13 +391,11 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
if (skb->ip_summed != CHECKSUM_UNNECESSARY) {
if ((unsigned short)csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) {
skb_free_datagram(svsk->sk_sk, skb);
svc_sock_received(svsk, 0);
svc_sock_received(svsk);
return 0;
}
}
/* There may be more data */
svsk->sk_data = 1;
len = skb->len - sizeof(struct udphdr);
data = (u32 *) (skb->data + sizeof(struct udphdr));
......@@ -434,7 +417,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
/* One down, maybe more to go... */
svsk->sk_sk->stamp = skb->stamp;
svc_sock_received(svsk, 0);
svc_sock_received(svsk);
return len;
}
......@@ -494,10 +477,8 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
printk("svc: socket %p: no user data\n", sk);
goto out;
}
spin_lock_bh(&svsk->sk_lock);
svsk->sk_conn++;
set_bit(SK_CONN, &svsk->sk_flags);
svc_sock_enqueue(svsk);
spin_unlock_bh(&svsk->sk_lock);
out:
if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible_all(sk->sleep);
......@@ -518,10 +499,8 @@ svc_tcp_state_change(struct sock *sk)
printk("svc: socket %p: no user data\n", sk);
goto out;
}
spin_lock_bh(&svsk->sk_lock);
svsk->sk_close = 1;
set_bit(SK_CLOSE, &svsk->sk_flags);
svc_sock_enqueue(svsk);
spin_unlock_bh(&svsk->sk_lock);
out:
if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible_all(sk->sleep);
......@@ -536,10 +515,8 @@ svc_tcp_data_ready(struct sock *sk, int count)
sk, sk->user_data);
if (!(svsk = (struct svc_sock *)(sk->user_data)))
goto out;
spin_lock_bh(&svsk->sk_lock);
svsk->sk_data++;
set_bit(SK_DATA, &svsk->sk_flags);
svc_sock_enqueue(svsk);
spin_unlock_bh(&svsk->sk_lock);
out:
if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible(sk->sleep);
......@@ -572,13 +549,14 @@ svc_tcp_accept(struct svc_sock *svsk)
newsock->type = sock->type;
newsock->ops = ops = sock->ops;
clear_bit(SK_CONN, &svsk->sk_flags);
if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) {
if (net_ratelimit())
if (err != -EAGAIN && net_ratelimit())
printk(KERN_WARNING "%s: accept failed (err %d)!\n",
serv->sv_name, -err);
goto failed; /* aborted connection or whatever */
}
set_bit(SK_CONN, &svsk->sk_flags);
slen = sizeof(sin);
err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1);
if (err < 0) {
......@@ -609,11 +587,9 @@ svc_tcp_accept(struct svc_sock *svsk)
/* Precharge. Data may have arrived on the socket before we
* installed the data_ready callback.
*/
spin_lock_bh(&newsvsk->sk_lock);
newsvsk->sk_data = 1;
newsvsk->sk_temp = 1;
set_bit(SK_DATA, &newsvsk->sk_flags);
set_bit(SK_TEMP, &newsvsk->sk_flags);
svc_sock_enqueue(newsvsk);
spin_unlock_bh(&newsvsk->sk_lock);
if (serv->sv_stats)
serv->sv_stats->nettcpconn++;
......@@ -634,23 +610,25 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
struct svc_sock *svsk = rqstp->rq_sock;
struct svc_serv *serv = svsk->sk_server;
struct svc_buf *bufp = &rqstp->rq_argbuf;
int len, ready, used;
int len;
dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close);
svsk, test_bit(SK_DATA, &svsk->sk_flags),
test_bit(SK_CONN, &svsk->sk_flags),
test_bit(SK_CLOSE, &svsk->sk_flags));
if (svsk->sk_close) {
if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
svc_delete_socket(svsk);
return 0;
}
if (svsk->sk_conn) {
if (test_bit(SK_CONN, &svsk->sk_flags)) {
svc_tcp_accept(svsk);
svc_sock_accepted(svsk);
svc_sock_received(svsk);
return 0;
}
ready = svsk->sk_data;
clear_bit(SK_DATA, &svsk->sk_flags);
/* Receive data. If we haven't got the record length yet, get
* the next four bytes. Otherwise try to gobble up as much as
......@@ -694,15 +672,10 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
*/
dprintk("svc: incomplete TCP record (%d of %d)\n",
len, svsk->sk_reclen);
svc_sock_received(svsk, ready);
svc_sock_received(svsk);
return -EAGAIN; /* record not complete */
}
/* if we think there is only one more record to read, but
* it is bigger than we expect, then two records must have arrived
* together, so pretend we aren't using the record.. */
if (len > svsk->sk_reclen && ready == 1)
used = 0;
else used = 1;
set_bit(SK_DATA, &svsk->sk_flags);
/* Frob argbuf */
bufp->iov[0].iov_base += 4;
......@@ -729,7 +702,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
svsk->sk_reclen = 0;
svsk->sk_tcplen = 0;
svc_sock_received(svsk, used);
svc_sock_received(svsk);
if (serv->sv_stats)
serv->sv_stats->nettcpcnt++;
......@@ -738,11 +711,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
error:
if (len == -EAGAIN) {
dprintk("RPC: TCP recvfrom got EAGAIN\n");
svc_sock_received(svsk, ready); /* Clear data ready */
svc_sock_received(svsk);
} else {
printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
svsk->sk_server->sv_name, -len);
svc_sock_received(svsk, 0);
svc_sock_received(svsk);
}
return len;
......@@ -949,7 +922,6 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock,
svsk->sk_ostate = inet->state_change;
svsk->sk_odata = inet->data_ready;
svsk->sk_server = serv;
spin_lock_init(&svsk->sk_lock);
/* Initialize the socket */
if (sock->type == SOCK_DGRAM)
......@@ -1045,11 +1017,11 @@ svc_delete_socket(struct svc_sock *svsk)
spin_lock_bh(&serv->sv_lock);
list_del(&svsk->sk_list);
if (svsk->sk_qued)
if (test_bit(SK_QUED, &svsk->sk_flags))
list_del(&svsk->sk_ready);
svsk->sk_dead = 1;
set_bit(SK_DEAD, &svsk->sk_flags);
if (!svsk->sk_inuse) {
spin_unlock_bh(&serv->sv_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment