Commit 550aebfe authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent c50b8ee0
...@@ -185,7 +185,6 @@ struct xdr_skb_reader { ...@@ -185,7 +185,6 @@ struct xdr_skb_reader {
typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len); typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len);
size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len);
extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *); extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *);
extern ssize_t xdr_partial_copy_from_skb(struct xdr_buf *, unsigned int, extern ssize_t xdr_partial_copy_from_skb(struct xdr_buf *, unsigned int,
struct xdr_skb_reader *, xdr_skb_read_actor); struct xdr_skb_reader *, xdr_skb_read_actor);
......
...@@ -26,7 +26,8 @@ ...@@ -26,7 +26,8 @@
* Possibly called several times to iterate over an sk_buff and copy * Possibly called several times to iterate over an sk_buff and copy
* data out of it. * data out of it.
*/ */
size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) static size_t
xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
{ {
if (len > desc->count) if (len > desc->count)
len = desc->count; len = desc->count;
...@@ -36,7 +37,6 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) ...@@ -36,7 +37,6 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
desc->offset += len; desc->offset += len;
return len; return len;
} }
EXPORT_SYMBOL_GPL(xdr_skb_read_bits);
/** /**
* xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer
......
...@@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work) ...@@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
xs_stream_data_receive(transport); xs_stream_data_receive(transport);
} }
static void
xs_stream_reset_connect(struct sock_xprt *transport)
{
transport->recv.offset = 0;
transport->recv.len = 0;
transport->recv.copied = 0;
transport->xmit.offset = 0;
transport->xprt.stat.connect_count++;
transport->xprt.stat.connect_start = jiffies;
}
#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
...@@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt) ...@@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt)
module_put(THIS_MODULE); module_put(THIS_MODULE);
} }
static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
{
struct xdr_skb_reader desc = {
.skb = skb,
.offset = sizeof(rpc_fraghdr),
.count = skb->len - sizeof(rpc_fraghdr),
};
if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
return -1;
if (desc.count)
return -1;
return 0;
}
/**
* xs_local_data_read_skb
* @xprt: transport
* @sk: socket
* @skb: skbuff
*
* Currently this assumes we can read the whole reply in a single gulp.
*/
static void xs_local_data_read_skb(struct rpc_xprt *xprt,
struct sock *sk,
struct sk_buff *skb)
{
struct rpc_task *task;
struct rpc_rqst *rovr;
int repsize, copied;
u32 _xid;
__be32 *xp;
repsize = skb->len - sizeof(rpc_fraghdr);
if (repsize < 4) {
dprintk("RPC: impossible RPC reply size %d\n", repsize);
return;
}
/* Copy the XID from the skb... */
xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
if (xp == NULL)
return;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
xprt_pin_rqst(rovr);
spin_unlock(&xprt->queue_lock);
task = rovr->rq_task;
copied = rovr->rq_private_buf.buflen;
if (copied > repsize)
copied = repsize;
if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n");
spin_lock(&xprt->queue_lock);
goto out_unpin;
}
spin_lock(&xprt->queue_lock);
xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
spin_unlock(&xprt->queue_lock);
}
static void xs_local_data_receive(struct sock_xprt *transport)
{
struct sk_buff *skb;
struct sock *sk;
int err;
restart:
mutex_lock(&transport->recv_mutex);
sk = transport->inet;
if (sk == NULL)
goto out;
for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err);
if (skb != NULL) {
xs_local_data_read_skb(&transport->xprt, sk, skb);
skb_free_datagram(sk, skb);
continue;
}
if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
if (need_resched()) {
mutex_unlock(&transport->recv_mutex);
cond_resched();
goto restart;
}
}
out:
mutex_unlock(&transport->recv_mutex);
}
static void xs_local_data_receive_workfn(struct work_struct *work)
{
struct sock_xprt *transport =
container_of(work, struct sock_xprt, recv_worker);
xs_local_data_receive(transport);
}
/** /**
* xs_udp_data_read_skb - receive callback for UDP sockets * xs_udp_data_read_skb - receive callback for UDP sockets
* @xprt: transport * @xprt: transport
...@@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, ...@@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
transport->xmit.offset = 0; xs_stream_reset_connect(transport);
/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
} }
...@@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) ...@@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
xs_set_memalloc(xprt); xs_set_memalloc(xprt);
/* Reset TCP record info */ /* Reset TCP record info */
transport->recv.offset = 0; xs_stream_reset_connect(transport);
transport->recv.len = 0;
transport->recv.copied = 0;
transport->xmit.offset = 0;
/* Tell the socket layer to start connecting... */ /* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
switch (ret) { switch (ret) {
...@@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = { ...@@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
.connect = xs_local_connect, .connect = xs_local_connect,
.buf_alloc = rpc_malloc, .buf_alloc = rpc_malloc,
.buf_free = rpc_free, .buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_local_send_request, .send_request = xs_local_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def, .set_retrans_timeout = xprt_set_retrans_timeout_def,
.close = xs_close, .close = xs_close,
...@@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) ...@@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
xprt->ops = &xs_local_ops; xprt->ops = &xs_local_ops;
xprt->timeout = &xs_local_default_timeout; xprt->timeout = &xs_local_default_timeout;
INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
INIT_DELAYED_WORK(&transport->connect_worker, INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
xs_dummy_setup_socket);
switch (sun->sun_family) { switch (sun->sun_family) {
case AF_LOCAL: case AF_LOCAL:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment