Commit 8f4043f1 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-improve-TIPC-unicast-link-throughput'

Tuong Lien says:

====================
tipc: improve TIPC unicast link throughput

The series introduces an algorithm to improve TIPC throughput especially
in terms of packet loss, also tries to reduce packet duplication due to
overactive NACK sending mechanism.

The link failover situation is also covered by the patches.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 29502bb1 58ee86b8
......@@ -151,6 +151,7 @@ struct tipc_link {
/* Failover/synch */
u16 drop_point;
struct sk_buff *failover_reasm_skb;
struct sk_buff_head failover_deferdq;
/* Max packet negotiation */
u16 mtu;
......@@ -209,6 +210,7 @@ enum {
};
#define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */
#define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1))
/*
* Interval between NACKs when packets arrive out of order
......@@ -246,6 +248,10 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
static void tipc_link_build_bc_init_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq);
/*
* Simple non-static link routines (i.e. referenced outside this file)
......@@ -493,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
__skb_queue_head_init(&l->transmq);
__skb_queue_head_init(&l->backlogq);
__skb_queue_head_init(&l->deferdq);
__skb_queue_head_init(&l->failover_deferdq);
skb_queue_head_init(&l->wakeupq);
skb_queue_head_init(l->inputq);
return true;
......@@ -883,6 +890,7 @@ void tipc_link_reset(struct tipc_link *l)
__skb_queue_purge(&l->transmq);
__skb_queue_purge(&l->deferdq);
__skb_queue_purge(&l->backlogq);
__skb_queue_purge(&l->failover_deferdq);
l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
......@@ -1154,34 +1162,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
* Consumes buffer
*/
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
struct sk_buff_head *inputq,
struct sk_buff **reasm_skb)
{
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff **reasm_skb = &l->reasm_buf;
struct sk_buff *iskb;
struct sk_buff_head tmpq;
int usr = msg_user(hdr);
int rc = 0;
int pos = 0;
int ipos = 0;
if (unlikely(usr == TUNNEL_PROTOCOL)) {
if (msg_type(hdr) == SYNCH_MSG) {
__skb_queue_purge(&l->deferdq);
goto drop;
}
if (!tipc_msg_extract(skb, &iskb, &ipos))
return rc;
kfree_skb(skb);
skb = iskb;
hdr = buf_msg(skb);
if (less(msg_seqno(hdr), l->drop_point))
goto drop;
if (tipc_data_input(l, skb, inputq))
return rc;
usr = msg_user(hdr);
reasm_skb = &l->failover_reasm_skb;
}
if (usr == MSG_BUNDLER) {
skb_queue_head_init(&tmpq);
......@@ -1206,11 +1194,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
tipc_bcast_unlock(l->net);
}
drop:
kfree_skb(skb);
return 0;
}
/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the
* inner message along with the ones in the old link's
* deferdq
* @l: tunnel link
* @skb: TUNNEL_PROTOCOL message
* @inputq: queue to put messages ready for delivery
*/
static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct sk_buff **reasm_skb = &l->failover_reasm_skb;
struct sk_buff_head *fdefq = &l->failover_deferdq;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff *iskb;
int ipos = 0;
int rc = 0;
u16 seqno;
/* SYNCH_MSG */
if (msg_type(hdr) == SYNCH_MSG)
goto drop;
/* FAILOVER_MSG */
if (!tipc_msg_extract(skb, &iskb, &ipos)) {
pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
skb_queue_len(fdefq));
return rc;
}
do {
seqno = buf_seqno(iskb);
if (unlikely(less(seqno, l->drop_point))) {
kfree_skb(iskb);
continue;
}
if (unlikely(seqno != l->drop_point)) {
__tipc_skb_queue_sorted(fdefq, seqno, iskb);
continue;
}
l->drop_point++;
if (!tipc_data_input(l, iskb, inputq))
rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
if (unlikely(rc))
break;
} while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
drop:
kfree_skb(skb);
return rc;
}
static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
{
bool released = false;
......@@ -1226,6 +1269,106 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
return released;
}
/* tipc_build_gap_ack_blks - build Gap ACK blocks
* @l: tipc link that data have come with gaps in sequence if any
* @data: data buffer to store the Gap ACK blocks after built
*
* returns the actual allocated memory size
*/
static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
{
struct sk_buff *skb = skb_peek(&l->deferdq);
struct tipc_gap_ack_blks *ga = data;
u16 len, expect, seqno = 0;
u8 n = 0;
if (!skb)
goto exit;
expect = buf_seqno(skb);
skb_queue_walk(&l->deferdq, skb) {
seqno = buf_seqno(skb);
if (unlikely(more(seqno, expect))) {
ga->gacks[n].ack = htons(expect - 1);
ga->gacks[n].gap = htons(seqno - expect);
if (++n >= MAX_GAP_ACK_BLKS) {
pr_info_ratelimited("Too few Gap ACK blocks!\n");
goto exit;
}
} else if (unlikely(less(seqno, expect))) {
pr_warn("Unexpected skb in deferdq!\n");
continue;
}
expect = seqno + 1;
}
/* last block */
ga->gacks[n].ack = htons(seqno);
ga->gacks[n].gap = 0;
n++;
exit:
len = tipc_gap_ack_blks_sz(n);
ga->len = htons(len);
ga->gack_cnt = n;
return len;
}
/* tipc_link_advance_transmq - advance TIPC link transmq queue by releasing
* acked packets, also doing retransmissions if
* gaps found
* @l: tipc link with transmq queue to be advanced
* @acked: seqno of last packet acked by peer without any gaps before
* @gap: # of gap packets
* @ga: buffer pointer to Gap ACK blocks from peer
* @xmitq: queue for accumulating the retransmitted packets if any
*/
static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb, *_skb, *tmp;
struct tipc_msg *hdr;
u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
u16 seqno;
u16 n = 0;
skb_queue_walk_safe(&l->transmq, skb, tmp) {
seqno = buf_seqno(skb);
next_gap_ack:
if (less_eq(seqno, acked)) {
/* release skb */
__skb_unlink(skb, &l->transmq);
kfree_skb(skb);
} else if (less_eq(seqno, acked + gap)) {
/* retransmit skb */
if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
continue;
TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
if (!_skb)
continue;
hdr = buf_msg(_skb);
msg_set_ack(hdr, ack);
msg_set_bcast_ack(hdr, bc_ack);
_skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb);
l->stats.retransmitted++;
} else {
/* retry with Gap ACK blocks if any */
if (!ga || n >= ga->gack_cnt)
break;
acked = ntohs(ga->gacks[n].ack);
gap = ntohs(ga->gacks[n].gap);
n++;
goto next_gap_ack;
}
}
}
/* tipc_link_build_state_msg: prepare link state message for transmission
*
* Note that sending of broadcast ack is coordinated among nodes, to reduce
......@@ -1280,6 +1423,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
struct sk_buff_head *xmitq)
{
u32 def_cnt = ++l->stats.deferred_recv;
u32 defq_len = skb_queue_len(&l->deferdq);
int match1, match2;
if (link_is_bc_rcvlink(l)) {
......@@ -1290,7 +1434,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
return 0;
}
if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
if (defq_len >= 3 && !((defq_len - 3) % 16))
tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
return 0;
}
......@@ -1304,29 +1448,29 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct sk_buff_head *defq = &l->deferdq;
struct tipc_msg *hdr;
struct tipc_msg *hdr = buf_msg(skb);
u16 seqno, rcv_nxt, win_lim;
int rc = 0;
/* Verify and update link state */
if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
return tipc_link_proto_rcv(l, skb, xmitq);
/* Don't send probe at next timeout expiration */
l->silent_intv_cnt = 0;
do {
hdr = buf_msg(skb);
seqno = msg_seqno(hdr);
rcv_nxt = l->rcv_nxt;
win_lim = rcv_nxt + TIPC_MAX_LINK_WIN;
/* Verify and update link state */
if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
return tipc_link_proto_rcv(l, skb, xmitq);
if (unlikely(!link_is_up(l))) {
if (l->state == LINK_ESTABLISHING)
rc = TIPC_LINK_UP_EVT;
goto drop;
}
/* Don't send probe at next timeout expiration */
l->silent_intv_cnt = 0;
/* Drop if outside receive window */
if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) {
l->stats.duplicates++;
......@@ -1351,13 +1495,16 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
/* Deliver packet */
l->rcv_nxt++;
l->stats.recv_pkts++;
if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
else if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
rc |= tipc_link_build_state_msg(l, xmitq);
if (unlikely(rc & ~TIPC_LINK_SND_STATE))
break;
} while ((skb = __skb_dequeue(defq)));
} while ((skb = __tipc_skb_dequeue(defq, l->rcv_nxt)));
return rc;
drop:
......@@ -1378,6 +1525,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
struct tipc_mon_state *mstate = &l->mon_state;
int dlen = 0;
void *data;
u16 glen = 0;
/* Don't send protocol message during reset or link failover */
if (tipc_link_is_blocked(l))
......@@ -1390,8 +1538,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
tipc_max_domain_size, l->addr,
tipc_own_addr(l->net), 0, 0, 0);
tipc_max_domain_size + MAX_GAP_ACK_BLKS_SZ,
l->addr, tipc_own_addr(l->net), 0, 0, 0);
if (!skb)
return;
......@@ -1418,9 +1566,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
msg_set_probe(hdr, probe);
msg_set_is_keepalive(hdr, probe || probe_reply);
tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
msg_set_size(hdr, INT_H_SIZE + dlen);
skb_trim(skb, INT_H_SIZE + dlen);
if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
glen = tipc_build_gap_ack_blks(l, data);
tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
msg_set_size(hdr, INT_H_SIZE + glen + dlen);
skb_trim(skb, INT_H_SIZE + glen + dlen);
l->stats.sent_states++;
l->rcv_unacked = 0;
} else {
......@@ -1479,6 +1629,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
int mtyp, struct sk_buff_head *xmitq)
{
struct sk_buff_head *fdefq = &tnl->failover_deferdq;
struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq;
......@@ -1506,7 +1657,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
/* Initialize reusable tunnel packet header */
tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr);
pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq);
if (mtyp == SYNCH_MSG)
pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq));
else
pktcnt = skb_queue_len(&l->transmq);
pktcnt += skb_queue_len(&l->backlogq);
msg_set_msgcnt(&tnlhdr, pktcnt);
msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
tnl:
......@@ -1537,6 +1692,14 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
tnl->drop_point = l->rcv_nxt;
tnl->failover_reasm_skb = l->reasm_buf;
l->reasm_buf = NULL;
/* Failover the link's deferdq */
if (unlikely(!skb_queue_empty(fdefq))) {
pr_warn("Link failover deferdq not empty: %d!\n",
skb_queue_len(fdefq));
__skb_queue_purge(fdefq);
}
skb_queue_splice_init(&l->deferdq, fdefq);
}
}
......@@ -1590,6 +1753,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_gap_ack_blks *ga = NULL;
u16 rcvgap = 0;
u16 ack = msg_ack(hdr);
u16 gap = msg_seq_gap(hdr);
......@@ -1600,6 +1764,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
u16 dlen = msg_data_sz(hdr);
int mtyp = msg_type(hdr);
bool reply = msg_probe(hdr);
u16 glen = 0;
void *data;
char *if_name;
int rc = 0;
......@@ -1697,7 +1862,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
rc = TIPC_LINK_UP_EVT;
break;
}
tipc_mon_rcv(l->net, data, dlen, l->addr,
/* Receive Gap ACK blocks from peer if any */
if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
ga = (struct tipc_gap_ack_blks *)data;
glen = ntohs(ga->len);
/* sanity check: if failed, ignore Gap ACK blocks */
if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
ga = NULL;
}
tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
&l->mon_state, l->bearer_id);
/* Send NACK if peer has sent pkts we haven't received yet */
......@@ -1706,13 +1881,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
if (rcvgap || reply)
tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
rcvgap, 0, 0, xmitq);
tipc_link_release_pkts(l, ack);
tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
/* If NACK, retransmit will now start at right position */
if (gap) {
rc = tipc_link_retrans(l, l, ack + 1, ack + gap, xmitq);
if (gap)
l->stats.recv_nacks++;
}
tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq)))
......
......@@ -117,6 +117,37 @@ struct tipc_msg {
__be32 hdr[15];
};
/* struct tipc_gap_ack - TIPC Gap ACK block
* @ack: seqno of the last consecutive packet in link deferdq
* @gap: number of gap packets since the last ack
*
* E.g:
* link deferdq: 1 2 3 4 10 11 13 14 15 20
* --> Gap ACK blocks: <4, 5>, <11, 1>, <15, 4>, <20, 0>
*/
struct tipc_gap_ack {
__be16 ack;
__be16 gap;
};
/* struct tipc_gap_ack_blks
* @len: actual length of the record
* @gack_cnt: number of Gap ACK blocks in the record
* @gacks: array of Gap ACK blocks
*/
struct tipc_gap_ack_blks {
__be16 len;
u8 gack_cnt;
u8 reserved;
struct tipc_gap_ack gacks[];
};
#define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \
sizeof(struct tipc_gap_ack) * (n))
#define MAX_GAP_ACK_BLKS 32
#define MAX_GAP_ACK_BLKS_SZ tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS)
static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
{
return (struct tipc_msg *)skb->data;
......@@ -1120,4 +1151,25 @@ static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list,
tipc_skb_queue_splice_tail(&tmp, head);
}
/* __tipc_skb_dequeue() - dequeue the head skb according to expected seqno
* @list: list to be dequeued from
* @seqno: seqno of the expected msg
*
* returns skb dequeued from the list if its seqno is less than or equal to
* the expected one, otherwise the skb is still hold
*
* Note: must be used with appropriate locks held only
*/
static inline struct sk_buff *__tipc_skb_dequeue(struct sk_buff_head *list,
u16 seqno)
{
struct sk_buff *skb = skb_peek(list);
if (skb && less_eq(buf_seqno(skb), seqno)) {
__skb_unlink(skb, list);
return skb;
}
return NULL;
}
#endif
......@@ -52,7 +52,8 @@ enum {
TIPC_BCAST_RCAST = (1 << 4),
TIPC_NODE_ID128 = (1 << 5),
TIPC_LINK_PROTO_SEQNO = (1 << 6),
TIPC_MCAST_RBCTL = (1 << 7)
TIPC_MCAST_RBCTL = (1 << 7),
TIPC_GAP_ACK_BLOCK = (1 << 8)
};
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
......@@ -62,7 +63,8 @@ enum {
TIPC_BLOCK_FLOWCTL | \
TIPC_NODE_ID128 | \
TIPC_LINK_PROTO_SEQNO | \
TIPC_MCAST_RBCTL)
TIPC_MCAST_RBCTL | \
TIPC_GAP_ACK_BLOCK)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment