Commit 27bd9ec0 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: introduce group unicast messaging

We now make it possible to send connectionless unicast messages
within a communication group. To send a message, the sender can use
either a direct port address, aka port identity, or an indirect port
name to be looked up.

This type of messages are subject to the same start synchronization
and flow control mechanism as group broadcast messages.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent b7d42635
...@@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp, ...@@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
return NULL; return NULL;
} }
static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
u32 node, u32 port)
{
struct tipc_member *m;
m = tipc_group_find_member(grp, node, port);
if (m && tipc_group_is_enabled(m))
return m;
return NULL;
}
static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
u32 node) u32 node)
{ {
...@@ -318,9 +329,39 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len) ...@@ -318,9 +329,39 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len)
grp->bc_snd_nxt++; grp->bc_snd_nxt++;
} }
bool tipc_group_bc_cong(struct tipc_group *grp, int len) bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int len, struct tipc_member **mbr)
{ {
struct sk_buff_head xmitq;
struct tipc_member *m; struct tipc_member *m;
int adv, state;
m = tipc_group_find_dest(grp, dnode, dport);
*mbr = m;
if (!m)
return false;
if (m->usr_pending)
return true;
if (m->window >= len)
return false;
m->usr_pending = true;
/* If not fully advertised, do it now to prevent mutual blocking */
adv = m->advertised;
state = m->state;
if (state < MBR_JOINED)
return true;
if (state == MBR_JOINED && adv == ADV_IDLE)
return true;
skb_queue_head_init(&xmitq);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
tipc_node_distr_xmit(grp->net, &xmitq);
return true;
}
bool tipc_group_bc_cong(struct tipc_group *grp, int len)
{
struct tipc_member *m = NULL;
if (list_empty(&grp->congested)) if (list_empty(&grp->congested))
return false; return false;
...@@ -329,7 +370,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) ...@@ -329,7 +370,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
if (m->window >= len) if (m->window >= len)
return false; return false;
return true; return tipc_group_cong(grp, m->node, m->port, len, &m);
} }
/* tipc_group_filter_msg() - determine if we should accept arriving message /* tipc_group_filter_msg() - determine if we should accept arriving message
......
...@@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, ...@@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
struct sk_buff_head *inputq, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_group_update_bc_members(struct tipc_group *grp, int len); void tipc_group_update_bc_members(struct tipc_group *grp, int len);
bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int len, struct tipc_member **m);
bool tipc_group_bc_cong(struct tipc_group *grp, int len); bool tipc_group_bc_cong(struct tipc_group *grp, int len);
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
u32 port, struct sk_buff_head *xmitq); u32 port, struct sk_buff_head *xmitq);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
void tipc_group_update_member(struct tipc_member *m, int len);
int tipc_group_size(struct tipc_group *grp); int tipc_group_size(struct tipc_group *grp);
#endif #endif
...@@ -67,6 +67,7 @@ struct plist; ...@@ -67,6 +67,7 @@ struct plist;
#define TIPC_DIRECT_MSG 3 #define TIPC_DIRECT_MSG 3
#define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_MEMBER_EVT 4
#define TIPC_GRP_BCAST_MSG 5 #define TIPC_GRP_BCAST_MSG 5
#define TIPC_GRP_UCAST_MSG 6
/* /*
* Internal message users * Internal message users
...@@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m) ...@@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m)
{ {
int mtyp = msg_type(m); int mtyp = msg_type(m);
return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT); return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG;
} }
static inline bool msg_is_grp_evt(struct tipc_msg *m) static inline bool msg_is_grp_evt(struct tipc_msg *m)
......
...@@ -817,6 +817,93 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, ...@@ -817,6 +817,93 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
return rc ? rc : dlen; return rc ? rc : dlen;
} }
/**
* tipc_send_group_msg - send a message to a member in the group
* @net: network namespace
* @m: message to send
* @mb: group member
* @dnode: destination node
* @dport: destination port
* @dlen: total length of message data
*/
static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
struct msghdr *m, struct tipc_member *mb,
u32 dnode, u32 dport, int dlen)
{
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
struct sk_buff_head pkts;
int mtu, rc;
/* Complete message header */
msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, dport);
msg_set_destnode(hdr, dnode);
/* Build message as chain of buffers */
skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc;
/* Send message */
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tipc_dest_push(&tsk->cong_links, dnode, 0);
tsk->cong_link_cnt++;
}
/* Update send window and sequence number */
tipc_group_update_member(mb, blks);
return dlen;
}
/**
* tipc_send_group_unicast - send message to a member in the group
* @sock: socket structure
* @m: message to send
* @dlen: total length of message data
* @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
int dlen, long timeout)
{
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
struct net *net = sock_net(sk);
struct tipc_member *mb = NULL;
u32 node, port;
int rc;
node = dest->addr.id.node;
port = dest->addr.id.ref;
if (!port && !node)
return -EHOSTUNREACH;
/* Block or return if destination link or member is congested */
rc = tipc_wait_for_cond(sock, &timeout,
!tipc_dest_find(&tsk->cong_links, node, 0) &&
!tipc_group_cong(grp, node, port, blks, &mb));
if (unlikely(rc))
return rc;
if (unlikely(!mb))
return -EHOSTUNREACH;
rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
return rc ? rc : dlen;
}
/** /**
* tipc_send_group_bcast - send message to all members in communication group * tipc_send_group_bcast - send message to all members in communication group
* @sk: socket structure * @sk: socket structure
...@@ -1030,8 +1117,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) ...@@ -1030,8 +1117,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE; return -EMSGSIZE;
if (unlikely(grp && !dest)) if (likely(dest)) {
if (unlikely(m->msg_namelen < sizeof(*dest)))
return -EINVAL;
if (unlikely(dest->family != AF_TIPC))
return -EINVAL;
}
if (grp) {
if (!dest)
return tipc_send_group_bcast(sock, m, dlen, timeout); return tipc_send_group_bcast(sock, m, dlen, timeout);
if (dest->addrtype == TIPC_ADDR_ID)
return tipc_send_group_unicast(sock, m, dlen, timeout);
return -EINVAL;
}
if (unlikely(!dest)) { if (unlikely(!dest)) {
dest = &tsk->peer; dest = &tsk->peer;
...@@ -1039,12 +1138,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) ...@@ -1039,12 +1138,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
return -EDESTADDRREQ; return -EDESTADDRREQ;
} }
if (unlikely(m->msg_namelen < sizeof(*dest)))
return -EINVAL;
if (unlikely(dest->family != AF_TIPC))
return -EINVAL;
if (unlikely(syn)) { if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN) if (sk->sk_state == TIPC_LISTEN)
return -EPIPE; return -EPIPE;
...@@ -1077,7 +1170,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) ...@@ -1077,7 +1170,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
msg_set_destport(hdr, dport); msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode)) if (unlikely(!dport && !dnode))
return -EHOSTUNREACH; return -EHOSTUNREACH;
} else if (dest->addrtype == TIPC_ADDR_ID) { } else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node; dnode = dest->addr.id.node;
msg_set_type(hdr, TIPC_DIRECT_MSG); msg_set_type(hdr, TIPC_DIRECT_MSG);
...@@ -1846,7 +1938,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -1846,7 +1938,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
if (unlikely(!msg_isdata(hdr))) if (unlikely(!msg_isdata(hdr)))
tipc_sk_proto_rcv(sk, &inputq, xmitq); tipc_sk_proto_rcv(sk, &inputq, xmitq);
else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG))
return kfree_skb(skb); return kfree_skb(skb);
if (unlikely(grp)) if (unlikely(grp))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment