Commit 7d231e3f authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: resolve message disordering problem

When TIPC receives messages from multi-threaded device drivers it may
occasionally deliver messages to their destination sockets in the wrong
order. This happens despite correct resequencing at the link layer,
because the upcall path from link to socket is not protected by any
locks.

These commits solve this problem by introducing an 'input' message
queue in each link, through which messages must be delivered to the
upper layers.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 4134069f cb1b7280
/* /*
* net/tipc/bcast.c: TIPC broadcast code * net/tipc/bcast.c: TIPC broadcast code
* *
* Copyright (c) 2004-2006, 2014, Ericsson AB * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
* Copyright (c) 2004, Intel Corporation. * Copyright (c) 2004, Intel Corporation.
* Copyright (c) 2005, 2010-2011, Wind River Systems * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
...@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net) ...@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
tipc_link_reset_all(node); tipc_link_reset_all(node);
} }
void tipc_bclink_input(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}
uint tipc_bclink_get_mtu(void) uint tipc_bclink_get_mtu(void)
{ {
return MAX_PKT_DEFAULT_MCAST; return MAX_PKT_DEFAULT_MCAST;
...@@ -189,10 +196,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) ...@@ -189,10 +196,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
void tipc_bclink_wakeup_users(struct net *net) void tipc_bclink_wakeup_users(struct net *net)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *skb;
while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks))) tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
tipc_sk_rcv(net, skb);
} }
/** /**
...@@ -271,9 +276,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) ...@@ -271,9 +276,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
tipc_link_push_packets(tn->bcl); tipc_link_push_packets(tn->bcl);
bclink_set_last_sent(net); bclink_set_last_sent(net);
} }
if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks))) if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
exit: exit:
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
} }
...@@ -283,10 +287,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) ...@@ -283,10 +287,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
* *
* RCU and node lock set * RCU and node lock set
*/ */
void tipc_bclink_update_link_state(struct net *net, struct tipc_node *n_ptr, void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
u32 last_sent) u32 last_sent)
{ {
struct sk_buff *buf; struct sk_buff *buf;
struct net *net = n_ptr->net;
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
/* Ignore "stale" link state info */ /* Ignore "stale" link state info */
...@@ -317,7 +322,7 @@ void tipc_bclink_update_link_state(struct net *net, struct tipc_node *n_ptr, ...@@ -317,7 +322,7 @@ void tipc_bclink_update_link_state(struct net *net, struct tipc_node *n_ptr,
struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
tipc_msg_init(net, msg, BCAST_PROTOCOL, STATE_MSG, tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr); INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1); msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tn->net_id); msg_set_mc_netid(msg, tn->net_id);
...@@ -358,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) ...@@ -358,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
} }
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
* @list: chain of buffers containing message * @list: chain of buffers containing message
...@@ -373,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) ...@@ -373,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
int rc = 0; int rc = 0;
int bc = 0; int bc = 0;
struct sk_buff *skb; struct sk_buff *skb;
struct sk_buff_head arrvq;
struct sk_buff_head inputq;
/* Prepare clone of message for local node */ /* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list); skb = tipc_msg_reassemble(list);
...@@ -381,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) ...@@ -381,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
return -EHOSTUNREACH; return -EHOSTUNREACH;
} }
/* Broadcast to all other nodes */ /* Broadcast to all nodes */
if (likely(bclink)) { if (likely(bclink)) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) { if (likely(bclink->bcast_nodes.count)) {
...@@ -401,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) ...@@ -401,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
if (unlikely(!bc)) if (unlikely(!bc))
__skb_queue_purge(list); __skb_queue_purge(list);
/* Deliver message clone */ if (unlikely(rc)) {
if (likely(!rc))
tipc_sk_mcast_rcv(net, skb);
else
kfree_skb(skb); kfree_skb(skb);
return rc;
}
/* Deliver message clone */
__skb_queue_head_init(&arrvq);
skb_queue_head_init(&inputq);
__skb_queue_tail(&arrvq, skb);
tipc_sk_mcast_rcv(net, &arrvq, &inputq);
return rc; return rc;
} }
...@@ -449,6 +459,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) ...@@ -449,6 +459,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
u32 next_in; u32 next_in;
u32 seqno; u32 seqno;
int deferred = 0; int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */ /* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id) if (msg_mc_netid(msg) != tn->net_id)
...@@ -485,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) ...@@ -485,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
/* Handle in-sequence broadcast message */ /* Handle in-sequence broadcast message */
seqno = msg_seqno(msg); seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1); next_in = mod(node->bclink.last_in + 1);
arrvq = &tn->bclink->arrvq;
inputq = &tn->bclink->inputq;
if (likely(seqno == next_in)) { if (likely(seqno == next_in)) {
receive: receive:
...@@ -492,20 +507,26 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) ...@@ -492,20 +507,26 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, buf);
spin_unlock_bh(&inputq->lock);
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_sk_mcast_rcv(net, buf);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) { } else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++; bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg); bcl->stats.recv_bundled += msg_msgcnt(msg);
pos = 0;
while (tipc_msg_extract(buf, &iskb, &pos)) {
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, iskb);
spin_unlock_bh(&inputq->lock);
}
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_link_bundle_rcv(net, buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf); tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf)) if (unlikely(!buf && !node->bclink.reasm_buf))
...@@ -521,12 +542,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) ...@@ -521,12 +542,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
} }
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
tipc_named_rcv(net, buf);
} else { } else {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
...@@ -943,10 +958,11 @@ int tipc_bclink_init(struct net *net) ...@@ -943,10 +958,11 @@ int tipc_bclink_init(struct net *net)
spin_lock_init(&bclink->lock); spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->outqueue); __skb_queue_head_init(&bcl->outqueue);
__skb_queue_head_init(&bcl->deferred_queue); __skb_queue_head_init(&bcl->deferred_queue);
skb_queue_head_init(&bcl->waiting_sks); skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1; bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock); spin_lock_init(&bclink->node.lock);
__skb_queue_head_init(&bclink->node.waiting_sks); __skb_queue_head_init(&bclink->arrvq);
skb_queue_head_init(&bclink->inputq);
bcl->owner = &bclink->node; bcl->owner = &bclink->node;
bcl->owner->net = net; bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
...@@ -954,6 +970,8 @@ int tipc_bclink_init(struct net *net) ...@@ -954,6 +970,8 @@ int tipc_bclink_init(struct net *net)
bcl->bearer_id = MAX_BEARERS; bcl->bearer_id = MAX_BEARERS;
rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING; bcl->state = WORKING_WORKING;
bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
msg_set_prevnode(bcl->pmsg, tn->own_addr);
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
tn->bcbearer = bcbearer; tn->bcbearer = bcbearer;
tn->bclink = bclink; tn->bclink = bclink;
...@@ -1032,50 +1050,3 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a, ...@@ -1032,50 +1050,3 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
} }
} }
} }
/**
* tipc_port_list_add - add a port to a port list, ensuring no duplicates
*/
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
{
struct tipc_port_list *item = pl_ptr;
int i;
int item_sz = PLSIZE;
int cnt = pl_ptr->count;
for (; ; cnt -= item_sz, item = item->next) {
if (cnt < PLSIZE)
item_sz = cnt;
for (i = 0; i < item_sz; i++)
if (item->ports[i] == port)
return;
if (i < PLSIZE) {
item->ports[i] = port;
pl_ptr->count++;
return;
}
if (!item->next) {
item->next = kmalloc(sizeof(*item), GFP_ATOMIC);
if (!item->next) {
pr_warn("Incomplete multicast delivery, no memory\n");
return;
}
item->next->next = NULL;
}
}
}
/**
* tipc_port_list_free - free dynamically created entries in port_list chain
*
*/
void tipc_port_list_free(struct tipc_port_list *pl_ptr)
{
struct tipc_port_list *item;
struct tipc_port_list *next;
for (item = pl_ptr->next; item; item = next) {
next = item->next;
kfree(item);
}
}
/* /*
* net/tipc/bcast.h: Include file for TIPC broadcast code * net/tipc/bcast.h: Include file for TIPC broadcast code
* *
* Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2003-2006, 2014-2015, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -41,22 +41,6 @@ ...@@ -41,22 +41,6 @@
#include "link.h" #include "link.h"
#include "node.h" #include "node.h"
#define TIPC_BCLINK_RESET 1
#define PLSIZE 32
#define BCBEARER MAX_BEARERS
/**
* struct tipc_port_list - set of node local destination ports
* @count: # of ports in set (only valid for first entry in list)
* @next: pointer to next entry in list
* @ports: array of port references
*/
struct tipc_port_list {
int count;
struct tipc_port_list *next;
u32 ports[PLSIZE];
};
/** /**
* struct tipc_bcbearer_pair - a pair of bearers used by broadcast link * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
* @primary: pointer to primary bearer * @primary: pointer to primary bearer
...@@ -71,6 +55,9 @@ struct tipc_bcbearer_pair { ...@@ -71,6 +55,9 @@ struct tipc_bcbearer_pair {
struct tipc_bearer *secondary; struct tipc_bearer *secondary;
}; };
#define TIPC_BCLINK_RESET 1
#define BCBEARER MAX_BEARERS
/** /**
* struct tipc_bcbearer - bearer used by broadcast link * struct tipc_bcbearer - bearer used by broadcast link
* @bearer: (non-standard) broadcast bearer structure * @bearer: (non-standard) broadcast bearer structure
...@@ -110,6 +97,8 @@ struct tipc_bclink { ...@@ -110,6 +97,8 @@ struct tipc_bclink {
struct tipc_link link; struct tipc_link link;
struct tipc_node node; struct tipc_node node;
unsigned int flags; unsigned int flags;
struct sk_buff_head arrvq;
struct sk_buff_head inputq;
struct tipc_node_map bcast_nodes; struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to; struct tipc_node *retransmit_to;
}; };
...@@ -126,9 +115,6 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, ...@@ -126,9 +115,6 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
return !memcmp(nm_a, nm_b, sizeof(*nm_a)); return !memcmp(nm_a, nm_b, sizeof(*nm_a));
} }
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port);
void tipc_port_list_free(struct tipc_port_list *pl_ptr);
int tipc_bclink_init(struct net *net); int tipc_bclink_init(struct net *net);
void tipc_bclink_stop(struct net *net); void tipc_bclink_stop(struct net *net);
void tipc_bclink_set_flags(struct net *tn, unsigned int flags); void tipc_bclink_set_flags(struct net *tn, unsigned int flags);
...@@ -139,7 +125,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); ...@@ -139,7 +125,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(struct net *net); u32 tipc_bclink_get_last_sent(struct net *net);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_update_link_state(struct net *net, struct tipc_node *n_ptr, void tipc_bclink_update_link_state(struct tipc_node *node,
u32 last_sent); u32 last_sent);
int tipc_bclink_stats(struct net *net, char *stats_buf, const u32 buf_size); int tipc_bclink_stats(struct net *net, char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(struct net *net); int tipc_bclink_reset_stats(struct net *net);
...@@ -150,5 +136,6 @@ uint tipc_bclink_get_mtu(void); ...@@ -150,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list); int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net); void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
void tipc_bclink_input(struct net *net);
#endif #endif
...@@ -85,7 +85,8 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type, ...@@ -85,7 +85,8 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
u32 dest_domain = b_ptr->domain; u32 dest_domain = b_ptr->domain;
msg = buf_msg(buf); msg = buf_msg(buf);
tipc_msg_init(net, msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain); tipc_msg_init(tn->own_addr, msg, LINK_CONFIG, type,
INT_H_SIZE, dest_domain);
msg_set_non_seq(msg, 1); msg_set_non_seq(msg, 1);
msg_set_node_sig(msg, tn->random); msg_set_node_sig(msg, tn->random);
msg_set_dest_domain(msg, dest_domain); msg_set_dest_domain(msg, dest_domain);
......
...@@ -101,23 +101,20 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { ...@@ -101,23 +101,20 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
*/ */
#define START_CHANGEOVER 100000u #define START_CHANGEOVER 100000u
static void link_handle_out_of_seq_msg(struct net *net, static void link_handle_out_of_seq_msg(struct tipc_link *link,
struct tipc_link *l_ptr, struct sk_buff *skb);
struct sk_buff *buf); static void tipc_link_proto_rcv(struct tipc_link *link,
static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr, struct sk_buff *skb);
struct sk_buff *buf); static int tipc_link_tunnel_rcv(struct tipc_node *node,
static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr, struct sk_buff **skb);
struct sk_buff **buf);
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void link_reset_statistics(struct tipc_link *l_ptr); static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str); static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static int tipc_link_input(struct net *net, struct tipc_link *l, static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
struct sk_buff *buf); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
struct sk_buff **buf);
/* /*
* Simple link routines * Simple link routines
...@@ -303,7 +300,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, ...@@ -303,7 +300,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
msg = l_ptr->pmsg; msg = l_ptr->pmsg;
tipc_msg_init(n_ptr->net, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
l_ptr->addr); l_ptr->addr);
msg_set_size(msg, sizeof(l_ptr->proto_msg)); msg_set_size(msg, sizeof(l_ptr->proto_msg));
msg_set_session(msg, (tn->random & 0xffff)); msg_set_session(msg, (tn->random & 0xffff));
...@@ -319,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, ...@@ -319,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->next_out_no = 1; l_ptr->next_out_no = 1;
__skb_queue_head_init(&l_ptr->outqueue); __skb_queue_head_init(&l_ptr->outqueue);
__skb_queue_head_init(&l_ptr->deferred_queue); __skb_queue_head_init(&l_ptr->deferred_queue);
skb_queue_head_init(&l_ptr->waiting_sks); skb_queue_head_init(&l_ptr->wakeupq);
skb_queue_head_init(&l_ptr->inputq);
skb_queue_head_init(&l_ptr->namedq);
link_reset_statistics(l_ptr); link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr); tipc_node_attach_link(n_ptr, l_ptr);
setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
...@@ -379,17 +377,16 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id, ...@@ -379,17 +377,16 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
static bool link_schedule_user(struct tipc_link *link, u32 oport, static bool link_schedule_user(struct tipc_link *link, u32 oport,
uint chain_sz, uint imp) uint chain_sz, uint imp)
{ {
struct net *net = link->owner->net;
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *buf; struct sk_buff *buf;
buf = tipc_msg_create(net, SOCK_WAKEUP, 0, INT_H_SIZE, 0, tn->own_addr, buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
tn->own_addr, oport, 0, 0); link_own_addr(link), link_own_addr(link),
oport, 0, 0);
if (!buf) if (!buf)
return false; return false;
TIPC_SKB_CB(buf)->chain_sz = chain_sz; TIPC_SKB_CB(buf)->chain_sz = chain_sz;
TIPC_SKB_CB(buf)->chain_imp = imp; TIPC_SKB_CB(buf)->chain_imp = imp;
skb_queue_tail(&link->waiting_sks, buf); skb_queue_tail(&link->wakeupq, buf);
link->stats.link_congs++; link->stats.link_congs++;
return true; return true;
} }
...@@ -400,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, ...@@ -400,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
* Move a number of waiting users, as permitted by available space in * Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to node wait queue for wakeup * the send queue, from link wait queue to node wait queue for wakeup
*/ */
static void link_prepare_wakeup(struct tipc_link *link) void link_prepare_wakeup(struct tipc_link *link)
{ {
uint pend_qsz = skb_queue_len(&link->outqueue); uint pend_qsz = skb_queue_len(&link->outqueue);
struct sk_buff *skb, *tmp; struct sk_buff *skb, *tmp;
skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
break; break;
pend_qsz += TIPC_SKB_CB(skb)->chain_sz; pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
skb_unlink(skb, &link->waiting_sks); skb_unlink(skb, &link->wakeupq);
skb_queue_tail(&link->owner->waiting_sks, skb); skb_queue_tail(&link->inputq, skb);
link->owner->inputq = &link->inputq;
link->owner->action_flags |= TIPC_MSG_EVT;
} }
} }
...@@ -463,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr) ...@@ -463,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
l_ptr->exp_msg_count = START_CHANGEOVER; l_ptr->exp_msg_count = START_CHANGEOVER;
} }
/* Clean up all queues: */ /* Clean up all queues, except inputq: */
__skb_queue_purge(&l_ptr->outqueue); __skb_queue_purge(&l_ptr->outqueue);
__skb_queue_purge(&l_ptr->deferred_queue); __skb_queue_purge(&l_ptr->deferred_queue);
if (!skb_queue_empty(&l_ptr->waiting_sks)) { skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); if (!skb_queue_empty(&l_ptr->inputq))
owner->action_flags |= TIPC_WAKEUP_USERS; owner->action_flags |= TIPC_MSG_EVT;
} owner->inputq = &l_ptr->inputq;
l_ptr->next_out = NULL; l_ptr->next_out = NULL;
l_ptr->unacked_window = 0; l_ptr->unacked_window = 0;
l_ptr->checkpoint = 1; l_ptr->checkpoint = 1;
...@@ -778,7 +777,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, ...@@ -778,7 +777,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
} else if (tipc_msg_bundle(outqueue, skb, mtu)) { } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
link->stats.sent_bundled++; link->stats.sent_bundled++;
continue; continue;
} else if (tipc_msg_make_bundle(net, outqueue, skb, mtu, } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
link->addr)) { link->addr)) {
link->stats.sent_bundled++; link->stats.sent_bundled++;
link->stats.sent_bundles++; link->stats.sent_bundles++;
...@@ -797,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, ...@@ -797,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{ {
__skb_queue_head_init(list); skb_queue_head_init(list);
__skb_queue_tail(list, skb); __skb_queue_tail(list, skb);
} }
...@@ -843,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, ...@@ -843,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
rc = __tipc_link_xmit(net, link, list); rc = __tipc_link_xmit(net, link, list);
tipc_node_unlock(node); tipc_node_unlock(node);
} }
if (link) if (link)
return rc; return rc;
if (likely(in_own_node(net, dnode))) { if (likely(in_own_node(net, dnode)))
/* As a node local message chain never contains more than one return tipc_sk_rcv(net, list);
* buffer, we just need to dequeue one SKB buffer from the
* head list.
*/
return tipc_sk_rcv(net, __skb_dequeue(list));
}
__skb_queue_purge(list);
__skb_queue_purge(list);
return rc; return rc;
} }
...@@ -877,7 +870,7 @@ static void tipc_link_sync_xmit(struct tipc_link *link) ...@@ -877,7 +870,7 @@ static void tipc_link_sync_xmit(struct tipc_link *link)
return; return;
msg = buf_msg(skb); msg = buf_msg(skb);
tipc_msg_init(link->owner->net, msg, BCAST_PROTOCOL, STATE_MSG, tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, link->addr); INT_H_SIZE, link->addr);
msg_set_last_bcast(msg, link->owner->bclink.acked); msg_set_last_bcast(msg, link->owner->bclink.acked);
__tipc_link_xmit_skb(link, skb); __tipc_link_xmit_skb(link, skb);
...@@ -1164,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ...@@ -1164,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
/* Locate unicast link endpoint that should handle message */ /* Locate unicast link endpoint that should handle message */
l_ptr = n_ptr->links[b_ptr->identity]; l_ptr = n_ptr->links[b_ptr->identity];
if (unlikely(!l_ptr)) if (unlikely(!l_ptr))
goto unlock_discard; goto unlock;
/* Verify that communication with node is currently allowed */ /* Verify that communication with node is currently allowed */
if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
...@@ -1175,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ...@@ -1175,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
if (tipc_node_blocked(n_ptr)) if (tipc_node_blocked(n_ptr))
goto unlock_discard; goto unlock;
/* Validate message sequence number info */ /* Validate message sequence number info */
seq_no = msg_seqno(msg); seq_no = msg_seqno(msg);
...@@ -1199,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ...@@ -1199,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
if (unlikely(l_ptr->next_out)) if (unlikely(l_ptr->next_out))
tipc_link_push_packets(l_ptr); tipc_link_push_packets(l_ptr);
if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { if (released && !skb_queue_empty(&l_ptr->wakeupq))
link_prepare_wakeup(l_ptr); link_prepare_wakeup(l_ptr);
l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
}
/* Process the incoming packet */ /* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) { if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) { if (msg_user(msg) == LINK_PROTOCOL) {
tipc_link_proto_rcv(net, l_ptr, skb); tipc_link_proto_rcv(l_ptr, skb);
link_retrieve_defq(l_ptr, &head); link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr); skb = NULL;
continue; goto unlock;
} }
/* Traffic message. Conditionally activate link */ /* Traffic message. Conditionally activate link */
...@@ -1219,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ...@@ -1219,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
if (link_working_working(l_ptr)) { if (link_working_working(l_ptr)) {
/* Re-insert buffer in front of queue */ /* Re-insert buffer in front of queue */
__skb_queue_head(&head, skb); __skb_queue_head(&head, skb);
tipc_node_unlock(n_ptr); skb = NULL;
continue; goto unlock;
} }
goto unlock_discard; goto unlock;
} }
/* Link is now in state WORKING_WORKING */ /* Link is now in state WORKING_WORKING */
if (unlikely(seq_no != mod(l_ptr->next_in_no))) { if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
link_handle_out_of_seq_msg(net, l_ptr, skb); link_handle_out_of_seq_msg(l_ptr, skb);
link_retrieve_defq(l_ptr, &head); link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr); skb = NULL;
continue; goto unlock;
} }
l_ptr->next_in_no++; l_ptr->next_in_no++;
if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
...@@ -1240,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ...@@ -1240,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
l_ptr->stats.sent_acks++; l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
} }
tipc_link_input(l_ptr, skb);
if (tipc_link_prepare_input(net, l_ptr, &skb)) { skb = NULL;
tipc_node_unlock(n_ptr); unlock:
continue;
}
tipc_node_unlock(n_ptr);
if (tipc_link_input(net, l_ptr, skb) != 0)
goto discard;
continue;
unlock_discard:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
discard: discard:
kfree_skb(skb); if (unlikely(skb))
kfree_skb(skb);
} }
} }
/** /* tipc_data_input - deliver data and name distr msgs to upper layer
* tipc_link_prepare_input - process TIPC link messages
*
* returns nonzero if the message was consumed
* *
* Consumes buffer if message is of right type
* Node lock must be held * Node lock must be held
*/ */
static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
struct sk_buff **buf)
{ {
struct tipc_node *n; struct tipc_node *node = link->owner;
struct tipc_msg *msg; struct tipc_msg *msg = buf_msg(skb);
int res = -EINVAL; u32 dport = msg_destport(msg);
n = l->owner;
msg = buf_msg(*buf);
switch (msg_user(msg)) { switch (msg_user(msg)) {
case CHANGEOVER_PROTOCOL: case TIPC_LOW_IMPORTANCE:
if (tipc_link_tunnel_rcv(net, n, buf)) case TIPC_MEDIUM_IMPORTANCE:
res = 0; case TIPC_HIGH_IMPORTANCE:
break; case TIPC_CRITICAL_IMPORTANCE:
case MSG_FRAGMENTER: case CONN_MANAGER:
l->stats.recv_fragments++; if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
if (tipc_buf_append(&l->reasm_buf, buf)) { node->inputq = &link->inputq;
l->stats.recv_fragmented++; node->action_flags |= TIPC_MSG_EVT;
res = 0;
} else if (!l->reasm_buf) {
tipc_link_reset(l);
} }
break; return true;
case MSG_BUNDLER:
l->stats.recv_bundles++;
l->stats.recv_bundled += msg_msgcnt(msg);
res = 0;
break;
case NAME_DISTRIBUTOR: case NAME_DISTRIBUTOR:
n->bclink.recv_permitted = true; node->bclink.recv_permitted = true;
res = 0; node->namedq = &link->namedq;
break; skb_queue_tail(&link->namedq, skb);
if (skb_queue_len(&link->namedq) == 1)
node->action_flags |= TIPC_NAMED_MSG_EVT;
return true;
case MSG_BUNDLER:
case CHANGEOVER_PROTOCOL:
case MSG_FRAGMENTER:
case BCAST_PROTOCOL: case BCAST_PROTOCOL:
tipc_link_sync_rcv(n, *buf); return false;
break;
default: default:
res = 0; pr_warn("Dropping received illegal msg type\n");
} kfree_skb(skb);
return res; return false;
};
} }
/**
* tipc_link_input - Deliver message too higher layers /* tipc_link_input - process packet that has passed link protocol check
*
* Consumes buffer
* Node lock must be held
*/ */
static int tipc_link_input(struct net *net, struct tipc_link *l, static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_node *node = link->owner;
int res = 0; struct tipc_msg *msg = buf_msg(skb);
struct sk_buff *iskb;
int pos = 0;
if (likely(tipc_data_input(link, skb)))
return;
switch (msg_user(msg)) { switch (msg_user(msg)) {
case TIPC_LOW_IMPORTANCE: case CHANGEOVER_PROTOCOL:
case TIPC_MEDIUM_IMPORTANCE: if (!tipc_link_tunnel_rcv(node, &skb))
case TIPC_HIGH_IMPORTANCE: break;
case TIPC_CRITICAL_IMPORTANCE: if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
case CONN_MANAGER: tipc_data_input(link, skb);
tipc_sk_rcv(net, buf); break;
}
case MSG_BUNDLER:
link->stats.recv_bundles++;
link->stats.recv_bundled += msg_msgcnt(msg);
while (tipc_msg_extract(skb, &iskb, &pos))
tipc_data_input(link, iskb);
break; break;
case NAME_DISTRIBUTOR: case MSG_FRAGMENTER:
tipc_named_rcv(net, buf); link->stats.recv_fragments++;
if (tipc_buf_append(&link->reasm_buf, &skb)) {
link->stats.recv_fragmented++;
tipc_data_input(link, skb);
} else if (!link->reasm_buf) {
tipc_link_reset(link);
}
break; break;
case MSG_BUNDLER: case BCAST_PROTOCOL:
tipc_link_bundle_rcv(net, buf); tipc_link_sync_rcv(node, skb);
break; break;
default: default:
res = -EINVAL; break;
} };
return res;
} }
/** /**
...@@ -1375,14 +1371,13 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) ...@@ -1375,14 +1371,13 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
/* /*
* link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
*/ */
static void link_handle_out_of_seq_msg(struct net *net, static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
struct tipc_link *l_ptr,
struct sk_buff *buf) struct sk_buff *buf)
{ {
u32 seq_no = buf_seqno(buf); u32 seq_no = buf_seqno(buf);
if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
tipc_link_proto_rcv(net, l_ptr, buf); tipc_link_proto_rcv(l_ptr, buf);
return; return;
} }
...@@ -1507,10 +1502,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, ...@@ -1507,10 +1502,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
* Note that network plane id propagates through the network, and may * Note that network plane id propagates through the network, and may
* change at any time. The node with lowest address rules * change at any time. The node with lowest address rules
*/ */
static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr, static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
struct sk_buff *buf) struct sk_buff *buf)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
u32 rec_gap = 0; u32 rec_gap = 0;
u32 max_pkt_info; u32 max_pkt_info;
u32 max_pkt_ack; u32 max_pkt_ack;
...@@ -1522,7 +1516,7 @@ static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr, ...@@ -1522,7 +1516,7 @@ static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr,
goto exit; goto exit;
if (l_ptr->net_plane != msg_net_plane(msg)) if (l_ptr->net_plane != msg_net_plane(msg))
if (tn->own_addr > msg_prevnode(msg)) if (link_own_addr(l_ptr) > msg_prevnode(msg))
l_ptr->net_plane = msg_net_plane(msg); l_ptr->net_plane = msg_net_plane(msg);
switch (msg_type(msg)) { switch (msg_type(msg)) {
...@@ -1625,7 +1619,7 @@ static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr, ...@@ -1625,7 +1619,7 @@ static void tipc_link_proto_rcv(struct net *net, struct tipc_link *l_ptr,
/* Protocol message before retransmits, reduce loss risk */ /* Protocol message before retransmits, reduce loss risk */
if (l_ptr->owner->bclink.recv_permitted) if (l_ptr->owner->bclink.recv_permitted)
tipc_bclink_update_link_state(net, l_ptr->owner, tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg)); msg_last_bcast(msg));
if (rec_gap || (msg_probe(msg))) { if (rec_gap || (msg_probe(msg))) {
...@@ -1690,7 +1684,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) ...@@ -1690,7 +1684,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
if (!tunnel) if (!tunnel)
return; return;
tipc_msg_init(l_ptr->owner->net, &tunnel_hdr, CHANGEOVER_PROTOCOL, tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
msg_set_msgcnt(&tunnel_hdr, msgcount); msg_set_msgcnt(&tunnel_hdr, msgcount);
...@@ -1748,7 +1742,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, ...@@ -1748,7 +1742,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
struct sk_buff *skb; struct sk_buff *skb;
struct tipc_msg tunnel_hdr; struct tipc_msg tunnel_hdr;
tipc_msg_init(l_ptr->owner->net, &tunnel_hdr, CHANGEOVER_PROTOCOL, tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
...@@ -1783,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, ...@@ -1783,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
* @from_pos: offset to extract from * @from_pos: offset to extract from
* *
* Returns a new message buffer containing an embedded message. The * Returns a new message buffer containing an embedded message. The
* encapsulating message itself is left unchanged. * encapsulating buffer is left unchanged.
*/ */
static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
{ {
...@@ -1797,12 +1791,10 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) ...@@ -1797,12 +1791,10 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
return eb; return eb;
} }
/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
* Owner node is locked. * Owner node is locked.
*/ */
static void tipc_link_dup_rcv(struct net *net, struct tipc_link *l_ptr, static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
struct sk_buff *t_buf) struct sk_buff *t_buf)
{ {
struct sk_buff *buf; struct sk_buff *buf;
...@@ -1817,7 +1809,7 @@ static void tipc_link_dup_rcv(struct net *net, struct tipc_link *l_ptr, ...@@ -1817,7 +1809,7 @@ static void tipc_link_dup_rcv(struct net *net, struct tipc_link *l_ptr,
} }
/* Add buffer to deferred queue, if applicable: */ /* Add buffer to deferred queue, if applicable: */
link_handle_out_of_seq_msg(net, l_ptr, buf); link_handle_out_of_seq_msg(l_ptr, buf);
} }
/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet /* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
...@@ -1869,7 +1861,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, ...@@ -1869,7 +1861,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
* returned to the active link for delivery upwards. * returned to the active link for delivery upwards.
* Owner node is locked. * Owner node is locked.
*/ */
static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr, static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
struct sk_buff **buf) struct sk_buff **buf)
{ {
struct sk_buff *t_buf = *buf; struct sk_buff *t_buf = *buf;
...@@ -1887,7 +1879,7 @@ static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr, ...@@ -1887,7 +1879,7 @@ static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr,
goto exit; goto exit;
if (msg_type(t_msg) == DUPLICATE_MSG) if (msg_type(t_msg) == DUPLICATE_MSG)
tipc_link_dup_rcv(net, l_ptr, t_buf); tipc_link_dup_rcv(l_ptr, t_buf);
else if (msg_type(t_msg) == ORIGINAL_MSG) else if (msg_type(t_msg) == ORIGINAL_MSG)
*buf = tipc_link_failover_rcv(l_ptr, t_buf); *buf = tipc_link_failover_rcv(l_ptr, t_buf);
else else
...@@ -1897,41 +1889,6 @@ static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr, ...@@ -1897,41 +1889,6 @@ static int tipc_link_tunnel_rcv(struct net *net, struct tipc_node *n_ptr,
return *buf != NULL; return *buf != NULL;
} }
/*
* Bundler functionality:
*/
void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf)
{
u32 msgcount = msg_msgcnt(buf_msg(buf));
u32 pos = INT_H_SIZE;
struct sk_buff *obuf;
struct tipc_msg *omsg;
while (msgcount--) {
obuf = buf_extract(buf, pos);
if (obuf == NULL) {
pr_warn("Link unable to unbundle message(s)\n");
break;
}
omsg = buf_msg(obuf);
pos += align(msg_size(omsg));
if (msg_isdata(omsg)) {
if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
tipc_sk_mcast_rcv(net, obuf);
else
tipc_sk_rcv(net, obuf);
} else if (msg_user(omsg) == CONN_MANAGER) {
tipc_sk_rcv(net, obuf);
} else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
tipc_named_rcv(net, obuf);
} else {
pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
kfree_skb(obuf);
}
}
kfree_skb(buf);
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
{ {
unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
......
...@@ -131,8 +131,10 @@ struct tipc_stats { ...@@ -131,8 +131,10 @@ struct tipc_stats {
* @next_in_no: next sequence number to expect for inbound messages * @next_in_no: next sequence number to expect for inbound messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node * @deferred_queue: deferred queue saved OOS b'cast message received from node
* @unacked_window: # of inbound messages rx'd without ack'ing back to peer * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
* @inputq: buffer queue for messages to be delivered upwards
* @namedq: buffer queue for name table messages to be delivered upwards
* @next_out: ptr to first unsent outbound message in queue * @next_out: ptr to first unsent outbound message in queue
* @waiting_sks: linked list of sockets waiting for link congestion to abate * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages * @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_buf: head of partially reassembled inbound message fragments * @reasm_buf: head of partially reassembled inbound message fragments
* @stats: collects statistics regarding link activity * @stats: collects statistics regarding link activity
...@@ -184,10 +186,12 @@ struct tipc_link { ...@@ -184,10 +186,12 @@ struct tipc_link {
u32 next_in_no; u32 next_in_no;
struct sk_buff_head deferred_queue; struct sk_buff_head deferred_queue;
u32 unacked_window; u32 unacked_window;
struct sk_buff_head inputq;
struct sk_buff_head namedq;
/* Congestion handling */ /* Congestion handling */
struct sk_buff *next_out; struct sk_buff *next_out;
struct sk_buff_head waiting_sks; struct sk_buff_head wakeupq;
/* Fragmentation/reassembly */ /* Fragmentation/reassembly */
u32 long_msg_seq_no; u32 long_msg_seq_no;
...@@ -228,7 +232,6 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest, ...@@ -228,7 +232,6 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
u32 selector); u32 selector);
int __tipc_link_xmit(struct net *net, struct tipc_link *link, int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list); struct sk_buff_head *list);
void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
void tipc_link_push_packets(struct tipc_link *l_ptr); void tipc_link_push_packets(struct tipc_link *l_ptr);
...@@ -244,6 +247,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); ...@@ -244,6 +247,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info); int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info); int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
void link_prepare_wakeup(struct tipc_link *l);
/* /*
* Link sequence number manipulation routines (uses modulo 2**16 arithmetic) * Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
...@@ -278,6 +282,10 @@ static inline u32 lesser(u32 left, u32 right) ...@@ -278,6 +282,10 @@ static inline u32 lesser(u32 left, u32 right)
return less_eq(left, right) ? left : right; return less_eq(left, right) ? left : right;
} }
static inline u32 link_own_addr(struct tipc_link *l)
{
return msg_prevnode(l->pmsg);
}
/* /*
* Link status checking routines * Link status checking routines
......
...@@ -70,25 +70,23 @@ struct sk_buff *tipc_buf_acquire(u32 size) ...@@ -70,25 +70,23 @@ struct sk_buff *tipc_buf_acquire(u32 size)
return skb; return skb;
} }
void tipc_msg_init(struct net *net, struct tipc_msg *m, u32 user, u32 type, void tipc_msg_init(u32 own_node, struct tipc_msg *m, u32 user, u32 type,
u32 hsize, u32 destnode) u32 hsize, u32 dnode)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
memset(m, 0, hsize); memset(m, 0, hsize);
msg_set_version(m); msg_set_version(m);
msg_set_user(m, user); msg_set_user(m, user);
msg_set_hdr_sz(m, hsize); msg_set_hdr_sz(m, hsize);
msg_set_size(m, hsize); msg_set_size(m, hsize);
msg_set_prevnode(m, tn->own_addr); msg_set_prevnode(m, own_node);
msg_set_type(m, type); msg_set_type(m, type);
if (hsize > SHORT_H_SIZE) { if (hsize > SHORT_H_SIZE) {
msg_set_orignode(m, tn->own_addr); msg_set_orignode(m, own_node);
msg_set_destnode(m, destnode); msg_set_destnode(m, dnode);
} }
} }
struct sk_buff *tipc_msg_create(struct net *net, uint user, uint type, struct sk_buff *tipc_msg_create(uint user, uint type,
uint hdr_sz, uint data_sz, u32 dnode, uint hdr_sz, uint data_sz, u32 dnode,
u32 onode, u32 dport, u32 oport, int errcode) u32 onode, u32 dport, u32 oport, int errcode)
{ {
...@@ -100,9 +98,8 @@ struct sk_buff *tipc_msg_create(struct net *net, uint user, uint type, ...@@ -100,9 +98,8 @@ struct sk_buff *tipc_msg_create(struct net *net, uint user, uint type,
return NULL; return NULL;
msg = buf_msg(buf); msg = buf_msg(buf);
tipc_msg_init(net, msg, user, type, hdr_sz, dnode); tipc_msg_init(onode, msg, user, type, hdr_sz, dnode);
msg_set_size(msg, hdr_sz + data_sz); msg_set_size(msg, hdr_sz + data_sz);
msg_set_prevnode(msg, onode);
msg_set_origport(msg, oport); msg_set_origport(msg, oport);
msg_set_destport(msg, dport); msg_set_destport(msg, dport);
msg_set_errcode(msg, errcode); msg_set_errcode(msg, errcode);
...@@ -195,7 +192,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) ...@@ -195,7 +192,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
* *
* Returns message data size or errno: -ENOMEM, -EFAULT * Returns message data size or errno: -ENOMEM, -EFAULT
*/ */
int tipc_msg_build(struct net *net, struct tipc_msg *mhdr, struct msghdr *m, int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int pktmax, struct sk_buff_head *list) int offset, int dsz, int pktmax, struct sk_buff_head *list)
{ {
int mhsz = msg_hdr_sz(mhdr); int mhsz = msg_hdr_sz(mhdr);
...@@ -227,8 +224,8 @@ int tipc_msg_build(struct net *net, struct tipc_msg *mhdr, struct msghdr *m, ...@@ -227,8 +224,8 @@ int tipc_msg_build(struct net *net, struct tipc_msg *mhdr, struct msghdr *m,
} }
/* Prepare reusable fragment header */ /* Prepare reusable fragment header */
tipc_msg_init(net, &pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT, INT_H_SIZE, tipc_msg_init(msg_prevnode(mhdr), &pkthdr, MSG_FRAGMENTER,
msg_destnode(mhdr)); FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
msg_set_size(&pkthdr, pktmax); msg_set_size(&pkthdr, pktmax);
msg_set_fragm_no(&pkthdr, pktno); msg_set_fragm_no(&pkthdr, pktno);
...@@ -329,6 +326,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) ...@@ -329,6 +326,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
return true; return true;
} }
/**
* tipc_msg_extract(): extract bundled inner packet from buffer
* @skb: linear outer buffer, to be extracted from.
* @iskb: extracted inner buffer, to be returned
* @pos: position of msg to be extracted. Returns with pointer of next msg
* Consumes outer buffer when last packet extracted
* Returns true when when there is an extracted buffer, otherwise false
*/
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
{
struct tipc_msg *msg = buf_msg(skb);
int imsz;
struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
/* Is there space left for shortest possible message? */
if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
goto none;
imsz = msg_size(imsg);
/* Is there space left for current message ? */
if ((*pos + imsz) > msg_data_sz(msg))
goto none;
*iskb = tipc_buf_acquire(imsz);
if (!*iskb)
goto none;
skb_copy_to_linear_data(*iskb, imsg, imsz);
*pos += align(imsz);
return true;
none:
kfree_skb(skb);
*iskb = NULL;
return false;
}
/** /**
* tipc_msg_make_bundle(): Create bundle buf and append message to its tail * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
* @list: the buffer chain * @list: the buffer chain
...@@ -338,7 +369,7 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) ...@@ -338,7 +369,7 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
* Replaces buffer if successful * Replaces buffer if successful
* Returns true if success, otherwise false * Returns true if success, otherwise false
*/ */
bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list, bool tipc_msg_make_bundle(struct sk_buff_head *list,
struct sk_buff *skb, u32 mtu, u32 dnode) struct sk_buff *skb, u32 mtu, u32 dnode)
{ {
struct sk_buff *bskb; struct sk_buff *bskb;
...@@ -362,7 +393,8 @@ bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list, ...@@ -362,7 +393,8 @@ bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list,
skb_trim(bskb, INT_H_SIZE); skb_trim(bskb, INT_H_SIZE);
bmsg = buf_msg(bskb); bmsg = buf_msg(bskb);
tipc_msg_init(net, bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode); tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
INT_H_SIZE, dnode);
msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_seqno(bmsg, msg_seqno(msg));
msg_set_ack(bmsg, msg_ack(msg)); msg_set_ack(bmsg, msg_ack(msg));
msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
...@@ -379,10 +411,9 @@ bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list, ...@@ -379,10 +411,9 @@ bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list,
* Consumes buffer if failure * Consumes buffer if failure
* Returns true if success, otherwise false * Returns true if success, otherwise false
*/ */
bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode, bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
int err) int err)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
uint imp = msg_importance(msg); uint imp = msg_importance(msg);
struct tipc_msg ohdr; struct tipc_msg ohdr;
...@@ -402,7 +433,7 @@ bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode, ...@@ -402,7 +433,7 @@ bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode,
msg_set_errcode(msg, err); msg_set_errcode(msg, err);
msg_set_origport(msg, msg_destport(&ohdr)); msg_set_origport(msg, msg_destport(&ohdr));
msg_set_destport(msg, msg_origport(&ohdr)); msg_set_destport(msg, msg_origport(&ohdr));
msg_set_prevnode(msg, tn->own_addr); msg_set_prevnode(msg, own_addr);
if (!msg_short(msg)) { if (!msg_short(msg)) {
msg_set_orignode(msg, msg_destnode(&ohdr)); msg_set_orignode(msg, msg_destnode(&ohdr));
msg_set_destnode(msg, msg_orignode(&ohdr)); msg_set_destnode(msg, msg_orignode(&ohdr));
...@@ -414,43 +445,43 @@ bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode, ...@@ -414,43 +445,43 @@ bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode,
return true; return true;
exit: exit:
kfree_skb(buf); kfree_skb(buf);
*dnode = 0;
return false; return false;
} }
/** /**
* tipc_msg_eval: determine fate of message that found no destination * tipc_msg_lookup_dest(): try to find new destination for named message
* @buf: the buffer containing the message. * @skb: the buffer containing the message.
* @dnode: return value: next-hop node, if message to be forwarded * @dnode: return value: next-hop node, if destination found
* @err: error code to use, if message to be rejected * @err: return value: error code to use, if message to be rejected
*
* Does not consume buffer * Does not consume buffer
* Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error * Returns true if a destination is found, false otherwise
* code if message to be rejected
*/ */
int tipc_msg_eval(struct net *net, struct sk_buff *buf, u32 *dnode) bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
u32 *dnode, int *err)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(skb);
u32 dport; u32 dport;
if (msg_type(msg) != TIPC_NAMED_MSG) if (!msg_isdata(msg))
return -TIPC_ERR_NO_PORT; return false;
if (skb_linearize(buf)) if (!msg_named(msg))
return -TIPC_ERR_NO_NAME; return false;
if (msg_data_sz(msg) > MAX_FORWARD_SIZE) *err = -TIPC_ERR_NO_NAME;
return -TIPC_ERR_NO_NAME; if (skb_linearize(skb))
return false;
if (msg_reroute_cnt(msg) > 0) if (msg_reroute_cnt(msg) > 0)
return -TIPC_ERR_NO_NAME; return false;
*dnode = addr_domain(net, msg_lookup_scope(msg)); *dnode = addr_domain(net, msg_lookup_scope(msg));
dport = tipc_nametbl_translate(net, msg_nametype(msg), dport = tipc_nametbl_translate(net, msg_nametype(msg),
msg_nameinst(msg), msg_nameinst(msg), dnode);
dnode);
if (!dport) if (!dport)
return -TIPC_ERR_NO_NAME; return false;
msg_incr_reroute_cnt(msg); msg_incr_reroute_cnt(msg);
msg_set_destnode(msg, *dnode); msg_set_destnode(msg, *dnode);
msg_set_destport(msg, dport); msg_set_destport(msg, dport);
return TIPC_OK; *err = TIPC_OK;
return true;
} }
/* tipc_msg_reassemble() - clone a buffer chain of fragments and /* tipc_msg_reassemble() - clone a buffer chain of fragments and
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
* Note: Some items are also used with TIPC internal message headers * Note: Some items are also used with TIPC internal message headers
*/ */
#define TIPC_VERSION 2 #define TIPC_VERSION 2
struct plist;
/* /*
* Payload message users are defined in TIPC's public API: * Payload message users are defined in TIPC's public API:
...@@ -748,20 +749,110 @@ static inline u32 msg_tot_origport(struct tipc_msg *m) ...@@ -748,20 +749,110 @@ static inline u32 msg_tot_origport(struct tipc_msg *m)
} }
struct sk_buff *tipc_buf_acquire(u32 size); struct sk_buff *tipc_buf_acquire(u32 size);
bool tipc_msg_reverse(struct net *net, struct sk_buff *buf, u32 *dnode, bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
int err); int err);
int tipc_msg_eval(struct net *net, struct sk_buff *buf, u32 *dnode); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
void tipc_msg_init(struct net *net, struct tipc_msg *m, u32 user, u32 type,
u32 hsize, u32 destnode); u32 hsize, u32 destnode);
struct sk_buff *tipc_msg_create(struct net *net, uint user, uint type, struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
uint hdr_sz, uint data_sz, u32 dnode, uint data_sz, u32 dnode, u32 onode,
u32 onode, u32 dport, u32 oport, int errcode); u32 dport, u32 oport, int errcode);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
bool tipc_msg_make_bundle(struct net *net, struct sk_buff_head *list, bool tipc_msg_make_bundle(struct sk_buff_head *list,
struct sk_buff *skb, u32 mtu, u32 dnode); struct sk_buff *skb, u32 mtu, u32 dnode);
int tipc_msg_build(struct net *net, struct tipc_msg *mhdr, struct msghdr *m, bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list); int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
int *err);
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
/* tipc_skb_peek(): peek and reserve first buffer in list
* @list: list to be peeked in
* Returns pointer to first buffer in list, if any
*/
static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
spinlock_t *lock)
{
struct sk_buff *skb;
spin_lock_bh(lock);
skb = skb_peek(list);
if (skb)
skb_get(skb);
spin_unlock_bh(lock);
return skb;
}
/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
* up to and including 'filter'.
* Note: ignoring previously tried destinations minimizes the risk of
* contention on the socket lock
* @list: list to be peeked in
* @filter: last destination to be ignored from search
* Returns a destination port number, of applicable.
*/
static inline u32 tipc_skb_peek_port(struct sk_buff_head *list, u32 filter)
{
struct sk_buff *skb;
u32 dport = 0;
bool ignore = true;
spin_lock_bh(&list->lock);
skb_queue_walk(list, skb) {
dport = msg_destport(buf_msg(skb));
if (!filter || skb_queue_is_last(list, skb))
break;
if (dport == filter)
ignore = false;
else if (!ignore)
break;
}
spin_unlock_bh(&list->lock);
return dport;
}
/* tipc_skb_dequeue(): unlink first buffer with dest 'dport' from list
* @list: list to be unlinked from
* @dport: selection criteria for buffer to unlink
*/
static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
u32 dport)
{
struct sk_buff *_skb, *tmp, *skb = NULL;
spin_lock_bh(&list->lock);
skb_queue_walk_safe(list, _skb, tmp) {
if (msg_destport(buf_msg(_skb)) == dport) {
__skb_unlink(_skb, list);
skb = _skb;
break;
}
}
spin_unlock_bh(&list->lock);
return skb;
}
/* tipc_skb_queue_tail(): add buffer to tail of list;
* @list: list to be appended to
* @skb: buffer to append. Always appended
* @dport: the destination port of the buffer
* returns true if dport differs from previous destination
*/
static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
struct sk_buff *skb, u32 dport)
{
struct sk_buff *_skb = NULL;
bool rv = false;
spin_lock_bh(&list->lock);
_skb = skb_peek_tail(list);
if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
(skb_queue_len(list) > 32))
rv = true;
__skb_queue_tail(list, skb);
spin_unlock_bh(&list->lock);
return rv;
}
#endif #endif
...@@ -71,13 +71,14 @@ static void publ_to_item(struct distr_item *i, struct publication *p) ...@@ -71,13 +71,14 @@ static void publ_to_item(struct distr_item *i, struct publication *p)
static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size, static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
u32 dest) u32 dest)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size); struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size);
struct tipc_msg *msg; struct tipc_msg *msg;
if (buf != NULL) { if (buf != NULL) {
msg = buf_msg(buf); msg = buf_msg(buf);
tipc_msg_init(net, msg, NAME_DISTRIBUTOR, type, INT_H_SIZE, tipc_msg_init(tn->own_addr, msg, NAME_DISTRIBUTOR, type,
dest); INT_H_SIZE, dest);
msg_set_size(msg, INT_H_SIZE + size); msg_set_size(msg, INT_H_SIZE + size);
} }
return buf; return buf;
...@@ -380,25 +381,34 @@ void tipc_named_process_backlog(struct net *net) ...@@ -380,25 +381,34 @@ void tipc_named_process_backlog(struct net *net)
} }
/** /**
* tipc_named_rcv - process name table update message sent by another node * tipc_named_rcv - process name table update messages sent by another node
*/ */
void tipc_named_rcv(struct net *net, struct sk_buff *buf) void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg;
struct distr_item *item = (struct distr_item *)msg_data(msg); struct distr_item *item;
u32 count = msg_data_sz(msg) / ITEM_SIZE; uint count;
u32 node = msg_orignode(msg); u32 node;
struct sk_buff *skb;
int mtype;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
while (count--) { for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
if (!tipc_update_nametbl(net, item, node, msg_type(msg))) msg = buf_msg(skb);
tipc_named_add_backlog(item, msg_type(msg), node); mtype = msg_type(msg);
item++; item = (struct distr_item *)msg_data(msg);
count = msg_data_sz(msg) / ITEM_SIZE;
node = msg_orignode(msg);
while (count--) {
if (!tipc_update_nametbl(net, item, node, mtype))
tipc_named_add_backlog(item, mtype, node);
item++;
}
kfree_skb(skb);
tipc_named_process_backlog(net);
} }
tipc_named_process_backlog(net);
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
kfree_skb(buf);
} }
/** /**
......
...@@ -71,7 +71,7 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ); ...@@ -71,7 +71,7 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ); struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
void named_cluster_distribute(struct net *net, struct sk_buff *buf); void named_cluster_distribute(struct net *net, struct sk_buff *buf);
void tipc_named_node_up(struct net *net, u32 dnode); void tipc_named_node_up(struct net *net, u32 dnode);
void tipc_named_rcv(struct net *net, struct sk_buff *buf); void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
void tipc_named_reinit(struct net *net); void tipc_named_reinit(struct net *net);
void tipc_named_process_backlog(struct net *net); void tipc_named_process_backlog(struct net *net);
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr); void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
......
/* /*
* net/tipc/name_table.c: TIPC name table code * net/tipc/name_table.c: TIPC name table code
* *
* Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2004-2008, 2010-2014, Wind River Systems * Copyright (c) 2004-2008, 2010-2014, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -618,7 +618,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, ...@@ -618,7 +618,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
* Returns non-zero if any off-node ports overlap * Returns non-zero if any off-node ports overlap
*/ */
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct tipc_port_list *dports) u32 limit, struct tipc_plist *dports)
{ {
struct name_seq *seq; struct name_seq *seq;
struct sub_seq *sseq; struct sub_seq *sseq;
...@@ -643,7 +643,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, ...@@ -643,7 +643,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
info = sseq->info; info = sseq->info;
list_for_each_entry(publ, &info->node_list, node_list) { list_for_each_entry(publ, &info->node_list, node_list) {
if (publ->scope <= limit) if (publ->scope <= limit)
tipc_port_list_add(dports, publ->ref); tipc_plist_push(dports, publ->ref);
} }
if (info->cluster_list_size != info->node_list_size) if (info->cluster_list_size != info->node_list_size)
...@@ -1212,3 +1212,41 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) ...@@ -1212,3 +1212,41 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
return skb->len; return skb->len;
} }
void tipc_plist_push(struct tipc_plist *pl, u32 port)
{
struct tipc_plist *nl;
if (likely(!pl->port)) {
pl->port = port;
return;
}
if (pl->port == port)
return;
list_for_each_entry(nl, &pl->list, list) {
if (nl->port == port)
return;
}
nl = kmalloc(sizeof(*nl), GFP_ATOMIC);
if (nl) {
nl->port = port;
list_add(&nl->list, &pl->list);
}
}
u32 tipc_plist_pop(struct tipc_plist *pl)
{
struct tipc_plist *nl;
u32 port = 0;
if (likely(list_empty(&pl->list))) {
port = pl->port;
pl->port = 0;
return port;
}
nl = list_first_entry(&pl->list, typeof(*nl), list);
port = nl->port;
list_del(&nl->list);
kfree(nl);
return port;
}
/* /*
* net/tipc/name_table.h: Include file for TIPC name table code * net/tipc/name_table.h: Include file for TIPC name table code
* *
* Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
#define _TIPC_NAME_TABLE_H #define _TIPC_NAME_TABLE_H
struct tipc_subscription; struct tipc_subscription;
struct tipc_port_list; struct tipc_plist;
/* /*
* TIPC name types reserved for internal TIPC use (both current and planned) * TIPC name types reserved for internal TIPC use (both current and planned)
...@@ -101,7 +101,7 @@ struct sk_buff *tipc_nametbl_get(struct net *net, const void *req_tlv_area, ...@@ -101,7 +101,7 @@ struct sk_buff *tipc_nametbl_get(struct net *net, const void *req_tlv_area,
int req_tlv_space); int req_tlv_space);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct tipc_port_list *dports); u32 limit, struct tipc_plist *dports);
struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
u32 upper, u32 scope, u32 port_ref, u32 upper, u32 scope, u32 port_ref,
u32 key); u32 key);
...@@ -118,4 +118,18 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s); ...@@ -118,4 +118,18 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net); int tipc_nametbl_init(struct net *net);
void tipc_nametbl_stop(struct net *net); void tipc_nametbl_stop(struct net *net);
struct tipc_plist {
struct list_head list;
u32 port;
};
static inline void tipc_plist_init(struct tipc_plist *pl)
{
INIT_LIST_HEAD(&pl->list);
pl->port = 0;
}
void tipc_plist_push(struct tipc_plist *pl, u32 port);
u32 tipc_plist_pop(struct tipc_plist *pl);
#endif #endif
...@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) ...@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->publ_list);
INIT_LIST_HEAD(&n_ptr->conn_sks); INIT_LIST_HEAD(&n_ptr->conn_sks);
skb_queue_head_init(&n_ptr->waiting_sks);
__skb_queue_head_init(&n_ptr->bclink.deferred_queue); __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
list_for_each_entry_rcu(temp_node, &tn->node_list, list) { list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
if (n_ptr->addr < temp_node->addr) if (n_ptr->addr < temp_node->addr)
break; break;
...@@ -197,25 +194,6 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) ...@@ -197,25 +194,6 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
tipc_node_unlock(node); tipc_node_unlock(node);
} }
void tipc_node_abort_sock_conns(struct net *net, struct list_head *conns)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_sock_conn *conn, *safe;
struct sk_buff *buf;
list_for_each_entry_safe(conn, safe, conns, list) {
buf = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, 0,
tn->own_addr, conn->peer_node,
conn->port, conn->peer_port,
TIPC_ERR_NO_NODE);
if (likely(buf))
tipc_sk_rcv(net, buf);
list_del(&conn->list);
kfree(conn);
}
}
/** /**
* tipc_node_link_up - handle addition of link * tipc_node_link_up - handle addition of link
* *
...@@ -377,7 +355,11 @@ static void node_established_contact(struct tipc_node *n_ptr) ...@@ -377,7 +355,11 @@ static void node_established_contact(struct tipc_node *n_ptr)
static void node_lost_contact(struct tipc_node *n_ptr) static void node_lost_contact(struct tipc_node *n_ptr)
{ {
char addr_string[16]; char addr_string[16];
u32 i; struct tipc_sock_conn *conn, *safe;
struct list_head *conns = &n_ptr->conn_sks;
struct sk_buff *skb;
struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
uint i;
pr_debug("Lost contact with %s\n", pr_debug("Lost contact with %s\n",
tipc_addr_string_fill(addr_string, n_ptr->addr)); tipc_addr_string_fill(addr_string, n_ptr->addr));
...@@ -413,11 +395,25 @@ static void node_lost_contact(struct tipc_node *n_ptr) ...@@ -413,11 +395,25 @@ static void node_lost_contact(struct tipc_node *n_ptr)
n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN;
/* Notify subscribers and prevent re-contact with node until /* Prevent re-contact with node until cleanup is done */
* cleanup is done. n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN;
*/
n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN | /* Notify publications from this node */
TIPC_NOTIFY_NODE_DOWN; n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
/* Notify sockets connected to node */
list_for_each_entry_safe(conn, safe, conns, list) {
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
SHORT_H_SIZE, 0, tn->own_addr,
conn->peer_node, conn->port,
conn->peer_port, TIPC_ERR_NO_NODE);
if (likely(skb)) {
skb_queue_tail(n_ptr->inputq, skb);
n_ptr->action_flags |= TIPC_MSG_EVT;
}
list_del(&conn->list);
kfree(conn);
}
} }
struct sk_buff *tipc_node_get_nodes(struct net *net, const void *req_tlv_area, struct sk_buff *tipc_node_get_nodes(struct net *net, const void *req_tlv_area,
...@@ -566,44 +562,36 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr, ...@@ -566,44 +562,36 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
void tipc_node_unlock(struct tipc_node *node) void tipc_node_unlock(struct tipc_node *node)
{ {
struct net *net = node->net; struct net *net = node->net;
LIST_HEAD(nsub_list);
LIST_HEAD(conn_sks);
struct sk_buff_head waiting_sks;
u32 addr = 0; u32 addr = 0;
int flags = node->action_flags; u32 flags = node->action_flags;
u32 link_id = 0; u32 link_id = 0;
struct list_head *publ_list;
struct sk_buff_head *inputq = node->inputq;
struct sk_buff_head *namedq;
if (likely(!flags)) { if (likely(!flags || (flags == TIPC_MSG_EVT))) {
node->action_flags = 0;
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
if (flags == TIPC_MSG_EVT)
tipc_sk_rcv(net, inputq);
return; return;
} }
addr = node->addr; addr = node->addr;
link_id = node->link_id; link_id = node->link_id;
__skb_queue_head_init(&waiting_sks); namedq = node->namedq;
publ_list = &node->publ_list;
if (flags & TIPC_WAKEUP_USERS) node->action_flags &= ~(TIPC_MSG_EVT |
skb_queue_splice_init(&node->waiting_sks, &waiting_sks); TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
if (flags & TIPC_NOTIFY_NODE_DOWN) { TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
list_replace_init(&node->publ_list, &nsub_list); TIPC_NAMED_MSG_EVT);
list_replace_init(&node->conn_sks, &conn_sks);
}
node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN |
TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
TIPC_NOTIFY_LINK_DOWN |
TIPC_WAKEUP_BCAST_USERS);
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
while (!skb_queue_empty(&waiting_sks)) if (flags & TIPC_NOTIFY_NODE_DOWN)
tipc_sk_rcv(net, __skb_dequeue(&waiting_sks)); tipc_publ_notify(net, publ_list, addr);
if (!list_empty(&conn_sks))
tipc_node_abort_sock_conns(net, &conn_sks);
if (!list_empty(&nsub_list))
tipc_publ_notify(net, &nsub_list, addr);
if (flags & TIPC_WAKEUP_BCAST_USERS) if (flags & TIPC_WAKEUP_BCAST_USERS)
tipc_bclink_wakeup_users(net); tipc_bclink_wakeup_users(net);
...@@ -618,6 +606,15 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -618,6 +606,15 @@ void tipc_node_unlock(struct tipc_node *node)
if (flags & TIPC_NOTIFY_LINK_DOWN) if (flags & TIPC_NOTIFY_LINK_DOWN)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr); link_id, addr);
if (flags & TIPC_MSG_EVT)
tipc_sk_rcv(net, inputq);
if (flags & TIPC_NAMED_MSG_EVT)
tipc_named_rcv(net, namedq);
if (flags & TIPC_BCAST_MSG_EVT)
tipc_bclink_input(net);
} }
/* Caller should hold node lock for the passed node */ /* Caller should hold node lock for the passed node */
......
/* /*
* net/tipc/node.h: Include file for TIPC node management routines * net/tipc/node.h: Include file for TIPC node management routines
* *
* Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2005, 2010-2014, Wind River Systems * Copyright (c) 2005, 2010-2014, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -55,14 +55,16 @@ ...@@ -55,14 +55,16 @@
* TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
*/ */
enum { enum {
TIPC_MSG_EVT = 1,
TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
TIPC_NOTIFY_NODE_DOWN = (1 << 3), TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4), TIPC_NOTIFY_NODE_UP = (1 << 4),
TIPC_WAKEUP_USERS = (1 << 5), TIPC_WAKEUP_BCAST_USERS = (1 << 5),
TIPC_WAKEUP_BCAST_USERS = (1 << 6), TIPC_NOTIFY_LINK_UP = (1 << 6),
TIPC_NOTIFY_LINK_UP = (1 << 7), TIPC_NOTIFY_LINK_DOWN = (1 << 7),
TIPC_NOTIFY_LINK_DOWN = (1 << 8) TIPC_NAMED_MSG_EVT = (1 << 8),
TIPC_BCAST_MSG_EVT = (1 << 9)
}; };
/** /**
...@@ -73,6 +75,7 @@ enum { ...@@ -73,6 +75,7 @@ enum {
* @oos_state: state tracker for handling OOS b'cast messages * @oos_state: state tracker for handling OOS b'cast messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node * @deferred_queue: deferred queue saved OOS b'cast message received from node
* @reasm_buf: broadcast reassembly queue head from node * @reasm_buf: broadcast reassembly queue head from node
* @inputq_map: bitmap indicating which inqueues should be kicked
* @recv_permitted: true if node is allowed to receive b'cast messages * @recv_permitted: true if node is allowed to receive b'cast messages
*/ */
struct tipc_node_bclink { struct tipc_node_bclink {
...@@ -83,6 +86,7 @@ struct tipc_node_bclink { ...@@ -83,6 +86,7 @@ struct tipc_node_bclink {
u32 deferred_size; u32 deferred_size;
struct sk_buff_head deferred_queue; struct sk_buff_head deferred_queue;
struct sk_buff *reasm_buf; struct sk_buff *reasm_buf;
int inputq_map;
bool recv_permitted; bool recv_permitted;
}; };
...@@ -92,6 +96,9 @@ struct tipc_node_bclink { ...@@ -92,6 +96,9 @@ struct tipc_node_bclink {
* @lock: spinlock governing access to structure * @lock: spinlock governing access to structure
* @net: the applicable net namespace * @net: the applicable net namespace
* @hash: links to adjacent nodes in unsorted hash chain * @hash: links to adjacent nodes in unsorted hash chain
* @inputq: pointer to input queue containing messages for msg event
* @namedq: pointer to name table input queue with name table messages
* @curr_link: the link holding the node lock, if any
* @active_links: pointers to active links to node * @active_links: pointers to active links to node
* @links: pointers to all links to node * @links: pointers to all links to node
* @action_flags: bit mask of different types of node actions * @action_flags: bit mask of different types of node actions
...@@ -109,10 +116,12 @@ struct tipc_node { ...@@ -109,10 +116,12 @@ struct tipc_node {
spinlock_t lock; spinlock_t lock;
struct net *net; struct net *net;
struct hlist_node hash; struct hlist_node hash;
struct sk_buff_head *inputq;
struct sk_buff_head *namedq;
struct tipc_link *active_links[2]; struct tipc_link *active_links[2];
u32 act_mtus[2]; u32 act_mtus[2];
struct tipc_link *links[MAX_BEARERS]; struct tipc_link *links[MAX_BEARERS];
unsigned int action_flags; int action_flags;
struct tipc_node_bclink bclink; struct tipc_node_bclink bclink;
struct list_head list; struct list_head list;
int link_cnt; int link_cnt;
...@@ -120,7 +129,6 @@ struct tipc_node { ...@@ -120,7 +129,6 @@ struct tipc_node {
u32 signature; u32 signature;
u32 link_id; u32 link_id;
struct list_head publ_list; struct list_head publ_list;
struct sk_buff_head waiting_sks;
struct list_head conn_sks; struct list_head conn_sks;
struct rcu_head rcu; struct rcu_head rcu;
}; };
......
/* /*
* net/tipc/socket.c: TIPC socket API * net/tipc/socket.c: TIPC socket API
* *
* Copyright (c) 2001-2007, 2012-2014, Ericsson AB * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
* Copyright (c) 2004-2008, 2010-2013, Wind River Systems * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "node.h" #include "node.h"
#include "link.h" #include "link.h"
#include "config.h" #include "config.h"
#include "name_distr.h"
#include "socket.h" #include "socket.h"
#define SS_LISTENING -1 /* socket is listening */ #define SS_LISTENING -1 /* socket is listening */
...@@ -69,8 +70,6 @@ ...@@ -69,8 +70,6 @@
* @pub_count: total # of publications port has made during its lifetime * @pub_count: total # of publications port has made during its lifetime
* @probing_state: * @probing_state:
* @probing_intv: * @probing_intv:
* @port: port - interacts with 'sk' and with the rest of the TIPC stack
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request * @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion * @link_cong: non-zero if owner must sleep because of link congestion
...@@ -177,6 +176,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = { ...@@ -177,6 +176,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
* - port reference * - port reference
*/ */
static u32 tsk_own_node(struct tipc_sock *tsk)
{
return msg_prevnode(&tsk->phdr);
}
static u32 tsk_peer_node(struct tipc_sock *tsk) static u32 tsk_peer_node(struct tipc_sock *tsk)
{ {
return msg_destnode(&tsk->phdr); return msg_destnode(&tsk->phdr);
...@@ -249,11 +253,11 @@ static void tsk_rej_rx_queue(struct sock *sk) ...@@ -249,11 +253,11 @@ static void tsk_rej_rx_queue(struct sock *sk)
{ {
struct sk_buff *skb; struct sk_buff *skb;
u32 dnode; u32 dnode;
struct net *net = sock_net(sk); u32 own_node = tsk_own_node(tipc_sk(sk));
while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
if (tipc_msg_reverse(net, skb, &dnode, TIPC_ERR_NO_PORT)) if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
tipc_link_xmit_skb(net, skb, dnode, 0); tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
} }
} }
...@@ -305,6 +309,7 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) ...@@ -305,6 +309,7 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
static int tipc_sk_create(struct net *net, struct socket *sock, static int tipc_sk_create(struct net *net, struct socket *sock,
int protocol, int kern) int protocol, int kern)
{ {
struct tipc_net *tn;
const struct proto_ops *ops; const struct proto_ops *ops;
socket_state state; socket_state state;
struct sock *sk; struct sock *sk;
...@@ -346,7 +351,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -346,7 +351,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk->max_pkt = MAX_PKT_DEFAULT; tsk->max_pkt = MAX_PKT_DEFAULT;
INIT_LIST_HEAD(&tsk->publications); INIT_LIST_HEAD(&tsk->publications);
msg = &tsk->phdr; msg = &tsk->phdr;
tipc_msg_init(net, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, tn = net_generic(sock_net(sk), tipc_net_id);
tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
NAMED_H_SIZE, 0); NAMED_H_SIZE, 0);
/* Finish initializing socket data structures */ /* Finish initializing socket data structures */
...@@ -471,7 +477,6 @@ static int tipc_release(struct socket *sock) ...@@ -471,7 +477,6 @@ static int tipc_release(struct socket *sock)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct net *net; struct net *net;
struct tipc_net *tn;
struct tipc_sock *tsk; struct tipc_sock *tsk;
struct sk_buff *skb; struct sk_buff *skb;
u32 dnode, probing_state; u32 dnode, probing_state;
...@@ -484,8 +489,6 @@ static int tipc_release(struct socket *sock) ...@@ -484,8 +489,6 @@ static int tipc_release(struct socket *sock)
return 0; return 0;
net = sock_net(sk); net = sock_net(sk);
tn = net_generic(net, tipc_net_id);
tsk = tipc_sk(sk); tsk = tipc_sk(sk);
lock_sock(sk); lock_sock(sk);
...@@ -507,7 +510,7 @@ static int tipc_release(struct socket *sock) ...@@ -507,7 +510,7 @@ static int tipc_release(struct socket *sock)
tsk->connected = 0; tsk->connected = 0;
tipc_node_remove_conn(net, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid);
} }
if (tipc_msg_reverse(net, skb, &dnode, if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
TIPC_ERR_NO_PORT)) TIPC_ERR_NO_PORT))
tipc_link_xmit_skb(net, skb, dnode, 0); tipc_link_xmit_skb(net, skb, dnode, 0);
} }
...@@ -520,9 +523,9 @@ static int tipc_release(struct socket *sock) ...@@ -520,9 +523,9 @@ static int tipc_release(struct socket *sock)
sock_put(sk); sock_put(sk);
tipc_sk_remove(tsk); tipc_sk_remove(tsk);
if (tsk->connected) { if (tsk->connected) {
skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE, skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
tn->own_addr, tsk_peer_port(tsk), tsk_own_node(tsk), tsk_peer_port(tsk),
tsk->portid, TIPC_ERR_NO_PORT); tsk->portid, TIPC_ERR_NO_PORT);
if (skb) if (skb)
tipc_link_xmit_skb(net, skb, dnode, tsk->portid); tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
...@@ -730,9 +733,10 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -730,9 +733,10 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct msghdr *msg, size_t dsz, long timeo) struct msghdr *msg, size_t dsz, long timeo)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; struct tipc_msg *mhdr = &tsk->phdr;
struct sk_buff_head head; struct sk_buff_head *pktchain = &sk->sk_write_queue;
struct iov_iter save = msg->msg_iter; struct iov_iter save = msg->msg_iter;
uint mtu; uint mtu;
int rc; int rc;
...@@ -748,13 +752,12 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -748,13 +752,12 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
new_mtu: new_mtu:
mtu = tipc_bclink_get_mtu(); mtu = tipc_bclink_get_mtu();
__skb_queue_head_init(&head); rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
rc = tipc_msg_build(net, mhdr, msg, 0, dsz, mtu, &head);
if (unlikely(rc < 0)) if (unlikely(rc < 0))
return rc; return rc;
do { do {
rc = tipc_bclink_xmit(net, &head); rc = tipc_bclink_xmit(net, pktchain);
if (likely(rc >= 0)) { if (likely(rc >= 0)) {
rc = dsz; rc = dsz;
break; break;
...@@ -768,62 +771,78 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -768,62 +771,78 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
tipc_sk(sk)->link_cong = 1; tipc_sk(sk)->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo); rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc) if (rc)
__skb_queue_purge(&head); __skb_queue_purge(pktchain);
} while (!rc); } while (!rc);
return rc; return rc;
} }
/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets /**
* tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
* @arrvq: queue with arriving messages, to be cloned after destination lookup
* @inputq: queue with cloned messages, delivered to socket after dest lookup
*
* Multi-threaded: parallel calls with reference to same queues may occur
*/ */
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg;
struct tipc_port_list dports = {0, NULL, }; struct tipc_plist dports;
struct tipc_port_list *item; u32 portid;
struct sk_buff *b;
uint i, last, dst = 0;
u32 scope = TIPC_CLUSTER_SCOPE; u32 scope = TIPC_CLUSTER_SCOPE;
struct sk_buff_head tmpq;
if (in_own_node(net, msg_orignode(msg))) uint hsz;
scope = TIPC_NODE_SCOPE; struct sk_buff *skb, *_skb;
/* Create destination port list: */ __skb_queue_head_init(&tmpq);
tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), tipc_plist_init(&dports);
msg_nameupper(msg), scope, &dports);
last = dports.count; skb = tipc_skb_peek(arrvq, &inputq->lock);
if (!last) { for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
kfree_skb(buf); msg = buf_msg(skb);
return; hsz = skb_headroom(skb) + msg_hdr_sz(msg);
}
if (in_own_node(net, msg_orignode(msg)))
for (item = &dports; item; item = item->next) { scope = TIPC_NODE_SCOPE;
for (i = 0; i < PLSIZE && ++dst <= last; i++) {
b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; /* Create destination port list and message clones: */
if (!b) { tipc_nametbl_mc_translate(net,
pr_warn("Failed do clone mcast rcv buffer\n"); msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports);
portid = tipc_plist_pop(&dports);
for (; portid; portid = tipc_plist_pop(&dports)) {
_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
__skb_queue_tail(&tmpq, _skb);
continue; continue;
} }
msg_set_destport(msg, item->ports[i]); pr_warn("Failed to clone mcast rcv buffer\n");
tipc_sk_rcv(net, b); }
/* Append to inputq if not already done by other thread */
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
skb_queue_splice_tail_init(&tmpq, inputq);
kfree_skb(__skb_dequeue(arrvq));
} }
spin_unlock_bh(&inputq->lock);
__skb_queue_purge(&tmpq);
kfree_skb(skb);
} }
tipc_port_list_free(&dports); tipc_sk_rcv(net, inputq);
} }
/** /**
* tipc_sk_proto_rcv - receive a connection mng protocol message * tipc_sk_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket * @tsk: receiving socket
* @dnode: node to send response message to, if any * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
* @buf: buffer containing protocol message
* Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
* (CONN_PROBE_REPLY) message should be forwarded.
*/ */
static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(*skb);
int conn_cong; int conn_cong;
u32 dnode;
u32 own_node = tsk_own_node(tsk);
/* Ignore if connection cannot be validated: */ /* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, msg)) if (!tsk_peer_msg(tsk, msg))
goto exit; goto exit;
...@@ -836,15 +855,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, ...@@ -836,15 +855,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
if (conn_cong) if (conn_cong)
tsk->sk.sk_write_space(&tsk->sk); tsk->sk.sk_write_space(&tsk->sk);
} else if (msg_type(msg) == CONN_PROBE) { } else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(sock_net(&tsk->sk), buf, dnode, TIPC_OK)) if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
return TIPC_OK; msg_set_type(msg, CONN_PROBE_REPLY);
msg_set_type(msg, CONN_PROBE_REPLY); return;
return TIPC_FWD_MSG; }
} }
/* Do nothing if msg_type() == CONN_PROBE_REPLY */ /* Do nothing if msg_type() == CONN_PROBE_REPLY */
exit: exit:
kfree_skb(buf); kfree_skb(*skb);
return TIPC_OK; *skb = NULL;
} }
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
...@@ -895,7 +914,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, ...@@ -895,7 +914,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr; struct tipc_msg *mhdr = &tsk->phdr;
u32 dnode, dport; u32 dnode, dport;
struct sk_buff_head head; struct sk_buff_head *pktchain = &sk->sk_write_queue;
struct sk_buff *skb; struct sk_buff *skb;
struct tipc_name_seq *seq = &dest->addr.nameseq; struct tipc_name_seq *seq = &dest->addr.nameseq;
struct iov_iter save; struct iov_iter save;
...@@ -970,15 +989,14 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, ...@@ -970,15 +989,14 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
save = m->msg_iter; save = m->msg_iter;
new_mtu: new_mtu:
mtu = tipc_node_get_mtu(net, dnode, tsk->portid); mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
__skb_queue_head_init(&head); rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
rc = tipc_msg_build(net, mhdr, m, 0, dsz, mtu, &head);
if (rc < 0) if (rc < 0)
goto exit; goto exit;
do { do {
skb = skb_peek(&head); skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, &head, dnode, tsk->portid); rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
if (likely(rc >= 0)) { if (likely(rc >= 0)) {
if (sock->state != SS_READY) if (sock->state != SS_READY)
sock->state = SS_CONNECTING; sock->state = SS_CONNECTING;
...@@ -994,7 +1012,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, ...@@ -994,7 +1012,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
tsk->link_cong = 1; tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo); rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc) if (rc)
__skb_queue_purge(&head); __skb_queue_purge(pktchain);
} while (!rc); } while (!rc);
exit: exit:
if (iocb) if (iocb)
...@@ -1052,7 +1070,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -1052,7 +1070,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *mhdr = &tsk->phdr; struct tipc_msg *mhdr = &tsk->phdr;
struct sk_buff_head head; struct sk_buff_head *pktchain = &sk->sk_write_queue;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
u32 portid = tsk->portid; u32 portid = tsk->portid;
int rc = -EINVAL; int rc = -EINVAL;
...@@ -1089,13 +1107,12 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -1089,13 +1107,12 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
save = m->msg_iter; save = m->msg_iter;
mtu = tsk->max_pkt; mtu = tsk->max_pkt;
send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
__skb_queue_head_init(&head); rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
rc = tipc_msg_build(net, mhdr, m, sent, send, mtu, &head);
if (unlikely(rc < 0)) if (unlikely(rc < 0))
goto exit; goto exit;
do { do {
if (likely(!tsk_conn_cong(tsk))) { if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_link_xmit(net, &head, dnode, portid); rc = tipc_link_xmit(net, pktchain, dnode, portid);
if (likely(!rc)) { if (likely(!rc)) {
tsk->sent_unacked++; tsk->sent_unacked++;
sent += send; sent += send;
...@@ -1115,7 +1132,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -1115,7 +1132,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
} }
rc = tipc_wait_for_sndpkt(sock, &timeo); rc = tipc_wait_for_sndpkt(sock, &timeo);
if (rc) if (rc)
__skb_queue_purge(&head); __skb_queue_purge(pktchain);
} while (!rc); } while (!rc);
exit: exit:
if (iocb) if (iocb)
...@@ -1263,7 +1280,6 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, ...@@ -1263,7 +1280,6 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
{ {
struct net *net = sock_net(&tsk->sk); struct net *net = sock_net(&tsk->sk);
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *skb = NULL; struct sk_buff *skb = NULL;
struct tipc_msg *msg; struct tipc_msg *msg;
u32 peer_port = tsk_peer_port(tsk); u32 peer_port = tsk_peer_port(tsk);
...@@ -1271,9 +1287,9 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) ...@@ -1271,9 +1287,9 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!tsk->connected) if (!tsk->connected)
return; return;
skb = tipc_msg_create(net, CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
dnode, tn->own_addr, peer_port, tsk->portid, dnode, tsk_own_node(tsk), peer_port,
TIPC_OK); tsk->portid, TIPC_OK);
if (!skb) if (!skb)
return; return;
msg = buf_msg(skb); msg = buf_msg(skb);
...@@ -1564,16 +1580,16 @@ static void tipc_data_ready(struct sock *sk) ...@@ -1564,16 +1580,16 @@ static void tipc_data_ready(struct sock *sk)
/** /**
* filter_connect - Handle all incoming messages for a connection-based socket * filter_connect - Handle all incoming messages for a connection-based socket
* @tsk: TIPC socket * @tsk: TIPC socket
* @msg: message * @skb: pointer to message buffer. Set to NULL if buffer is consumed
* *
* Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
*/ */
static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
{ {
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(*buf); struct tipc_msg *msg = buf_msg(*skb);
int retval = -TIPC_ERR_NO_PORT; int retval = -TIPC_ERR_NO_PORT;
if (msg_mcast(msg)) if (msg_mcast(msg))
...@@ -1623,8 +1639,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) ...@@ -1623,8 +1639,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
* connect() routine if sleeping. * connect() routine if sleeping.
*/ */
if (msg_data_sz(msg) == 0) { if (msg_data_sz(msg) == 0) {
kfree_skb(*buf); kfree_skb(*skb);
*buf = NULL; *skb = NULL;
if (waitqueue_active(sk_sleep(sk))) if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk)); wake_up_interruptible(sk_sleep(sk));
} }
...@@ -1676,32 +1692,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) ...@@ -1676,32 +1692,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
/** /**
* filter_rcv - validate incoming message * filter_rcv - validate incoming message
* @sk: socket * @sk: socket
* @buf: message * @skb: pointer to message. Set to NULL if buffer is consumed.
* *
* Enqueues message on receive queue if acceptable; optionally handles * Enqueues message on receive queue if acceptable; optionally handles
* disconnect indication for a connected socket. * disconnect indication for a connected socket.
* *
* Called with socket lock already taken; port lock may also be taken. * Called with socket lock already taken
* *
* Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
* to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
*/ */
static int filter_rcv(struct sock *sk, struct sk_buff *buf) static int filter_rcv(struct sock *sk, struct sk_buff **skb)
{ {
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(*skb);
unsigned int limit = rcvbuf_limit(sk, buf); unsigned int limit = rcvbuf_limit(sk, *skb);
u32 onode;
int rc = TIPC_OK; int rc = TIPC_OK;
if (unlikely(msg_user(msg) == CONN_MANAGER)) if (unlikely(msg_user(msg) == CONN_MANAGER)) {
return tipc_sk_proto_rcv(tsk, &onode, buf); tipc_sk_proto_rcv(tsk, skb);
return TIPC_OK;
}
if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
kfree_skb(buf); kfree_skb(*skb);
tsk->link_cong = 0; tsk->link_cong = 0;
sk->sk_write_space(sk); sk->sk_write_space(sk);
*skb = NULL;
return TIPC_OK; return TIPC_OK;
} }
...@@ -1713,21 +1730,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1713,21 +1730,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
if (msg_connected(msg)) if (msg_connected(msg))
return -TIPC_ERR_NO_PORT; return -TIPC_ERR_NO_PORT;
} else { } else {
rc = filter_connect(tsk, &buf); rc = filter_connect(tsk, skb);
if (rc != TIPC_OK || buf == NULL) if (rc != TIPC_OK || !*skb)
return rc; return rc;
} }
/* Reject message if there isn't room to queue it */ /* Reject message if there isn't room to queue it */
if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
return -TIPC_ERR_OVERLOAD; return -TIPC_ERR_OVERLOAD;
/* Enqueue message */ /* Enqueue message */
TIPC_SKB_CB(buf)->handle = NULL; TIPC_SKB_CB(*skb)->handle = NULL;
__skb_queue_tail(&sk->sk_receive_queue, buf); __skb_queue_tail(&sk->sk_receive_queue, *skb);
skb_set_owner_r(buf, sk); skb_set_owner_r(*skb, sk);
sk->sk_data_ready(sk); sk->sk_data_ready(sk);
*skb = NULL;
return TIPC_OK; return TIPC_OK;
} }
...@@ -1736,79 +1754,126 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1736,79 +1754,126 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
* @sk: socket * @sk: socket
* @skb: message * @skb: message
* *
* Caller must hold socket lock, but not port lock. * Caller must hold socket lock
* *
* Returns 0 * Returns 0
*/ */
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{ {
int rc; int err;
u32 onode; atomic_t *dcnt;
u32 dnode;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
uint truesize = skb->truesize; uint truesize = skb->truesize;
rc = filter_rcv(sk, skb); err = filter_rcv(sk, &skb);
if (likely(!skb)) {
if (likely(!rc)) { dcnt = &tsk->dupl_rcvcnt;
if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
atomic_add(truesize, &tsk->dupl_rcvcnt); atomic_add(truesize, dcnt);
return 0; return 0;
} }
if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
if ((rc < 0) && !tipc_msg_reverse(net, skb, &onode, -rc)) tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
return 0;
tipc_link_xmit_skb(net, skb, onode, 0);
return 0; return 0;
} }
/** /**
* tipc_sk_rcv - handle incoming message * tipc_sk_enqueue - extract all buffers with destination 'dport' from
* @skb: buffer containing arriving message * inputq and try adding them to socket or backlog queue
* Consumes buffer * @inputq: list of incoming buffers with potentially different destinations
* Returns 0 if success, or errno: -EHOSTUNREACH * @sk: socket where the buffers should be enqueued
* @dport: port number for the socket
* @_skb: returned buffer to be forwarded or rejected, if applicable
*
* Caller must hold socket lock
*
* Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
* or -TIPC_ERR_NO_PORT
*/ */
int tipc_sk_rcv(struct net *net, struct sk_buff *skb) static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
u32 dport, struct sk_buff **_skb)
{ {
struct tipc_sock *tsk; unsigned int lim;
struct sock *sk; atomic_t *dcnt;
u32 dport = msg_destport(buf_msg(skb)); int err;
int rc = TIPC_OK; struct sk_buff *skb;
uint limit; unsigned long time_limit = jiffies + 2;
u32 dnode;
/* Validate destination and message */ while (skb_queue_len(inputq)) {
tsk = tipc_sk_lookup(net, dport); skb = tipc_skb_dequeue(inputq, dport);
if (unlikely(!tsk)) { if (unlikely(!skb))
rc = tipc_msg_eval(net, skb, &dnode); return TIPC_OK;
goto exit; /* Return if softirq window exhausted */
if (unlikely(time_after_eq(jiffies, time_limit)))
return TIPC_OK;
if (!sock_owned_by_user(sk)) {
err = filter_rcv(sk, &skb);
if (likely(!skb))
continue;
*_skb = skb;
return err;
}
dcnt = &tipc_sk(sk)->dupl_rcvcnt;
if (sk->sk_backlog.len)
atomic_set(dcnt, 0);
lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
if (likely(!sk_add_backlog(sk, skb, lim)))
continue;
*_skb = skb;
return -TIPC_ERR_OVERLOAD;
} }
sk = &tsk->sk; return TIPC_OK;
}
/* Queue message */ /**
spin_lock_bh(&sk->sk_lock.slock); * tipc_sk_rcv - handle a chain of incoming buffers
* @inputq: buffer list containing the buffers
* Consumes all buffers in list until inputq is empty
* Note: may be called in multiple threads referring to the same queue
* Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
* Only node local calls check the return value, sending single-buffer queues
*/
int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
u32 dnode, dport = 0;
int err = -TIPC_ERR_NO_PORT;
struct sk_buff *skb;
struct tipc_sock *tsk;
struct tipc_net *tn;
struct sock *sk;
if (!sock_owned_by_user(sk)) { while (skb_queue_len(inputq)) {
rc = filter_rcv(sk, skb); skb = NULL;
} else { dport = tipc_skb_peek_port(inputq, dport);
if (sk->sk_backlog.len == 0) tsk = tipc_sk_lookup(net, dport);
atomic_set(&tsk->dupl_rcvcnt, 0); if (likely(tsk)) {
limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt); sk = &tsk->sk;
if (sk_add_backlog(sk, skb, limit)) if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
rc = -TIPC_ERR_OVERLOAD; err = tipc_sk_enqueue(inputq, sk, dport, &skb);
spin_unlock_bh(&sk->sk_lock.slock);
dport = 0;
}
sock_put(sk);
} else {
skb = tipc_skb_dequeue(inputq, dport);
}
if (likely(!skb))
continue;
if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
goto xmit;
if (!err) {
dnode = msg_destnode(buf_msg(skb));
goto xmit;
}
tn = net_generic(net, tipc_net_id);
if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
continue;
xmit:
tipc_link_xmit_skb(net, skb, dnode, dport);
} }
spin_unlock_bh(&sk->sk_lock.slock); return err ? -EHOSTUNREACH : 0;
sock_put(sk);
if (likely(!rc))
return 0;
exit:
if ((rc < 0) && !tipc_msg_reverse(net, skb, &dnode, -rc))
return -EHOSTUNREACH;
tipc_link_xmit_skb(net, skb, dnode, 0);
return (rc < 0) ? -EHOSTUNREACH : 0;
} }
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
...@@ -2065,7 +2130,6 @@ static int tipc_shutdown(struct socket *sock, int how) ...@@ -2065,7 +2130,6 @@ static int tipc_shutdown(struct socket *sock, int how)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct sk_buff *skb; struct sk_buff *skb;
u32 dnode; u32 dnode;
...@@ -2088,16 +2152,17 @@ static int tipc_shutdown(struct socket *sock, int how) ...@@ -2088,16 +2152,17 @@ static int tipc_shutdown(struct socket *sock, int how)
kfree_skb(skb); kfree_skb(skb);
goto restart; goto restart;
} }
if (tipc_msg_reverse(net, skb, &dnode, if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
TIPC_CONN_SHUTDOWN)) TIPC_CONN_SHUTDOWN))
tipc_link_xmit_skb(net, skb, dnode, tipc_link_xmit_skb(net, skb, dnode,
tsk->portid); tsk->portid);
tipc_node_remove_conn(net, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid);
} else { } else {
dnode = tsk_peer_node(tsk); dnode = tsk_peer_node(tsk);
skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, TIPC_CONN_MSG, SHORT_H_SIZE,
0, dnode, tn->own_addr, 0, dnode, tsk_own_node(tsk),
tsk_peer_port(tsk), tsk_peer_port(tsk),
tsk->portid, TIPC_CONN_SHUTDOWN); tsk->portid, TIPC_CONN_SHUTDOWN);
tipc_link_xmit_skb(net, skb, dnode, tsk->portid); tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
...@@ -2129,10 +2194,9 @@ static void tipc_sk_timeout(unsigned long data) ...@@ -2129,10 +2194,9 @@ static void tipc_sk_timeout(unsigned long data)
{ {
struct tipc_sock *tsk = (struct tipc_sock *)data; struct tipc_sock *tsk = (struct tipc_sock *)data;
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk);
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *skb = NULL; struct sk_buff *skb = NULL;
u32 peer_port, peer_node; u32 peer_port, peer_node;
u32 own_node = tsk_own_node(tsk);
bh_lock_sock(sk); bh_lock_sock(sk);
if (!tsk->connected) { if (!tsk->connected) {
...@@ -2144,13 +2208,13 @@ static void tipc_sk_timeout(unsigned long data) ...@@ -2144,13 +2208,13 @@ static void tipc_sk_timeout(unsigned long data)
if (tsk->probing_state == TIPC_CONN_PROBING) { if (tsk->probing_state == TIPC_CONN_PROBING) {
/* Previous probe not answered -> self abort */ /* Previous probe not answered -> self abort */
skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE, skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, 0, TIPC_CONN_MSG, SHORT_H_SIZE, 0,
tn->own_addr, peer_node, tsk->portid, own_node, peer_node, tsk->portid,
peer_port, TIPC_ERR_NO_PORT); peer_port, TIPC_ERR_NO_PORT);
} else { } else {
skb = tipc_msg_create(net, CONN_MANAGER, CONN_PROBE, INT_H_SIZE, skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
0, peer_node, tn->own_addr, INT_H_SIZE, 0, peer_node, own_node,
peer_port, tsk->portid, TIPC_OK); peer_port, tsk->portid, TIPC_OK);
tsk->probing_state = TIPC_CONN_PROBING; tsk->probing_state = TIPC_CONN_PROBING;
sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv); sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
......
/* net/tipc/socket.h: Include file for TIPC socket code /* net/tipc/socket.h: Include file for TIPC socket code
* *
* Copyright (c) 2014, Ericsson AB * Copyright (c) 2014-2015, Ericsson AB
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -42,16 +42,16 @@ ...@@ -42,16 +42,16 @@
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) #define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ #define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
int tipc_socket_init(void); int tipc_socket_init(void);
void tipc_socket_stop(void); void tipc_socket_stop(void);
int tipc_sock_create_local(struct net *net, int type, struct socket **res); int tipc_sock_create_local(struct net *net, int type, struct socket **res);
void tipc_sock_release_local(struct socket *sock); void tipc_sock_release_local(struct socket *sock);
int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
int flags); int flags);
int tipc_sk_rcv(struct net *net, struct sk_buff *buf); int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
struct sk_buff *tipc_sk_socks_show(struct net *net); struct sk_buff *tipc_sk_socks_show(struct net *net);
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq);
void tipc_sk_reinit(struct net *net); void tipc_sk_reinit(struct net *net);
int tipc_sk_rht_init(struct net *net); int tipc_sk_rht_init(struct net *net);
void tipc_sk_rht_destroy(struct net *net); void tipc_sk_rht_destroy(struct net *net);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment