Commit 3ac9eb42 authored by David S. Miller's avatar David S. Miller

Merge tag 'rxrpc-fixes-20200319' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc, afs: Interruptibility fixes

Here are a number of fixes for AF_RXRPC and AFS that make AFS system calls
less interruptible and so less likely to leave the filesystem in an
uncertain state.  There's also a miscellaneous patch to make tracing
consistent.

 (1) Firstly, abstract out the Tx space calculation in sendmsg.  Much the
     same code is replicated in a number of places that subsequent patches
     are going to alter, including adding another copy.

 (2) Fix Tx interruptibility by allowing a kernel service, such as AFS, to
     request that a call be interruptible only when waiting for a call slot
     to become available (ie. the call has not taken place yet) or that a
     call be not interruptible at all (e.g. when we want to do writeback
     and don't want a signal interrupting a VM-induced writeback).

 (3) Increase the minimum delay on MSG_WAITALL for userspace sendmsg() when
     waiting for Tx buffer space as a 2*RTT delay is really small over 10G
     ethernet and a 1 jiffy timeout might be essentially 0 if at the end of
     the jiffy period.

 (4) Fix some tracing output in AFS to make it consistent with rxrpc.

 (5) Make sure aborted asynchronous AFS operations are tidied up properly
     so we don't end up with stuck rxrpc calls.

 (6) Make AFS client calls uninterruptible in the Rx phase.  If we don't
     wait for the reply to be fully gathered, we can't update the local VFS
     state and we end up in an indeterminate state with respect to the
     server.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 6002059d 7d7587db
...@@ -243,6 +243,17 @@ static void afs_cm_destructor(struct afs_call *call) ...@@ -243,6 +243,17 @@ static void afs_cm_destructor(struct afs_call *call)
call->buffer = NULL; call->buffer = NULL;
} }
/*
* Abort a service call from within an action function.
*/
static void afs_abort_service_call(struct afs_call *call, u32 abort_code, int error,
const char *why)
{
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
abort_code, error, why);
afs_set_call_complete(call, error, 0);
}
/* /*
* The server supplied a list of callbacks that it wanted to break. * The server supplied a list of callbacks that it wanted to break.
*/ */
...@@ -510,8 +521,7 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work) ...@@ -510,8 +521,7 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
if (memcmp(r, &call->net->uuid, sizeof(call->net->uuid)) == 0) if (memcmp(r, &call->net->uuid, sizeof(call->net->uuid)) == 0)
afs_send_empty_reply(call); afs_send_empty_reply(call);
else else
rxrpc_kernel_abort_call(call->net->socket, call->rxcall, afs_abort_service_call(call, 1, 1, "K-1");
1, 1, "K-1");
afs_put_call(call); afs_put_call(call);
_leave(""); _leave("");
......
...@@ -154,7 +154,7 @@ struct afs_call { ...@@ -154,7 +154,7 @@ struct afs_call {
}; };
unsigned char unmarshall; /* unmarshalling phase */ unsigned char unmarshall; /* unmarshalling phase */
unsigned char addr_ix; /* Address in ->alist */ unsigned char addr_ix; /* Address in ->alist */
bool incoming; /* T if incoming call */ bool drop_ref; /* T if need to drop ref for incoming call */
bool send_pages; /* T if data from mapping should be sent */ bool send_pages; /* T if data from mapping should be sent */
bool need_attention; /* T if RxRPC poked us */ bool need_attention; /* T if RxRPC poked us */
bool async; /* T if asynchronous */ bool async; /* T if asynchronous */
...@@ -1209,8 +1209,16 @@ static inline void afs_set_call_complete(struct afs_call *call, ...@@ -1209,8 +1209,16 @@ static inline void afs_set_call_complete(struct afs_call *call,
ok = true; ok = true;
} }
spin_unlock_bh(&call->state_lock); spin_unlock_bh(&call->state_lock);
if (ok) if (ok) {
trace_afs_call_done(call); trace_afs_call_done(call);
/* Asynchronous calls have two refs to release - one from the alloc and
* one queued with the work item - and we can't just deallocate the
* call because the work item may be queued again.
*/
if (call->drop_ref)
afs_put_call(call);
}
} }
/* /*
......
...@@ -18,7 +18,6 @@ struct workqueue_struct *afs_async_calls; ...@@ -18,7 +18,6 @@ struct workqueue_struct *afs_async_calls;
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long); static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long); static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_delete_async_call(struct work_struct *);
static void afs_process_async_call(struct work_struct *); static void afs_process_async_call(struct work_struct *);
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long); static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long); static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
...@@ -169,7 +168,7 @@ void afs_put_call(struct afs_call *call) ...@@ -169,7 +168,7 @@ void afs_put_call(struct afs_call *call)
int n = atomic_dec_return(&call->usage); int n = atomic_dec_return(&call->usage);
int o = atomic_read(&net->nr_outstanding_calls); int o = atomic_read(&net->nr_outstanding_calls);
trace_afs_call(call, afs_call_trace_put, n + 1, o, trace_afs_call(call, afs_call_trace_put, n, o,
__builtin_return_address(0)); __builtin_return_address(0));
ASSERTCMP(n, >=, 0); ASSERTCMP(n, >=, 0);
...@@ -402,8 +401,10 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp) ...@@ -402,8 +401,10 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
/* If the call is going to be asynchronous, we need an extra ref for /* If the call is going to be asynchronous, we need an extra ref for
* the call to hold itself so the caller need not hang on to its ref. * the call to hold itself so the caller need not hang on to its ref.
*/ */
if (call->async) if (call->async) {
afs_get_call(call, afs_call_trace_get); afs_get_call(call, afs_call_trace_get);
call->drop_ref = true;
}
/* create a call */ /* create a call */
rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key, rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
...@@ -413,7 +414,8 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp) ...@@ -413,7 +414,8 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
afs_wake_up_async_call : afs_wake_up_async_call :
afs_wake_up_call_waiter), afs_wake_up_call_waiter),
call->upgrade, call->upgrade,
call->intr, (call->intr ? RXRPC_PREINTERRUPTIBLE :
RXRPC_UNINTERRUPTIBLE),
call->debug_id); call->debug_id);
if (IS_ERR(rxcall)) { if (IS_ERR(rxcall)) {
ret = PTR_ERR(rxcall); ret = PTR_ERR(rxcall);
...@@ -584,8 +586,6 @@ static void afs_deliver_to_call(struct afs_call *call) ...@@ -584,8 +586,6 @@ static void afs_deliver_to_call(struct afs_call *call)
done: done:
if (call->type->done) if (call->type->done)
call->type->done(call); call->type->done(call);
if (state == AFS_CALL_COMPLETE && call->incoming)
afs_put_call(call);
out: out:
_leave(""); _leave("");
return; return;
...@@ -604,11 +604,7 @@ static void afs_deliver_to_call(struct afs_call *call) ...@@ -604,11 +604,7 @@ static void afs_deliver_to_call(struct afs_call *call)
long afs_wait_for_call_to_complete(struct afs_call *call, long afs_wait_for_call_to_complete(struct afs_call *call,
struct afs_addr_cursor *ac) struct afs_addr_cursor *ac)
{ {
signed long rtt2, timeout;
long ret; long ret;
bool stalled = false;
u64 rtt;
u32 life, last_life;
bool rxrpc_complete = false; bool rxrpc_complete = false;
DECLARE_WAITQUEUE(myself, current); DECLARE_WAITQUEUE(myself, current);
...@@ -619,14 +615,6 @@ long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -619,14 +615,6 @@ long afs_wait_for_call_to_complete(struct afs_call *call,
if (ret < 0) if (ret < 0)
goto out; goto out;
rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 2)
rtt2 = 2;
timeout = rtt2;
rxrpc_kernel_check_life(call->net->socket, call->rxcall, &last_life);
add_wait_queue(&call->waitq, &myself); add_wait_queue(&call->waitq, &myself);
for (;;) { for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
...@@ -637,37 +625,19 @@ long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -637,37 +625,19 @@ long afs_wait_for_call_to_complete(struct afs_call *call,
call->need_attention = false; call->need_attention = false;
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
afs_deliver_to_call(call); afs_deliver_to_call(call);
timeout = rtt2;
continue; continue;
} }
if (afs_check_call_state(call, AFS_CALL_COMPLETE)) if (afs_check_call_state(call, AFS_CALL_COMPLETE))
break; break;
if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall, &life)) { if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
/* rxrpc terminated the call. */ /* rxrpc terminated the call. */
rxrpc_complete = true; rxrpc_complete = true;
break; break;
} }
if (call->intr && timeout == 0 && schedule();
life == last_life && signal_pending(current)) {
if (stalled)
break;
__set_current_state(TASK_RUNNING);
rxrpc_kernel_probe_life(call->net->socket, call->rxcall);
timeout = rtt2;
stalled = true;
continue;
}
if (life != last_life) {
timeout = rtt2;
last_life = life;
stalled = false;
}
timeout = schedule_timeout(timeout);
} }
remove_wait_queue(&call->waitq, &myself); remove_wait_queue(&call->waitq, &myself);
...@@ -735,7 +705,7 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall, ...@@ -735,7 +705,7 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
u = atomic_fetch_add_unless(&call->usage, 1, 0); u = atomic_fetch_add_unless(&call->usage, 1, 0);
if (u != 0) { if (u != 0) {
trace_afs_call(call, afs_call_trace_wake, u, trace_afs_call(call, afs_call_trace_wake, u + 1,
atomic_read(&call->net->nr_outstanding_calls), atomic_read(&call->net->nr_outstanding_calls),
__builtin_return_address(0)); __builtin_return_address(0));
...@@ -744,21 +714,6 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall, ...@@ -744,21 +714,6 @@ static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
} }
} }
/*
* Delete an asynchronous call. The work item carries a ref to the call struct
* that we need to release.
*/
static void afs_delete_async_call(struct work_struct *work)
{
struct afs_call *call = container_of(work, struct afs_call, async_work);
_enter("");
afs_put_call(call);
_leave("");
}
/* /*
* Perform I/O processing on an asynchronous call. The work item carries a ref * Perform I/O processing on an asynchronous call. The work item carries a ref
* to the call struct that we either need to release or to pass on. * to the call struct that we either need to release or to pass on.
...@@ -774,16 +729,6 @@ static void afs_process_async_call(struct work_struct *work) ...@@ -774,16 +729,6 @@ static void afs_process_async_call(struct work_struct *work)
afs_deliver_to_call(call); afs_deliver_to_call(call);
} }
if (call->state == AFS_CALL_COMPLETE) {
/* We have two refs to release - one from the alloc and one
* queued with the work item - and we can't just deallocate the
* call because the work item may be queued again.
*/
call->async_work.func = afs_delete_async_call;
if (!queue_work(afs_async_calls, &call->async_work))
afs_put_call(call);
}
afs_put_call(call); afs_put_call(call);
_leave(""); _leave("");
} }
...@@ -810,6 +755,7 @@ void afs_charge_preallocation(struct work_struct *work) ...@@ -810,6 +755,7 @@ void afs_charge_preallocation(struct work_struct *work)
if (!call) if (!call)
break; break;
call->drop_ref = true;
call->async = true; call->async = true;
call->state = AFS_CALL_SV_AWAIT_OP_ID; call->state = AFS_CALL_SV_AWAIT_OP_ID;
init_waitqueue_head(&call->waitq); init_waitqueue_head(&call->waitq);
......
...@@ -16,6 +16,12 @@ struct sock; ...@@ -16,6 +16,12 @@ struct sock;
struct socket; struct socket;
struct rxrpc_call; struct rxrpc_call;
enum rxrpc_interruptibility {
RXRPC_INTERRUPTIBLE, /* Call is interruptible */
RXRPC_PREINTERRUPTIBLE, /* Call can be cancelled whilst waiting for a slot */
RXRPC_UNINTERRUPTIBLE, /* Call should not be interruptible at all */
};
/* /*
* Debug ID counter for tracing. * Debug ID counter for tracing.
*/ */
...@@ -41,7 +47,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *, ...@@ -41,7 +47,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
gfp_t, gfp_t,
rxrpc_notify_rx_t, rxrpc_notify_rx_t,
bool, bool,
bool, enum rxrpc_interruptibility,
unsigned int); unsigned int);
int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *, int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
struct msghdr *, size_t, struct msghdr *, size_t,
...@@ -58,9 +64,7 @@ int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t, ...@@ -58,9 +64,7 @@ int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t, rxrpc_user_attach_call_t, unsigned long, gfp_t,
unsigned int); unsigned int);
void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64); void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *, bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *);
u32 *);
void rxrpc_kernel_probe_life(struct socket *, struct rxrpc_call *);
u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *); u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_get_reply_time(struct socket *, struct rxrpc_call *, bool rxrpc_kernel_get_reply_time(struct socket *, struct rxrpc_call *,
ktime_t *); ktime_t *);
......
...@@ -233,7 +233,7 @@ enum afs_cb_break_reason { ...@@ -233,7 +233,7 @@ enum afs_cb_break_reason {
EM(afs_call_trace_get, "GET ") \ EM(afs_call_trace_get, "GET ") \
EM(afs_call_trace_put, "PUT ") \ EM(afs_call_trace_put, "PUT ") \
EM(afs_call_trace_wake, "WAKE ") \ EM(afs_call_trace_wake, "WAKE ") \
E_(afs_call_trace_work, "WORK ") E_(afs_call_trace_work, "QUEUE")
#define afs_server_traces \ #define afs_server_traces \
EM(afs_server_trace_alloc, "ALLOC ") \ EM(afs_server_trace_alloc, "ALLOC ") \
......
...@@ -285,7 +285,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock, ...@@ -285,7 +285,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
gfp_t gfp, gfp_t gfp,
rxrpc_notify_rx_t notify_rx, rxrpc_notify_rx_t notify_rx,
bool upgrade, bool upgrade,
bool intr, enum rxrpc_interruptibility interruptibility,
unsigned int debug_id) unsigned int debug_id)
{ {
struct rxrpc_conn_parameters cp; struct rxrpc_conn_parameters cp;
...@@ -310,7 +310,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock, ...@@ -310,7 +310,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
memset(&p, 0, sizeof(p)); memset(&p, 0, sizeof(p));
p.user_call_ID = user_call_ID; p.user_call_ID = user_call_ID;
p.tx_total_len = tx_total_len; p.tx_total_len = tx_total_len;
p.intr = intr; p.interruptibility = interruptibility;
memset(&cp, 0, sizeof(cp)); memset(&cp, 0, sizeof(cp));
cp.local = rx->local; cp.local = rx->local;
...@@ -371,44 +371,17 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call); ...@@ -371,44 +371,17 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
* rxrpc_kernel_check_life - Check to see whether a call is still alive * rxrpc_kernel_check_life - Check to see whether a call is still alive
* @sock: The socket the call is on * @sock: The socket the call is on
* @call: The call to check * @call: The call to check
* @_life: Where to store the life value
* *
* Allow a kernel service to find out whether a call is still alive - ie. we're * Allow a kernel service to find out whether a call is still alive -
* getting ACKs from the server. Passes back in *_life a number representing * ie. whether it has completed.
* the life state which can be compared to that returned by a previous call and
* return true if the call is still alive.
*
* If the life state stalls, rxrpc_kernel_probe_life() should be called and
* then 2RTT waited.
*/ */
bool rxrpc_kernel_check_life(const struct socket *sock, bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call, const struct rxrpc_call *call)
u32 *_life)
{ {
*_life = call->acks_latest;
return call->state != RXRPC_CALL_COMPLETE; return call->state != RXRPC_CALL_COMPLETE;
} }
EXPORT_SYMBOL(rxrpc_kernel_check_life); EXPORT_SYMBOL(rxrpc_kernel_check_life);
/**
* rxrpc_kernel_probe_life - Poke the peer to see if it's still alive
* @sock: The socket the call is on
* @call: The call to check
*
* In conjunction with rxrpc_kernel_check_life(), allow a kernel service to
* find out whether a call is still alive by pinging it. This should cause the
* life state to be bumped in about 2*RTT.
*
* The must be called in TASK_RUNNING state on pain of might_sleep() objecting.
*/
void rxrpc_kernel_probe_life(struct socket *sock, struct rxrpc_call *call)
{
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_check_life);
rxrpc_send_ack_packet(call, true, NULL);
}
EXPORT_SYMBOL(rxrpc_kernel_probe_life);
/** /**
* rxrpc_kernel_get_epoch - Retrieve the epoch value from a call. * rxrpc_kernel_get_epoch - Retrieve the epoch value from a call.
* @sock: The socket the call is on * @sock: The socket the call is on
......
...@@ -489,7 +489,6 @@ enum rxrpc_call_flag { ...@@ -489,7 +489,6 @@ enum rxrpc_call_flag {
RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */ RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */
RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */ RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */
RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */ RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */
RXRPC_CALL_IS_INTR, /* The call is interruptible */
RXRPC_CALL_DISCONNECTED, /* The call has been disconnected */ RXRPC_CALL_DISCONNECTED, /* The call has been disconnected */
}; };
...@@ -598,6 +597,7 @@ struct rxrpc_call { ...@@ -598,6 +597,7 @@ struct rxrpc_call {
atomic_t usage; atomic_t usage;
u16 service_id; /* service ID */ u16 service_id; /* service ID */
u8 security_ix; /* Security type */ u8 security_ix; /* Security type */
enum rxrpc_interruptibility interruptibility; /* At what point call may be interrupted */
u32 call_id; /* call ID on connection */ u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */ u32 cid; /* connection ID plus channel index */
int debug_id; /* debug ID for printks */ int debug_id; /* debug ID for printks */
...@@ -675,7 +675,6 @@ struct rxrpc_call { ...@@ -675,7 +675,6 @@ struct rxrpc_call {
/* transmission-phase ACK management */ /* transmission-phase ACK management */
ktime_t acks_latest_ts; /* Timestamp of latest ACK received */ ktime_t acks_latest_ts; /* Timestamp of latest ACK received */
rxrpc_serial_t acks_latest; /* serial number of latest ACK received */
rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */ rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */
rxrpc_seq_t acks_lost_top; /* tx_top at the time lost-ack ping sent */ rxrpc_seq_t acks_lost_top; /* tx_top at the time lost-ack ping sent */
rxrpc_serial_t acks_lost_ping; /* Serial number of probe ACK */ rxrpc_serial_t acks_lost_ping; /* Serial number of probe ACK */
...@@ -721,7 +720,7 @@ struct rxrpc_call_params { ...@@ -721,7 +720,7 @@ struct rxrpc_call_params {
u32 normal; /* Max time since last call packet (msec) */ u32 normal; /* Max time since last call packet (msec) */
} timeouts; } timeouts;
u8 nr_timeouts; /* Number of timeouts specified */ u8 nr_timeouts; /* Number of timeouts specified */
bool intr; /* The call is interruptible */ enum rxrpc_interruptibility interruptibility; /* How is interruptible is the call? */
}; };
struct rxrpc_send_params { struct rxrpc_send_params {
......
...@@ -237,8 +237,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, ...@@ -237,8 +237,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call; return call;
} }
if (p->intr) call->interruptibility = p->interruptibility;
__set_bit(RXRPC_CALL_IS_INTR, &call->flags);
call->tx_total_len = p->tx_total_len; call->tx_total_len = p->tx_total_len;
trace_rxrpc_call(call->debug_id, rxrpc_call_new_client, trace_rxrpc_call(call->debug_id, rxrpc_call_new_client,
atomic_read(&call->usage), atomic_read(&call->usage),
......
...@@ -655,13 +655,20 @@ static int rxrpc_wait_for_channel(struct rxrpc_call *call, gfp_t gfp) ...@@ -655,13 +655,20 @@ static int rxrpc_wait_for_channel(struct rxrpc_call *call, gfp_t gfp)
add_wait_queue_exclusive(&call->waitq, &myself); add_wait_queue_exclusive(&call->waitq, &myself);
for (;;) { for (;;) {
if (test_bit(RXRPC_CALL_IS_INTR, &call->flags)) switch (call->interruptibility) {
case RXRPC_INTERRUPTIBLE:
case RXRPC_PREINTERRUPTIBLE:
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
else break;
case RXRPC_UNINTERRUPTIBLE:
default:
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
break;
}
if (call->call_id) if (call->call_id)
break; break;
if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) && if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
signal_pending(current)) { signal_pending(current)) {
ret = -ERESTARTSYS; ret = -ERESTARTSYS;
break; break;
......
...@@ -882,7 +882,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -882,7 +882,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
before(prev_pkt, call->ackr_prev_seq)) before(prev_pkt, call->ackr_prev_seq))
goto out; goto out;
call->acks_latest_ts = skb->tstamp; call->acks_latest_ts = skb->tstamp;
call->acks_latest = sp->hdr.serial;
call->ackr_first_seq = first_soft_ack; call->ackr_first_seq = first_soft_ack;
call->ackr_prev_seq = prev_pkt; call->ackr_prev_seq = prev_pkt;
......
...@@ -17,6 +17,21 @@ ...@@ -17,6 +17,21 @@
#include <net/af_rxrpc.h> #include <net/af_rxrpc.h>
#include "ar-internal.h" #include "ar-internal.h"
/*
* Return true if there's sufficient Tx queue space.
*/
static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
{
unsigned int win_size =
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra);
rxrpc_seq_t tx_win = READ_ONCE(call->tx_hard_ack);
if (_tx_win)
*_tx_win = tx_win;
return call->tx_top - tx_win < win_size;
}
/* /*
* Wait for space to appear in the Tx queue or a signal to occur. * Wait for space to appear in the Tx queue or a signal to occur.
*/ */
...@@ -26,9 +41,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, ...@@ -26,9 +41,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
{ {
for (;;) { for (;;) {
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
if (call->tx_top - call->tx_hard_ack < if (rxrpc_check_tx_space(call, NULL))
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0; return 0;
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
...@@ -49,7 +62,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, ...@@ -49,7 +62,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
* Wait for space to appear in the Tx queue uninterruptibly, but with * Wait for space to appear in the Tx queue uninterruptibly, but with
* a timeout of 2*RTT if no progress was made and a signal occurred. * a timeout of 2*RTT if no progress was made and a signal occurred.
*/ */
static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
struct rxrpc_call *call) struct rxrpc_call *call)
{ {
rxrpc_seq_t tx_start, tx_win; rxrpc_seq_t tx_start, tx_win;
...@@ -58,8 +71,8 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, ...@@ -58,8 +71,8 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
rtt = READ_ONCE(call->peer->rtt); rtt = READ_ONCE(call->peer->rtt);
rtt2 = nsecs_to_jiffies64(rtt) * 2; rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 1) if (rtt2 < 2)
rtt2 = 1; rtt2 = 2;
timeout = rtt2; timeout = rtt2;
tx_start = READ_ONCE(call->tx_hard_ack); tx_start = READ_ONCE(call->tx_hard_ack);
...@@ -68,16 +81,13 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, ...@@ -68,16 +81,13 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
tx_win = READ_ONCE(call->tx_hard_ack); tx_win = READ_ONCE(call->tx_hard_ack);
if (call->tx_top - tx_win < if (rxrpc_check_tx_space(call, &tx_win))
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0; return 0;
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
return call->error; return call->error;
if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) && if (timeout == 0 &&
timeout == 0 &&
tx_win == tx_start && signal_pending(current)) tx_win == tx_start && signal_pending(current))
return -EINTR; return -EINTR;
...@@ -91,6 +101,26 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, ...@@ -91,6 +101,26 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
} }
} }
/*
* Wait for space to appear in the Tx queue uninterruptibly.
*/
static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
struct rxrpc_call *call,
long *timeo)
{
for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);
if (rxrpc_check_tx_space(call, NULL))
return 0;
if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;
trace_rxrpc_transmit(call, rxrpc_transmit_wait);
*timeo = schedule_timeout(*timeo);
}
}
/* /*
* wait for space to appear in the transmit/ACK window * wait for space to appear in the transmit/ACK window
* - caller holds the socket locked * - caller holds the socket locked
...@@ -108,10 +138,19 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, ...@@ -108,10 +138,19 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
add_wait_queue(&call->waitq, &myself); add_wait_queue(&call->waitq, &myself);
switch (call->interruptibility) {
case RXRPC_INTERRUPTIBLE:
if (waitall) if (waitall)
ret = rxrpc_wait_for_tx_window_nonintr(rx, call); ret = rxrpc_wait_for_tx_window_waitall(rx, call);
else else
ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo); ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
break;
case RXRPC_PREINTERRUPTIBLE:
case RXRPC_UNINTERRUPTIBLE:
default:
ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
break;
}
remove_wait_queue(&call->waitq, &myself); remove_wait_queue(&call->waitq, &myself);
set_current_state(TASK_RUNNING); set_current_state(TASK_RUNNING);
...@@ -302,9 +341,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -302,9 +341,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
_debug("alloc"); _debug("alloc");
if (call->tx_top - call->tx_hard_ack >= if (!rxrpc_check_tx_space(call, NULL)) {
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra)) {
ret = -EAGAIN; ret = -EAGAIN;
if (msg->msg_flags & MSG_DONTWAIT) if (msg->msg_flags & MSG_DONTWAIT)
goto maybe_error; goto maybe_error;
...@@ -619,7 +656,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -619,7 +656,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
.call.tx_total_len = -1, .call.tx_total_len = -1,
.call.user_call_ID = 0, .call.user_call_ID = 0,
.call.nr_timeouts = 0, .call.nr_timeouts = 0,
.call.intr = true, .call.interruptibility = RXRPC_INTERRUPTIBLE,
.abort_code = 0, .abort_code = 0,
.command = RXRPC_CMD_SEND_DATA, .command = RXRPC_CMD_SEND_DATA,
.exclusive = false, .exclusive = false,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment