Commit d6cc76d3 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: bug fixes and improvements

Intensive and extensive testing has revealed some rather infrequent
problems related to flow control, buffer handling and link
establishment. Commits ##1 to 4 deal with these problems.

The remaining four commits are just code improvments, aiming at
making the code more comprehensible and maintainable. There are
no functional enhancements in this series.

v2: Fixed a typo in commit log #2. Otherwise no changes from v1.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 3fdddd85 9816f061
...@@ -506,18 +506,14 @@ void tipc_bclink_rcv(struct sk_buff *buf) ...@@ -506,18 +506,14 @@ void tipc_bclink_rcv(struct sk_buff *buf)
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_link_bundle_rcv(buf); tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret; tipc_buf_append(&node->bclink.reasm_buf, &buf);
ret = tipc_link_frag_rcv(&node->bclink.reasm_head, if (unlikely(!buf && !node->bclink.reasm_buf))
&node->bclink.reasm_tail,
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock; goto unlock;
tipc_bclink_lock(); tipc_bclink_lock();
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++; bcl->stats.recv_fragments++;
if (ret == LINK_REASM_COMPLETE) { if (buf) {
bcl->stats.recv_fragmented++; bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf); msg = buf_msg(buf);
tipc_bclink_unlock(); tipc_bclink_unlock();
goto receive; goto receive;
......
...@@ -411,28 +411,6 @@ int tipc_disable_bearer(const char *name) ...@@ -411,28 +411,6 @@ int tipc_disable_bearer(const char *name)
return res; return res;
} }
/* tipc_l2_media_addr_set - initialize Ethernet media address structure
*
* Media-dependent "value" field stores MAC address in first 6 bytes
* and zeroes out the remaining bytes.
*/
void tipc_l2_media_addr_set(const struct tipc_bearer *b,
struct tipc_media_addr *a, char *mac)
{
int len = b->media->hwaddr_len;
if (unlikely(sizeof(a->value) < len)) {
WARN_ONCE(1, "Media length invalid\n");
return;
}
memcpy(a->value, mac, len);
memset(a->value + len, 0, sizeof(a->value) - len);
a->media_id = b->media->type_id;
a->broadcast = !memcmp(mac, b->bcast_addr.value, len);
}
int tipc_enable_l2_media(struct tipc_bearer *b) int tipc_enable_l2_media(struct tipc_bearer *b)
{ {
struct net_device *dev; struct net_device *dev;
...@@ -443,21 +421,21 @@ int tipc_enable_l2_media(struct tipc_bearer *b) ...@@ -443,21 +421,21 @@ int tipc_enable_l2_media(struct tipc_bearer *b)
if (!dev) if (!dev)
return -ENODEV; return -ENODEV;
/* Associate TIPC bearer with Ethernet bearer */ /* Associate TIPC bearer with L2 bearer */
rcu_assign_pointer(b->media_ptr, dev); rcu_assign_pointer(b->media_ptr, dev);
memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value)); memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
b->bcast_addr.media_id = b->media->type_id; b->bcast_addr.media_id = b->media->type_id;
b->bcast_addr.broadcast = 1; b->bcast_addr.broadcast = 1;
b->mtu = dev->mtu; b->mtu = dev->mtu;
tipc_l2_media_addr_set(b, &b->addr, (char *)dev->dev_addr); b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
rcu_assign_pointer(dev->tipc_ptr, b); rcu_assign_pointer(dev->tipc_ptr, b);
return 0; return 0;
} }
/* tipc_disable_l2_media - detach TIPC bearer from an Ethernet interface /* tipc_disable_l2_media - detach TIPC bearer from an L2 interface
* *
* Mark Ethernet bearer as inactive so that incoming buffers are thrown away, * Mark L2 bearer as inactive so that incoming buffers are thrown away,
* then get worker thread to complete bearer cleanup. (Can't do cleanup * then get worker thread to complete bearer cleanup. (Can't do cleanup
* here because cleanup code needs to sleep and caller holds spinlocks.) * here because cleanup code needs to sleep and caller holds spinlocks.)
*/ */
...@@ -473,7 +451,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b) ...@@ -473,7 +451,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
} }
/** /**
* tipc_l2_send_msg - send a TIPC packet out over an Ethernet interface * tipc_l2_send_msg - send a TIPC packet out over an L2 interface
* @buf: the packet to be sent * @buf: the packet to be sent
* @b_ptr: the bearer through which the packet is to be sent * @b_ptr: the bearer through which the packet is to be sent
* @dest: peer destination address * @dest: peer destination address
...@@ -597,7 +575,7 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt, ...@@ -597,7 +575,7 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
tipc_reset_bearer(b_ptr); tipc_reset_bearer(b_ptr);
break; break;
case NETDEV_CHANGEADDR: case NETDEV_CHANGEADDR:
tipc_l2_media_addr_set(b_ptr, &b_ptr->addr, b_ptr->media->raw2addr(b_ptr, &b_ptr->addr,
(char *)dev->dev_addr); (char *)dev->dev_addr);
tipc_reset_bearer(b_ptr); tipc_reset_bearer(b_ptr);
break; break;
......
...@@ -42,14 +42,12 @@ ...@@ -42,14 +42,12 @@
#define MAX_BEARERS 2 #define MAX_BEARERS 2
#define MAX_MEDIA 2 #define MAX_MEDIA 2
/* /* Identifiers associated with TIPC message header media address info
* Identifiers associated with TIPC message header media address info * - address info field is 32 bytes long
* * - the field's actual content and length is defined per media
* - address info field is 20 bytes long * - remaining unused bytes in the field are set to zero
* - media type identifier located at offset 3
* - remaining bytes vary according to media type
*/ */
#define TIPC_MEDIA_ADDR_SIZE 20 #define TIPC_MEDIA_ADDR_SIZE 32
#define TIPC_MEDIA_TYPE_OFFSET 3 #define TIPC_MEDIA_TYPE_OFFSET 3
/* /*
...@@ -77,9 +75,10 @@ struct tipc_bearer; ...@@ -77,9 +75,10 @@ struct tipc_bearer;
* @send_msg: routine which handles buffer transmission * @send_msg: routine which handles buffer transmission
* @enable_media: routine which enables a media * @enable_media: routine which enables a media
* @disable_media: routine which disables a media * @disable_media: routine which disables a media
* @addr2str: routine which converts media address to string * @addr2str: convert media address format to string
* @addr2msg: routine which converts media address to protocol message area * @addr2msg: convert from media addr format to discovery msg addr format
* @msg2addr: routine which converts media address from protocol message area * @msg2addr: convert from discovery msg addr format to media addr format
* @raw2addr: convert from raw addr format to media addr format
* @priority: default link (and bearer) priority * @priority: default link (and bearer) priority
* @tolerance: default time (in ms) before declaring link failure * @tolerance: default time (in ms) before declaring link failure
* @window: default window (in packets) before declaring link congestion * @window: default window (in packets) before declaring link congestion
...@@ -93,10 +92,16 @@ struct tipc_media { ...@@ -93,10 +92,16 @@ struct tipc_media {
struct tipc_media_addr *dest); struct tipc_media_addr *dest);
int (*enable_media)(struct tipc_bearer *b_ptr); int (*enable_media)(struct tipc_bearer *b_ptr);
void (*disable_media)(struct tipc_bearer *b_ptr); void (*disable_media)(struct tipc_bearer *b_ptr);
int (*addr2str)(struct tipc_media_addr *a, char *str_buf, int str_size); int (*addr2str)(struct tipc_media_addr *addr,
int (*addr2msg)(struct tipc_media_addr *a, char *msg_area); char *strbuf,
int (*msg2addr)(const struct tipc_bearer *b_ptr, int bufsz);
struct tipc_media_addr *a, char *msg_area); int (*addr2msg)(char *msg, struct tipc_media_addr *addr);
int (*msg2addr)(struct tipc_bearer *b,
struct tipc_media_addr *addr,
char *msg);
int (*raw2addr)(struct tipc_bearer *b,
struct tipc_media_addr *addr,
char *raw);
u32 priority; u32 priority;
u32 tolerance; u32 tolerance;
u32 window; u32 window;
...@@ -175,8 +180,6 @@ int tipc_media_set_priority(const char *name, u32 new_value); ...@@ -175,8 +180,6 @@ int tipc_media_set_priority(const char *name, u32 new_value);
int tipc_media_set_window(const char *name, u32 new_value); int tipc_media_set_window(const char *name, u32 new_value);
void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a);
struct sk_buff *tipc_media_get_names(void); struct sk_buff *tipc_media_get_names(void);
void tipc_l2_media_addr_set(const struct tipc_bearer *b,
struct tipc_media_addr *a, char *mac);
int tipc_enable_l2_media(struct tipc_bearer *b); int tipc_enable_l2_media(struct tipc_bearer *b);
void tipc_disable_l2_media(struct tipc_bearer *b); void tipc_disable_l2_media(struct tipc_bearer *b);
int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
......
...@@ -154,10 +154,11 @@ static int __init tipc_init(void) ...@@ -154,10 +154,11 @@ static int __init tipc_init(void)
tipc_max_ports = CONFIG_TIPC_PORTS; tipc_max_ports = CONFIG_TIPC_PORTS;
tipc_net_id = 4711; tipc_net_id = 4711;
sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE; sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE;
sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
TIPC_CRITICAL_IMPORTANCE; TIPC_CRITICAL_IMPORTANCE;
sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT; sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
res = tipc_core_start(); res = tipc_core_start();
if (res) if (res)
......
...@@ -57,6 +57,7 @@ ...@@ -57,6 +57,7 @@
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/vmalloc.h> #include <linux/vmalloc.h>
#include <linux/rtnetlink.h> #include <linux/rtnetlink.h>
#include <linux/etherdevice.h>
#define TIPC_MOD_VER "2.0.0" #define TIPC_MOD_VER "2.0.0"
...@@ -187,6 +188,7 @@ static inline void k_term_timer(struct timer_list *timer) ...@@ -187,6 +188,7 @@ static inline void k_term_timer(struct timer_list *timer)
struct tipc_skb_cb { struct tipc_skb_cb {
void *handle; void *handle;
bool deferred; bool deferred;
struct sk_buff *tail;
}; };
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
......
/* /*
* net/tipc/discover.c * net/tipc/discover.c
* *
* Copyright (c) 2003-2006, Ericsson AB * Copyright (c) 2003-2006, 2014, Ericsson AB
* Copyright (c) 2005-2006, 2010-2011, Wind River Systems * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -83,7 +83,7 @@ static void tipc_disc_init_msg(struct sk_buff *buf, u32 type, ...@@ -83,7 +83,7 @@ static void tipc_disc_init_msg(struct sk_buff *buf, u32 type,
msg_set_node_sig(msg, tipc_random); msg_set_node_sig(msg, tipc_random);
msg_set_dest_domain(msg, dest_domain); msg_set_dest_domain(msg, dest_domain);
msg_set_bc_netid(msg, tipc_net_id); msg_set_bc_netid(msg, tipc_net_id);
b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg)); b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
} }
/** /**
...@@ -106,147 +106,150 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr, ...@@ -106,147 +106,150 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
} }
/** /**
* tipc_disc_rcv - handle incoming link setup message (request or response) * tipc_disc_rcv - handle incoming discovery message (request or response)
* @buf: buffer containing message * @buf: buffer containing message
* @b_ptr: bearer that message arrived on * @bearer: bearer that message arrived on
*/ */
void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr) void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *bearer)
{ {
struct tipc_node *n_ptr; struct tipc_node *node;
struct tipc_link *link; struct tipc_link *link;
struct tipc_media_addr media_addr; struct tipc_media_addr maddr;
struct sk_buff *rbuf; struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
u32 dest = msg_dest_domain(msg); u32 ddom = msg_dest_domain(msg);
u32 orig = msg_prevnode(msg); u32 onode = msg_prevnode(msg);
u32 net_id = msg_bc_netid(msg); u32 net_id = msg_bc_netid(msg);
u32 type = msg_type(msg); u32 mtyp = msg_type(msg);
u32 signature = msg_node_sig(msg); u32 signature = msg_node_sig(msg);
int addr_mismatch; bool addr_match = false;
int link_fully_up; bool sign_match = false;
bool link_up = false;
media_addr.broadcast = 1; bool accept_addr = false;
b_ptr->media->msg2addr(b_ptr, &media_addr, msg_media_addr(msg)); bool accept_sign = false;
bool respond = false;
bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg));
kfree_skb(buf); kfree_skb(buf);
/* Ensure message from node is valid and communication is permitted */ /* Ensure message from node is valid and communication is permitted */
if (net_id != tipc_net_id) if (net_id != tipc_net_id)
return; return;
if (media_addr.broadcast) if (maddr.broadcast)
return; return;
if (!tipc_addr_domain_valid(dest)) if (!tipc_addr_domain_valid(ddom))
return; return;
if (!tipc_addr_node_valid(orig)) if (!tipc_addr_node_valid(onode))
return; return;
if (orig == tipc_own_addr) {
if (memcmp(&media_addr, &b_ptr->addr, sizeof(media_addr))) if (in_own_node(onode)) {
disc_dupl_alert(b_ptr, tipc_own_addr, &media_addr); if (memcmp(&maddr, &bearer->addr, sizeof(maddr)))
disc_dupl_alert(bearer, tipc_own_addr, &maddr);
return; return;
} }
if (!tipc_in_scope(dest, tipc_own_addr)) if (!tipc_in_scope(ddom, tipc_own_addr))
return; return;
if (!tipc_in_scope(b_ptr->domain, orig)) if (!tipc_in_scope(bearer->domain, onode))
return; return;
/* Locate structure corresponding to requesting node */ /* Locate, or if necessary, create, node: */
n_ptr = tipc_node_find(orig); node = tipc_node_find(onode);
if (!n_ptr) { if (!node)
n_ptr = tipc_node_create(orig); node = tipc_node_create(onode);
if (!n_ptr) if (!node)
return; return;
}
tipc_node_lock(n_ptr);
/* Prepare to validate requesting node's signature and media address */ tipc_node_lock(node);
link = n_ptr->links[b_ptr->identity]; link = node->links[bearer->identity];
addr_mismatch = (link != NULL) &&
memcmp(&link->media_addr, &media_addr, sizeof(media_addr));
/* /* Prepare to validate requesting node's signature and media address */
* Ensure discovery message's signature is correct sign_match = (signature == node->signature);
* addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr));
* If signature is incorrect and there is no working link to the node, link_up = link && tipc_link_is_up(link);
* accept the new signature but invalidate all existing links to the
* node so they won't re-activate without a new discovery message.
* /* These three flags give us eight permutations: */
* If signature is incorrect and the requested link to the node is
* working, accept the new signature. (This is an instance of delayed if (sign_match && addr_match && link_up) {
* rediscovery, where a link endpoint was able to re-establish contact /* All is fine. Do nothing. */
* with its peer endpoint on a node that rebooted before receiving a } else if (sign_match && addr_match && !link_up) {
* discovery message from that node.) /* Respond. The link will come up in due time */
* respond = true;
* If signature is incorrect and there is a working link to the node } else if (sign_match && !addr_match && link_up) {
* that is not the requested link, reject the request (must be from /* Peer has changed i/f address without rebooting.
* a duplicate node). * If so, the link will reset soon, and the next
* discovery will be accepted. So we can ignore it.
* It may also be an cloned or malicious peer having
* chosen the same node address and signature as an
* existing one.
* Ignore requests until the link goes down, if ever.
*/ */
if (signature != n_ptr->signature) { disc_dupl_alert(bearer, onode, &maddr);
if (n_ptr->working_links == 0) { } else if (sign_match && !addr_match && !link_up) {
struct tipc_link *curr_link; /* Peer link has changed i/f address without rebooting.
int i; * It may also be a cloned or malicious peer; we can't
* distinguish between the two.
for (i = 0; i < MAX_BEARERS; i++) { * The signature is correct, so we must accept.
curr_link = n_ptr->links[i];
if (curr_link) {
memset(&curr_link->media_addr, 0,
sizeof(media_addr));
tipc_link_reset(curr_link);
}
}
addr_mismatch = (link != NULL);
} else if (tipc_link_is_up(link) && !addr_mismatch) {
/* delayed rediscovery */
} else {
disc_dupl_alert(b_ptr, orig, &media_addr);
tipc_node_unlock(n_ptr);
return;
}
n_ptr->signature = signature;
}
/*
* Ensure requesting node's media address is correct
*
* If media address doesn't match and the link is working, reject the
* request (must be from a duplicate node).
*
* If media address doesn't match and the link is not working, accept
* the new media address and reset the link to ensure it starts up
* cleanly.
*/ */
if (addr_mismatch) { accept_addr = true;
if (tipc_link_is_up(link)) { respond = true;
disc_dupl_alert(b_ptr, orig, &media_addr); } else if (!sign_match && addr_match && link_up) {
tipc_node_unlock(n_ptr); /* Peer node rebooted. Two possibilities:
return; * - Delayed re-discovery; this link endpoint has already
} else { * reset and re-established contact with the peer, before
memcpy(&link->media_addr, &media_addr, * receiving a discovery message from that node.
sizeof(media_addr)); * (The peer happened to receive one from this node first).
tipc_link_reset(link); * - The peer came back so fast that our side has not
} * discovered it yet. Probing from this side will soon
* reset the link, since there can be no working link
* endpoint at the peer end, and the link will re-establish.
* Accept the signature, since it comes from a known peer.
*/
accept_sign = true;
} else if (!sign_match && addr_match && !link_up) {
/* The peer node has rebooted.
* Accept signature, since it is a known peer.
*/
accept_sign = true;
respond = true;
} else if (!sign_match && !addr_match && link_up) {
/* Peer rebooted with new address, or a new/duplicate peer.
* Ignore until the link goes down, if ever.
*/
disc_dupl_alert(bearer, onode, &maddr);
} else if (!sign_match && !addr_match && !link_up) {
/* Peer rebooted with new address, or it is a new peer.
* Accept signature and address.
*/
accept_sign = true;
accept_addr = true;
respond = true;
} }
/* Create a link endpoint for this bearer, if necessary */ if (accept_sign)
if (!link) { node->signature = signature;
link = tipc_link_create(n_ptr, b_ptr, &media_addr);
if (!link) { if (accept_addr) {
tipc_node_unlock(n_ptr); if (!link)
return; link = tipc_link_create(node, bearer, &maddr);
if (link) {
memcpy(&link->media_addr, &maddr, sizeof(maddr));
tipc_link_reset(link);
} else {
respond = false;
} }
} }
/* Accept discovery message & send response, if necessary */ /* Send response, if necessary */
link_fully_up = link_working_working(link); if (respond && (mtyp == DSC_REQ_MSG)) {
if ((type == DSC_REQ_MSG) && !link_fully_up) {
rbuf = tipc_buf_acquire(INT_H_SIZE); rbuf = tipc_buf_acquire(INT_H_SIZE);
if (rbuf) { if (rbuf) {
tipc_disc_init_msg(rbuf, DSC_RESP_MSG, b_ptr); tipc_disc_init_msg(rbuf, DSC_RESP_MSG, bearer);
tipc_bearer_send(b_ptr->identity, rbuf, &media_addr); tipc_bearer_send(bearer->identity, rbuf, &maddr);
kfree_skb(rbuf); kfree_skb(rbuf);
} }
} }
tipc_node_unlock(node);
tipc_node_unlock(n_ptr);
} }
/** /**
......
/* /*
* net/tipc/eth_media.c: Ethernet bearer support for TIPC * net/tipc/eth_media.c: Ethernet bearer support for TIPC
* *
* Copyright (c) 2001-2007, 2013, Ericsson AB * Copyright (c) 2001-2007, 2013-2014, Ericsson AB
* Copyright (c) 2005-2008, 2011-2013, Wind River Systems * Copyright (c) 2005-2008, 2011-2013, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -37,39 +37,52 @@ ...@@ -37,39 +37,52 @@
#include "core.h" #include "core.h"
#include "bearer.h" #include "bearer.h"
#define ETH_ADDR_OFFSET 4 /* message header offset of MAC address */ #define ETH_ADDR_OFFSET 4 /* MAC addr position inside address field */
/* convert Ethernet address to string */ /* Convert Ethernet address (media address format) to string */
static int tipc_eth_addr2str(struct tipc_media_addr *a, char *str_buf, static int tipc_eth_addr2str(struct tipc_media_addr *addr,
int str_size) char *strbuf, int bufsz)
{ {
if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ if (bufsz < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */
return 1; return 1;
sprintf(str_buf, "%pM", a->value); sprintf(strbuf, "%pM", addr->value);
return 0; return 0;
} }
/* convert Ethernet address format to message header format */ /* Convert from media address format to discovery message addr format */
static int tipc_eth_addr2msg(struct tipc_media_addr *a, char *msg_area) static int tipc_eth_addr2msg(char *msg, struct tipc_media_addr *addr)
{ {
memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); memset(msg, 0, TIPC_MEDIA_ADDR_SIZE);
msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH;
memcpy(msg_area + ETH_ADDR_OFFSET, a->value, ETH_ALEN); memcpy(msg + ETH_ADDR_OFFSET, addr->value, ETH_ALEN);
return 0; return 0;
} }
/* convert message header address format to Ethernet format */ /* Convert raw mac address format to media addr format */
static int tipc_eth_msg2addr(const struct tipc_bearer *tb_ptr, static int tipc_eth_raw2addr(struct tipc_bearer *b,
struct tipc_media_addr *a, char *msg_area) struct tipc_media_addr *addr,
char *msg)
{ {
if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH) char bcast_mac[ETH_ALEN] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
return 1;
tipc_l2_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET); memset(addr, 0, sizeof(*addr));
ether_addr_copy(addr->value, msg);
addr->media_id = TIPC_MEDIA_TYPE_ETH;
addr->broadcast = !memcmp(addr->value, bcast_mac, ETH_ALEN);
return 0; return 0;
} }
/* Convert discovery msg addr format to Ethernet media addr format */
static int tipc_eth_msg2addr(struct tipc_bearer *b,
struct tipc_media_addr *addr,
char *msg)
{
/* Skip past preamble: */
msg += ETH_ADDR_OFFSET;
return tipc_eth_raw2addr(b, addr, msg);
}
/* Ethernet media registration info */ /* Ethernet media registration info */
struct tipc_media eth_media_info = { struct tipc_media eth_media_info = {
.send_msg = tipc_l2_send_msg, .send_msg = tipc_l2_send_msg,
...@@ -78,6 +91,7 @@ struct tipc_media eth_media_info = { ...@@ -78,6 +91,7 @@ struct tipc_media eth_media_info = {
.addr2str = tipc_eth_addr2str, .addr2str = tipc_eth_addr2str,
.addr2msg = tipc_eth_addr2msg, .addr2msg = tipc_eth_addr2msg,
.msg2addr = tipc_eth_msg2addr, .msg2addr = tipc_eth_msg2addr,
.raw2addr = tipc_eth_raw2addr,
.priority = TIPC_DEF_LINK_PRI, .priority = TIPC_DEF_LINK_PRI,
.tolerance = TIPC_DEF_LINK_TOL, .tolerance = TIPC_DEF_LINK_TOL,
.window = TIPC_DEF_LINK_WIN, .window = TIPC_DEF_LINK_WIN,
...@@ -85,4 +99,3 @@ struct tipc_media eth_media_info = { ...@@ -85,4 +99,3 @@ struct tipc_media eth_media_info = {
.hwaddr_len = ETH_ALEN, .hwaddr_len = ETH_ALEN,
.name = "eth" .name = "eth"
}; };
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
#include "core.h" #include "core.h"
#include "bearer.h" #include "bearer.h"
/* convert InfiniBand address to string */ /* convert InfiniBand address (media address format) media address to string */
static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf,
int str_size) int str_size)
{ {
...@@ -54,23 +54,35 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, ...@@ -54,23 +54,35 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf,
return 0; return 0;
} }
/* convert InfiniBand address format to message header format */ /* Convert from media address format to discovery message addr format */
static int tipc_ib_addr2msg(struct tipc_media_addr *a, char *msg_area) static int tipc_ib_addr2msg(char *msg, struct tipc_media_addr *addr)
{ {
memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); memset(msg, 0, TIPC_MEDIA_ADDR_SIZE);
msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_IB; memcpy(msg, addr->value, INFINIBAND_ALEN);
memcpy(msg_area, a->value, INFINIBAND_ALEN);
return 0; return 0;
} }
/* convert message header address format to InfiniBand format */ /* Convert raw InfiniBand address format to media addr format */
static int tipc_ib_msg2addr(const struct tipc_bearer *tb_ptr, static int tipc_ib_raw2addr(struct tipc_bearer *b,
struct tipc_media_addr *a, char *msg_area) struct tipc_media_addr *addr,
char *msg)
{ {
tipc_l2_media_addr_set(tb_ptr, a, msg_area); memset(addr, 0, sizeof(*addr));
memcpy(addr->value, msg, INFINIBAND_ALEN);
addr->media_id = TIPC_MEDIA_TYPE_IB;
addr->broadcast = !memcmp(msg, b->bcast_addr.value,
INFINIBAND_ALEN);
return 0; return 0;
} }
/* Convert discovery msg addr format to InfiniBand media addr format */
static int tipc_ib_msg2addr(struct tipc_bearer *b,
struct tipc_media_addr *addr,
char *msg)
{
return tipc_ib_raw2addr(b, addr, msg);
}
/* InfiniBand media registration info */ /* InfiniBand media registration info */
struct tipc_media ib_media_info = { struct tipc_media ib_media_info = {
.send_msg = tipc_l2_send_msg, .send_msg = tipc_l2_send_msg,
...@@ -79,6 +91,7 @@ struct tipc_media ib_media_info = { ...@@ -79,6 +91,7 @@ struct tipc_media ib_media_info = {
.addr2str = tipc_ib_addr2str, .addr2str = tipc_ib_addr2str,
.addr2msg = tipc_ib_addr2msg, .addr2msg = tipc_ib_addr2msg,
.msg2addr = tipc_ib_msg2addr, .msg2addr = tipc_ib_msg2addr,
.raw2addr = tipc_ib_raw2addr,
.priority = TIPC_DEF_LINK_PRI, .priority = TIPC_DEF_LINK_PRI,
.tolerance = TIPC_DEF_LINK_TOL, .tolerance = TIPC_DEF_LINK_TOL,
.window = TIPC_DEF_LINK_WIN, .window = TIPC_DEF_LINK_WIN,
...@@ -86,4 +99,3 @@ struct tipc_media ib_media_info = { ...@@ -86,4 +99,3 @@ struct tipc_media ib_media_info = {
.hwaddr_len = INFINIBAND_ALEN, .hwaddr_len = INFINIBAND_ALEN,
.name = "ib" .name = "ib"
}; };
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "core.h" #include "core.h"
#include "link.h" #include "link.h"
#include "port.h" #include "port.h"
#include "socket.h"
#include "name_distr.h" #include "name_distr.h"
#include "discover.h" #include "discover.h"
#include "config.h" #include "config.h"
...@@ -398,9 +399,8 @@ static void link_release_outqueue(struct tipc_link *l_ptr) ...@@ -398,9 +399,8 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
*/ */
void tipc_link_reset_fragments(struct tipc_link *l_ptr) void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{ {
kfree_skb(l_ptr->reasm_head); kfree_skb(l_ptr->reasm_buf);
l_ptr->reasm_head = NULL; l_ptr->reasm_buf = NULL;
l_ptr->reasm_tail = NULL;
} }
/** /**
...@@ -1573,17 +1573,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1573,17 +1573,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
} }
msg = buf_msg(buf); msg = buf_msg(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int rc;
l_ptr->stats.recv_fragments++; l_ptr->stats.recv_fragments++;
rc = tipc_link_frag_rcv(&l_ptr->reasm_head, if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) {
&l_ptr->reasm_tail,
&buf);
if (rc == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++; l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf); msg = buf_msg(buf);
} else { } else {
if (rc == LINK_REASM_ERROR) if (!l_ptr->reasm_buf)
tipc_link_reset(l_ptr); tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
continue; continue;
...@@ -1596,7 +1591,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1596,7 +1591,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
case TIPC_HIGH_IMPORTANCE: case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_port_rcv(buf); tipc_sk_rcv(buf);
continue; continue;
case MSG_BUNDLER: case MSG_BUNDLER:
l_ptr->stats.recv_bundles++; l_ptr->stats.recv_bundles++;
...@@ -1831,9 +1826,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -1831,9 +1826,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
if (l_ptr->exp_msg_count) if (l_ptr->exp_msg_count)
goto exit; goto exit;
/* record unnumbered packet arrival (force mismatch on next timeout) */
l_ptr->checkpoint--;
if (l_ptr->net_plane != msg_net_plane(msg)) if (l_ptr->net_plane != msg_net_plane(msg))
if (tipc_own_addr > msg_prevnode(msg)) if (tipc_own_addr > msg_prevnode(msg))
l_ptr->net_plane = msg_net_plane(msg); l_ptr->net_plane = msg_net_plane(msg);
...@@ -1909,6 +1901,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -1909,6 +1901,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
tipc_link_reset(l_ptr); /* Enforce change to take effect */ tipc_link_reset(l_ptr); /* Enforce change to take effect */
break; break;
} }
/* Record reception; force mismatch at next timeout: */
l_ptr->checkpoint--;
link_state_event(l_ptr, TRAFFIC_MSG_EVT); link_state_event(l_ptr, TRAFFIC_MSG_EVT);
l_ptr->stats.recv_states++; l_ptr->stats.recv_states++;
if (link_reset_unknown(l_ptr)) if (link_reset_unknown(l_ptr))
...@@ -2168,9 +2164,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, ...@@ -2168,9 +2164,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
} }
if (msg_user(msg) == MSG_FRAGMENTER) { if (msg_user(msg) == MSG_FRAGMENTER) {
l_ptr->stats.recv_fragments++; l_ptr->stats.recv_fragments++;
tipc_link_frag_rcv(&l_ptr->reasm_head, tipc_buf_append(&l_ptr->reasm_buf, &buf);
&l_ptr->reasm_tail,
&buf);
} }
} }
exit: exit:
...@@ -2308,53 +2302,6 @@ static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) ...@@ -2308,53 +2302,6 @@ static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
return dsz; return dsz;
} }
/* tipc_link_frag_rcv(): Called with node lock on. Returns
* the reassembled buffer if message is complete.
*/
int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff **fbuf)
{
struct sk_buff *frag = *fbuf;
struct tipc_msg *msg = buf_msg(frag);
u32 fragid = msg_type(msg);
bool headstolen;
int delta;
skb_pull(frag, msg_hdr_sz(msg));
if (fragid == FIRST_FRAGMENT) {
if (*head || skb_unclone(frag, GFP_ATOMIC))
goto out_free;
*head = frag;
skb_frag_list_init(*head);
*fbuf = NULL;
return 0;
} else if (*head &&
skb_try_coalesce(*head, frag, &headstolen, &delta)) {
kfree_skb_partial(frag, headstolen);
} else {
if (!*head)
goto out_free;
if (!skb_has_frag_list(*head))
skb_shinfo(*head)->frag_list = frag;
else
(*tail)->next = frag;
*tail = frag;
(*head)->truesize += frag->truesize;
}
if (fragid == LAST_FRAGMENT) {
*fbuf = *head;
*tail = *head = NULL;
return LINK_REASM_COMPLETE;
}
*fbuf = NULL;
return 0;
out_free:
pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
kfree_skb(*fbuf);
*fbuf = NULL;
return LINK_REASM_ERROR;
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
{ {
if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
......
...@@ -40,11 +40,6 @@ ...@@ -40,11 +40,6 @@
#include "msg.h" #include "msg.h"
#include "node.h" #include "node.h"
/* Link reassembly status codes
*/
#define LINK_REASM_ERROR -1
#define LINK_REASM_COMPLETE 1
/* Out-of-range value for link sequence numbers /* Out-of-range value for link sequence numbers
*/ */
#define INVALID_LINK_SEQ 0x10000 #define INVALID_LINK_SEQ 0x10000
...@@ -140,8 +135,7 @@ struct tipc_stats { ...@@ -140,8 +135,7 @@ struct tipc_stats {
* @next_out: ptr to first unsent outbound message in queue * @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate * @waiting_ports: linked list of ports waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages * @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_head: list head of partially reassembled inbound message fragments * @reasm_buf: head of partially reassembled inbound message fragments
* @reasm_tail: last fragment received
* @stats: collects statistics regarding link activity * @stats: collects statistics regarding link activity
*/ */
struct tipc_link { struct tipc_link {
...@@ -204,8 +198,7 @@ struct tipc_link { ...@@ -204,8 +198,7 @@ struct tipc_link {
/* Fragmentation/reassembly */ /* Fragmentation/reassembly */
u32 long_msg_seq_no; u32 long_msg_seq_no;
struct sk_buff *reasm_head; struct sk_buff *reasm_buf;
struct sk_buff *reasm_tail;
/* Statistics */ /* Statistics */
struct tipc_stats stats; struct tipc_stats stats;
...@@ -242,9 +235,6 @@ int tipc_link_iovec_xmit_fast(struct tipc_port *sender, ...@@ -242,9 +235,6 @@ int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
struct iovec const *msg_sect, struct iovec const *msg_sect,
unsigned int len, u32 destnode); unsigned int len, u32 destnode);
void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_bundle_rcv(struct sk_buff *buf);
int tipc_link_frag_rcv(struct sk_buff **reasm_head,
struct sk_buff **reasm_tail,
struct sk_buff **fbuf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
void tipc_link_push_queue(struct tipc_link *l_ptr); void tipc_link_push_queue(struct tipc_link *l_ptr);
......
/* /*
* net/tipc/msg.c: TIPC message header routines * net/tipc/msg.c: TIPC message header routines
* *
* Copyright (c) 2000-2006, Ericsson AB * Copyright (c) 2000-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -99,3 +99,56 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, ...@@ -99,3 +99,56 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
} }
return dsz; return dsz;
} }
/* tipc_buf_append(): Append a buffer to the fragment list of another buffer
* Let first buffer become head buffer
* Returns 1 and sets *buf to headbuf if chain is complete, otherwise 0
* Leaves headbuf pointer at NULL if failure
*/
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
{
struct sk_buff *head = *headbuf;
struct sk_buff *frag = *buf;
struct sk_buff *tail;
struct tipc_msg *msg = buf_msg(frag);
u32 fragid = msg_type(msg);
bool headstolen;
int delta;
skb_pull(frag, msg_hdr_sz(msg));
if (fragid == FIRST_FRAGMENT) {
if (head || skb_unclone(frag, GFP_ATOMIC))
goto out_free;
head = *headbuf = frag;
skb_frag_list_init(head);
return 0;
}
if (!head)
goto out_free;
tail = TIPC_SKB_CB(head)->tail;
if (skb_try_coalesce(head, frag, &headstolen, &delta)) {
kfree_skb_partial(frag, headstolen);
} else {
if (!skb_has_frag_list(head))
skb_shinfo(head)->frag_list = frag;
else
tail->next = frag;
head->truesize += frag->truesize;
head->data_len += frag->len;
head->len += frag->len;
TIPC_SKB_CB(head)->tail = frag;
}
if (fragid == LAST_FRAGMENT) {
*buf = head;
TIPC_SKB_CB(head)->tail = NULL;
*headbuf = NULL;
return 1;
}
*buf = NULL;
return 0;
out_free:
pr_warn_ratelimited("Unable to build fragment list\n");
kfree_skb(*buf);
return 0;
}
/* /*
* net/tipc/msg.h: Include file for TIPC message header routines * net/tipc/msg.h: Include file for TIPC message header routines
* *
* Copyright (c) 2000-2007, Ericsson AB * Copyright (c) 2000-2007, 2014, Ericsson AB
* Copyright (c) 2005-2008, 2010-2011, Wind River Systems * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -711,4 +711,7 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, ...@@ -711,4 +711,7 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode); u32 destnode);
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
unsigned int len, int max_size, struct sk_buff **buf); unsigned int len, int max_size, struct sk_buff **buf);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
#endif #endif
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include "name_distr.h" #include "name_distr.h"
#include "subscr.h" #include "subscr.h"
#include "port.h" #include "port.h"
#include "socket.h"
#include "node.h" #include "node.h"
#include "config.h" #include "config.h"
...@@ -141,7 +142,7 @@ void tipc_net_route_msg(struct sk_buff *buf) ...@@ -141,7 +142,7 @@ void tipc_net_route_msg(struct sk_buff *buf)
if (msg_mcast(msg)) if (msg_mcast(msg))
tipc_port_mcast_rcv(buf, NULL); tipc_port_mcast_rcv(buf, NULL);
else if (msg_destport(msg)) else if (msg_destport(msg))
tipc_port_rcv(buf); tipc_sk_rcv(buf);
else else
net_route_named_msg(buf); net_route_named_msg(buf);
return; return;
......
...@@ -286,10 +286,9 @@ static void node_lost_contact(struct tipc_node *n_ptr) ...@@ -286,10 +286,9 @@ static void node_lost_contact(struct tipc_node *n_ptr)
kfree_skb_list(n_ptr->bclink.deferred_head); kfree_skb_list(n_ptr->bclink.deferred_head);
n_ptr->bclink.deferred_size = 0; n_ptr->bclink.deferred_size = 0;
if (n_ptr->bclink.reasm_head) { if (n_ptr->bclink.reasm_buf) {
kfree_skb(n_ptr->bclink.reasm_head); kfree_skb(n_ptr->bclink.reasm_buf);
n_ptr->bclink.reasm_head = NULL; n_ptr->bclink.reasm_buf = NULL;
n_ptr->bclink.reasm_tail = NULL;
} }
tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_remove_node(n_ptr->addr);
......
...@@ -69,8 +69,7 @@ enum { ...@@ -69,8 +69,7 @@ enum {
* @deferred_size: number of OOS b'cast messages in deferred queue * @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node * @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node
* @reasm_head: broadcast reassembly queue head from node * @reasm_buf: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages * @recv_permitted: true if node is allowed to receive b'cast messages
*/ */
struct tipc_node_bclink { struct tipc_node_bclink {
...@@ -81,8 +80,7 @@ struct tipc_node_bclink { ...@@ -81,8 +80,7 @@ struct tipc_node_bclink {
u32 deferred_size; u32 deferred_size;
struct sk_buff *deferred_head; struct sk_buff *deferred_head;
struct sk_buff *deferred_tail; struct sk_buff *deferred_tail;
struct sk_buff *reasm_head; struct sk_buff *reasm_buf;
struct sk_buff *reasm_tail;
bool recv_permitted; bool recv_permitted;
}; };
......
...@@ -165,7 +165,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) ...@@ -165,7 +165,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
msg_set_destnode(msg, tipc_own_addr); msg_set_destnode(msg, tipc_own_addr);
if (dp->count == 1) { if (dp->count == 1) {
msg_set_destport(msg, dp->ports[0]); msg_set_destport(msg, dp->ports[0]);
tipc_port_rcv(buf); tipc_sk_rcv(buf);
tipc_port_list_free(dp); tipc_port_list_free(dp);
return; return;
} }
...@@ -180,7 +180,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) ...@@ -180,7 +180,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
if ((index == 0) && (cnt != 0)) if ((index == 0) && (cnt != 0))
item = item->next; item = item->next;
msg_set_destport(buf_msg(b), item->ports[index]); msg_set_destport(buf_msg(b), item->ports[index]);
tipc_port_rcv(b); tipc_sk_rcv(b);
} }
} }
exit: exit:
...@@ -343,7 +343,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) ...@@ -343,7 +343,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
/* send returned message & dispose of rejected message */ /* send returned message & dispose of rejected message */
src_node = msg_prevnode(msg); src_node = msg_prevnode(msg);
if (in_own_node(src_node)) if (in_own_node(src_node))
tipc_port_rcv(rbuf); tipc_sk_rcv(rbuf);
else else
tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
exit: exit:
...@@ -754,37 +754,6 @@ int tipc_port_shutdown(u32 ref) ...@@ -754,37 +754,6 @@ int tipc_port_shutdown(u32 ref)
return tipc_port_disconnect(ref); return tipc_port_disconnect(ref);
} }
/**
* tipc_port_rcv - receive message from lower layer and deliver to port user
*/
int tipc_port_rcv(struct sk_buff *buf)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg = buf_msg(buf);
u32 destport = msg_destport(msg);
u32 dsz = msg_data_sz(msg);
u32 err;
/* forward unresolved named message */
if (unlikely(!destport)) {
tipc_net_route_msg(buf);
return dsz;
}
/* validate destination & pass to port, otherwise reject message */
p_ptr = tipc_port_lock(destport);
if (likely(p_ptr)) {
err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf);
tipc_port_unlock(p_ptr);
if (likely(!err))
return dsz;
} else {
err = TIPC_ERR_NO_PORT;
}
return tipc_reject_msg(buf, err);
}
/* /*
* tipc_port_iovec_rcv: Concatenate and deliver sectioned * tipc_port_iovec_rcv: Concatenate and deliver sectioned
* message for this node. * message for this node.
...@@ -798,7 +767,7 @@ static int tipc_port_iovec_rcv(struct tipc_port *sender, ...@@ -798,7 +767,7 @@ static int tipc_port_iovec_rcv(struct tipc_port *sender,
res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
if (likely(buf)) if (likely(buf))
tipc_port_rcv(buf); tipc_sk_rcv(buf);
return res; return res;
} }
......
...@@ -42,8 +42,9 @@ ...@@ -42,8 +42,9 @@
#include "msg.h" #include "msg.h"
#include "node_subscr.h" #include "node_subscr.h"
#define TIPC_FLOW_CONTROL_WIN 512 #define TIPC_CONNACK_INTV 256
#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ #define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
/** /**
...@@ -134,7 +135,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); ...@@ -134,7 +135,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
/* /*
* TIPC messaging routines * TIPC messaging routines
*/ */
int tipc_port_rcv(struct sk_buff *buf);
int tipc_send(struct tipc_port *port, int tipc_send(struct tipc_port *port,
struct iovec const *msg_sect, struct iovec const *msg_sect,
...@@ -187,7 +187,7 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) ...@@ -187,7 +187,7 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
static inline int tipc_port_congested(struct tipc_port *p_ptr) static inline int tipc_port_congested(struct tipc_port *p_ptr)
{ {
return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
} }
......
/* /*
* net/tipc/socket.c: TIPC socket API * net/tipc/socket.c: TIPC socket API
* *
* Copyright (c) 2001-2007, 2012-2014, Ericsson AB * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
* Copyright (c) 2004-2008, 2010-2013, Wind River Systems * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
static int backlog_rcv(struct sock *sk, struct sk_buff *skb); static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
static void tipc_data_ready(struct sock *sk); static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk); static void tipc_write_space(struct sock *sk);
static int tipc_release(struct socket *sock); static int tipc_release(struct socket *sock);
...@@ -196,11 +196,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -196,11 +196,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sock->state = state; sock->state = state;
sock_init_data(sock, sk); sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv; sk->sk_backlog_rcv = tipc_backlog_rcv;
sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_rcvbuf = sysctl_tipc_rmem[1];
sk->sk_data_ready = tipc_data_ready; sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port); tipc_port_unlock(port);
if (sock->state == SS_READY) { if (sock->state == SS_READY) {
...@@ -1101,7 +1102,7 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, ...@@ -1101,7 +1102,7 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
/* Consume received message (optional) */ /* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) { if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) && if ((sock->state != SS_READY) &&
(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) (++port->conn_unacked >= TIPC_CONNACK_INTV))
tipc_acknowledge(port->ref, port->conn_unacked); tipc_acknowledge(port->ref, port->conn_unacked);
advance_rx_queue(sk); advance_rx_queue(sk);
} }
...@@ -1210,7 +1211,7 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, ...@@ -1210,7 +1211,7 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
/* Consume received message (optional) */ /* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) { if (likely(!(flags & MSG_PEEK))) {
if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
tipc_acknowledge(port->ref, port->conn_unacked); tipc_acknowledge(port->ref, port->conn_unacked);
advance_rx_queue(sk); advance_rx_queue(sk);
} }
...@@ -1416,7 +1417,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1416,7 +1417,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
} }
/** /**
* backlog_rcv - handle incoming message from backlog queue * tipc_backlog_rcv - handle incoming message from backlog queue
* @sk: socket * @sk: socket
* @buf: message * @buf: message
* *
...@@ -1424,47 +1425,73 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1424,47 +1425,73 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
* *
* Returns 0 * Returns 0
*/ */
static int backlog_rcv(struct sock *sk, struct sk_buff *buf) static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
{ {
u32 res; u32 res;
struct tipc_sock *tsk = tipc_sk(sk);
res = filter_rcv(sk, buf); res = filter_rcv(sk, buf);
if (res) if (unlikely(res))
tipc_reject_msg(buf, res); tipc_reject_msg(buf, res);
if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
atomic_add(buf->truesize, &tsk->dupl_rcvcnt);
return 0; return 0;
} }
/** /**
* tipc_sk_rcv - handle incoming message * tipc_sk_rcv - handle incoming message
* @sk: socket receiving message * @buf: buffer containing arriving message
* @buf: message * Consumes buffer
* * Returns 0 if success, or errno: -EHOSTUNREACH
* Called with port lock already taken.
*
* Returns TIPC error status code (TIPC_OK if message is not to be rejected)
*/ */
u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) int tipc_sk_rcv(struct sk_buff *buf)
{ {
u32 res; struct tipc_sock *tsk;
struct tipc_port *port;
struct sock *sk;
u32 dport = msg_destport(buf_msg(buf));
int err = TIPC_OK;
uint limit;
/* /* Forward unresolved named message */
* Process message if socket is unlocked; otherwise add to backlog queue if (unlikely(!dport)) {
* tipc_net_route_msg(buf);
* This code is based on sk_receive_skb(), but must be distinct from it return 0;
* since a TIPC-specific filter/reject mechanism is utilized }
*/
/* Validate destination */
port = tipc_port_lock(dport);
if (unlikely(!port)) {
err = TIPC_ERR_NO_PORT;
goto exit;
}
tsk = tipc_port_to_sock(port);
sk = &tsk->sk;
/* Queue message */
bh_lock_sock(sk); bh_lock_sock(sk);
if (!sock_owned_by_user(sk)) { if (!sock_owned_by_user(sk)) {
res = filter_rcv(sk, buf); err = filter_rcv(sk, buf);
} else { } else {
if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf))) if (sk->sk_backlog.len == 0)
res = TIPC_ERR_OVERLOAD; atomic_set(&tsk->dupl_rcvcnt, 0);
else limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
res = TIPC_OK; if (sk_add_backlog(sk, buf, limit))
err = TIPC_ERR_OVERLOAD;
} }
bh_unlock_sock(sk); bh_unlock_sock(sk);
tipc_port_unlock(port);
return res; if (likely(!err))
return 0;
exit:
tipc_reject_msg(buf, err);
return -EHOSTUNREACH;
} }
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
......
...@@ -44,12 +44,14 @@ ...@@ -44,12 +44,14 @@
* @port: port - interacts with 'sk' and with the rest of the TIPC stack * @port: port - interacts with 'sk' and with the rest of the TIPC stack
* @peer_name: the peer of the connection, if any * @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request * @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
*/ */
struct tipc_sock { struct tipc_sock {
struct sock sk; struct sock sk;
struct tipc_port port; struct tipc_port port;
unsigned int conn_timeout; unsigned int conn_timeout;
atomic_t dupl_rcvcnt;
}; };
static inline struct tipc_sock *tipc_sk(const struct sock *sk) static inline struct tipc_sock *tipc_sk(const struct sock *sk)
...@@ -67,6 +69,6 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) ...@@ -67,6 +69,6 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
tsk->sk.sk_write_space(&tsk->sk); tsk->sk.sk_write_space(&tsk->sk);
} }
u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf); int tipc_sk_rcv(struct sk_buff *buf);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment