Commit 9903cd1c authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

[PATCH] RPC: transport switch function naming

 Introduce block header comments and a function naming convention to the
 socket transport implementation.  Provide a debug setting for transports
 that is separate from RPCDBG_XPRT.  Eliminate xprt_default_timeout().

 Provide block comments for exposed interfaces in xprt.c, and eliminate
 the useless obvious comments.

 Convert printk's to dprintk's.

 Test-plan:
 Compile kernel with CONFIG_NFS enabled.

 Version: Thu, 11 Aug 2005 16:04:04 -0400
Signed-off-by: default avatarChuck Lever <cel@netapp.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent a246b010
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#define RPCDBG_AUTH 0x0010 #define RPCDBG_AUTH 0x0010
#define RPCDBG_PMAP 0x0020 #define RPCDBG_PMAP 0x0020
#define RPCDBG_SCHED 0x0040 #define RPCDBG_SCHED 0x0040
#define RPCDBG_TRANS 0x0080
#define RPCDBG_SVCSOCK 0x0100 #define RPCDBG_SVCSOCK 0x0100
#define RPCDBG_SVCDSP 0x0200 #define RPCDBG_SVCDSP 0x0200
#define RPCDBG_MISC 0x0400 #define RPCDBG_MISC 0x0400
......
...@@ -227,9 +227,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) ...@@ -227,9 +227,6 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
xprt->cwnd = cwnd; xprt->cwnd = cwnd;
} }
/*
* Reset the major timeout value
*/
static void xprt_reset_majortimeo(struct rpc_rqst *req) static void xprt_reset_majortimeo(struct rpc_rqst *req)
{ {
struct rpc_timeout *to = &req->rq_xprt->timeout; struct rpc_timeout *to = &req->rq_xprt->timeout;
...@@ -244,8 +241,10 @@ static void xprt_reset_majortimeo(struct rpc_rqst *req) ...@@ -244,8 +241,10 @@ static void xprt_reset_majortimeo(struct rpc_rqst *req)
req->rq_majortimeo += jiffies; req->rq_majortimeo += jiffies;
} }
/* /**
* Adjust timeout values etc for next retransmit * xprt_adjust_timeout - adjust timeout values for next retransmit
* @req: RPC request containing parameters to use for the adjustment
*
*/ */
int xprt_adjust_timeout(struct rpc_rqst *req) int xprt_adjust_timeout(struct rpc_rqst *req)
{ {
...@@ -291,8 +290,10 @@ xprt_socket_autoclose(void *args) ...@@ -291,8 +290,10 @@ xprt_socket_autoclose(void *args)
xprt_release_write(xprt, NULL); xprt_release_write(xprt, NULL);
} }
/* /**
* Mark a transport as disconnected * xprt_disconnect - mark a transport as disconnected
* @xprt: transport to flag for disconnect
*
*/ */
void xprt_disconnect(struct rpc_xprt *xprt) void xprt_disconnect(struct rpc_xprt *xprt)
{ {
...@@ -303,9 +304,6 @@ void xprt_disconnect(struct rpc_xprt *xprt) ...@@ -303,9 +304,6 @@ void xprt_disconnect(struct rpc_xprt *xprt)
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
} }
/*
* Used to allow disconnection when we've been idle
*/
static void static void
xprt_init_autodisconnect(unsigned long data) xprt_init_autodisconnect(unsigned long data)
{ {
...@@ -327,8 +325,9 @@ xprt_init_autodisconnect(unsigned long data) ...@@ -327,8 +325,9 @@ xprt_init_autodisconnect(unsigned long data)
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
} }
/* /**
* Attempt to connect a TCP socket. * xprt_connect - schedule a transport connect operation
* @task: RPC task that is requesting the connect
* *
*/ */
void xprt_connect(struct rpc_task *task) void xprt_connect(struct rpc_task *task)
...@@ -361,11 +360,7 @@ void xprt_connect(struct rpc_task *task) ...@@ -361,11 +360,7 @@ void xprt_connect(struct rpc_task *task)
return; return;
} }
/* static void xprt_connect_status(struct rpc_task *task)
* We arrive here when awoken from waiting on connection establishment.
*/
static void
xprt_connect_status(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
...@@ -404,8 +399,11 @@ xprt_connect_status(struct rpc_task *task) ...@@ -404,8 +399,11 @@ xprt_connect_status(struct rpc_task *task)
} }
} }
/* /**
* Look up the RPC request corresponding to a reply, and then lock it. * xprt_lookup_rqst - find an RPC request corresponding to an XID
* @xprt: transport on which the original request was transmitted
* @xid: RPC XID of incoming reply
*
*/ */
struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{ {
...@@ -422,9 +420,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) ...@@ -422,9 +420,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
return req; return req;
} }
/* /**
* Complete reply received. * xprt_complete_rqst - called when reply processing is complete
* The TCP code relies on us to remove the request from xprt->pending. * @xprt: controlling transport
* @req: RPC request that just completed
* @copied: actual number of bytes received from the transport
*
*/ */
void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
{ {
...@@ -498,12 +499,12 @@ xprt_timer(struct rpc_task *task) ...@@ -498,12 +499,12 @@ xprt_timer(struct rpc_task *task)
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
} }
/* /**
* Place the actual RPC call. * xprt_prepare_transmit - reserve the transport before sending a request
* We have to copy the iovec because sendmsg fiddles with its contents. * @task: RPC task about to send a request
*
*/ */
int int xprt_prepare_transmit(struct rpc_task *task)
xprt_prepare_transmit(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
...@@ -533,8 +534,13 @@ xprt_prepare_transmit(struct rpc_task *task) ...@@ -533,8 +534,13 @@ xprt_prepare_transmit(struct rpc_task *task)
return err; return err;
} }
void /**
xprt_transmit(struct rpc_task *task) * xprt_transmit - send an RPC request on a transport
* @task: controlling RPC task
*
* We have to copy the iovec because sendmsg fiddles with its contents.
*/
void xprt_transmit(struct rpc_task *task)
{ {
struct rpc_clnt *clnt = task->tk_client; struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
...@@ -604,11 +610,7 @@ xprt_transmit(struct rpc_task *task) ...@@ -604,11 +610,7 @@ xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
} }
/* static inline void do_xprt_reserve(struct rpc_task *task)
* Reserve an RPC call slot.
*/
static inline void
do_xprt_reserve(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
...@@ -628,8 +630,14 @@ do_xprt_reserve(struct rpc_task *task) ...@@ -628,8 +630,14 @@ do_xprt_reserve(struct rpc_task *task)
rpc_sleep_on(&xprt->backlog, task, NULL, NULL); rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
} }
void /**
xprt_reserve(struct rpc_task *task) * xprt_reserve - allocate an RPC request slot
* @task: RPC task requesting a slot allocation
*
* If no more slots are available, place the task on the transport's
* backlog queue.
*/
void xprt_reserve(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
...@@ -641,9 +649,6 @@ xprt_reserve(struct rpc_task *task) ...@@ -641,9 +649,6 @@ xprt_reserve(struct rpc_task *task)
} }
} }
/*
* Allocate a 'unique' XID
*/
static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
{ {
return xprt->xid++; return xprt->xid++;
...@@ -654,11 +659,7 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt) ...@@ -654,11 +659,7 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt)
get_random_bytes(&xprt->xid, sizeof(xprt->xid)); get_random_bytes(&xprt->xid, sizeof(xprt->xid));
} }
/* static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
* Initialize RPC request
*/
static void
xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
...@@ -670,11 +671,12 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -670,11 +671,12 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
req, ntohl(req->rq_xid)); req, ntohl(req->rq_xid));
} }
/* /**
* Release an RPC call slot * xprt_release - release an RPC request slot
* @task: task which is finished with the slot
*
*/ */
void void xprt_release(struct rpc_task *task)
xprt_release(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
...@@ -702,11 +704,14 @@ xprt_release(struct rpc_task *task) ...@@ -702,11 +704,14 @@ xprt_release(struct rpc_task *task)
spin_unlock(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
} }
/* /**
* Set constant timeout * xprt_set_timeout - set constant RPC timeout
* @to: RPC timeout parameters to set up
* @retr: number of retries
* @incr: amount of increase after each retry
*
*/ */
void void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
{ {
to->to_initval = to->to_initval =
to->to_increment = incr; to->to_increment = incr;
...@@ -715,11 +720,7 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) ...@@ -715,11 +720,7 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
to->to_exponential = 0; to->to_exponential = 0;
} }
/* static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
* Initialize an RPC client
*/
static struct rpc_xprt *
xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
{ {
int result; int result;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
...@@ -778,11 +779,14 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) ...@@ -778,11 +779,14 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
return xprt; return xprt;
} }
/* /**
* Create an RPC client transport given the protocol and peer address. * xprt_create_proto - create an RPC client transport
* @proto: requested transport protocol
* @sap: remote peer's address
* @to: timeout parameters for new transport
*
*/ */
struct rpc_xprt * struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
...@@ -794,11 +798,7 @@ xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) ...@@ -794,11 +798,7 @@ xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
return xprt; return xprt;
} }
/* static void xprt_shutdown(struct rpc_xprt *xprt)
* Prepare for transport shutdown.
*/
static void
xprt_shutdown(struct rpc_xprt *xprt)
{ {
xprt->shutdown = 1; xprt->shutdown = 1;
rpc_wake_up(&xprt->sending); rpc_wake_up(&xprt->sending);
...@@ -809,21 +809,18 @@ xprt_shutdown(struct rpc_xprt *xprt) ...@@ -809,21 +809,18 @@ xprt_shutdown(struct rpc_xprt *xprt)
del_timer_sync(&xprt->timer); del_timer_sync(&xprt->timer);
} }
/* static int xprt_clear_backlog(struct rpc_xprt *xprt) {
* Clear the xprt backlog queue
*/
static int
xprt_clear_backlog(struct rpc_xprt *xprt) {
rpc_wake_up_next(&xprt->backlog); rpc_wake_up_next(&xprt->backlog);
wake_up(&xprt->cong_wait); wake_up(&xprt->cong_wait);
return 1; return 1;
} }
/* /**
* Destroy an RPC transport, killing off all requests. * xprt_destroy - destroy an RPC transport, killing off all requests.
* @xprt: transport to destroy
*
*/ */
int int xprt_destroy(struct rpc_xprt *xprt)
xprt_destroy(struct rpc_xprt *xprt)
{ {
dprintk("RPC: destroying transport %p\n", xprt); dprintk("RPC: destroying transport %p\n", xprt);
xprt_shutdown(xprt); xprt_shutdown(xprt);
......
...@@ -33,23 +33,21 @@ ...@@ -33,23 +33,21 @@
#include <net/udp.h> #include <net/udp.h>
#include <net/tcp.h> #include <net/tcp.h>
/*
* Maximum port number to use when requesting a reserved port.
*/
#define XS_MAX_RESVPORT (800U)
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
# undef RPC_DEBUG_DATA # undef RPC_DEBUG_DATA
# define RPCDBG_FACILITY RPCDBG_XPRT # define RPCDBG_FACILITY RPCDBG_TRANS
#endif #endif
#define XPRT_MAX_RESVPORT (800)
#ifdef RPC_DEBUG_DATA #ifdef RPC_DEBUG_DATA
/* static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
* Print the buffer contents (first 128 bytes only--just enough for
* diropres return).
*/
static void
xprt_pktdump(char *msg, u32 *packet, unsigned int count)
{ {
u8 *buf = (u8 *) packet; u8 *buf = (u8 *) packet;
int j; int j;
dprintk("RPC: %s\n", msg); dprintk("RPC: %s\n", msg);
for (j = 0; j < count && j < 128; j += 4) { for (j = 0; j < count && j < 128; j += 4) {
...@@ -64,25 +62,22 @@ xprt_pktdump(char *msg, u32 *packet, unsigned int count) ...@@ -64,25 +62,22 @@ xprt_pktdump(char *msg, u32 *packet, unsigned int count)
dprintk("\n"); dprintk("\n");
} }
#else #else
static inline void static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
xprt_pktdump(char *msg, u32 *packet, unsigned int count)
{ {
/* NOP */ /* NOP */
} }
#endif #endif
/* /**
* Look up RPC transport given an INET socket * xs_sendpages - write pages directly to a socket
* @sock: socket to send on
* @addr: UDP only -- address of destination
* @addrlen: UDP only -- length of destination address
* @xdr: buffer containing this request
* @base: starting position in the buffer
*
*/ */
static inline struct rpc_xprt * static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, int msgflags)
xprt_from_sock(struct sock *sk)
{
return (struct rpc_xprt *) sk->sk_user_data;
}
static int
xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
struct xdr_buf *xdr, unsigned int base, int msgflags)
{ {
struct page **ppage = xdr->pages; struct page **ppage = xdr->pages;
unsigned int len, pglen = xdr->page_len; unsigned int len, pglen = xdr->page_len;
...@@ -125,7 +120,7 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, ...@@ -125,7 +120,7 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
} }
if (base || xdr->page_base) { if (base || xdr->page_base) {
pglen -= base; pglen -= base;
base += xdr->page_base; base += xdr->page_base;
ppage += base >> PAGE_CACHE_SHIFT; ppage += base >> PAGE_CACHE_SHIFT;
base &= ~PAGE_CACHE_MASK; base &= ~PAGE_CACHE_MASK;
} }
...@@ -176,23 +171,25 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, ...@@ -176,23 +171,25 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
return ret; return ret;
} }
/* /**
* Write data to socket. * xs_sendmsg - write an RPC request to a socket
* @xprt: generic transport
* @req: the RPC request to write
*
*/ */
static inline int static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ {
struct socket *sock = xprt->sock; struct socket *sock = xprt->sock;
struct xdr_buf *xdr = &req->rq_snd_buf; struct xdr_buf *xdr = &req->rq_snd_buf;
struct sockaddr *addr = NULL; struct sockaddr *addr = NULL;
int addrlen = 0; int addrlen = 0;
unsigned int skip; unsigned int skip;
int result; int result;
if (!sock) if (!sock)
return -ENOTCONN; return -ENOTCONN;
xprt_pktdump("packet data:", xs_pktdump("packet data:",
req->rq_svec->iov_base, req->rq_svec->iov_base,
req->rq_svec->iov_len); req->rq_svec->iov_len);
...@@ -201,13 +198,13 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -201,13 +198,13 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
addr = (struct sockaddr *) &xprt->addr; addr = (struct sockaddr *) &xprt->addr;
addrlen = sizeof(xprt->addr); addrlen = sizeof(xprt->addr);
} }
/* Dont repeat bytes */ /* Don't repeat bytes */
skip = req->rq_bytes_sent; skip = req->rq_bytes_sent;
clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); result = xs_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); dprintk("RPC: xs_sendmsg(%d) = %d\n", xdr->len - skip, result);
if (result >= 0) if (result >= 0)
return result; return result;
...@@ -215,8 +212,7 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -215,8 +212,7 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
switch (result) { switch (result) {
case -ECONNREFUSED: case -ECONNREFUSED:
/* When the server has died, an ICMP port unreachable message /* When the server has died, an ICMP port unreachable message
* prompts ECONNREFUSED. * prompts ECONNREFUSED. */
*/
case -EAGAIN: case -EAGAIN:
break; break;
case -ECONNRESET: case -ECONNRESET:
...@@ -227,13 +223,25 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -227,13 +223,25 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
result = -ENOTCONN; result = -ENOTCONN;
break; break;
default: default:
printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); break;
} }
return result; return result;
} }
static int /**
xprt_send_request(struct rpc_task *task) * xs_send_request - write an RPC request to a socket
* @task: address of RPC task that manages the state of an RPC request
*
* Return values:
* 0: The request has been sent
* EAGAIN: The socket was blocked, please call again later to
* complete the request
* other: Some other error occured, the request was not sent
*
* XXX: In the case of soft timeouts, should we eventually give up
* if the socket is not able to make progress?
*/
static int xs_send_request(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
...@@ -242,18 +250,18 @@ xprt_send_request(struct rpc_task *task) ...@@ -242,18 +250,18 @@ xprt_send_request(struct rpc_task *task)
/* set up everything as needed. */ /* set up everything as needed. */
/* Write the record marker */ /* Write the record marker */
if (xprt->stream) { if (xprt->stream) {
u32 *marker = req->rq_svec[0].iov_base; u32 *marker = req->rq_svec[0].iov_base;
*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
} }
/* Continue transmitting the packet/record. We must be careful /* Continue transmitting the packet/record. We must be careful
* to cope with writespace callbacks arriving _after_ we have * to cope with writespace callbacks arriving _after_ we have
* called xprt_sendmsg(). * called sendmsg().
*/ */
while (1) { while (1) {
req->rq_xtime = jiffies; req->rq_xtime = jiffies;
status = xprt_sendmsg(xprt, req); status = xs_sendmsg(xprt, req);
if (status < 0) if (status < 0)
break; break;
...@@ -285,7 +293,7 @@ xprt_send_request(struct rpc_task *task) ...@@ -285,7 +293,7 @@ xprt_send_request(struct rpc_task *task)
if (status == -EAGAIN) { if (status == -EAGAIN) {
if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
/* Protect against races with xprt_write_space */ /* Protect against races with xs_write_space */
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
/* Don't race with disconnect */ /* Don't race with disconnect */
if (!xprt_connected(xprt)) if (!xprt_connected(xprt))
...@@ -303,65 +311,77 @@ xprt_send_request(struct rpc_task *task) ...@@ -303,65 +311,77 @@ xprt_send_request(struct rpc_task *task)
return status; return status;
} }
/* /**
* Close down a transport socket * xs_close - close a socket
* @xprt: transport
*
*/ */
static void static void xs_close(struct rpc_xprt *xprt)
xprt_close(struct rpc_xprt *xprt)
{ {
struct socket *sock = xprt->sock; struct socket *sock = xprt->sock;
struct sock *sk = xprt->inet; struct sock *sk = xprt->inet;
if (!sk) if (!sk)
return; return;
dprintk("RPC: xs_close xprt %p\n", xprt);
write_lock_bh(&sk->sk_callback_lock); write_lock_bh(&sk->sk_callback_lock);
xprt->inet = NULL; xprt->inet = NULL;
xprt->sock = NULL; xprt->sock = NULL;
sk->sk_user_data = NULL; sk->sk_user_data = NULL;
sk->sk_data_ready = xprt->old_data_ready; sk->sk_data_ready = xprt->old_data_ready;
sk->sk_state_change = xprt->old_state_change; sk->sk_state_change = xprt->old_state_change;
sk->sk_write_space = xprt->old_write_space; sk->sk_write_space = xprt->old_write_space;
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
sk->sk_no_check = 0; sk->sk_no_check = 0;
sock_release(sock); sock_release(sock);
} }
static void xprt_socket_destroy(struct rpc_xprt *xprt) /**
* xs_destroy - prepare to shutdown a transport
* @xprt: doomed transport
*
*/
static void xs_destroy(struct rpc_xprt *xprt)
{ {
dprintk("RPC: xs_destroy xprt %p\n", xprt);
cancel_delayed_work(&xprt->sock_connect); cancel_delayed_work(&xprt->sock_connect);
flush_scheduled_work(); flush_scheduled_work();
xprt_disconnect(xprt); xprt_disconnect(xprt);
xprt_close(xprt); xs_close(xprt);
kfree(xprt->slot); kfree(xprt->slot);
} }
/* static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
* Input handler for RPC replies. Called from a bottom half and hence {
* atomic. return (struct rpc_xprt *) sk->sk_user_data;
}
/**
* xs_udp_data_ready - "data ready" callback for UDP sockets
* @sk: socket with data to read
* @len: how much data to read
*
*/ */
static void static void xs_udp_data_ready(struct sock *sk, int len)
udp_data_ready(struct sock *sk, int len)
{ {
struct rpc_task *task; struct rpc_task *task;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct rpc_rqst *rovr; struct rpc_rqst *rovr;
struct sk_buff *skb; struct sk_buff *skb;
int err, repsize, copied; int err, repsize, copied;
u32 _xid, *xp; u32 _xid, *xp;
read_lock(&sk->sk_callback_lock); read_lock(&sk->sk_callback_lock);
dprintk("RPC: udp_data_ready...\n"); dprintk("RPC: xs_udp_data_ready...\n");
if (!(xprt = xprt_from_sock(sk))) { if (!(xprt = xprt_from_sock(sk)))
printk("RPC: udp_data_ready request not found!\n");
goto out; goto out;
}
dprintk("RPC: udp_data_ready client %p\n", xprt);
if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
goto out; goto out;
...@@ -371,7 +391,7 @@ udp_data_ready(struct sock *sk, int len) ...@@ -371,7 +391,7 @@ udp_data_ready(struct sock *sk, int len)
repsize = skb->len - sizeof(struct udphdr); repsize = skb->len - sizeof(struct udphdr);
if (repsize < 4) { if (repsize < 4) {
printk("RPC: impossible RPC reply size %d!\n", repsize); dprintk("RPC: impossible RPC reply size %d!\n", repsize);
goto dropit; goto dropit;
} }
...@@ -410,11 +430,7 @@ udp_data_ready(struct sock *sk, int len) ...@@ -410,11 +430,7 @@ udp_data_ready(struct sock *sk, int len)
read_unlock(&sk->sk_callback_lock); read_unlock(&sk->sk_callback_lock);
} }
/* static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
* Copy from an skb into memory and shrink the skb.
*/
static inline size_t
tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
{ {
if (len > desc->count) if (len > desc->count)
len = desc->count; len = desc->count;
...@@ -430,18 +446,14 @@ tcp_copy_data(skb_reader_t *desc, void *p, size_t len) ...@@ -430,18 +446,14 @@ tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
return len; return len;
} }
/* static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
* TCP read fragment marker
*/
static inline void
tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
size_t len, used; size_t len, used;
char *p; char *p;
p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
used = tcp_copy_data(desc, p, len); used = xs_tcp_copy_data(desc, p, len);
xprt->tcp_offset += used; xprt->tcp_offset += used;
if (used != len) if (used != len)
return; return;
...@@ -455,15 +467,15 @@ tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -455,15 +467,15 @@ tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
xprt->tcp_offset = 0; xprt->tcp_offset = 0;
/* Sanity check of the record length */ /* Sanity check of the record length */
if (xprt->tcp_reclen < 4) { if (xprt->tcp_reclen < 4) {
printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); dprintk("RPC: invalid TCP record fragment length\n");
xprt_disconnect(xprt); xprt_disconnect(xprt);
return;
} }
dprintk("RPC: reading TCP record fragment of length %d\n", dprintk("RPC: reading TCP record fragment of length %d\n",
xprt->tcp_reclen); xprt->tcp_reclen);
} }
static void static void xs_tcp_check_recm(struct rpc_xprt *xprt)
tcp_check_recm(struct rpc_xprt *xprt)
{ {
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
...@@ -478,11 +490,7 @@ tcp_check_recm(struct rpc_xprt *xprt) ...@@ -478,11 +490,7 @@ tcp_check_recm(struct rpc_xprt *xprt)
} }
} }
/* static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
* TCP read xid
*/
static inline void
tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
size_t len, used; size_t len, used;
char *p; char *p;
...@@ -490,7 +498,7 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -490,7 +498,7 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
dprintk("RPC: reading XID (%Zu bytes)\n", len); dprintk("RPC: reading XID (%Zu bytes)\n", len);
p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
used = tcp_copy_data(desc, p, len); used = xs_tcp_copy_data(desc, p, len);
xprt->tcp_offset += used; xprt->tcp_offset += used;
if (used != len) if (used != len)
return; return;
...@@ -499,14 +507,10 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -499,14 +507,10 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
xprt->tcp_copied = 4; xprt->tcp_copied = 4;
dprintk("RPC: reading reply for XID %08x\n", dprintk("RPC: reading reply for XID %08x\n",
ntohl(xprt->tcp_xid)); ntohl(xprt->tcp_xid));
tcp_check_recm(xprt); xs_tcp_check_recm(xprt);
} }
/* static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
* TCP read and complete request
*/
static inline void
tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
struct rpc_rqst *req; struct rpc_rqst *req;
struct xdr_buf *rcvbuf; struct xdr_buf *rcvbuf;
...@@ -533,12 +537,12 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -533,12 +537,12 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
memcpy(&my_desc, desc, sizeof(my_desc)); memcpy(&my_desc, desc, sizeof(my_desc));
my_desc.count = len; my_desc.count = len;
r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
&my_desc, tcp_copy_data); &my_desc, xs_tcp_copy_data);
desc->count -= r; desc->count -= r;
desc->offset += r; desc->offset += r;
} else } else
r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
desc, tcp_copy_data); desc, xs_tcp_copy_data);
if (r > 0) { if (r > 0) {
xprt->tcp_copied += r; xprt->tcp_copied += r;
...@@ -581,14 +585,10 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -581,14 +585,10 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
xprt_complete_rqst(xprt, req, xprt->tcp_copied); xprt_complete_rqst(xprt, req, xprt->tcp_copied);
} }
spin_unlock(&xprt->sock_lock); spin_unlock(&xprt->sock_lock);
tcp_check_recm(xprt); xs_tcp_check_recm(xprt);
} }
/* static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
* TCP discard extra bytes from a short read
*/
static inline void
tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
size_t len; size_t len;
...@@ -599,16 +599,10 @@ tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) ...@@ -599,16 +599,10 @@ tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
desc->offset += len; desc->offset += len;
xprt->tcp_offset += len; xprt->tcp_offset += len;
dprintk("RPC: discarded %Zu bytes\n", len); dprintk("RPC: discarded %Zu bytes\n", len);
tcp_check_recm(xprt); xs_tcp_check_recm(xprt);
} }
/* static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
* TCP record receive routine
* We first have to grab the record marker, then the XID, then the data.
*/
static int
tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{ {
struct rpc_xprt *xprt = rd_desc->arg.data; struct rpc_xprt *xprt = rd_desc->arg.data;
skb_reader_t desc = { skb_reader_t desc = {
...@@ -616,64 +610,72 @@ tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, ...@@ -616,64 +610,72 @@ tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
.offset = offset, .offset = offset,
.count = len, .count = len,
.csum = 0 .csum = 0
}; };
dprintk("RPC: tcp_data_recv\n"); dprintk("RPC: xs_tcp_data_recv started\n");
do { do {
/* Read in a new fragment marker if necessary */ /* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */ /* Can we ever really expect to get completely empty fragments? */
if (xprt->tcp_flags & XPRT_COPY_RECM) { if (xprt->tcp_flags & XPRT_COPY_RECM) {
tcp_read_fraghdr(xprt, &desc); xs_tcp_read_fraghdr(xprt, &desc);
continue; continue;
} }
/* Read in the xid if necessary */ /* Read in the xid if necessary */
if (xprt->tcp_flags & XPRT_COPY_XID) { if (xprt->tcp_flags & XPRT_COPY_XID) {
tcp_read_xid(xprt, &desc); xs_tcp_read_xid(xprt, &desc);
continue; continue;
} }
/* Read in the request data */ /* Read in the request data */
if (xprt->tcp_flags & XPRT_COPY_DATA) { if (xprt->tcp_flags & XPRT_COPY_DATA) {
tcp_read_request(xprt, &desc); xs_tcp_read_request(xprt, &desc);
continue; continue;
} }
/* Skip over any trailing bytes on short reads */ /* Skip over any trailing bytes on short reads */
tcp_read_discard(xprt, &desc); xs_tcp_read_discard(xprt, &desc);
} while (desc.count); } while (desc.count);
dprintk("RPC: tcp_data_recv done\n"); dprintk("RPC: xs_tcp_data_recv done\n");
return len - desc.count; return len - desc.count;
} }
static void tcp_data_ready(struct sock *sk, int bytes) /**
* xs_tcp_data_ready - "data ready" callback for TCP sockets
* @sk: socket with data to read
* @bytes: how much data to read
*
*/
static void xs_tcp_data_ready(struct sock *sk, int bytes)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
read_descriptor_t rd_desc; read_descriptor_t rd_desc;
read_lock(&sk->sk_callback_lock); read_lock(&sk->sk_callback_lock);
dprintk("RPC: tcp_data_ready...\n"); dprintk("RPC: xs_tcp_data_ready...\n");
if (!(xprt = xprt_from_sock(sk))) { if (!(xprt = xprt_from_sock(sk)))
printk("RPC: tcp_data_ready socket info not found!\n");
goto out; goto out;
}
if (xprt->shutdown) if (xprt->shutdown)
goto out; goto out;
/* We use rd_desc to pass struct xprt to tcp_data_recv */ /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
rd_desc.arg.data = xprt; rd_desc.arg.data = xprt;
rd_desc.count = 65536; rd_desc.count = 65536;
tcp_read_sock(sk, &rd_desc, tcp_data_recv); tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
out: out:
read_unlock(&sk->sk_callback_lock); read_unlock(&sk->sk_callback_lock);
} }
static void /**
tcp_state_change(struct sock *sk) * xs_tcp_state_change - callback to handle TCP socket state changes
* @sk: socket whose state has changed
*
*/
static void xs_tcp_state_change(struct sock *sk)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
read_lock(&sk->sk_callback_lock); read_lock(&sk->sk_callback_lock);
if (!(xprt = xprt_from_sock(sk))) if (!(xprt = xprt_from_sock(sk)))
goto out; goto out;
dprintk("RPC: tcp_state_change client %p...\n", xprt); dprintk("RPC: xs_tcp_state_change client %p...\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n", dprintk("RPC: state %x conn %d dead %d zapped %d\n",
sk->sk_state, xprt_connected(xprt), sk->sk_state, xprt_connected(xprt),
sock_flag(sk, SOCK_DEAD), sock_flag(sk, SOCK_DEAD),
...@@ -703,17 +705,20 @@ tcp_state_change(struct sock *sk) ...@@ -703,17 +705,20 @@ tcp_state_change(struct sock *sk)
read_unlock(&sk->sk_callback_lock); read_unlock(&sk->sk_callback_lock);
} }
/* /**
* xs_write_space - callback invoked when socket buffer space becomes
* available
* @sk: socket whose state has changed
*
* Called when more output buffer space is available for this socket. * Called when more output buffer space is available for this socket.
* We try not to wake our writers until they can make "significant" * We try not to wake our writers until they can make "significant"
* progress, otherwise we'll waste resources thrashing sock_sendmsg * progress, otherwise we'll waste resources thrashing sock_sendmsg
* with a bunch of small requests. * with a bunch of small requests.
*/ */
static void static void xs_write_space(struct sock *sk)
xprt_write_space(struct sock *sk)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct socket *sock; struct socket *sock;
read_lock(&sk->sk_callback_lock); read_lock(&sk->sk_callback_lock);
if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
...@@ -743,11 +748,15 @@ xprt_write_space(struct sock *sk) ...@@ -743,11 +748,15 @@ xprt_write_space(struct sock *sk)
read_unlock(&sk->sk_callback_lock); read_unlock(&sk->sk_callback_lock);
} }
/* /**
* Set socket buffer length * xs_set_buffer_size - set send and receive limits
* @xprt: generic transport
*
* Set socket send and receive limits based on the
* sndsize and rcvsize fields in the generic transport
* structure. This applies only to UDP sockets.
*/ */
static void static void xs_set_buffer_size(struct rpc_xprt *xprt)
xprt_sock_setbufsize(struct rpc_xprt *xprt)
{ {
struct sock *sk = xprt->inet; struct sock *sk = xprt->inet;
...@@ -764,15 +773,12 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt) ...@@ -764,15 +773,12 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt)
} }
} }
/* static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
* Bind to a reserved port
*/
static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
{ {
struct sockaddr_in myaddr = { struct sockaddr_in myaddr = {
.sin_family = AF_INET, .sin_family = AF_INET,
}; };
int err, port; int err, port;
/* Were we already bound to a given port? Try to reuse it */ /* Were we already bound to a given port? Try to reuse it */
port = xprt->port; port = xprt->port;
...@@ -782,20 +788,47 @@ static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) ...@@ -782,20 +788,47 @@ static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
sizeof(myaddr)); sizeof(myaddr));
if (err == 0) { if (err == 0) {
xprt->port = port; xprt->port = port;
dprintk("RPC: xs_bindresvport bound to port %u\n",
port);
return 0; return 0;
} }
if (--port == 0) if (--port == 0)
port = XPRT_MAX_RESVPORT; port = XS_MAX_RESVPORT;
} while (err == -EADDRINUSE && port != xprt->port); } while (err == -EADDRINUSE && port != xprt->port);
printk("RPC: Can't bind to reserved port (%d).\n", -err); dprintk("RPC: can't bind to reserved port (%d).\n", -err);
return err; return err;
} }
static void static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
{ {
struct sock *sk = sock->sk; struct socket *sock;
int type, err;
dprintk("RPC: xs_create(%s %d)\n",
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
dprintk("RPC: can't create socket (%d).\n", -err);
return NULL;
}
/* If the caller has the capability, bind to a reserved port */
if (resvport && xs_bindresvport(xprt, sock) < 0)
goto failed;
return sock;
failed:
sock_release(sock);
return NULL;
}
static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
{
struct sock *sk = sock->sk;
if (xprt->inet) if (xprt->inet)
return; return;
...@@ -806,16 +839,16 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) ...@@ -806,16 +839,16 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
xprt->old_state_change = sk->sk_state_change; xprt->old_state_change = sk->sk_state_change;
xprt->old_write_space = sk->sk_write_space; xprt->old_write_space = sk->sk_write_space;
if (xprt->prot == IPPROTO_UDP) { if (xprt->prot == IPPROTO_UDP) {
sk->sk_data_ready = udp_data_ready; sk->sk_data_ready = xs_udp_data_ready;
sk->sk_no_check = UDP_CSUM_NORCV; sk->sk_no_check = UDP_CSUM_NORCV;
xprt_set_connected(xprt); xprt_set_connected(xprt);
} else { } else {
tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
sk->sk_data_ready = tcp_data_ready; sk->sk_data_ready = xs_tcp_data_ready;
sk->sk_state_change = tcp_state_change; sk->sk_state_change = xs_tcp_state_change;
xprt_clear_connected(xprt); xprt_clear_connected(xprt);
} }
sk->sk_write_space = xprt_write_space; sk->sk_write_space = xs_write_space;
/* Reset to new socket */ /* Reset to new socket */
xprt->sock = sock; xprt->sock = sock;
...@@ -825,39 +858,13 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) ...@@ -825,39 +858,13 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
return; return;
} }
/* /**
* Datastream sockets are created here, but xprt_connect will create * xs_connect_worker - try to connect a socket to a remote endpoint
* and connect stream sockets. * @args: RPC transport to connect
*
* Invoked by a work queue tasklet.
*/ */
static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) static void xs_connect_worker(void *args)
{
struct socket *sock;
int type, err;
dprintk("RPC: xprt_create_socket(%s %d)\n",
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
printk("RPC: can't create socket (%d).\n", -err);
return NULL;
}
/* If the caller has the capability, bind to a reserved port */
if (resvport && xprt_bindresvport(xprt, sock) < 0) {
printk("RPC: can't bind to reserved port.\n");
goto failed;
}
return sock;
failed:
sock_release(sock);
return NULL;
}
static void xprt_socket_connect(void *args)
{ {
struct rpc_xprt *xprt = (struct rpc_xprt *)args; struct rpc_xprt *xprt = (struct rpc_xprt *)args;
struct socket *sock = xprt->sock; struct socket *sock = xprt->sock;
...@@ -866,18 +873,20 @@ static void xprt_socket_connect(void *args) ...@@ -866,18 +873,20 @@ static void xprt_socket_connect(void *args)
if (xprt->shutdown || xprt->addr.sin_port == 0) if (xprt->shutdown || xprt->addr.sin_port == 0)
goto out; goto out;
dprintk("RPC: xs_connect_worker xprt %p\n", xprt);
/* /*
* Start by resetting any existing state * Start by resetting any existing state
*/ */
xprt_close(xprt); xs_close(xprt);
sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); sock = xs_create(xprt, xprt->prot, xprt->resvport);
if (sock == NULL) { if (sock == NULL) {
/* couldn't create socket or bind to reserved port; /* couldn't create socket or bind to reserved port;
* this is likely a permanent error, so cause an abort */ * this is likely a permanent error, so cause an abort */
goto out; goto out;
} }
xprt_bind_socket(xprt, sock); xs_bind(xprt, sock);
xprt_sock_setbufsize(xprt); xs_set_buffer_size(xprt);
status = 0; status = 0;
if (!xprt->stream) if (!xprt->stream)
...@@ -908,20 +917,23 @@ static void xprt_socket_connect(void *args) ...@@ -908,20 +917,23 @@ static void xprt_socket_connect(void *args)
smp_mb__after_clear_bit(); smp_mb__after_clear_bit();
} }
static void /**
xprt_connect_sock(struct rpc_task *task) * xs_connect - connect a socket to a remote endpoint
* @task: address of RPC task that manages state of connect request
*
* TCP: If the remote end dropped the connection, delay reconnecting.
*/
static void xs_connect(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
/* Note: if we are here due to a dropped connection if (xprt->sock != NULL) {
* we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ dprintk("RPC: xs_connect delayed xprt %p\n", xprt);
* seconds
*/
if (xprt->sock != NULL)
schedule_delayed_work(&xprt->sock_connect, schedule_delayed_work(&xprt->sock_connect,
RPC_REESTABLISH_TIMEOUT); RPC_REESTABLISH_TIMEOUT);
else { } else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
schedule_work(&xprt->sock_connect); schedule_work(&xprt->sock_connect);
/* flush_scheduled_work can sleep... */ /* flush_scheduled_work can sleep... */
if (!RPC_IS_ASYNC(task)) if (!RPC_IS_ASYNC(task))
...@@ -930,29 +942,23 @@ xprt_connect_sock(struct rpc_task *task) ...@@ -930,29 +942,23 @@ xprt_connect_sock(struct rpc_task *task)
} }
} }
/* static struct rpc_xprt_ops xs_ops = {
* Set default timeout parameters .set_buffer_size = xs_set_buffer_size,
*/ .connect = xs_connect,
static void .send_request = xs_send_request,
xprt_default_timeout(struct rpc_timeout *to, int proto) .close = xs_close,
{ .destroy = xs_destroy,
if (proto == IPPROTO_UDP)
xprt_set_timeout(to, 5, 5 * HZ);
else
xprt_set_timeout(to, 2, 60 * HZ);
}
static struct rpc_xprt_ops xprt_socket_ops = {
.set_buffer_size = xprt_sock_setbufsize,
.connect = xprt_connect_sock,
.send_request = xprt_send_request,
.close = xprt_close,
.destroy = xprt_socket_destroy,
}; };
extern unsigned int xprt_udp_slot_table_entries; extern unsigned int xprt_udp_slot_table_entries;
extern unsigned int xprt_tcp_slot_table_entries; extern unsigned int xprt_tcp_slot_table_entries;
/**
* xs_setup_udp - Set up transport to use a UDP socket
* @xprt: transport to set up
* @to: timeout parameters
*
*/
int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
{ {
size_t slot_table_size; size_t slot_table_size;
...@@ -967,7 +973,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) ...@@ -967,7 +973,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
memset(xprt->slot, 0, slot_table_size); memset(xprt->slot, 0, slot_table_size);
xprt->prot = IPPROTO_UDP; xprt->prot = IPPROTO_UDP;
xprt->port = XPRT_MAX_RESVPORT; xprt->port = XS_MAX_RESVPORT;
xprt->stream = 0; xprt->stream = 0;
xprt->nocong = 0; xprt->nocong = 0;
xprt->cwnd = RPC_INITCWND; xprt->cwnd = RPC_INITCWND;
...@@ -975,18 +981,24 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) ...@@ -975,18 +981,24 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
/* XXX: header size can vary due to auth type, IPv6, etc. */ /* XXX: header size can vary due to auth type, IPv6, etc. */
xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
xprt->ops = &xprt_socket_ops; xprt->ops = &xs_ops;
if (to) if (to)
xprt->timeout = *to; xprt->timeout = *to;
else else
xprt_default_timeout(to, xprt->prot); xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
return 0; return 0;
} }
/**
* xs_setup_tcp - Set up transport to use a TCP socket
* @xprt: transport to set up
* @to: timeout parameters
*
*/
int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
{ {
size_t slot_table_size; size_t slot_table_size;
...@@ -1001,21 +1013,21 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) ...@@ -1001,21 +1013,21 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
memset(xprt->slot, 0, slot_table_size); memset(xprt->slot, 0, slot_table_size);
xprt->prot = IPPROTO_TCP; xprt->prot = IPPROTO_TCP;
xprt->port = XPRT_MAX_RESVPORT; xprt->port = XS_MAX_RESVPORT;
xprt->stream = 1; xprt->stream = 1;
xprt->nocong = 1; xprt->nocong = 1;
xprt->cwnd = RPC_MAXCWND(xprt); xprt->cwnd = RPC_MAXCWND(xprt);
xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
xprt->max_payload = (1U << 31) - 1; xprt->max_payload = (1U << 31) - 1;
INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
xprt->ops = &xprt_socket_ops; xprt->ops = &xs_ops;
if (to) if (to)
xprt->timeout = *to; xprt->timeout = *to;
else else
xprt_default_timeout(to, xprt->prot); xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment