Commit f03273f1 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller

tipc: use generic SKB list APIs to manage link receive queue

Use standard SKB list APIs associated with struct sk_buff_head to
manage link's receive queue to simplify its relevant code cemplexity.
Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent bc6fecd4
...@@ -165,7 +165,7 @@ extern struct tipc_bearer __rcu *bearer_list[]; ...@@ -165,7 +165,7 @@ extern struct tipc_bearer __rcu *bearer_list[];
* TIPC routines available to supported media types * TIPC routines available to supported media types
*/ */
void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *tb_ptr);
int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
int tipc_disable_bearer(const char *name); int tipc_disable_bearer(const char *name);
......
...@@ -966,29 +966,17 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, ...@@ -966,29 +966,17 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
} }
} }
/** static void link_retrieve_defq(struct tipc_link *link,
* link_insert_deferred_queue - insert deferred messages back into receive chain struct sk_buff_head *list)
*/
static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
struct sk_buff *buf)
{ {
struct sk_buff_head head;
struct sk_buff *skb = NULL;
u32 seq_no; u32 seq_no;
if (skb_queue_empty(&l_ptr->deferred_queue)) if (skb_queue_empty(&link->deferred_queue))
return buf; return;
seq_no = buf_seqno(skb_peek(&l_ptr->deferred_queue)); seq_no = buf_seqno(skb_peek(&link->deferred_queue));
if (seq_no == mod(l_ptr->next_in_no)) { if (seq_no == mod(link->next_in_no))
__skb_queue_head_init(&head); skb_queue_splice_tail_init(&link->deferred_queue, list);
skb_queue_splice_tail_init(&l_ptr->deferred_queue, &head);
skb = head.next;
skb->prev = NULL;
head.prev->next = buf;
head.prev->prev = NULL;
}
return skb;
} }
/** /**
...@@ -1048,43 +1036,43 @@ static int link_recv_buf_validate(struct sk_buff *buf) ...@@ -1048,43 +1036,43 @@ static int link_recv_buf_validate(struct sk_buff *buf)
/** /**
* tipc_rcv - process TIPC packets/messages arriving from off-node * tipc_rcv - process TIPC packets/messages arriving from off-node
* @head: pointer to message buffer chain * @skb: TIPC packet
* @b_ptr: pointer to bearer message arrived on * @b_ptr: pointer to bearer message arrived on
* *
* Invoked with no locks held. Bearer pointer must point to a valid bearer * Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive. * structure (i.e. cannot be NULL), but bearer can be inactive.
*/ */
void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
{ {
while (head) { struct sk_buff_head head;
struct tipc_node *n_ptr; struct tipc_node *n_ptr;
struct tipc_link *l_ptr; struct tipc_link *l_ptr;
struct sk_buff *buf = head;
struct sk_buff *skb1, *tmp; struct sk_buff *skb1, *tmp;
struct tipc_msg *msg; struct tipc_msg *msg;
u32 seq_no; u32 seq_no;
u32 ackd; u32 ackd;
u32 released; u32 released;
head = head->next; __skb_queue_head_init(&head);
buf->next = NULL; __skb_queue_tail(&head, skb);
while ((skb = __skb_dequeue(&head))) {
/* Ensure message is well-formed */ /* Ensure message is well-formed */
if (unlikely(!link_recv_buf_validate(buf))) if (unlikely(!link_recv_buf_validate(skb)))
goto discard; goto discard;
/* Ensure message data is a single contiguous unit */ /* Ensure message data is a single contiguous unit */
if (unlikely(skb_linearize(buf))) if (unlikely(skb_linearize(skb)))
goto discard; goto discard;
/* Handle arrival of a non-unicast link message */ /* Handle arrival of a non-unicast link message */
msg = buf_msg(buf); msg = buf_msg(skb);
if (unlikely(msg_non_seq(msg))) { if (unlikely(msg_non_seq(msg))) {
if (msg_user(msg) == LINK_CONFIG) if (msg_user(msg) == LINK_CONFIG)
tipc_disc_rcv(buf, b_ptr); tipc_disc_rcv(skb, b_ptr);
else else
tipc_bclink_rcv(buf); tipc_bclink_rcv(skb);
continue; continue;
} }
...@@ -1145,8 +1133,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1145,8 +1133,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Process the incoming packet */ /* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) { if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) { if (msg_user(msg) == LINK_PROTOCOL) {
tipc_link_proto_rcv(l_ptr, buf); tipc_link_proto_rcv(l_ptr, skb);
head = link_insert_deferred_queue(l_ptr, head); link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
} }
...@@ -1156,8 +1144,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1156,8 +1144,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (link_working_working(l_ptr)) { if (link_working_working(l_ptr)) {
/* Re-insert buffer in front of queue */ /* Re-insert buffer in front of queue */
buf->next = head; __skb_queue_head(&head, skb);
head = buf;
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
} }
...@@ -1166,33 +1153,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1166,33 +1153,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Link is now in state WORKING_WORKING */ /* Link is now in state WORKING_WORKING */
if (unlikely(seq_no != mod(l_ptr->next_in_no))) { if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
link_handle_out_of_seq_msg(l_ptr, buf); link_handle_out_of_seq_msg(l_ptr, skb);
head = link_insert_deferred_queue(l_ptr, head); link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
} }
l_ptr->next_in_no++; l_ptr->next_in_no++;
if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
head = link_insert_deferred_queue(l_ptr, head); link_retrieve_defq(l_ptr, &head);
if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
l_ptr->stats.sent_acks++; l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
} }
if (tipc_link_prepare_input(l_ptr, &buf)) { if (tipc_link_prepare_input(l_ptr, &skb)) {
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
} }
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
msg = buf_msg(buf);
if (tipc_link_input(l_ptr, buf) != 0) if (tipc_link_input(l_ptr, skb) != 0)
goto discard; goto discard;
continue; continue;
unlock_discard: unlock_discard:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
discard: discard:
kfree_skb(buf); kfree_skb(skb);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment