Commit 01fd12bb authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: make replicast a user selectable option

If the bearer carrying multicast messages supports broadcast, those
messages will be sent to all cluster nodes, irrespective of whether
these nodes host any actual destinations socket or not. This is clearly
wasteful if the cluster is large and there are only a few real
destinations for the message being sent.

In this commit we extend the eligibility of the newly introduced
"replicast" transmit option. We now make it possible for a user to
select which method he wants to be used, either as a mandatory setting
via setsockopt(), or as a relative setting where we let the broadcast
layer decide which method to use based on the ratio between cluster
size and the message's actual number of destination nodes.

In the latter case, a sending socket must stick to a previously
selected method until it enters an idle period of at least 5 seconds.
This eliminates the risk of message reordering caused by method change,
i.e., when changes to cluster size or number of destinations would
otherwise mandate a new method to be used.
Reviewed-by: default avatarParthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent a853e4c6
/* /*
* include/uapi/linux/tipc.h: Header for TIPC socket interface * include/uapi/linux/tipc.h: Header for TIPC socket interface
* *
* Copyright (c) 2003-2006, Ericsson AB * Copyright (c) 2003-2006, 2015-2016 Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -220,7 +220,7 @@ struct sockaddr_tipc { ...@@ -220,7 +220,7 @@ struct sockaddr_tipc {
#define TIPC_DESTNAME 3 /* destination name */ #define TIPC_DESTNAME 3 /* destination name */
/* /*
* TIPC-specific socket option values * TIPC-specific socket option names
*/ */
#define TIPC_IMPORTANCE 127 /* Default: TIPC_LOW_IMPORTANCE */ #define TIPC_IMPORTANCE 127 /* Default: TIPC_LOW_IMPORTANCE */
...@@ -229,6 +229,8 @@ struct sockaddr_tipc { ...@@ -229,6 +229,8 @@ struct sockaddr_tipc {
#define TIPC_CONN_TIMEOUT 130 /* Default: 8000 (ms) */ #define TIPC_CONN_TIMEOUT 130 /* Default: 8000 (ms) */
#define TIPC_NODE_RECVQ_DEPTH 131 /* Default: none (read only) */ #define TIPC_NODE_RECVQ_DEPTH 131 /* Default: none (read only) */
#define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */ #define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */
#define TIPC_MCAST_BROADCAST 133 /* Default: TIPC selects. No arg */
#define TIPC_MCAST_REPLICAST 134 /* Default: TIPC selects. No arg */
/* /*
* Maximum sizes of TIPC bearer-related names (including terminating NULL) * Maximum sizes of TIPC bearer-related names (including terminating NULL)
......
...@@ -54,6 +54,9 @@ const char tipc_bclink_name[] = "broadcast-link"; ...@@ -54,6 +54,9 @@ const char tipc_bclink_name[] = "broadcast-link";
* @dest: array keeping number of reachable destinations per bearer * @dest: array keeping number of reachable destinations per bearer
* @primary_bearer: a bearer having links to all broadcast destinations, if any * @primary_bearer: a bearer having links to all broadcast destinations, if any
* @bcast_support: indicates if primary bearer, if any, supports broadcast * @bcast_support: indicates if primary bearer, if any, supports broadcast
* @rcast_support: indicates if all peer nodes support replicast
* @rc_ratio: dest count as percentage of cluster size where send method changes
* @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
*/ */
struct tipc_bc_base { struct tipc_bc_base {
struct tipc_link *link; struct tipc_link *link;
...@@ -61,6 +64,9 @@ struct tipc_bc_base { ...@@ -61,6 +64,9 @@ struct tipc_bc_base {
int dests[MAX_BEARERS]; int dests[MAX_BEARERS];
int primary_bearer; int primary_bearer;
bool bcast_support; bool bcast_support;
bool rcast_support;
int rc_ratio;
int bc_threshold;
}; };
static struct tipc_bc_base *tipc_bc_base(struct net *net) static struct tipc_bc_base *tipc_bc_base(struct net *net)
...@@ -73,6 +79,19 @@ int tipc_bcast_get_mtu(struct net *net) ...@@ -73,6 +79,19 @@ int tipc_bcast_get_mtu(struct net *net)
return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
} }
void tipc_bcast_disable_rcast(struct net *net)
{
tipc_bc_base(net)->rcast_support = false;
}
static void tipc_bcbase_calc_bc_threshold(struct net *net)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
}
/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, /* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
* if any, and make it primary bearer * if any, and make it primary bearer
*/ */
...@@ -175,6 +194,31 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) ...@@ -175,6 +194,31 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
__skb_queue_purge(&_xmitq); __skb_queue_purge(&_xmitq);
} }
static void tipc_bcast_select_xmit_method(struct net *net, int dests,
struct tipc_mc_method *method)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
unsigned long exp = method->expires;
/* Broadcast supported by used bearer/bearers? */
if (!bb->bcast_support) {
method->rcast = true;
return;
}
/* Any destinations which don't support replicast ? */
if (!bb->rcast_support) {
method->rcast = false;
return;
}
/* Can current method be changed ? */
method->expires = jiffies + TIPC_METHOD_EXPIRE;
if (method->mandatory || time_before(jiffies, exp))
return;
/* Determine method to use now */
method->rcast = dests <= bb->bc_threshold;
}
/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes /* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
* @net: the applicable net namespace * @net: the applicable net namespace
* @pkts: chain of buffers containing message * @pkts: chain of buffers containing message
...@@ -237,16 +281,16 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, ...@@ -237,16 +281,16 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
* @pkts: chain of buffers containing message * @pkts: chain of buffers containing message
* @dests: destination nodes for message. Not consumed. * @method: send method to be used
* @dests: destination nodes for message.
* @cong_link_cnt: returns number of encountered congested destination links * @cong_link_cnt: returns number of encountered congested destination links
* @cong_links: returns identities of congested links
* Consumes buffer chain. * Consumes buffer chain.
* Returns 0 if success, otherwise errno * Returns 0 if success, otherwise errno
*/ */
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
struct tipc_nlist *dests, u16 *cong_link_cnt) struct tipc_mc_method *method, struct tipc_nlist *dests,
u16 *cong_link_cnt)
{ {
struct tipc_bc_base *bb = tipc_bc_base(net);
struct sk_buff_head inputq, localq; struct sk_buff_head inputq, localq;
int rc = 0; int rc = 0;
...@@ -258,9 +302,10 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, ...@@ -258,9 +302,10 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
rc = -ENOMEM; rc = -ENOMEM;
goto exit; goto exit;
} }
/* Send according to determined transmit method */
if (dests->remote) { if (dests->remote) {
if (!bb->bcast_support) tipc_bcast_select_xmit_method(net, dests->remote, method);
if (method->rcast)
rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
else else
rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
...@@ -269,6 +314,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, ...@@ -269,6 +314,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
if (dests->local) if (dests->local)
tipc_sk_mcast_rcv(net, &localq, &inputq); tipc_sk_mcast_rcv(net, &localq, &inputq);
exit: exit:
/* This queue should normally be empty by now */
__skb_queue_purge(pkts); __skb_queue_purge(pkts);
return rc; return rc;
} }
...@@ -377,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, ...@@ -377,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
tipc_bcast_lock(net); tipc_bcast_lock(net);
tipc_link_add_bc_peer(snd_l, uc_l, xmitq); tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
tipc_bcbase_select_primary(net); tipc_bcbase_select_primary(net);
tipc_bcbase_calc_bc_threshold(net);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
} }
...@@ -395,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) ...@@ -395,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
tipc_bcast_lock(net); tipc_bcast_lock(net);
tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
tipc_bcbase_select_primary(net); tipc_bcbase_select_primary(net);
tipc_bcbase_calc_bc_threshold(net);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
tipc_bcbase_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
...@@ -477,6 +525,8 @@ int tipc_bcast_init(struct net *net) ...@@ -477,6 +525,8 @@ int tipc_bcast_init(struct net *net)
goto enomem; goto enomem;
bb->link = l; bb->link = l;
tn->bcl = l; tn->bcl = l;
bb->rc_ratio = 25;
bb->rcast_support = true;
return 0; return 0;
enomem: enomem:
kfree(bb); kfree(bb);
......
...@@ -46,6 +46,8 @@ struct tipc_nlist; ...@@ -46,6 +46,8 @@ struct tipc_nlist;
struct tipc_nitem; struct tipc_nitem;
extern const char tipc_bclink_name[]; extern const char tipc_bclink_name[];
#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
struct tipc_nlist { struct tipc_nlist {
struct list_head list; struct list_head list;
u32 self; u32 self;
...@@ -58,6 +60,17 @@ void tipc_nlist_purge(struct tipc_nlist *nl); ...@@ -58,6 +60,17 @@ void tipc_nlist_purge(struct tipc_nlist *nl);
void tipc_nlist_add(struct tipc_nlist *nl, u32 node); void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
void tipc_nlist_del(struct tipc_nlist *nl, u32 node); void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
/* Cookie to be used between socket and broadcast layer
* @rcast: replicast (instead of broadcast) was used at previous xmit
* @mandatory: broadcast/replicast indication was set by user
* @expires: re-evaluate non-mandatory transmit method if we are past this
*/
struct tipc_mc_method {
bool rcast;
bool mandatory;
unsigned long expires;
};
int tipc_bcast_init(struct net *net); int tipc_bcast_init(struct net *net);
void tipc_bcast_stop(struct net *net); void tipc_bcast_stop(struct net *net);
void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
...@@ -66,8 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); ...@@ -66,8 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
int tipc_bcast_get_mtu(struct net *net); int tipc_bcast_get_mtu(struct net *net);
void tipc_bcast_disable_rcast(struct net *net);
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
struct tipc_nlist *dests, u16 *cong_link_cnt); struct tipc_mc_method *method, struct tipc_nlist *dests,
u16 *cong_link_cnt);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr); struct tipc_msg *hdr);
......
...@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, ...@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
if (link_is_bc_sndlink(l)) if (link_is_bc_sndlink(l))
l->state = LINK_ESTABLISHED; l->state = LINK_ESTABLISHED;
/* Disable replicast if even a single peer doesn't support it */
if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
tipc_bcast_disable_rcast(net);
return true; return true;
} }
......
...@@ -47,11 +47,13 @@ ...@@ -47,11 +47,13 @@
enum { enum {
TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BCAST_STATE_NACK = (1 << 2), TIPC_BCAST_STATE_NACK = (1 << 2),
TIPC_BLOCK_FLOWCTL = (1 << 3) TIPC_BLOCK_FLOWCTL = (1 << 3),
TIPC_BCAST_RCAST = (1 << 4)
}; };
#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
TIPC_BCAST_STATE_NACK | \ TIPC_BCAST_STATE_NACK | \
TIPC_BCAST_RCAST | \
TIPC_BLOCK_FLOWCTL) TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1 #define INVALID_BEARER_ID -1
......
...@@ -79,6 +79,7 @@ enum { ...@@ -79,6 +79,7 @@ enum {
* @rcv_unacked: # messages read by user, but not yet acked back to peer * @rcv_unacked: # messages read by user, but not yet acked back to peer
* @peer: 'connected' peer for dgram/rdm * @peer: 'connected' peer for dgram/rdm
* @node: hash table node * @node: hash table node
* @mc_method: cookie for use between socket and broadcast layer
* @rcu: rcu struct for tipc_sock * @rcu: rcu struct for tipc_sock
*/ */
struct tipc_sock { struct tipc_sock {
...@@ -103,6 +104,7 @@ struct tipc_sock { ...@@ -103,6 +104,7 @@ struct tipc_sock {
u16 rcv_win; u16 rcv_win;
struct sockaddr_tipc peer; struct sockaddr_tipc peer;
struct rhash_head node; struct rhash_head node;
struct tipc_mc_method mc_method;
struct rcu_head rcu; struct rcu_head rcu;
}; };
...@@ -740,6 +742,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -740,6 +742,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct tipc_msg *hdr = &tsk->phdr; struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
int mtu = tipc_bcast_get_mtu(net); int mtu = tipc_bcast_get_mtu(net);
struct tipc_mc_method *method = &tsk->mc_method;
u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
struct sk_buff_head pkts; struct sk_buff_head pkts;
struct tipc_nlist dsts; struct tipc_nlist dsts;
...@@ -773,7 +776,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -773,7 +776,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
/* Send message if build was successful */ /* Send message if build was successful */
if (unlikely(rc == dlen)) if (unlikely(rc == dlen))
rc = tipc_mcast_xmit(net, &pkts, &dsts, rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
&tsk->cong_link_cnt); &tsk->cong_link_cnt);
tipc_nlist_purge(&dsts); tipc_nlist_purge(&dsts);
...@@ -2344,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, ...@@ -2344,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
u32 value; u32 value = 0;
int res; int res;
if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
return 0; return 0;
if (lvl != SOL_TIPC) if (lvl != SOL_TIPC)
return -ENOPROTOOPT; return -ENOPROTOOPT;
switch (opt) {
case TIPC_IMPORTANCE:
case TIPC_SRC_DROPPABLE:
case TIPC_DEST_DROPPABLE:
case TIPC_CONN_TIMEOUT:
if (ol < sizeof(value)) if (ol < sizeof(value))
return -EINVAL; return -EINVAL;
res = get_user(value, (u32 __user *)ov); res = get_user(value, (u32 __user *)ov);
if (res) if (res)
return res; return res;
break;
default:
if (ov || ol)
return -EINVAL;
}
lock_sock(sk); lock_sock(sk);
...@@ -2376,6 +2390,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, ...@@ -2376,6 +2390,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
tipc_sk(sk)->conn_timeout = value; tipc_sk(sk)->conn_timeout = value;
/* no need to set "res", since already 0 at this point */ /* no need to set "res", since already 0 at this point */
break; break;
case TIPC_MCAST_BROADCAST:
tsk->mc_method.rcast = false;
tsk->mc_method.mandatory = true;
break;
case TIPC_MCAST_REPLICAST:
tsk->mc_method.rcast = true;
tsk->mc_method.mandatory = true;
break;
default: default:
res = -EINVAL; res = -EINVAL;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment