Commit 792438e5 authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-rewrite-20160830-1' of...

Merge tag 'rxrpc-rewrite-20160830-1' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Preparation for removal of use of skbs from AFS

Here's a set of patches that prepare the way for the removal of the use of
sk_buffs from fs/afs (they'll be entirely retained within net/rxrpc):

 (1) Fix a potential NULL-pointer deref in rxrpc_abort_calls().

 (2) Condense all the terminal call state machine states to a single one
     plus supplementary info.

 (3) Add a trace point for rxrpc call usage debugging.

 (4) Cleanups and missing headers.

 (5) Provide a way for AFS to ask about a call's peer address without
     having an sk_buff to query.

 (6) Use call->peer directly rather than going via call->conn (which might
     be NULL).

 (7) Pass struct socket * to various rxrpc kernel interface functions so
     they can use that directly rather than getting it from the rxrpc_call
     struct.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents cf4d13fe 4de48af6
......@@ -725,7 +725,8 @@ The kernel interface functions are as follows:
(*) End a client call.
void rxrpc_kernel_end_call(struct rxrpc_call *call);
void rxrpc_kernel_end_call(struct socket *sock,
struct rxrpc_call *call);
This is used to end a previously begun call. The user_call_ID is expunged
from AF_RXRPC's knowledge and will not be seen again in association with
......@@ -733,7 +734,9 @@ The kernel interface functions are as follows:
(*) Send data through a call.
int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
int rxrpc_kernel_send_data(struct socket *sock,
struct rxrpc_call *call,
struct msghdr *msg,
size_t len);
This is used to supply either the request part of a client call or the
......@@ -747,7 +750,9 @@ The kernel interface functions are as follows:
(*) Abort a call.
void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code);
void rxrpc_kernel_abort_call(struct socket *sock,
struct rxrpc_call *call,
u32 abort_code);
This is used to abort a call if it's still in an abortable state. The
abort code specified will be placed in the ABORT message sent.
......@@ -868,6 +873,13 @@ The kernel interface functions are as follows:
This is used to allocate a null RxRPC key that can be used to indicate
anonymous security for a particular domain.
(*) Get the peer address of a call.
void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call,
struct sockaddr_rxrpc *_srx);
This is used to find the remote peer address of a call.
=======================
CONFIGURABLE PARAMETERS
......
......@@ -17,10 +17,6 @@
#include "internal.h"
#include "afs_cm.h"
#if 0
struct workqueue_struct *afs_cm_workqueue;
#endif /* 0 */
static int afs_deliver_cb_init_call_back_state(struct afs_call *,
struct sk_buff *, bool);
static int afs_deliver_cb_init_call_back_state3(struct afs_call *,
......@@ -171,9 +167,9 @@ static void SRXAFSCB_CallBack(struct work_struct *work)
static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
bool last)
{
struct sockaddr_rxrpc srx;
struct afs_callback *cb;
struct afs_server *server;
struct in_addr addr;
__be32 *bp;
u32 tmp;
int ret, loop;
......@@ -182,6 +178,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
switch (call->unmarshall) {
case 0:
rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
call->offset = 0;
call->unmarshall++;
......@@ -282,13 +279,11 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
break;
}
call->state = AFS_CALL_REPLYING;
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
memcpy(&addr, &ip_hdr(skb)->saddr, 4);
server = afs_find_server(&addr);
server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
......@@ -319,12 +314,14 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call,
struct sk_buff *skb,
bool last)
{
struct sockaddr_rxrpc srx;
struct afs_server *server;
struct in_addr addr;
int ret;
_enter(",{%u},%d", skb->len, last);
rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
ret = afs_data_complete(call, skb, last);
if (ret < 0)
return ret;
......@@ -334,8 +331,7 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call,
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
memcpy(&addr, &ip_hdr(skb)->saddr, 4);
server = afs_find_server(&addr);
server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
......@@ -352,11 +348,13 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call,
struct sk_buff *skb,
bool last)
{
struct sockaddr_rxrpc srx;
struct afs_server *server;
struct in_addr addr;
_enter(",{%u},%d", skb->len, last);
rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
/* There are some arguments that we ignore */
afs_data_consumed(call, skb);
if (!last)
......@@ -367,8 +365,7 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call,
/* we'll need the file server record as that tells us which set of
* vnodes to operate upon */
memcpy(&addr, &ip_hdr(skb)->saddr, 4);
server = afs_find_server(&addr);
server = afs_find_server(&srx);
if (!server)
return -ENOTCONN;
call->server = server;
......@@ -426,7 +423,6 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
_enter("");
if (memcmp(r, &afs_uuid, sizeof(afs_uuid)) == 0)
reply.match = htonl(0);
else
......
......@@ -20,6 +20,7 @@
#include <linux/sched.h>
#include <linux/fscache.h>
#include <linux/backing-dev.h>
#include <net/af_rxrpc.h>
#include "afs.h"
#include "afs_vl.h"
......@@ -607,6 +608,8 @@ extern void afs_proc_cell_remove(struct afs_cell *);
/*
* rxrpc.c
*/
extern struct socket *afs_socket;
extern int afs_open_socket(void);
extern void afs_close_socket(void);
extern void afs_data_consumed(struct afs_call *, struct sk_buff *);
......@@ -654,7 +657,7 @@ do { \
extern struct afs_server *afs_lookup_server(struct afs_cell *,
const struct in_addr *);
extern struct afs_server *afs_find_server(const struct in_addr *);
extern struct afs_server *afs_find_server(const struct sockaddr_rxrpc *);
extern void afs_put_server(struct afs_server *);
extern void __exit afs_purge_servers(void);
......
......@@ -14,6 +14,7 @@
#include <linux/init.h>
#include <linux/completion.h>
#include <linux/sched.h>
#include <linux/random.h>
#include "internal.h"
MODULE_DESCRIPTION("AFS Client File System");
......
......@@ -16,7 +16,7 @@
#include "internal.h"
#include "afs_cm.h"
static struct socket *afs_socket; /* my RxRPC socket */
struct socket *afs_socket; /* my RxRPC socket */
static struct workqueue_struct *afs_async_calls;
static atomic_t afs_outstanding_calls;
static atomic_t afs_outstanding_skbs;
......@@ -207,7 +207,7 @@ static void afs_free_call(struct afs_call *call)
static void afs_end_call_nofree(struct afs_call *call)
{
if (call->rxcall) {
rxrpc_kernel_end_call(call->rxcall);
rxrpc_kernel_end_call(afs_socket, call->rxcall);
call->rxcall = NULL;
}
if (call->type->destructor)
......@@ -325,8 +325,8 @@ static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
* returns from sending the request */
if (first + loop >= last)
call->state = AFS_CALL_AWAIT_REPLY;
ret = rxrpc_kernel_send_data(call->rxcall, msg,
to - offset);
ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
msg, to - offset);
kunmap(pages[loop]);
if (ret < 0)
break;
......@@ -406,7 +406,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
* request */
if (!call->send_pages)
call->state = AFS_CALL_AWAIT_REPLY;
ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
ret = rxrpc_kernel_send_data(afs_socket, rxcall,
&msg, call->request_size);
if (ret < 0)
goto error_do_abort;
......@@ -421,7 +422,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
return wait_mode->wait(call);
error_do_abort:
rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT);
while ((skb = skb_dequeue(&call->rx_queue)))
afs_free_skb(skb);
error_kill_call:
......@@ -509,7 +510,8 @@ static void afs_deliver_to_call(struct afs_call *call)
if (call->state != AFS_CALL_AWAIT_REPLY)
abort_code = RXGEN_SS_UNMARSHAL;
do_abort:
rxrpc_kernel_abort_call(call->rxcall,
rxrpc_kernel_abort_call(afs_socket,
call->rxcall,
abort_code);
call->error = ret;
call->state = AFS_CALL_ERROR;
......@@ -605,7 +607,7 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
/* kill the call */
if (call->state < AFS_CALL_COMPLETE) {
_debug("call incomplete");
rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
rxrpc_kernel_abort_call(afs_socket, call->rxcall, RX_CALL_DEAD);
while ((skb = skb_dequeue(&call->rx_queue)))
afs_free_skb(skb);
}
......@@ -823,14 +825,15 @@ void afs_send_empty_reply(struct afs_call *call)
msg.msg_flags = 0;
call->state = AFS_CALL_AWAIT_ACK;
switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
case 0:
_leave(" [replied]");
return;
case -ENOMEM:
_debug("oom");
rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
rxrpc_kernel_abort_call(afs_socket, call->rxcall,
RX_USER_ABORT);
default:
afs_end_call(call);
_leave(" [error]");
......@@ -859,7 +862,7 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
msg.msg_flags = 0;
call->state = AFS_CALL_AWAIT_ACK;
n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
if (n >= 0) {
/* Success */
_leave(" [replied]");
......@@ -868,7 +871,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
if (n == -ENOMEM) {
_debug("oom");
rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
rxrpc_kernel_abort_call(afs_socket, call->rxcall,
RX_USER_ABORT);
}
afs_end_call(call);
_leave(" [error]");
......
......@@ -178,13 +178,18 @@ struct afs_server *afs_lookup_server(struct afs_cell *cell,
/*
* look up a server by its IP address
*/
struct afs_server *afs_find_server(const struct in_addr *_addr)
struct afs_server *afs_find_server(const struct sockaddr_rxrpc *srx)
{
struct afs_server *server = NULL;
struct rb_node *p;
struct in_addr addr = *_addr;
struct in_addr addr = srx->transport.sin.sin_addr;
_enter("%pI4", &addr.s_addr);
_enter("{%d,%pI4}", srx->transport.family, &addr.s_addr);
if (srx->transport.family != AF_INET) {
WARN(true, "AFS does not yes support non-IPv4 addresses\n");
return NULL;
}
read_lock(&afs_servers_lock);
......
......@@ -15,6 +15,9 @@
#include <linux/skbuff.h>
#include <linux/rxrpc.h>
struct key;
struct sock;
struct socket;
struct rxrpc_call;
/*
......@@ -39,15 +42,18 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
struct key *,
unsigned long,
gfp_t);
int rxrpc_kernel_send_data(struct rxrpc_call *, struct msghdr *, size_t);
int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
struct msghdr *, size_t);
void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
void rxrpc_kernel_abort_call(struct rxrpc_call *, u32);
void rxrpc_kernel_end_call(struct rxrpc_call *);
void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32);
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_is_data_last(struct sk_buff *);
u32 rxrpc_kernel_get_abort_code(struct sk_buff *);
int rxrpc_kernel_get_error_number(struct sk_buff *);
void rxrpc_kernel_free_skb(struct sk_buff *);
struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long);
int rxrpc_kernel_reject_call(struct socket *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
#endif /* _NET_RXRPC_H */
......@@ -16,6 +16,45 @@
#include <linux/tracepoint.h>
TRACE_EVENT(rxrpc_call,
TP_PROTO(struct rxrpc_call *call, int op, int usage, int nskb,
const void *where, const void *aux),
TP_ARGS(call, op, usage, nskb, where, aux),
TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(int, op )
__field(int, usage )
__field(int, nskb )
__field(const void *, where )
__field(const void *, aux )
),
TP_fast_assign(
__entry->call = call;
__entry->op = op;
__entry->usage = usage;
__entry->nskb = nskb;
__entry->where = where;
__entry->aux = aux;
),
TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
__entry->call,
(__entry->op == 0 ? "NWc" :
__entry->op == 1 ? "NWs" :
__entry->op == 2 ? "SEE" :
__entry->op == 3 ? "GET" :
__entry->op == 4 ? "Gsb" :
__entry->op == 5 ? "PUT" :
"Psb"),
__entry->usage,
__entry->nskb,
__entry->where,
__entry->aux)
);
TRACE_EVENT(rxrpc_skb,
TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count,
const void *where),
......
......@@ -279,15 +279,16 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call);
/**
* rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
* @sock: The socket the call is on
* @call: The call to end
*
* Allow a kernel service to end a call it was using. The call must be
* complete before this is called (the call should be aborted if necessary).
*/
void rxrpc_kernel_end_call(struct rxrpc_call *call)
void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
{
_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
rxrpc_remove_user_ID(call->socket, call);
rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
rxrpc_put_call(call);
}
EXPORT_SYMBOL(rxrpc_kernel_end_call);
......
......@@ -289,8 +289,6 @@ enum rxrpc_conn_proto_state {
RXRPC_CONN_SERVICE, /* Service secured connection */
RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */
RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */
RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */
RXRPC_CONN_LOCAL_ERROR, /* Conn terminated by local error */
RXRPC_CONN__NR_STATES
};
......@@ -344,7 +342,6 @@ struct rxrpc_connection {
enum rxrpc_conn_proto_state state : 8; /* current state of connection */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
int error; /* local error incurred */
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
unsigned int hi_serial; /* highest serial number received */
......@@ -411,13 +408,22 @@ enum rxrpc_call_state {
RXRPC_CALL_SERVER_ACK_REQUEST, /* - server pending ACK of request */
RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */
RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */
RXRPC_CALL_COMPLETE, /* - call completed */
RXRPC_CALL_COMPLETE, /* - call complete */
RXRPC_CALL_DEAD, /* - call is dead */
NR__RXRPC_CALL_STATES
};
/*
* Call completion condition (state == RXRPC_CALL_COMPLETE).
*/
enum rxrpc_call_completion {
RXRPC_CALL_SUCCEEDED, /* - Normal termination */
RXRPC_CALL_SERVER_BUSY, /* - call rejected by busy server */
RXRPC_CALL_REMOTELY_ABORTED, /* - call aborted by peer */
RXRPC_CALL_LOCALLY_ABORTED, /* - call aborted locally on error or close */
RXRPC_CALL_LOCAL_ERROR, /* - call failed due to local error */
RXRPC_CALL_NETWORK_ERROR, /* - call terminated by network error */
RXRPC_CALL_DEAD, /* - call is dead */
NR__RXRPC_CALL_STATES
NR__RXRPC_CALL_COMPLETIONS
};
/*
......@@ -451,14 +457,13 @@ struct rxrpc_call {
unsigned long events;
spinlock_t lock;
rwlock_t state_lock; /* lock for state transition */
u32 abort_code; /* Local/remote abort code */
int error; /* Local error incurred */
enum rxrpc_call_state state : 8; /* current state of call */
enum rxrpc_call_completion completion : 8; /* Call completion condition */
atomic_t usage;
atomic_t skb_count; /* Outstanding packets on this call */
atomic_t sequence; /* Tx data packet sequence counter */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
int error_report; /* Network error (ICMP/local transport) */
int error; /* Local error incurred */
enum rxrpc_call_state state : 8; /* current state of call */
u16 service_id; /* service ID */
u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */
......@@ -493,20 +498,6 @@ struct rxrpc_call {
unsigned long ackr_window[RXRPC_ACKR_WINDOW_ASZ + 1];
};
/*
* locally abort an RxRPC call
*/
static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
{
write_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE) {
call->local_abort = abort_code;
call->state = RXRPC_CALL_LOCALLY_ABORTED;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
}
write_unlock_bh(&call->state_lock);
}
#include <trace/events/rxrpc.h>
/*
......@@ -534,6 +525,8 @@ void rxrpc_process_call(struct work_struct *);
/*
* call_object.c
*/
extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern unsigned int rxrpc_max_call_lifetime;
extern unsigned int rxrpc_dead_call_expiry;
extern struct kmem_cache *rxrpc_call_jar;
......@@ -550,7 +543,11 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *,
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
void __rxrpc_put_call(struct rxrpc_call *);
void rxrpc_see_call(struct rxrpc_call *);
void rxrpc_get_call(struct rxrpc_call *);
void rxrpc_put_call(struct rxrpc_call *);
void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
void __exit rxrpc_destroy_all_calls(void);
static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
......@@ -563,6 +560,78 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
return !rxrpc_is_service_call(call);
}
/*
* Transition a call to the complete state.
*/
static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
if (call->state < RXRPC_CALL_COMPLETE) {
call->abort_code = abort_code;
call->error = error;
call->completion = compl,
call->state = RXRPC_CALL_COMPLETE;
return true;
}
return false;
}
static inline bool rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
int ret;
write_lock_bh(&call->state_lock);
ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
write_unlock_bh(&call->state_lock);
return ret;
}
/*
* Record that a call successfully completed.
*/
static inline void __rxrpc_call_completed(struct rxrpc_call *call)
{
__rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}
static inline void rxrpc_call_completed(struct rxrpc_call *call)
{
write_lock_bh(&call->state_lock);
__rxrpc_call_completed(call);
write_unlock_bh(&call->state_lock);
}
/*
* Record that a call is locally aborted.
*/
static inline bool __rxrpc_abort_call(struct rxrpc_call *call,
u32 abort_code, int error)
{
if (__rxrpc_set_call_completion(call,
RXRPC_CALL_LOCALLY_ABORTED,
abort_code, error)) {
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
return true;
}
return false;
}
static inline bool rxrpc_abort_call(struct rxrpc_call *call,
u32 abort_code, int error)
{
bool ret;
write_lock_bh(&call->state_lock);
ret = __rxrpc_abort_call(call, abort_code, error);
write_unlock_bh(&call->state_lock);
return ret;
}
/*
* conn_client.c
*/
......@@ -778,7 +847,6 @@ static inline void rxrpc_put_peer(struct rxrpc_peer *peer)
/*
* proc.c
*/
extern const char *const rxrpc_call_states[];
extern const struct file_operations rxrpc_call_seq_fops;
extern const struct file_operations rxrpc_connection_seq_fops;
......@@ -958,16 +1026,3 @@ do { \
} while (0)
#endif /* __KDEBUGALL */
#define rxrpc_get_call(CALL) \
do { \
CHECK_SLAB_OKAY(&(CALL)->usage); \
if (atomic_inc_return(&(CALL)->usage) == 1) \
BUG(); \
} while (0)
#define rxrpc_put_call(CALL) \
do { \
__rxrpc_put_call(CALL); \
} while (0)
......@@ -129,8 +129,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
_debug("conn ready");
call->state = RXRPC_CALL_SERVER_ACCEPTING;
list_add_tail(&call->accept_link, &rx->acceptq);
rxrpc_get_call(call);
atomic_inc(&call->skb_count);
rxrpc_get_call_for_skb(call, notification);
nsp = rxrpc_skb(notification);
nsp->call = call;
......@@ -323,18 +322,15 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_SERVER_ACCEPTING:
call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
break;
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_LOCALLY_ABORTED:
ret = -ECONNABORTED;
goto out_release;
case RXRPC_CALL_NETWORK_ERROR:
ret = call->conn->error;
case RXRPC_CALL_COMPLETE:
ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
......@@ -399,21 +395,19 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_SERVER_ACCEPTING:
call->state = RXRPC_CALL_SERVER_BUSY;
__rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY,
0, ECONNABORTED);
if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
rxrpc_queue_call(call);
ret = 0;
goto out_release;
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_LOCALLY_ABORTED:
ret = -ECONNABORTED;
goto out_release;
case RXRPC_CALL_NETWORK_ERROR:
ret = call->conn->error;
case RXRPC_CALL_COMPLETE:
ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
......
......@@ -95,7 +95,7 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
_debug("cancel timer %%%u", serial);
try_to_del_timer_sync(&call->ack_timer);
read_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE &&
if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
read_unlock_bh(&call->state_lock);
......@@ -123,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
unsigned long resend_at)
{
read_lock_bh(&call->state_lock);
if (call->state >= RXRPC_CALL_COMPLETE)
if (call->state == RXRPC_CALL_COMPLETE)
resend = 0;
if (resend & 1) {
......@@ -230,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call)
_enter("%d,%d,%d",
call->acks_tail, call->acks_unacked, call->acks_head);
if (call->state >= RXRPC_CALL_COMPLETE)
if (call->state == RXRPC_CALL_COMPLETE)
return;
resend = 0;
......@@ -465,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
skb->destructor = rxrpc_packet_destructor;
ASSERTCMP(sp->call, ==, NULL);
sp->call = call;
rxrpc_get_call(call);
atomic_inc(&call->skb_count);
rxrpc_get_call_for_skb(call, skb);
/* insert into the buffer in sequence order */
spin_lock_bh(&call->lock);
......@@ -552,7 +551,7 @@ static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
peer = call->conn->params.peer;
peer = call->peer;
if (mtu < peer->maxdata) {
spin_lock_bh(&peer->lock);
peer->maxdata = mtu;
......@@ -711,7 +710,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
break;
case RXRPC_CALL_SERVER_AWAIT_ACK:
_debug("srv complete");
call->state = RXRPC_CALL_COMPLETE;
__rxrpc_call_completed(call);
post_ACK = true;
break;
case RXRPC_CALL_CLIENT_SEND_REQUEST:
......@@ -741,8 +740,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
_debug("post ACK");
skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
sp->call = call;
rxrpc_get_call(call);
atomic_inc(&call->skb_count);
rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
BUG();
......@@ -801,8 +799,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
memset(sp, 0, sizeof(*sp));
sp->error = error;
sp->call = call;
rxrpc_get_call(call);
atomic_inc(&call->skb_count);
rxrpc_get_call_for_skb(call, skb);
spin_lock_bh(&call->lock);
ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
......@@ -834,6 +831,8 @@ void rxrpc_process_call(struct work_struct *work)
u32 serial, abort_code = RX_PROTOCOL_ERROR;
u8 *acks = NULL;
rxrpc_see_call(call);
//printk("\n--------------------\n");
_enter("{%d,%s,%lx} [%lu]",
call->debug_id, rxrpc_call_states[call->state], call->events,
......@@ -844,8 +843,8 @@ void rxrpc_process_call(struct work_struct *work)
/* there's a good chance we're going to have to send a message, so set
* one up in advance */
msg.msg_name = &call->conn->params.peer->srx.transport;
msg.msg_namelen = call->conn->params.peer->srx.transport_len;
msg.msg_name = &call->peer->srx.transport;
msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
......@@ -875,24 +874,22 @@ void rxrpc_process_call(struct work_struct *work)
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
error = call->error_report;
if (error < RXRPC_LOCAL_ERROR_OFFSET) {
if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
mark = RXRPC_SKB_MARK_NET_ERROR;
_debug("post net error %d", error);
} else {
mark = RXRPC_SKB_MARK_LOCAL_ERROR;
error -= RXRPC_LOCAL_ERROR_OFFSET;
_debug("post net local error %d", error);
}
if (rxrpc_post_message(call, mark, error, true) < 0)
if (rxrpc_post_message(call, mark, call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
goto kill_ACKs;
}
if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
......@@ -900,7 +897,7 @@ void rxrpc_process_call(struct work_struct *work)
_debug("post conn abort");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
call->conn->error, true) < 0)
call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
goto kill_ACKs;
......@@ -913,13 +910,13 @@ void rxrpc_process_call(struct work_struct *work)
}
if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
ECONNABORTED, true) < 0)
call->error, true) < 0)
goto no_mem;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
data = htonl(call->local_abort);
data = htonl(call->abort_code);
iov[1].iov_base = &data;
iov[1].iov_len = sizeof(data);
genbit = RXRPC_CALL_EV_ABORT;
......@@ -979,13 +976,7 @@ void rxrpc_process_call(struct work_struct *work)
}
if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_CALL_TIMEOUT;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
}
write_unlock_bh(&call->state_lock);
rxrpc_abort_call(call, RX_CALL_TIMEOUT, ETIME);
_debug("post timeout");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
......@@ -998,7 +989,8 @@ void rxrpc_process_call(struct work_struct *work)
/* deal with assorted inbound messages */
if (!skb_queue_empty(&call->rx_queue)) {
switch (rxrpc_process_rx_queue(call, &abort_code)) {
ret = rxrpc_process_rx_queue(call, &abort_code);
switch (ret) {
case 0:
case -EAGAIN:
break;
......@@ -1007,7 +999,7 @@ void rxrpc_process_call(struct work_struct *work)
case -EKEYEXPIRED:
case -EKEYREJECTED:
case -EPROTO:
rxrpc_abort_call(call, abort_code);
rxrpc_abort_call(call, abort_code, -ret);
goto kill_ACKs;
}
}
......@@ -1159,8 +1151,8 @@ void rxrpc_process_call(struct work_struct *work)
send_ACK_with_skew:
ack.maxSkew = htons(call->ackr_skew);
send_ACK:
mtu = call->conn->params.peer->if_mtu;
mtu -= call->conn->params.peer->hdrsize;
mtu = call->peer->if_mtu;
mtu -= call->peer->hdrsize;
ackinfo.maxMTU = htonl(mtu);
ackinfo.rwind = htonl(rxrpc_rx_window_size);
......@@ -1232,10 +1224,7 @@ void rxrpc_process_call(struct work_struct *work)
goto kill_ACKs;
case RXRPC_CALL_EV_ACK_FINAL:
write_lock_bh(&call->state_lock);
if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
call->state = RXRPC_CALL_COMPLETE;
write_unlock_bh(&call->state_lock);
rxrpc_call_completed(call);
goto kill_ACKs;
default:
......
......@@ -30,7 +30,7 @@ unsigned int rxrpc_max_call_lifetime = 60 * HZ;
unsigned int rxrpc_dead_call_expiry = 2 * HZ;
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
[RXRPC_CALL_UNINITIALISED] = "Uninit",
[RXRPC_CALL_UNINITIALISED] = "Uninit ",
[RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn",
[RXRPC_CALL_CLIENT_SEND_REQUEST] = "ClSndReq",
[RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl",
......@@ -43,11 +43,16 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
[RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl",
[RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK",
[RXRPC_CALL_COMPLETE] = "Complete",
[RXRPC_CALL_DEAD] = "Dead ",
};
const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
[RXRPC_CALL_SUCCEEDED] = "Complete",
[RXRPC_CALL_SERVER_BUSY] = "SvBusy ",
[RXRPC_CALL_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CALL_LOCALLY_ABORTED] = "LocAbort",
[RXRPC_CALL_LOCAL_ERROR] = "LocError",
[RXRPC_CALL_NETWORK_ERROR] = "NetError",
[RXRPC_CALL_DEAD] = "Dead ",
};
struct kmem_cache *rxrpc_call_jar;
......@@ -214,6 +219,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
{
struct rxrpc_call *call, *xcall;
struct rb_node *parent, **pp;
const void *here = __builtin_return_address(0);
int ret;
_enter("%p,%lx", rx, user_call_ID);
......@@ -224,6 +230,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call;
}
trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
(const void *)user_call_ID);
/* Publish the call, even though it is incompletely set up as yet */
call->user_call_ID = user_call_ID;
__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
......@@ -303,6 +312,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_call *call, *candidate;
const void *here = __builtin_return_address(0);
u32 call_id, chan;
_enter(",%d", conn->debug_id);
......@@ -313,6 +323,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
if (!candidate)
return ERR_PTR(-EBUSY);
trace_rxrpc_call(candidate, 1, atomic_read(&candidate->usage),
0, here, NULL);
chan = sp->hdr.cid & RXRPC_CHANNELMASK;
candidate->socket = rx;
candidate->conn = conn;
......@@ -358,7 +371,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
_debug("CALL: %u { %s }",
call->debug_id, rxrpc_call_states[call->state]);
if (call->state >= RXRPC_CALL_COMPLETE) {
if (call->state == RXRPC_CALL_COMPLETE) {
__rxrpc_disconnect_call(conn, call);
} else {
spin_unlock(&conn->channel_lock);
......@@ -425,6 +438,44 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
return ERR_PTR(-ECONNRESET);
}
/*
* Note the re-emergence of a call.
*/
void rxrpc_see_call(struct rxrpc_call *call)
{
const void *here = __builtin_return_address(0);
if (call) {
int n = atomic_read(&call->usage);
int m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 2, n, m, here, 0);
}
}
/*
* Note the addition of a ref on a call.
*/
void rxrpc_get_call(struct rxrpc_call *call)
{
const void *here = __builtin_return_address(0);
int n = atomic_inc_return(&call->usage);
int m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 3, n, m, here, 0);
}
/*
* Note the addition of a ref on a call for a socket buffer.
*/
void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
{
const void *here = __builtin_return_address(0);
int n = atomic_inc_return(&call->usage);
int m = atomic_inc_return(&call->skb_count);
trace_rxrpc_call(call, 4, n, m, here, skb);
}
/*
* detach a call from a socket and set up for release
*/
......@@ -438,6 +489,8 @@ void rxrpc_release_call(struct rxrpc_call *call)
atomic_read(&call->ackr_not_idle),
call->rx_first_oos);
rxrpc_see_call(call);
spin_lock_bh(&call->lock);
if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
BUG();
......@@ -472,8 +525,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
if (call->state < RXRPC_CALL_COMPLETE &&
call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
_debug("+++ ABORTING STATE %d +++\n", call->state);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_CALL_DEAD;
__rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
}
write_unlock_bh(&call->state_lock);
......@@ -522,6 +574,7 @@ static void rxrpc_dead_call_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
call->state = RXRPC_CALL_DEAD;
write_unlock_bh(&call->state_lock);
......@@ -536,22 +589,16 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
{
bool sched;
rxrpc_see_call(call);
write_lock(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD) {
sched = false;
if (call->state < RXRPC_CALL_COMPLETE) {
_debug("abort call %p", call);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_CALL_DEAD;
if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
sched = true;
}
sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
sched = true;
if (sched)
rxrpc_queue_call(call);
}
write_unlock(&call->state_lock);
if (sched)
rxrpc_queue_call(call);
}
/*
......@@ -588,21 +635,43 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
/*
* release a call
*/
void __rxrpc_put_call(struct rxrpc_call *call)
void rxrpc_put_call(struct rxrpc_call *call)
{
ASSERT(call != NULL);
const void *here = __builtin_return_address(0);
int n, m;
_enter("%p{u=%d}", call, atomic_read(&call->usage));
ASSERT(call != NULL);
ASSERTCMP(atomic_read(&call->usage), >, 0);
n = atomic_dec_return(&call->usage);
m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 5, n, m, here, NULL);
ASSERTCMP(n, >=, 0);
if (n == 0) {
_debug("call %d dead", call->debug_id);
WARN_ON(m != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer);
}
}
if (atomic_dec_and_test(&call->usage)) {
/*
* Release a call ref held by a socket buffer.
*/
void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
{
const void *here = __builtin_return_address(0);
int n, m;
n = atomic_dec_return(&call->usage);
m = atomic_dec_return(&call->skb_count);
trace_rxrpc_call(call, 6, n, m, here, skb);
ASSERTCMP(n, >=, 0);
if (n == 0) {
_debug("call %d dead", call->debug_id);
WARN_ON(atomic_read(&call->skb_count) != 0);
WARN_ON(m != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer);
}
_leave("");
}
/*
......@@ -708,6 +777,7 @@ void __exit rxrpc_destroy_all_calls(void)
call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
_debug("Zapping call %p", call);
rxrpc_see_call(call);
list_del_init(&call->link);
switch (atomic_read(&call->usage)) {
......@@ -749,16 +819,14 @@ static void rxrpc_call_life_expired(unsigned long _call)
{
struct rxrpc_call *call = (struct rxrpc_call *) _call;
_enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
_enter("{%d}", call->debug_id);
read_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE) {
set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
rxrpc_queue_call(call);
}
read_unlock_bh(&call->state_lock);
}
/*
......@@ -771,6 +839,7 @@ static void rxrpc_resend_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
......@@ -788,12 +857,10 @@ static void rxrpc_ack_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE)
return;
read_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
read_unlock_bh(&call->state_lock);
}
......@@ -537,6 +537,7 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
struct rxrpc_call, chan_wait_link);
u32 call_id = chan->call_counter + 1;
rxrpc_see_call(call);
list_del_init(&call->chan_wait_link);
conn->active_chans |= 1 << channel;
call->peer = rxrpc_get_peer(conn->params.peer);
......@@ -741,7 +742,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
* terminal retransmission without requiring access to the call.
*/
if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
_debug("exposed %u,%u", call->call_id, call->local_abort);
_debug("exposed %u,%u", call->call_id, call->abort_code);
__rxrpc_disconnect_call(conn, call);
}
......
......@@ -27,7 +27,7 @@
/*
* Retransmit terminal ACK or ABORT of the previous call.
*/
static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
......@@ -135,33 +135,39 @@ static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
/*
* pass a connection-level abort onto all calls on that connection
*/
static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
u32 abort_code)
static void rxrpc_abort_calls(struct rxrpc_connection *conn,
enum rxrpc_call_completion compl,
u32 abort_code, int error)
{
struct rxrpc_call *call;
int i;
bool queue;
int i, bit;
_enter("{%d},%x", conn->debug_id, abort_code);
if (compl == RXRPC_CALL_LOCALLY_ABORTED)
bit = RXRPC_CALL_EV_CONN_ABORT;
else
bit = RXRPC_CALL_EV_RCVD_ABORT;
spin_lock(&conn->channel_lock);
for (i = 0; i < RXRPC_MAXCALLS; i++) {
call = rcu_dereference_protected(
conn->channels[i].call,
lockdep_is_held(&conn->channel_lock));
if (call) {
rxrpc_see_call(call);
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = state;
if (state == RXRPC_CALL_LOCALLY_ABORTED) {
call->local_abort = conn->local_abort;
set_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
} else {
call->remote_abort = conn->remote_abort;
set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
if (rxrpc_set_call_completion(call, compl, abort_code,
error)) {
set_bit(bit, &call->events);
queue = true;
}
write_unlock_bh(&call->state_lock);
if (queue)
rxrpc_queue_call(call);
}
write_unlock_bh(&call->state_lock);
}
spin_unlock(&conn->channel_lock);
......@@ -186,17 +192,16 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
/* generate a connection-level abort */
spin_lock_bh(&conn->state_lock);
if (conn->state < RXRPC_CONN_REMOTELY_ABORTED) {
conn->state = RXRPC_CONN_LOCALLY_ABORTED;
conn->error = error;
spin_unlock_bh(&conn->state_lock);
} else {
if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) {
spin_unlock_bh(&conn->state_lock);
_leave(" = 0 [already dead]");
return 0;
}
rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code);
conn->state = RXRPC_CONN_LOCALLY_ABORTED;
spin_unlock_bh(&conn->state_lock);
rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code, error);
msg.msg_name = &conn->params.peer->srx.transport;
msg.msg_namelen = conn->params.peer->srx.transport_len;
......@@ -276,7 +281,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
case RXRPC_PACKET_TYPE_ACK:
rxrpc_conn_retransmit(conn, skb);
rxrpc_conn_retransmit_call(conn, skb);
rxrpc_free_skb(skb);
return 0;
......@@ -287,7 +292,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
_proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code);
conn->state = RXRPC_CONN_REMOTELY_ABORTED;
rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED,
rxrpc_abort_calls(conn, 0, RXRPC_CALL_REMOTELY_ABORTED,
abort_code);
return -ECONNABORTED;
......
......@@ -165,8 +165,8 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
* through the channel, whilst disposing of the actual call record.
*/
chan->last_service_id = call->service_id;
if (call->local_abort) {
chan->last_abort = call->local_abort;
if (call->abort_code) {
chan->last_abort = call->abort_code;
chan->last_type = RXRPC_PACKET_TYPE_ABORT;
} else {
chan->last_seq = call->rx_data_eaten;
......
......@@ -196,8 +196,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
goto enqueue_packet;
sp->call = call;
rxrpc_get_call(call);
atomic_inc(&call->skb_count);
rxrpc_get_call_for_skb(call, skb);
terminal = ((flags & RXRPC_LAST_PACKET) &&
!(flags & RXRPC_CLIENT_INITIATED));
ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
......@@ -341,14 +340,13 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
abort_code = ntohl(wtmp);
_proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code);
write_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_REMOTELY_ABORTED;
call->remote_abort = abort_code;
if (__rxrpc_set_call_completion(call,
RXRPC_CALL_REMOTELY_ABORTED,
abort_code, ECONNABORTED)) {
set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
rxrpc_queue_call(call);
}
goto free_packet_unlock;
goto free_packet;
case RXRPC_PACKET_TYPE_BUSY:
_proto("Rx BUSY %%%u", sp->hdr.serial);
......@@ -359,7 +357,9 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
call->state = RXRPC_CALL_SERVER_BUSY;
__rxrpc_set_call_completion(call,
RXRPC_CALL_SERVER_BUSY,
0, EBUSY);
set_bit(RXRPC_CALL_EV_RCVD_BUSY, &call->events);
rxrpc_queue_call(call);
case RXRPC_CALL_SERVER_BUSY:
......@@ -415,12 +415,8 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
_debug("protocol error");
write_lock_bh(&call->state_lock);
protocol_error_locked:
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
if (__rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO))
rxrpc_queue_call(call);
}
free_packet_unlock:
write_unlock_bh(&call->state_lock);
free_packet:
......@@ -486,14 +482,8 @@ static void rxrpc_process_jumbo_packet(struct rxrpc_call *call,
_debug("protocol error");
rxrpc_free_skb(part);
rxrpc_free_skb(jumbo);
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
if (rxrpc_abort_call(call, RX_PROTOCOL_ERROR, EPROTO))
rxrpc_queue_call(call);
}
write_unlock_bh(&call->state_lock);
_leave("");
}
......@@ -514,26 +504,28 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
read_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_DEAD:
goto dead_call;
case RXRPC_CALL_COMPLETE:
switch (call->completion) {
case RXRPC_CALL_LOCALLY_ABORTED:
if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
if (!test_and_set_bit(RXRPC_CALL_EV_ABORT,
&call->events)) {
rxrpc_queue_call(call);
goto free_unlock;
}
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_NETWORK_ERROR:
case RXRPC_CALL_DEAD:
default:
goto dead_call;
case RXRPC_CALL_COMPLETE:
case RXRPC_CALL_CLIENT_FINAL_ACK:
/* complete server call */
case RXRPC_CALL_SUCCEEDED:
if (rxrpc_conn_is_service(call->conn))
goto dead_call;
/* resend last packet of a completed call */
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
rxrpc_queue_call(call);
goto free_unlock;
goto resend_final_ack;
}
case RXRPC_CALL_CLIENT_FINAL_ACK:
goto resend_final_ack;
default:
break;
}
......@@ -550,6 +542,13 @@ static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
rxrpc_put_call(call);
goto done;
resend_final_ack:
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
rxrpc_queue_call(call);
goto free_unlock;
dead_call:
if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) {
skb->priority = RX_CALL_DEAD;
......@@ -748,6 +747,7 @@ void rxrpc_data_ready(struct sock *sk)
if (!call || atomic_read(&call->usage) == 0)
goto cant_route_call;
rxrpc_see_call(call);
rxrpc_post_packet_to_call(call, skb);
goto out_unlock;
}
......
......@@ -115,12 +115,12 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg,
*/
static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
{
if (call->state >= RXRPC_CALL_COMPLETE)
return;
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = abort_code;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
if (__rxrpc_abort_call(call, abort_code, ECONNABORTED)) {
del_timer_sync(&call->resend_timer);
del_timer_sync(&call->ack_timer);
clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events);
......@@ -207,12 +207,13 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
return PTR_ERR(call);
}
rxrpc_see_call(call);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
if (call->state >= RXRPC_CALL_COMPLETE) {
/* it's too late for this call */
ret = -ECONNRESET;
ret = -ESHUTDOWN;
} else if (cmd == RXRPC_CMD_SEND_ABORT) {
rxrpc_send_abort(call, abort_code);
ret = 0;
......@@ -238,6 +239,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
/**
* rxrpc_kernel_send_data - Allow a kernel service to send data on a call
* @sock: The socket the call is on
* @call: The call to send data through
* @msg: The data to send
* @len: The amount of data to send
......@@ -247,8 +249,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
* nor should an address be supplied. MSG_MORE should be flagged if there's
* more data to come, otherwise this data will end the transmission phase.
*/
int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
size_t len)
int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
struct msghdr *msg, size_t len)
{
int ret;
......@@ -257,7 +259,7 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
ASSERTCMP(msg->msg_name, ==, NULL);
ASSERTCMP(msg->msg_control, ==, NULL);
lock_sock(&call->socket->sk);
lock_sock(sock->sk);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
......@@ -269,36 +271,36 @@ int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
ret = -EPROTO; /* request phase complete for this client call */
} else {
ret = rxrpc_send_data(call->socket, call, msg, len);
ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
}
release_sock(&call->socket->sk);
release_sock(sock->sk);
_leave(" = %d", ret);
return ret;
}
EXPORT_SYMBOL(rxrpc_kernel_send_data);
/**
* rxrpc_kernel_abort_call - Allow a kernel service to abort a call
* @sock: The socket the call is on
* @call: The call to be aborted
* @abort_code: The abort code to stick into the ABORT packet
*
* Allow a kernel service to abort a call, if it's still in an abortable state.
*/
void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code)
void rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
u32 abort_code)
{
_enter("{%d},%d", call->debug_id, abort_code);
lock_sock(&call->socket->sk);
lock_sock(sock->sk);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
if (call->state < RXRPC_CALL_COMPLETE)
rxrpc_send_abort(call, abort_code);
release_sock(&call->socket->sk);
release_sock(sock->sk);
_leave("");
}
......@@ -640,8 +642,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
/* check for the far side aborting the call or a network error
* occurring */
if (call->state > RXRPC_CALL_COMPLETE)
goto call_aborted;
if (call->state == RXRPC_CALL_COMPLETE)
goto call_terminated;
/* add the packet to the send queue if it's now full */
if (sp->remain <= 0 ||
......@@ -702,15 +704,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
_leave(" = %d", ret);
return ret;
call_aborted:
call_terminated:
rxrpc_free_skb(skb);
if (call->state == RXRPC_CALL_NETWORK_ERROR)
ret = call->error_report < RXRPC_LOCAL_ERROR_OFFSET ?
call->error_report :
call->error_report - RXRPC_LOCAL_ERROR_OFFSET;
else
ret = -ECONNABORTED;
_leave(" = %d", ret);
_leave(" = %d", -call->error);
return ret;
maybe_error:
......
......@@ -248,13 +248,21 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
struct rxrpc_peer *peer =
container_of(work, struct rxrpc_peer, error_distributor);
struct rxrpc_call *call;
int error_report;
enum rxrpc_call_completion compl;
bool queue;
int error;
_enter("");
error_report = READ_ONCE(peer->error_report);
error = READ_ONCE(peer->error_report);
if (error < RXRPC_LOCAL_ERROR_OFFSET) {
compl = RXRPC_CALL_NETWORK_ERROR;
} else {
compl = RXRPC_CALL_LOCAL_ERROR;
error -= RXRPC_LOCAL_ERROR_OFFSET;
}
_debug("ISSUE ERROR %d", error_report);
_debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);
spin_lock_bh(&peer->lock);
......@@ -262,16 +270,17 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
call = hlist_entry(peer->error_targets.first,
struct rxrpc_call, error_link);
hlist_del_init(&call->error_link);
rxrpc_see_call(call);
queue = false;
write_lock(&call->state_lock);
if (call->state != RXRPC_CALL_COMPLETE &&
call->state < RXRPC_CALL_NETWORK_ERROR) {
call->error_report = error_report;
call->state = RXRPC_CALL_NETWORK_ERROR;
if (__rxrpc_set_call_completion(call, compl, 0, error)) {
set_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
rxrpc_queue_call(call);
queue = true;
}
write_unlock(&call->state_lock);
if (queue)
rxrpc_queue_call(call);
}
spin_unlock_bh(&peer->lock);
......
......@@ -313,3 +313,18 @@ void __rxrpc_put_peer(struct rxrpc_peer *peer)
kfree_rcu(peer, rcu);
}
/**
* rxrpc_kernel_get_peer - Get the peer address of a call
* @sock: The socket on which the call is in progress.
* @call: The call to query
* @_srx: Where to place the result
*
* Get the address of the remote peer in a call.
*/
void rxrpc_kernel_get_peer(struct socket *sock, struct rxrpc_call *call,
struct sockaddr_rxrpc *_srx)
{
*_srx = call->peer->srx;
}
EXPORT_SYMBOL(rxrpc_kernel_get_peer);
......@@ -22,7 +22,6 @@ static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
[RXRPC_CONN_SERVICE] = "SvSecure",
[RXRPC_CONN_REMOTELY_ABORTED] = "RmtAbort",
[RXRPC_CONN_LOCALLY_ABORTED] = "LocAbort",
[RXRPC_CONN_NETWORK_ERROR] = "NetError",
};
/*
......@@ -94,7 +93,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
rxrpc_is_service_call(call) ? "Svc" : "Clt",
atomic_read(&call->usage),
rxrpc_call_states[call->state],
call->remote_abort ?: call->local_abort,
call->abort_code,
call->user_call_ID);
return 0;
......
......@@ -115,6 +115,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
sp = rxrpc_skb(skb);
call = sp->call;
ASSERT(call != NULL);
rxrpc_see_call(call);
_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
......@@ -294,12 +295,17 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
break;
case RXRPC_SKB_MARK_REMOTE_ABORT:
abort_code = call->remote_abort;
abort_code = call->abort_code;
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
break;
case RXRPC_SKB_MARK_LOCAL_ABORT:
abort_code = call->local_abort;
abort_code = call->abort_code;
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
if (call->error) {
abort_code = call->error;
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
&abort_code);
}
break;
case RXRPC_SKB_MARK_NET_ERROR:
_debug("RECV NET ERROR %d", sp->error);
......@@ -392,9 +398,8 @@ u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
switch (skb->mark) {
case RXRPC_SKB_MARK_REMOTE_ABORT:
return sp->call->remote_abort;
case RXRPC_SKB_MARK_LOCAL_ABORT:
return sp->call->local_abort;
return sp->call->abort_code;
default:
BUG();
}
......
......@@ -140,9 +140,7 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
_enter("%p{%p}", skb, call);
if (call) {
if (atomic_dec_return(&call->skb_count) < 0)
BUG();
rxrpc_put_call(call);
rxrpc_put_call_for_skb(call, skb);
sp->call = NULL;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment