Commit d12d2e12 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: send out join messages as soon as new member is discovered

When a socket is joining a group, we look up in the binding table to
find if there are already other members of the group present. This is
used for being able to return EAGAIN instead of EHOSTUNREACH if the
user proceeds directly to a send attempt.

However, the information in the binding table can be used to directly
set the created member in state MBR_PUBLISHED and send a JOIN message
to the peer, instead of waiting for a topology PUBLISH event to do this.
When there are many members in a group, the propagation time for such
events can be significant, and we can save time during the join
operation if we use the initial lookup result fully.

In this commit, we eliminate the member state MBR_DISCOVERED which has
been the result of the initial lookup, and do instead go directly to
MBR_PUBLISHED, which initiates the setup.

After this change, the tipc_member FSM looks as follows:

     +-----------+
---->| PUBLISHED |-----------------------------------------------+
PUB- +-----------+                                 LEAVE/WITHRAW |
LISH       |JOIN                                                 |
           |     +-------------------------------------------+   |
           |     |                            LEAVE/WITHDRAW |   |
           |     |                +------------+             |   |
           |     |   +----------->|  PENDING   |---------+   |   |
           |     |   |msg/maxactv +-+---+------+  LEAVE/ |   |   |
           |     |   |              |   |       WITHDRAW |   |   |
           |     |   |   +----------+   |                |   |   |
           |     |   |   |revert/maxactv|                |   |   |
           |     |   |   V              V                V   V   V
           |   +----------+  msg  +------------+       +-----------+
           +-->|  JOINED  |------>|   ACTIVE   |------>|  LEAVING  |--->
           |   +----------+       +--- -+------+ LEAVE/+-----------+DOWN
           |        A   A               |      WITHDRAW A   A    A   EVT
           |        |   |               |RECLAIM        |   |    |
           |        |   |REMIT          V               |   |    |
           |        |   |== adv   +------------+        |   |    |
           |        |   +---------| RECLAIMING |--------+   |    |
           |        |             +-----+------+  LEAVE/    |    |
           |        |                   |REMIT   WITHDRAW   |    |
           |        |                   |< adv              |    |
           |        |msg/               V            LEAVE/ |    |
           |        |adv==ADV_IDLE+------------+   WITHDRAW |    |
           |        +-------------|  REMITTED  |------------+    |
           |                      +------------+                 |
           |PUBLISH                                              |
JOIN +-----------+                                LEAVE/WITHDRAW |
---->|  JOINING  |-----------------------------------------------+
     +-----------+
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c2b22bcf
...@@ -49,7 +49,6 @@ ...@@ -49,7 +49,6 @@
#define ADV_ACTIVE (ADV_UNIT * 12) #define ADV_ACTIVE (ADV_UNIT * 12)
enum mbr_state { enum mbr_state {
MBR_DISCOVERED,
MBR_JOINING, MBR_JOINING,
MBR_PUBLISHED, MBR_PUBLISHED,
MBR_JOINED, MBR_JOINED,
...@@ -141,7 +140,7 @@ static bool tipc_group_is_receiver(struct tipc_member *m) ...@@ -141,7 +140,7 @@ static bool tipc_group_is_receiver(struct tipc_member *m)
static bool tipc_group_is_sender(struct tipc_member *m) static bool tipc_group_is_sender(struct tipc_member *m)
{ {
return m && m->state >= MBR_JOINED; return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
} }
u32 tipc_group_exclude(struct tipc_group *grp) u32 tipc_group_exclude(struct tipc_group *grp)
...@@ -184,6 +183,21 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, ...@@ -184,6 +183,21 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
return NULL; return NULL;
} }
void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
{
struct rb_root *tree = &grp->members;
struct tipc_member *m, *tmp;
struct sk_buff_head xmitq;
skb_queue_head_init(&xmitq);
rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
tipc_group_update_member(m, 0);
}
tipc_node_distr_xmit(net, &xmitq);
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
}
void tipc_group_delete(struct net *net, struct tipc_group *grp) void tipc_group_delete(struct net *net, struct tipc_group *grp)
{ {
struct rb_root *tree = &grp->members; struct rb_root *tree = &grp->members;
...@@ -274,7 +288,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp, ...@@ -274,7 +288,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,
static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
u32 node, u32 port, u32 node, u32 port,
int state) u32 instance, int state)
{ {
struct tipc_member *m; struct tipc_member *m;
...@@ -287,6 +301,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, ...@@ -287,6 +301,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
m->group = grp; m->group = grp;
m->node = node; m->node = node;
m->port = port; m->port = port;
m->instance = instance;
m->bc_acked = grp->bc_snd_nxt - 1; m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++; grp->member_cnt++;
tipc_group_add_to_tree(grp, m); tipc_group_add_to_tree(grp, m);
...@@ -295,9 +310,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, ...@@ -295,9 +310,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
return m; return m;
} }
void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) void tipc_group_add_member(struct tipc_group *grp, u32 node,
u32 port, u32 instance)
{ {
tipc_group_create_member(grp, node, port, MBR_DISCOVERED); tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
} }
static void tipc_group_delete_member(struct tipc_group *grp, static void tipc_group_delete_member(struct tipc_group *grp,
...@@ -623,7 +639,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, ...@@ -623,7 +639,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break; break;
case MBR_RECLAIMING: case MBR_RECLAIMING:
case MBR_DISCOVERED:
case MBR_JOINING: case MBR_JOINING:
case MBR_LEAVING: case MBR_LEAVING:
default: default:
...@@ -721,26 +736,26 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -721,26 +736,26 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
case GRP_JOIN_MSG: case GRP_JOIN_MSG:
if (!m) if (!m)
m = tipc_group_create_member(grp, node, port, m = tipc_group_create_member(grp, node, port,
MBR_JOINING); 0, MBR_JOINING);
if (!m) if (!m)
return; return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr); m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt; m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr); m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */ /* Wait until PUBLISH event is received if necessary */
if (m->state == MBR_DISCOVERED) { if (m->state != MBR_PUBLISHED)
m->state = MBR_JOINING; return;
} else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED; /* Member can be taken into service */
*usr_wakeup = true; m->state = MBR_JOINED;
m->usr_pending = false; *usr_wakeup = true;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); m->usr_pending = false;
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq);
}
list_del_init(&m->small_win); list_del_init(&m->small_win);
tipc_group_update_member(m, 0); tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq);
return; return;
case GRP_LEAVE_MSG: case GRP_LEAVE_MSG:
if (!m) if (!m)
...@@ -844,30 +859,36 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -844,30 +859,36 @@ void tipc_group_member_evt(struct tipc_group *grp,
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (event == TIPC_PUBLISHED) { switch (event) {
if (!m) case TIPC_PUBLISHED:
m = tipc_group_create_member(grp, node, port, /* Send and wait for arrival of JOIN message if necessary */
MBR_DISCOVERED); if (!m) {
if (!m) m = tipc_group_create_member(grp, node, port, instance,
return; MBR_PUBLISHED);
if (!m)
break;
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
break;
}
m->instance = instance; if (m->state != MBR_JOINING)
break;
/* Hold back event if JOIN message not yet received */ /* Member can be taken into service */
if (m->state == MBR_DISCOVERED) { m->instance = instance;
m->state = MBR_PUBLISHED; m->state = MBR_JOINED;
} else { *usr_wakeup = true;
tipc_group_create_event(grp, m, TIPC_PUBLISHED, m->usr_pending = false;
m->bc_syncpt, inputq); list_del_init(&m->small_win);
m->state = MBR_JOINED;
*usr_wakeup = true;
m->usr_pending = false;
}
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
tipc_group_update_member(m, 0); tipc_group_update_member(m, 0);
} else if (event == TIPC_WITHDRAWN) { tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq);
break;
case TIPC_WITHDRAWN:
if (!m) if (!m)
return; break;
*usr_wakeup = true; *usr_wakeup = true;
m->usr_pending = false; m->usr_pending = false;
...@@ -880,6 +901,9 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -880,6 +901,9 @@ void tipc_group_member_evt(struct tipc_group *grp,
if (!tipc_node_is_up(net, node)) if (!tipc_node_is_up(net, node))
tipc_group_create_event(grp, m, TIPC_WITHDRAWN, tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
m->bc_rcv_nxt, inputq); m->bc_rcv_nxt, inputq);
break;
default:
break;
} }
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp); *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
} }
...@@ -44,8 +44,10 @@ struct tipc_msg; ...@@ -44,8 +44,10 @@ struct tipc_msg;
struct tipc_group *tipc_group_create(struct net *net, u32 portid, struct tipc_group *tipc_group_create(struct net *net, u32 portid,
struct tipc_group_req *mreq); struct tipc_group_req *mreq);
void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcv_buf);
void tipc_group_delete(struct net *net, struct tipc_group *grp); void tipc_group_delete(struct net *net, struct tipc_group *grp);
void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); void tipc_group_add_member(struct tipc_group *grp, u32 node,
u32 port, u32 instance);
struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
int *scope); int *scope);
......
...@@ -732,7 +732,7 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, ...@@ -732,7 +732,7 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
list_for_each_entry(p, &info->zone_list, zone_list) { list_for_each_entry(p, &info->zone_list, zone_list) {
if (!tipc_in_scope(domain, p->node)) if (!tipc_in_scope(domain, p->node))
continue; continue;
tipc_group_add_member(grp, p->node, p->ref); tipc_group_add_member(grp, p->node, p->ref, p->lower);
} }
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&seq->lock);
......
...@@ -2757,10 +2757,10 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) ...@@ -2757,10 +2757,10 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
tipc_group_delete(net, grp); tipc_group_delete(net, grp);
tsk->group = NULL; tsk->group = NULL;
} }
/* Eliminate any risk that a broadcast overtakes sent JOINs */
/* Eliminate any risk that a broadcast overtakes the sent JOIN */
tsk->mc_method.rcast = true; tsk->mc_method.rcast = true;
tsk->mc_method.mandatory = true; tsk->mc_method.mandatory = true;
tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
return rc; return rc;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment