Commit f1d1c9fa authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'nfs-for-2.6.40' of git://git.linux-nfs.org/projects/trondmy/nfs-2.6

* 'nfs-for-2.6.40' of git://git.linux-nfs.org/projects/trondmy/nfs-2.6:
  SUNRPC: Support for RPC over AF_LOCAL transports
  SUNRPC: Remove obsolete comment
  SUNRPC: Use AF_LOCAL for rpcbind upcalls
  SUNRPC: Clean up use of curly braces in switch cases
  NFS: Revert NFSROOT default mount options
  SUNRPC: Rename xs_encode_tcp_fragment_header()
  nfs,rcu: convert call_rcu(nfs_free_delegation_callback) to kfree_rcu()
  nfs41: Correct offset for LAYOUTCOMMIT
  NFS: nfs_update_inode: print current and new inode size in debug output
  NFSv4.1: Fix the handling of NFS4ERR_SEQ_MISORDERED errors
  NFSv4: Handle expired stateids when the lease is still valid
  SUNRPC: Deal with the lack of a SYN_SENT sk->sk_state_change callback...
parents 2ff55e98 176e21ee
......@@ -21,25 +21,13 @@
#include "delegation.h"
#include "internal.h"
static void nfs_do_free_delegation(struct nfs_delegation *delegation)
{
kfree(delegation);
}
static void nfs_free_delegation_callback(struct rcu_head *head)
{
struct nfs_delegation *delegation = container_of(head, struct nfs_delegation, rcu);
nfs_do_free_delegation(delegation);
}
static void nfs_free_delegation(struct nfs_delegation *delegation)
{
if (delegation->cred) {
put_rpccred(delegation->cred);
delegation->cred = NULL;
}
call_rcu(&delegation->rcu, nfs_free_delegation_callback);
kfree_rcu(delegation, rcu);
}
/**
......
......@@ -1298,8 +1298,12 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
i_size_write(inode, new_isize);
invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
}
dprintk("NFS: isize change on server for file %s/%ld\n",
inode->i_sb->s_id, inode->i_ino);
dprintk("NFS: isize change on server for file %s/%ld "
"(%Ld to %Ld)\n",
inode->i_sb->s_id,
inode->i_ino,
(long long)cur_isize,
(long long)new_isize);
}
} else
invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
......
......@@ -267,9 +267,11 @@ static int nfs4_handle_exception(struct nfs_server *server, int errorcode, struc
break;
nfs4_schedule_stateid_recovery(server, state);
goto wait_on_recovery;
case -NFS4ERR_EXPIRED:
if (state != NULL)
nfs4_schedule_stateid_recovery(server, state);
case -NFS4ERR_STALE_STATEID:
case -NFS4ERR_STALE_CLIENTID:
case -NFS4ERR_EXPIRED:
nfs4_schedule_lease_recovery(clp);
goto wait_on_recovery;
#if defined(CONFIG_NFS_V4_1)
......@@ -3670,9 +3672,11 @@ nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server,
break;
nfs4_schedule_stateid_recovery(server, state);
goto wait_on_recovery;
case -NFS4ERR_EXPIRED:
if (state != NULL)
nfs4_schedule_stateid_recovery(server, state);
case -NFS4ERR_STALE_STATEID:
case -NFS4ERR_STALE_CLIENTID:
case -NFS4ERR_EXPIRED:
nfs4_schedule_lease_recovery(clp);
goto wait_on_recovery;
#if defined(CONFIG_NFS_V4_1)
......@@ -4543,6 +4547,7 @@ int nfs4_lock_delegation_recall(struct nfs4_state *state, struct file_lock *fl)
case -ESTALE:
goto out;
case -NFS4ERR_EXPIRED:
nfs4_schedule_stateid_recovery(server, state);
case -NFS4ERR_STALE_CLIENTID:
case -NFS4ERR_STALE_STATEID:
nfs4_schedule_lease_recovery(server->nfs_client);
......
......@@ -1466,7 +1466,10 @@ static int nfs4_reclaim_lease(struct nfs_client *clp)
#ifdef CONFIG_NFS_V4_1
void nfs4_schedule_session_recovery(struct nfs4_session *session)
{
nfs4_schedule_lease_recovery(session->clp);
struct nfs_client *clp = session->clp;
set_bit(NFS4CLNT_SESSION_RESET, &clp->cl_state);
nfs4_schedule_lease_recovery(clp);
}
EXPORT_SYMBOL_GPL(nfs4_schedule_session_recovery);
......@@ -1549,6 +1552,7 @@ static int nfs4_reset_session(struct nfs_client *clp)
status = nfs4_recovery_handle_error(clp, status);
goto out;
}
clear_bit(NFS4CLNT_SESSION_RESET, &clp->cl_state);
/* create_session negotiated new slot table */
clear_bit(NFS4CLNT_RECALL_SLOT, &clp->cl_state);
......
......@@ -87,7 +87,7 @@
#define NFS_ROOT "/tftpboot/%s"
/* Default NFSROOT mount options. */
#define NFS_DEF_OPTIONS "udp"
#define NFS_DEF_OPTIONS "vers=2,udp,rsize=4096,wsize=4096"
/* Parameters passed from the kernel command line */
static char nfs_root_parms[256] __initdata = "";
......
......@@ -1009,7 +1009,7 @@ void
pnfs_set_layoutcommit(struct nfs_write_data *wdata)
{
struct nfs_inode *nfsi = NFS_I(wdata->inode);
loff_t end_pos = wdata->args.offset + wdata->res.count;
loff_t end_pos = wdata->mds_offset + wdata->res.count;
bool mark_as_dirty = false;
spin_lock(&nfsi->vfs_inode.i_lock);
......
......@@ -145,6 +145,7 @@ typedef __be32 rpc_fraghdr;
#define RPCBIND_NETID_TCP "tcp"
#define RPCBIND_NETID_UDP6 "udp6"
#define RPCBIND_NETID_TCP6 "tcp6"
#define RPCBIND_NETID_LOCAL "local"
/*
* Note that RFC 1833 does not put any size restrictions on the
......
......@@ -141,7 +141,8 @@ enum xprt_transports {
XPRT_TRANSPORT_UDP = IPPROTO_UDP,
XPRT_TRANSPORT_TCP = IPPROTO_TCP,
XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC,
XPRT_TRANSPORT_RDMA = 256
XPRT_TRANSPORT_RDMA = 256,
XPRT_TRANSPORT_LOCAL = 257,
};
struct rpc_xprt {
......
......@@ -13,10 +13,6 @@
* and need to be refreshed, or when a packet was damaged in transit.
* This may be have to be moved to the VFS layer.
*
* NB: BSD uses a more intelligent approach to guessing when a request
* or reply has been lost by keeping the RTO estimate for each procedure.
* We currently make do with a constant timeout value.
*
* Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
* Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
*/
......@@ -32,7 +28,9 @@
#include <linux/slab.h>
#include <linux/utsname.h>
#include <linux/workqueue.h>
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/un.h>
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/rpc_pipe_fs.h>
......@@ -298,22 +296,27 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
* up a string representation of the passed-in address.
*/
if (args->servername == NULL) {
servername[0] = '\0';
switch (args->address->sa_family) {
case AF_INET: {
struct sockaddr_un *sun =
(struct sockaddr_un *)args->address;
struct sockaddr_in *sin =
(struct sockaddr_in *)args->address;
struct sockaddr_in6 *sin6 =
(struct sockaddr_in6 *)args->address;
servername[0] = '\0';
switch (args->address->sa_family) {
case AF_LOCAL:
snprintf(servername, sizeof(servername), "%s",
sun->sun_path);
break;
case AF_INET:
snprintf(servername, sizeof(servername), "%pI4",
&sin->sin_addr.s_addr);
break;
}
case AF_INET6: {
struct sockaddr_in6 *sin =
(struct sockaddr_in6 *)args->address;
case AF_INET6:
snprintf(servername, sizeof(servername), "%pI6",
&sin->sin6_addr);
&sin6->sin6_addr);
break;
}
default:
/* caller wants default server name, but
* address family isn't recognized. */
......
......@@ -16,6 +16,7 @@
#include <linux/types.h>
#include <linux/socket.h>
#include <linux/un.h>
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/kernel.h>
......@@ -32,6 +33,8 @@
# define RPCDBG_FACILITY RPCDBG_BIND
#endif
#define RPCBIND_SOCK_PATHNAME "/var/run/rpcbind.sock"
#define RPCBIND_PROGRAM (100000u)
#define RPCBIND_PORT (111u)
......@@ -158,20 +161,69 @@ static void rpcb_map_release(void *data)
kfree(map);
}
static const struct sockaddr_in rpcb_inaddr_loopback = {
.sin_family = AF_INET,
.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
.sin_port = htons(RPCBIND_PORT),
};
/*
* Returns zero on success, otherwise a negative errno value
* is returned.
*/
static int rpcb_create_local_unix(void)
{
static const struct sockaddr_un rpcb_localaddr_rpcbind = {
.sun_family = AF_LOCAL,
.sun_path = RPCBIND_SOCK_PATHNAME,
};
struct rpc_create_args args = {
.net = &init_net,
.protocol = XPRT_TRANSPORT_LOCAL,
.address = (struct sockaddr *)&rpcb_localaddr_rpcbind,
.addrsize = sizeof(rpcb_localaddr_rpcbind),
.servername = "localhost",
.program = &rpcb_program,
.version = RPCBVERS_2,
.authflavor = RPC_AUTH_NULL,
};
struct rpc_clnt *clnt, *clnt4;
int result = 0;
/*
* Because we requested an RPC PING at transport creation time,
* this works only if the user space portmapper is rpcbind, and
* it's listening on AF_LOCAL on the named socket.
*/
clnt = rpc_create(&args);
if (IS_ERR(clnt)) {
dprintk("RPC: failed to create AF_LOCAL rpcbind "
"client (errno %ld).\n", PTR_ERR(clnt));
result = -PTR_ERR(clnt);
goto out;
}
clnt4 = rpc_bind_new_program(clnt, &rpcb_program, RPCBVERS_4);
if (IS_ERR(clnt4)) {
dprintk("RPC: failed to bind second program to "
"rpcbind v4 client (errno %ld).\n",
PTR_ERR(clnt4));
clnt4 = NULL;
}
/* Protected by rpcb_create_local_mutex */
rpcb_local_clnt = clnt;
rpcb_local_clnt4 = clnt4;
static DEFINE_MUTEX(rpcb_create_local_mutex);
out:
return result;
}
/*
* Returns zero on success, otherwise a negative errno value
* is returned.
*/
static int rpcb_create_local(void)
static int rpcb_create_local_net(void)
{
static const struct sockaddr_in rpcb_inaddr_loopback = {
.sin_family = AF_INET,
.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
.sin_port = htons(RPCBIND_PORT),
};
struct rpc_create_args args = {
.net = &init_net,
.protocol = XPRT_TRANSPORT_TCP,
......@@ -186,13 +238,6 @@ static int rpcb_create_local(void)
struct rpc_clnt *clnt, *clnt4;
int result = 0;
if (rpcb_local_clnt)
return result;
mutex_lock(&rpcb_create_local_mutex);
if (rpcb_local_clnt)
goto out;
clnt = rpc_create(&args);
if (IS_ERR(clnt)) {
dprintk("RPC: failed to create local rpcbind "
......@@ -214,9 +259,33 @@ static int rpcb_create_local(void)
clnt4 = NULL;
}
/* Protected by rpcb_create_local_mutex */
rpcb_local_clnt = clnt;
rpcb_local_clnt4 = clnt4;
out:
return result;
}
/*
* Returns zero on success, otherwise a negative errno value
* is returned.
*/
static int rpcb_create_local(void)
{
static DEFINE_MUTEX(rpcb_create_local_mutex);
int result = 0;
if (rpcb_local_clnt)
return result;
mutex_lock(&rpcb_create_local_mutex);
if (rpcb_local_clnt)
goto out;
if (rpcb_create_local_unix() != 0)
result = rpcb_create_local_net();
out:
mutex_unlock(&rpcb_create_local_mutex);
return result;
......
......@@ -942,6 +942,8 @@ static void svc_unregister(const struct svc_serv *serv)
if (progp->pg_vers[i]->vs_hidden)
continue;
dprintk("svc: attempting to unregister %sv%u\n",
progp->pg_name, i);
__svc_unregister(progp->pg_prog, i, progp->pg_name);
}
}
......
......@@ -19,6 +19,7 @@
*/
#include <linux/types.h>
#include <linux/string.h>
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/capability.h>
......@@ -28,6 +29,7 @@
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/un.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
......@@ -45,6 +47,9 @@
#include <net/tcp.h>
#include "sunrpc.h"
static void xs_close(struct rpc_xprt *xprt);
/*
* xprtsock tunables
*/
......@@ -261,6 +266,11 @@ static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
return (struct sockaddr *) &xprt->addr;
}
static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
{
return (struct sockaddr_un *) &xprt->addr;
}
static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
{
return (struct sockaddr_in *) &xprt->addr;
......@@ -276,23 +286,34 @@ static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
struct sockaddr *sap = xs_addr(xprt);
struct sockaddr_in6 *sin6;
struct sockaddr_in *sin;
struct sockaddr_un *sun;
char buf[128];
(void)rpc_ntop(sap, buf, sizeof(buf));
xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
switch (sap->sa_family) {
case AF_LOCAL:
sun = xs_addr_un(xprt);
strlcpy(buf, sun->sun_path, sizeof(buf));
xprt->address_strings[RPC_DISPLAY_ADDR] =
kstrdup(buf, GFP_KERNEL);
break;
case AF_INET:
(void)rpc_ntop(sap, buf, sizeof(buf));
xprt->address_strings[RPC_DISPLAY_ADDR] =
kstrdup(buf, GFP_KERNEL);
sin = xs_addr_in(xprt);
snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
break;
case AF_INET6:
(void)rpc_ntop(sap, buf, sizeof(buf));
xprt->address_strings[RPC_DISPLAY_ADDR] =
kstrdup(buf, GFP_KERNEL);
sin6 = xs_addr_in6(xprt);
snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
break;
default:
BUG();
}
xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
}
......@@ -495,6 +516,70 @@ static int xs_nospace(struct rpc_task *task)
return ret;
}
/*
* Construct a stream transport record marker in @buf.
*/
static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
{
u32 reclen = buf->len - sizeof(rpc_fraghdr);
rpc_fraghdr *base = buf->head[0].iov_base;
*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
}
/**
* xs_local_send_request - write an RPC request to an AF_LOCAL socket
* @task: RPC task that manages the state of an RPC request
*
* Return values:
* 0: The request has been sent
* EAGAIN: The socket was blocked, please call again later to
* complete the request
* ENOTCONN: Caller needs to invoke connect logic then call again
* other: Some other error occured, the request was not sent
*/
static int xs_local_send_request(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
int status;
xs_encode_stream_record_marker(&req->rq_snd_buf);
xs_pktdump("packet data:",
req->rq_svec->iov_base, req->rq_svec->iov_len);
status = xs_sendpages(transport->sock, NULL, 0,
xdr, req->rq_bytes_sent);
dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - req->rq_bytes_sent, status);
if (likely(status >= 0)) {
req->rq_bytes_sent += status;
req->rq_xmit_bytes_sent += status;
if (likely(req->rq_bytes_sent >= req->rq_slen)) {
req->rq_bytes_sent = 0;
return 0;
}
status = -EAGAIN;
}
switch (status) {
case -EAGAIN:
status = xs_nospace(task);
break;
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
case -EPIPE:
xs_close(xprt);
status = -ENOTCONN;
}
return status;
}
/**
* xs_udp_send_request - write an RPC request to a UDP socket
* @task: address of RPC task that manages the state of an RPC request
......@@ -574,13 +659,6 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt)
kernel_sock_shutdown(sock, SHUT_WR);
}
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
u32 reclen = buf->len - sizeof(rpc_fraghdr);
rpc_fraghdr *base = buf->head[0].iov_base;
*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}
/**
* xs_tcp_send_request - write an RPC request to a TCP socket
* @task: address of RPC task that manages the state of an RPC request
......@@ -603,7 +681,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
struct xdr_buf *xdr = &req->rq_snd_buf;
int status;
xs_encode_tcp_record_marker(&req->rq_snd_buf);
xs_encode_stream_record_marker(&req->rq_snd_buf);
xs_pktdump("packet data:",
req->rq_svec->iov_base,
......@@ -785,6 +863,88 @@ static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
return (struct rpc_xprt *) sk->sk_user_data;
}
static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
{
struct xdr_skb_reader desc = {
.skb = skb,
.offset = sizeof(rpc_fraghdr),
.count = skb->len - sizeof(rpc_fraghdr),
};
if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
return -1;
if (desc.count)
return -1;
return 0;
}
/**
* xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
* @sk: socket with data to read
* @len: how much data to read
*
* Currently this assumes we can read the whole reply in a single gulp.
*/
static void xs_local_data_ready(struct sock *sk, int len)
{
struct rpc_task *task;
struct rpc_xprt *xprt;
struct rpc_rqst *rovr;
struct sk_buff *skb;
int err, repsize, copied;
u32 _xid;
__be32 *xp;
read_lock_bh(&sk->sk_callback_lock);
dprintk("RPC: %s...\n", __func__);
xprt = xprt_from_sock(sk);
if (xprt == NULL)
goto out;
skb = skb_recv_datagram(sk, 0, 1, &err);
if (skb == NULL)
goto out;
if (xprt->shutdown)
goto dropit;
repsize = skb->len - sizeof(rpc_fraghdr);
if (repsize < 4) {
dprintk("RPC: impossible RPC reply size %d\n", repsize);
goto dropit;
}
/* Copy the XID from the skb... */
xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
if (xp == NULL)
goto dropit;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->transport_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
task = rovr->rq_task;
copied = rovr->rq_private_buf.buflen;
if (copied > repsize)
copied = repsize;
if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n");
goto out_unlock;
}
xprt_complete_rqst(task, copied);
out_unlock:
spin_unlock(&xprt->transport_lock);
dropit:
skb_free_datagram(sk, skb);
out:
read_unlock_bh(&sk->sk_callback_lock);
}
/**
* xs_udp_data_ready - "data ready" callback for UDP sockets
* @sk: socket with data to read
......@@ -1344,7 +1504,6 @@ static void xs_tcp_state_change(struct sock *sk)
case TCP_CLOSE_WAIT:
/* The server initiated a shutdown of the socket */
xprt_force_disconnect(xprt);
case TCP_SYN_SENT:
xprt->connect_cookie++;
case TCP_CLOSING:
/*
......@@ -1571,11 +1730,31 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
return err;
}
/*
* We don't support autobind on AF_LOCAL sockets
*/
static void xs_local_rpcbind(struct rpc_task *task)
{
xprt_set_bound(task->tk_xprt);
}
static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
{
}
#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key xs_key[2];
static struct lock_class_key xs_slock_key[2];
static inline void xs_reclassify_socketu(struct socket *sock)
{
struct sock *sk = sock->sk;
BUG_ON(sock_owned_by_user(sk));
sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
&xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
}
static inline void xs_reclassify_socket4(struct socket *sock)
{
struct sock *sk = sock->sk;
......@@ -1597,6 +1776,9 @@ static inline void xs_reclassify_socket6(struct socket *sock)
static inline void xs_reclassify_socket(int family, struct socket *sock)
{
switch (family) {
case AF_LOCAL:
xs_reclassify_socketu(sock);
break;
case AF_INET:
xs_reclassify_socket4(sock);
break;
......@@ -1606,6 +1788,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock)
}
}
#else
static inline void xs_reclassify_socketu(struct socket *sock)
{
}
static inline void xs_reclassify_socket4(struct socket *sock)
{
}
......@@ -1644,6 +1830,94 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
return ERR_PTR(err);
}
static int xs_local_finish_connecting(struct rpc_xprt *xprt,
struct socket *sock)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
xprt);
if (!transport->inet) {
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock);
xs_save_old_callbacks(transport, sk);
sk->sk_user_data = xprt;
sk->sk_data_ready = xs_local_data_ready;
sk->sk_write_space = xs_udp_write_space;
sk->sk_error_report = xs_error_report;
sk->sk_allocation = GFP_ATOMIC;
xprt_clear_connected(xprt);
/* Reset to new socket */
transport->sock = sock;
transport->inet = sk;
write_unlock_bh(&sk->sk_callback_lock);
}
/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
}
/**
* xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
* @xprt: RPC transport to connect
* @transport: socket transport to connect
* @create_sock: function to create a socket of the correct type
*
* Invoked by a work queue tasklet.
*/
static void xs_local_setup_socket(struct work_struct *work)
{
struct sock_xprt *transport =
container_of(work, struct sock_xprt, connect_worker.work);
struct rpc_xprt *xprt = &transport->xprt;
struct socket *sock;
int status = -EIO;
if (xprt->shutdown)
goto out;
clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
status = __sock_create(xprt->xprt_net, AF_LOCAL,
SOCK_STREAM, 0, &sock, 1);
if (status < 0) {
dprintk("RPC: can't create AF_LOCAL "
"transport socket (%d).\n", -status);
goto out;
}
xs_reclassify_socketu(sock);
dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n",
xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
status = xs_local_finish_connecting(xprt, sock);
switch (status) {
case 0:
dprintk("RPC: xprt %p connected to %s\n",
xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
xprt_set_connected(xprt);
break;
case -ENOENT:
dprintk("RPC: xprt %p: socket %s does not exist\n",
xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
break;
default:
printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
__func__, -status,
xprt->address_strings[RPC_DISPLAY_ADDR]);
}
out:
xprt_clear_connecting(xprt);
xprt_wake_pending_tasks(xprt, status);
}
static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
......@@ -1758,6 +2032,7 @@ static void xs_tcp_reuse_connection(struct sock_xprt *transport)
static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
int ret = -ENOTCONN;
if (!transport->inet) {
struct sock *sk = sock->sk;
......@@ -1789,12 +2064,22 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
}
if (!xprt_bound(xprt))
return -ENOTCONN;
goto out;
/* Tell the socket layer to start connecting... */
xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies;
return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
switch (ret) {
case 0:
case -EINPROGRESS:
/* SYN_SENT! */
xprt->connect_cookie++;
if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
}
out:
return ret;
}
/**
......@@ -1916,6 +2201,32 @@ static void xs_connect(struct rpc_task *task)
}
}
/**
* xs_local_print_stats - display AF_LOCAL socket-specifc stats
* @xprt: rpc_xprt struct containing statistics
* @seq: output file
*
*/
static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
long idle_time = 0;
if (xprt_connected(xprt))
idle_time = (long)(jiffies - xprt->last_used) / HZ;
seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
"%llu %llu\n",
xprt->stat.bind_count,
xprt->stat.connect_count,
xprt->stat.connect_time,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
xprt->stat.bad_xids,
xprt->stat.req_u,
xprt->stat.bklog_u);
}
/**
* xs_udp_print_stats - display UDP socket-specifc stats
* @xprt: rpc_xprt struct containing statistics
......@@ -2014,10 +2325,7 @@ static int bc_sendto(struct rpc_rqst *req)
unsigned long headoff;
unsigned long tailoff;
/*
* Set up the rpc header and record marker stuff
*/
xs_encode_tcp_record_marker(xbufp);
xs_encode_stream_record_marker(xbufp);
tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
......@@ -2089,6 +2397,21 @@ static void bc_destroy(struct rpc_xprt *xprt)
{
}
static struct rpc_xprt_ops xs_local_ops = {
.reserve_xprt = xprt_reserve_xprt,
.release_xprt = xs_tcp_release_xprt,
.rpcbind = xs_local_rpcbind,
.set_port = xs_local_set_port,
.connect = xs_connect,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_local_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
.close = xs_close,
.destroy = xs_destroy,
.print_stats = xs_local_print_stats,
};
static struct rpc_xprt_ops xs_udp_ops = {
.set_buffer_size = xs_udp_set_buffer_size,
.reserve_xprt = xprt_reserve_xprt_cong,
......@@ -2150,6 +2473,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
};
switch (family) {
case AF_LOCAL:
break;
case AF_INET:
memcpy(sap, &sin, sizeof(sin));
break;
......@@ -2197,6 +2522,70 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
return xprt;
}
static const struct rpc_timeout xs_local_default_timeout = {
.to_initval = 10 * HZ,
.to_maxval = 10 * HZ,
.to_retries = 2,
};
/**
* xs_setup_local - Set up transport to use an AF_LOCAL socket
* @args: rpc transport creation arguments
*
* AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
*/
static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
{
struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
struct sock_xprt *transport;
struct rpc_xprt *xprt;
struct rpc_xprt *ret;
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
if (IS_ERR(xprt))
return xprt;
transport = container_of(xprt, struct sock_xprt, xprt);
xprt->prot = 0;
xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
xprt->bind_timeout = XS_BIND_TO;
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
xprt->idle_timeout = XS_IDLE_DISC_TO;
xprt->ops = &xs_local_ops;
xprt->timeout = &xs_local_default_timeout;
switch (sun->sun_family) {
case AF_LOCAL:
if (sun->sun_path[0] != '/') {
dprintk("RPC: bad AF_LOCAL address: %s\n",
sun->sun_path);
ret = ERR_PTR(-EINVAL);
goto out_err;
}
xprt_set_bound(xprt);
INIT_DELAYED_WORK(&transport->connect_worker,
xs_local_setup_socket);
xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
break;
default:
ret = ERR_PTR(-EAFNOSUPPORT);
goto out_err;
}
dprintk("RPC: set up xprt to %s via AF_LOCAL\n",
xprt->address_strings[RPC_DISPLAY_ADDR]);
if (try_module_get(THIS_MODULE))
return xprt;
ret = ERR_PTR(-EINVAL);
out_err:
xprt_free(xprt);
return ret;
}
static const struct rpc_timeout xs_udp_default_timeout = {
.to_initval = 5 * HZ,
.to_maxval = 30 * HZ,
......@@ -2438,6 +2827,14 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
return ret;
}
static struct xprt_class xs_local_transport = {
.list = LIST_HEAD_INIT(xs_local_transport.list),
.name = "named UNIX socket",
.owner = THIS_MODULE,
.ident = XPRT_TRANSPORT_LOCAL,
.setup = xs_setup_local,
};
static struct xprt_class xs_udp_transport = {
.list = LIST_HEAD_INIT(xs_udp_transport.list),
.name = "udp",
......@@ -2473,6 +2870,7 @@ int init_socket_xprt(void)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
#endif
xprt_register_transport(&xs_local_transport);
xprt_register_transport(&xs_udp_transport);
xprt_register_transport(&xs_tcp_transport);
xprt_register_transport(&xs_bc_tcp_transport);
......@@ -2493,6 +2891,7 @@ void cleanup_socket_xprt(void)
}
#endif
xprt_unregister_transport(&xs_local_transport);
xprt_unregister_transport(&xs_udp_transport);
xprt_unregister_transport(&xs_tcp_transport);
xprt_unregister_transport(&xs_bc_tcp_transport);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment