Commit 645eb543 authored by David S. Miller's avatar David S. Miller

Merge branch 'rxrpc-fixes'

David Howells says:

====================
rxrpc: Miscellaneous fixes

Here some miscellaneous fixes for AF_RXRPC:

 (1) The zero serial number has a special meaning in an ACK packet serial
     reference, so skip it when assigning serial numbers to transmitted
     packets.

 (2) Don't set the reference serial number in a delayed ACK as the ACK
     cannot be used for RTT calculation.

 (3) Don't emit a DUP ACK response to a PING RESPONSE ACK coming back to a
     call that completed in the meantime.

 (4) Fix the counting of acks and nacks in ACK packet to better drive
     congestion management.  We want to know if there have been new
     acks/nacks since the last ACK packet, not that there are still
     acks/nacks.  This is more complicated as we have to save the old SACK
     table and compare it.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents fdeba0b5 41b7fa15
......@@ -128,6 +128,7 @@
EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \
EM(rxrpc_skb_get_conn_secured, "GET conn-secd") \
EM(rxrpc_skb_get_conn_work, "GET conn-work") \
EM(rxrpc_skb_get_last_nack, "GET last-nack") \
EM(rxrpc_skb_get_local_work, "GET locl-work") \
EM(rxrpc_skb_get_reject_work, "GET rej-work ") \
EM(rxrpc_skb_get_to_recvmsg, "GET to-recv ") \
......@@ -141,6 +142,7 @@
EM(rxrpc_skb_put_error_report, "PUT error-rep") \
EM(rxrpc_skb_put_input, "PUT input ") \
EM(rxrpc_skb_put_jumbo_subpacket, "PUT jumbo-sub") \
EM(rxrpc_skb_put_last_nack, "PUT last-nack") \
EM(rxrpc_skb_put_purge, "PUT purge ") \
EM(rxrpc_skb_put_rotate, "PUT rotate ") \
EM(rxrpc_skb_put_unknown, "PUT unknown ") \
......@@ -1552,7 +1554,7 @@ TRACE_EVENT(rxrpc_congest,
memcpy(&__entry->sum, summary, sizeof(__entry->sum));
),
TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nA=%u,%u+%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nA=%u,%u+%u,%u b=%u u=%u d=%u l=%x%s%s%s",
__entry->call,
__entry->ack_serial,
__print_symbolic(__entry->sum.ack_reason, rxrpc_ack_names),
......@@ -1560,9 +1562,9 @@ TRACE_EVENT(rxrpc_congest,
__print_symbolic(__entry->sum.mode, rxrpc_congest_modes),
__entry->sum.cwnd,
__entry->sum.ssthresh,
__entry->sum.nr_acks, __entry->sum.saw_nacks,
__entry->sum.nr_acks, __entry->sum.nr_retained_nacks,
__entry->sum.nr_new_acks,
__entry->sum.nr_rot_new_acks,
__entry->sum.nr_new_nacks,
__entry->top - __entry->hard_ack,
__entry->sum.cumulative_acks,
__entry->sum.dup_acks,
......
......@@ -199,11 +199,19 @@ struct rxrpc_host_header {
*/
struct rxrpc_skb_priv {
struct rxrpc_connection *conn; /* Connection referred to (poke packet) */
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 flags;
union {
struct {
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 flags;
#define RXRPC_RX_VERIFIED 0x01
};
struct {
rxrpc_seq_t first_ack; /* First packet in acks table */
u8 nr_acks; /* Number of acks+nacks */
u8 nr_nacks; /* Number of nacks */
};
};
struct rxrpc_host_header hdr; /* RxRPC packet header from this packet */
};
......@@ -510,7 +518,7 @@ struct rxrpc_connection {
enum rxrpc_call_completion completion; /* Completion condition */
s32 abort_code; /* Abort code of connection abort */
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
rxrpc_serial_t tx_serial; /* Outgoing packet serial number counter */
unsigned int hi_serial; /* highest serial number received */
u32 service_id; /* Service ID, possibly upgraded */
u32 security_level; /* Security level selected */
......@@ -692,11 +700,11 @@ struct rxrpc_call {
u8 cong_dup_acks; /* Count of ACKs showing missing packets */
u8 cong_cumul_acks; /* Cumulative ACK count */
ktime_t cong_tstamp; /* Last time cwnd was changed */
struct sk_buff *cong_last_nack; /* Last ACK with nacks received */
/* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */
u16 ackr_sack_base; /* Starting slot in SACK table ring */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_seq_t ackr_window; /* Base of SACK window */
rxrpc_seq_t ackr_wtop; /* Base of SACK window */
unsigned int ackr_nr_unacked; /* Number of unacked packets */
......@@ -730,7 +738,8 @@ struct rxrpc_call {
struct rxrpc_ack_summary {
u16 nr_acks; /* Number of ACKs in packet */
u16 nr_new_acks; /* Number of new ACKs in packet */
u16 nr_rot_new_acks; /* Number of rotated new ACKs */
u16 nr_new_nacks; /* Number of new nacks in packet */
u16 nr_retained_nacks; /* Number of nacks retained between ACKs */
u8 ack_reason;
bool saw_nacks; /* Saw NACKs in packet */
bool new_low_nack; /* T if new low NACK found */
......@@ -822,6 +831,20 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
#include <trace/events/rxrpc.h>
/*
* Allocate the next serial number on a connection. 0 must be skipped.
*/
static inline rxrpc_serial_t rxrpc_get_next_serial(struct rxrpc_connection *conn)
{
rxrpc_serial_t serial;
serial = conn->tx_serial;
if (serial == 0)
serial = 1;
conn->tx_serial = serial + 1;
return serial;
}
/*
* af_rxrpc.c
*/
......
......@@ -43,8 +43,6 @@ void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
unsigned long expiry = rxrpc_soft_ack_delay;
unsigned long now = jiffies, ack_at;
call->ackr_serial = serial;
if (rxrpc_soft_ack_delay < expiry)
expiry = rxrpc_soft_ack_delay;
if (call->peer->srtt_us != 0)
......@@ -114,6 +112,7 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
{
struct rxrpc_ackpacket *ack = NULL;
struct rxrpc_skb_priv *sp;
struct rxrpc_txbuf *txb;
unsigned long resend_at;
rxrpc_seq_t transmitted = READ_ONCE(call->tx_transmitted);
......@@ -141,14 +140,15 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
* explicitly NAK'd packets.
*/
if (ack_skb) {
sp = rxrpc_skb(ack_skb);
ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);
for (i = 0; i < ack->nAcks; i++) {
for (i = 0; i < sp->nr_acks; i++) {
rxrpc_seq_t seq;
if (ack->acks[i] & 1)
continue;
seq = ntohl(ack->firstPacket) + i;
seq = sp->first_ack + i;
if (after(txb->seq, transmitted))
break;
if (after(txb->seq, seq))
......@@ -373,7 +373,6 @@ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
{
unsigned long now, next, t;
rxrpc_serial_t ackr_serial;
bool resend = false, expired = false;
s32 abort_code;
......@@ -423,8 +422,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET);
ackr_serial = xchg(&call->ackr_serial, 0);
rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial,
rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0,
rxrpc_propose_ack_ping_for_lost_ack);
}
......
......@@ -686,6 +686,7 @@ static void rxrpc_destroy_call(struct work_struct *work)
del_timer_sync(&call->timer);
rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
rxrpc_cleanup_ring(call);
while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
struct rxrpc_txbuf, call_link))) {
......
......@@ -95,6 +95,14 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
_enter("%d", conn->debug_id);
if (sp && sp->hdr.type == RXRPC_PACKET_TYPE_ACK) {
if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
&pkt.ack, sizeof(pkt.ack)) < 0)
return;
if (pkt.ack.reason == RXRPC_ACK_PING_RESPONSE)
return;
}
chan = &conn->channels[channel];
/* If the last call got moved on whilst we were waiting to run, just
......@@ -117,7 +125,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
iov[2].iov_base = &ack_info;
iov[2].iov_len = sizeof(ack_info);
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
pkt.whdr.epoch = htonl(conn->proto.epoch);
pkt.whdr.cid = htonl(conn->proto.cid | channel);
......
......@@ -45,11 +45,9 @@ static void rxrpc_congestion_management(struct rxrpc_call *call,
}
cumulative_acks += summary->nr_new_acks;
cumulative_acks += summary->nr_rot_new_acks;
if (cumulative_acks > 255)
cumulative_acks = 255;
summary->mode = call->cong_mode;
summary->cwnd = call->cong_cwnd;
summary->ssthresh = call->cong_ssthresh;
summary->cumulative_acks = cumulative_acks;
......@@ -151,6 +149,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call,
cwnd = RXRPC_TX_MAX_WINDOW;
call->cong_cwnd = cwnd;
call->cong_cumul_acks = cumulative_acks;
summary->mode = call->cong_mode;
trace_rxrpc_congest(call, summary, acked_serial, change);
if (resend)
rxrpc_resend(call, skb);
......@@ -213,7 +212,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
if (before_eq(txb->seq, call->acks_hard_ack))
continue;
summary->nr_rot_new_acks++;
if (test_bit(RXRPC_TXBUF_LAST, &txb->flags)) {
set_bit(RXRPC_CALL_TX_LAST, &call->flags);
rot_last = true;
......@@ -254,6 +252,11 @@ static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
{
ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
if (unlikely(call->cong_last_nack)) {
rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
call->cong_last_nack = NULL;
}
switch (__rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
......@@ -702,6 +705,43 @@ static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
wake_up(&call->waitq);
}
/*
* Determine how many nacks from the previous ACK have now been satisfied.
*/
static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call,
struct rxrpc_ack_summary *summary,
rxrpc_seq_t seq)
{
struct sk_buff *skb = call->cong_last_nack;
struct rxrpc_ackpacket ack;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int i, new_acks = 0, retained_nacks = 0;
rxrpc_seq_t old_seq = sp->first_ack;
u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(ack);
if (after_eq(seq, old_seq + sp->nr_acks)) {
summary->nr_new_acks += sp->nr_nacks;
summary->nr_new_acks += seq - (old_seq + sp->nr_acks);
summary->nr_retained_nacks = 0;
} else if (seq == old_seq) {
summary->nr_retained_nacks = sp->nr_nacks;
} else {
for (i = 0; i < sp->nr_acks; i++) {
if (acks[i] == RXRPC_ACK_TYPE_NACK) {
if (before(old_seq + i, seq))
new_acks++;
else
retained_nacks++;
}
}
summary->nr_new_acks += new_acks;
summary->nr_retained_nacks = retained_nacks;
}
return old_seq + sp->nr_acks;
}
/*
* Process individual soft ACKs.
*
......@@ -711,25 +751,51 @@ static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
* the timer on the basis that the peer might just not have processed them at
* the time the ACK was sent.
*/
static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks,
rxrpc_seq_t seq, int nr_acks,
struct rxrpc_ack_summary *summary)
static void rxrpc_input_soft_acks(struct rxrpc_call *call,
struct rxrpc_ack_summary *summary,
struct sk_buff *skb,
rxrpc_seq_t seq,
rxrpc_seq_t since)
{
unsigned int i;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int i, old_nacks = 0;
rxrpc_seq_t lowest_nak = seq + sp->nr_acks;
u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket);
for (i = 0; i < nr_acks; i++) {
for (i = 0; i < sp->nr_acks; i++) {
if (acks[i] == RXRPC_ACK_TYPE_ACK) {
summary->nr_acks++;
summary->nr_new_acks++;
if (after_eq(seq, since))
summary->nr_new_acks++;
} else {
if (!summary->saw_nacks &&
call->acks_lowest_nak != seq + i) {
call->acks_lowest_nak = seq + i;
summary->new_low_nack = true;
}
summary->saw_nacks = true;
if (before(seq, since)) {
/* Overlap with previous ACK */
old_nacks++;
} else {
summary->nr_new_nacks++;
sp->nr_nacks++;
}
if (before(seq, lowest_nak))
lowest_nak = seq;
}
seq++;
}
if (lowest_nak != call->acks_lowest_nak) {
call->acks_lowest_nak = lowest_nak;
summary->new_low_nack = true;
}
/* We *can* have more nacks than we did - the peer is permitted to drop
* packets it has soft-acked and re-request them. Further, it is
* possible for the nack distribution to change whilst the number of
* nacks stays the same or goes down.
*/
if (old_nacks < summary->nr_retained_nacks)
summary->nr_new_acks += summary->nr_retained_nacks - old_nacks;
summary->nr_retained_nacks = old_nacks;
}
/*
......@@ -773,7 +839,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_ackinfo info;
rxrpc_serial_t ack_serial, acked_serial;
rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt;
rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since;
int nr_acks, offset, ioffset;
_enter("");
......@@ -789,6 +855,8 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
prev_pkt = ntohl(ack.previousPacket);
hard_ack = first_soft_ack - 1;
nr_acks = ack.nAcks;
sp->first_ack = first_soft_ack;
sp->nr_acks = nr_acks;
summary.ack_reason = (ack.reason < RXRPC_ACK__INVALID ?
ack.reason : RXRPC_ACK__INVALID);
......@@ -858,6 +926,16 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
if (nr_acks > 0)
skb_condense(skb);
if (call->cong_last_nack) {
since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack);
rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
call->cong_last_nack = NULL;
} else {
summary.nr_new_acks = first_soft_ack - call->acks_first_seq;
call->acks_lowest_nak = first_soft_ack + nr_acks;
since = first_soft_ack;
}
call->acks_latest_ts = skb->tstamp;
call->acks_first_seq = first_soft_ack;
call->acks_prev_seq = prev_pkt;
......@@ -866,7 +944,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
case RXRPC_ACK_PING:
break;
default:
if (after(acked_serial, call->acks_highest_serial))
if (acked_serial && after(acked_serial, call->acks_highest_serial))
call->acks_highest_serial = acked_serial;
break;
}
......@@ -905,8 +983,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
if (nr_acks > 0) {
if (offset > (int)skb->len - nr_acks)
return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack);
rxrpc_input_soft_acks(call, skb->data + offset, first_soft_ack,
nr_acks, &summary);
rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since);
rxrpc_get_skb(skb, rxrpc_skb_get_last_nack);
call->cong_last_nack = skb;
}
if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) &&
......
......@@ -216,7 +216,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
iov[0].iov_len = sizeof(txb->wire) + sizeof(txb->ack) + n;
len = iov[0].iov_len;
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
txb->wire.serial = htonl(serial);
trace_rxrpc_tx_ack(call->debug_id, serial,
ntohl(txb->ack.firstPacket),
......@@ -302,7 +302,7 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
iov[0].iov_base = &pkt;
iov[0].iov_len = sizeof(pkt);
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
pkt.whdr.serial = htonl(serial);
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, sizeof(pkt));
......@@ -334,7 +334,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
_enter("%x,{%d}", txb->seq, txb->len);
/* Each transmission of a Tx packet needs a new serial number */
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
txb->wire.serial = htonl(serial);
if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) &&
......@@ -558,7 +558,7 @@ void rxrpc_send_conn_abort(struct rxrpc_connection *conn)
len = iov[0].iov_len + iov[1].iov_len;
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
whdr.serial = htonl(serial);
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
......
......@@ -181,7 +181,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
atomic_read(&conn->active),
state,
key_serial(conn->key),
atomic_read(&conn->serial),
conn->tx_serial,
conn->hi_serial,
conn->channels[0].call_id,
conn->channels[1].call_id,
......
......@@ -664,7 +664,7 @@ static int rxkad_issue_challenge(struct rxrpc_connection *conn)
len = iov[0].iov_len + iov[1].iov_len;
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
whdr.serial = htonl(serial);
ret = kernel_sendmsg(conn->local->socket, &msg, iov, 2, len);
......@@ -721,7 +721,7 @@ static int rxkad_send_response(struct rxrpc_connection *conn,
len = iov[0].iov_len + iov[1].iov_len + iov[2].iov_len;
serial = atomic_inc_return(&conn->serial);
serial = rxrpc_get_next_serial(conn);
whdr.serial = htonl(serial);
rxrpc_local_dont_fragment(conn->local, false);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment