Commit c1ab3f1d authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: make struct tipc_link generic to support broadcast

Realizing that unicast is just a special case of broadcast, we also see
that we can go in the other direction, i.e., that modest changes to the
current unicast link can make it generic enough to support broadcast.

The following changes are introduced here:

- A new counter ("ackers") in struct tipc_link, to indicate how many
  peers need to ack a packet before it can be released.
- A corresponding counter in the skb user area, to keep track of how
  many peers a are left to ack before a buffer can be released.
- A new counter ("acked"), to keep persistent track of how far a peer
  has acked at the moment, i.e., where in the transmission queue to
  start updating buffers when the next ack arrives. This is to avoid
  double acknowledgements from a peer, with inadvertent relase of
  packets as a result.
- A more generic tipc_link_retrans() function, where retransmit starts
  from a given sequence number, instead of the first packet in the
  transmision queue. This is to minimize the number of retransmitted
  packets on the broadcast media.

When the new functionality is taken into use in the next commits,
we expect it to have minimal effect on unicast mode performance.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 32301906
...@@ -221,6 +221,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, ...@@ -221,6 +221,7 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
l->mtu = mtu; l->mtu = mtu;
l->priority = priority; l->priority = priority;
tipc_link_set_queue_limits(l, window); tipc_link_set_queue_limits(l, window);
l->ackers = 1;
l->inputq = inputq; l->inputq = inputq;
l->namedq = namedq; l->namedq = namedq;
l->state = LINK_RESETTING; l->state = LINK_RESETTING;
...@@ -647,6 +648,7 @@ void tipc_link_reset(struct tipc_link *l) ...@@ -647,6 +648,7 @@ void tipc_link_reset(struct tipc_link *l)
l->rcv_unacked = 0; l->rcv_unacked = 0;
l->snd_nxt = 1; l->snd_nxt = 1;
l->rcv_nxt = 1; l->rcv_nxt = 1;
l->acked = 0;
l->silent_intv_cnt = 0; l->silent_intv_cnt = 0;
l->stats.recv_info = 0; l->stats.recv_info = 0;
l->stale_count = 0; l->stale_count = 0;
...@@ -769,6 +771,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -769,6 +771,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
__skb_dequeue(list); __skb_dequeue(list);
__skb_queue_tail(transmq, skb); __skb_queue_tail(transmq, skb);
__skb_queue_tail(xmitq, _skb); __skb_queue_tail(xmitq, _skb);
TIPC_SKB_CB(skb)->ackers = l->ackers;
l->rcv_unacked = 0; l->rcv_unacked = 0;
seqno++; seqno++;
continue; continue;
...@@ -829,6 +832,7 @@ void tipc_link_push_packets(struct tipc_link *link) ...@@ -829,6 +832,7 @@ void tipc_link_push_packets(struct tipc_link *link)
skb = __skb_dequeue(&link->backlogq); skb = __skb_dequeue(&link->backlogq);
if (!skb) if (!skb)
break; break;
TIPC_SKB_CB(skb)->ackers = link->ackers;
msg = buf_msg(skb); msg = buf_msg(skb);
link->backlog[msg_importance(msg)].len--; link->backlog[msg_importance(msg)].len--;
msg_set_ack(msg, ack); msg_set_ack(msg, ack);
...@@ -862,6 +866,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) ...@@ -862,6 +866,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
l->backlog[msg_importance(hdr)].len--; l->backlog[msg_importance(hdr)].len--;
__skb_queue_tail(&l->transmq, skb); __skb_queue_tail(&l->transmq, skb);
__skb_queue_tail(xmitq, _skb); __skb_queue_tail(xmitq, _skb);
TIPC_SKB_CB(skb)->ackers = l->ackers;
msg_set_ack(hdr, ack); msg_set_ack(hdr, ack);
msg_set_seqno(hdr, seqno); msg_set_seqno(hdr, seqno);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in); msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
...@@ -947,11 +952,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, ...@@ -947,11 +952,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
} }
} }
static int tipc_link_retransm(struct tipc_link *l, int retransm, int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
struct sk_buff *_skb, *skb = skb_peek(&l->transmq); struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
struct tipc_msg *hdr; struct tipc_msg *hdr;
u16 ack = l->rcv_nxt - 1;
u16 bc_ack = l->owner->bclink.last_in;
if (!skb) if (!skb)
return 0; return 0;
...@@ -964,19 +971,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, ...@@ -964,19 +971,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
link_retransmit_failure(l, skb); link_retransmit_failure(l, skb);
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
} }
/* Move forward to where retransmission should start */
skb_queue_walk(&l->transmq, skb) { skb_queue_walk(&l->transmq, skb) {
if (!retransm) if (!less(buf_seqno(skb), from))
return 0; break;
}
skb_queue_walk_from(&l->transmq, skb) {
if (more(buf_seqno(skb), to))
break;
hdr = buf_msg(skb); hdr = buf_msg(skb);
_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
if (!_skb) if (!_skb)
return 0; return 0;
hdr = buf_msg(_skb); hdr = buf_msg(_skb);
msg_set_ack(hdr, l->rcv_nxt - 1); msg_set_ack(hdr, ack);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in); msg_set_bcast_ack(hdr, bc_ack);
_skb->priority = TC_PRIO_CONTROL; _skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb); __skb_queue_tail(xmitq, _skb);
retransm--;
l->stats.retransmitted++; l->stats.retransmitted++;
} }
return 0; return 0;
...@@ -1390,7 +1403,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1390,7 +1403,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
{ {
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
u16 rcvgap = 0; u16 rcvgap = 0;
u16 nacked_gap = msg_seq_gap(hdr); u16 ack = msg_ack(hdr);
u16 gap = msg_seq_gap(hdr);
u16 peers_snd_nxt = msg_next_sent(hdr); u16 peers_snd_nxt = msg_next_sent(hdr);
u16 peers_tol = msg_link_tolerance(hdr); u16 peers_tol = msg_link_tolerance(hdr);
u16 peers_prio = msg_linkprio(hdr); u16 peers_prio = msg_linkprio(hdr);
...@@ -1469,11 +1483,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1469,11 +1483,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
if (rcvgap || (msg_probe(hdr))) if (rcvgap || (msg_probe(hdr)))
tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
0, 0, xmitq); 0, 0, xmitq);
tipc_link_release_pkts(l, msg_ack(hdr)); tipc_link_release_pkts(l, ack);
/* If NACK, retransmit will now start at right position */ /* If NACK, retransmit will now start at right position */
if (nacked_gap) { if (gap) {
rc = tipc_link_retransm(l, nacked_gap, xmitq); rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq);
l->stats.recv_nacks++; l->stats.recv_nacks++;
} }
...@@ -1550,7 +1564,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr) ...@@ -1550,7 +1564,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
static void link_print(struct tipc_link *l, const char *str) static void link_print(struct tipc_link *l, const char *str)
{ {
struct sk_buff *hskb = skb_peek(&l->transmq); struct sk_buff *hskb = skb_peek(&l->transmq);
u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1;
u16 tail = l->snd_nxt - 1; u16 tail = l->snd_nxt - 1;
pr_info("%s Link <%s> state %x\n", str, l->name, l->state); pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
......
...@@ -134,6 +134,8 @@ struct tipc_stats { ...@@ -134,6 +134,8 @@ struct tipc_stats {
* @snt_nxt: next sequence number to use for outbound messages * @snt_nxt: next sequence number to use for outbound messages
* @last_retransmitted: sequence number of most recently retransmitted message * @last_retransmitted: sequence number of most recently retransmitted message
* @stale_count: # of identical retransmit requests made by peer * @stale_count: # of identical retransmit requests made by peer
* @ackers: # of peers that needs to ack each packet before it can be released
* @acked: # last packet acked by a certain peer. Used for broadcast.
* @rcv_nxt: next sequence number to expect for inbound messages * @rcv_nxt: next sequence number to expect for inbound messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node * @deferred_queue: deferred queue saved OOS b'cast message received from node
* @unacked_window: # of inbound messages rx'd without ack'ing back to peer * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
...@@ -143,6 +145,7 @@ struct tipc_stats { ...@@ -143,6 +145,7 @@ struct tipc_stats {
* @wakeupq: linked list of wakeup msgs waiting for link congestion to abate * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages * @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_buf: head of partially reassembled inbound message fragments * @reasm_buf: head of partially reassembled inbound message fragments
* @bc_rcvr: marks that this is a broadcast receiver link
* @stats: collects statistics regarding link activity * @stats: collects statistics regarding link activity
*/ */
struct tipc_link { struct tipc_link {
...@@ -201,6 +204,10 @@ struct tipc_link { ...@@ -201,6 +204,10 @@ struct tipc_link {
/* Fragmentation/reassembly */ /* Fragmentation/reassembly */
struct sk_buff *reasm_buf; struct sk_buff *reasm_buf;
/* Broadcast */
u16 ackers;
u16 acked;
/* Statistics */ /* Statistics */
struct tipc_stats stats; struct tipc_stats stats;
}; };
......
...@@ -112,6 +112,7 @@ struct tipc_skb_cb { ...@@ -112,6 +112,7 @@ struct tipc_skb_cb {
bool wakeup_pending; bool wakeup_pending;
u16 chain_sz; u16 chain_sz;
u16 chain_imp; u16 chain_imp;
u16 ackers;
}; };
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment