Commit 3008ba5f authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-obsolete-zone-concept'

Jon Maloy says:

====================
tipc: obsolete zone concept

Functionality related to the 'zone' concept was never implemented in
TIPC. In this series we eliminate the remaining traces of it in the
code, and can hence take a first step in reducing the footprint and
complexity of the binding table.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 4f1aec01 e50e73e1
......@@ -61,50 +61,6 @@ struct tipc_name_seq {
__u32 upper;
};
/* TIPC Address Size, Offset, Mask specification for Z.C.N
*/
#define TIPC_NODE_BITS 12
#define TIPC_CLUSTER_BITS 12
#define TIPC_ZONE_BITS 8
#define TIPC_NODE_OFFSET 0
#define TIPC_CLUSTER_OFFSET TIPC_NODE_BITS
#define TIPC_ZONE_OFFSET (TIPC_CLUSTER_OFFSET + TIPC_CLUSTER_BITS)
#define TIPC_NODE_SIZE ((1UL << TIPC_NODE_BITS) - 1)
#define TIPC_CLUSTER_SIZE ((1UL << TIPC_CLUSTER_BITS) - 1)
#define TIPC_ZONE_SIZE ((1UL << TIPC_ZONE_BITS) - 1)
#define TIPC_NODE_MASK (TIPC_NODE_SIZE << TIPC_NODE_OFFSET)
#define TIPC_CLUSTER_MASK (TIPC_CLUSTER_SIZE << TIPC_CLUSTER_OFFSET)
#define TIPC_ZONE_MASK (TIPC_ZONE_SIZE << TIPC_ZONE_OFFSET)
#define TIPC_ZONE_CLUSTER_MASK (TIPC_ZONE_MASK | TIPC_CLUSTER_MASK)
static inline __u32 tipc_addr(unsigned int zone,
unsigned int cluster,
unsigned int node)
{
return (zone << TIPC_ZONE_OFFSET) |
(cluster << TIPC_CLUSTER_OFFSET) |
node;
}
static inline unsigned int tipc_zone(__u32 addr)
{
return addr >> TIPC_ZONE_OFFSET;
}
static inline unsigned int tipc_cluster(__u32 addr)
{
return (addr & TIPC_CLUSTER_MASK) >> TIPC_CLUSTER_OFFSET;
}
static inline unsigned int tipc_node(__u32 addr)
{
return addr & TIPC_NODE_MASK;
}
/*
* Application-accessible port name types
*/
......@@ -117,9 +73,10 @@ static inline unsigned int tipc_node(__u32 addr)
/*
* Publication scopes when binding port names and port name sequences
*/
#define TIPC_ZONE_SCOPE 1
#define TIPC_CLUSTER_SCOPE 2
#define TIPC_NODE_SCOPE 3
enum tipc_scope {
TIPC_CLUSTER_SCOPE = 2, /* 0 can also be used */
TIPC_NODE_SCOPE = 3
};
/*
* Limiting values for messages
......@@ -243,7 +200,7 @@ struct sockaddr_tipc {
struct tipc_group_req {
__u32 type; /* group id */
__u32 instance; /* member id */
__u32 scope; /* zone/cluster/node */
__u32 scope; /* cluster/node */
__u32 flags;
};
......@@ -268,4 +225,53 @@ struct tipc_sioc_ln_req {
__u32 bearer_id;
char linkname[TIPC_MAX_LINK_NAME];
};
/* The macros and functions below are deprecated:
*/
#define TIPC_ZONE_SCOPE 1
#define TIPC_NODE_BITS 12
#define TIPC_CLUSTER_BITS 12
#define TIPC_ZONE_BITS 8
#define TIPC_NODE_OFFSET 0
#define TIPC_CLUSTER_OFFSET TIPC_NODE_BITS
#define TIPC_ZONE_OFFSET (TIPC_CLUSTER_OFFSET + TIPC_CLUSTER_BITS)
#define TIPC_NODE_SIZE ((1UL << TIPC_NODE_BITS) - 1)
#define TIPC_CLUSTER_SIZE ((1UL << TIPC_CLUSTER_BITS) - 1)
#define TIPC_ZONE_SIZE ((1UL << TIPC_ZONE_BITS) - 1)
#define TIPC_NODE_MASK (TIPC_NODE_SIZE << TIPC_NODE_OFFSET)
#define TIPC_CLUSTER_MASK (TIPC_CLUSTER_SIZE << TIPC_CLUSTER_OFFSET)
#define TIPC_ZONE_MASK (TIPC_ZONE_SIZE << TIPC_ZONE_OFFSET)
#define TIPC_ZONE_CLUSTER_MASK (TIPC_ZONE_MASK | TIPC_CLUSTER_MASK)
static inline __u32 tipc_addr(unsigned int zone,
unsigned int cluster,
unsigned int node)
{
return (zone << TIPC_ZONE_OFFSET) |
(cluster << TIPC_CLUSTER_OFFSET) |
node;
}
static inline unsigned int tipc_zone(__u32 addr)
{
return addr >> TIPC_ZONE_OFFSET;
}
static inline unsigned int tipc_cluster(__u32 addr)
{
return (addr & TIPC_CLUSTER_MASK) >> TIPC_CLUSTER_OFFSET;
}
static inline unsigned int tipc_node(__u32 addr)
{
return addr & TIPC_NODE_MASK;
}
#endif
......@@ -63,23 +63,6 @@ int in_own_node(struct net *net, u32 addr)
return (addr == tn->own_addr) || !addr;
}
/**
* addr_domain - convert 2-bit scope value to equivalent message lookup domain
*
* Needed when address of a named message must be looked up a second time
* after a network hop.
*/
u32 addr_domain(struct net *net, u32 sc)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
if (likely(sc == TIPC_NODE_SCOPE))
return tn->own_addr;
if (sc == TIPC_CLUSTER_SCOPE)
return tipc_cluster_mask(tn->own_addr);
return tipc_zone_mask(tn->own_addr);
}
/**
* tipc_addr_domain_valid - validates a network domain address
*
......@@ -124,20 +107,6 @@ int tipc_in_scope(u32 domain, u32 addr)
return 0;
}
/**
* tipc_addr_scope - convert message lookup domain to a 2-bit scope value
*/
int tipc_addr_scope(u32 domain)
{
if (likely(!domain))
return TIPC_ZONE_SCOPE;
if (tipc_node(domain))
return TIPC_NODE_SCOPE;
if (tipc_cluster(domain))
return TIPC_CLUSTER_SCOPE;
return TIPC_ZONE_SCOPE;
}
char *tipc_addr_string_fill(char *string, u32 addr)
{
snprintf(string, 16, "<%u.%u.%u>",
......
......@@ -60,6 +60,16 @@ static inline u32 tipc_cluster_mask(u32 addr)
return addr & TIPC_ZONE_CLUSTER_MASK;
}
static inline int tipc_node2scope(u32 node)
{
return node ? TIPC_NODE_SCOPE : TIPC_CLUSTER_SCOPE;
}
static inline int tipc_scope2node(struct net *net, int sc)
{
return sc != TIPC_NODE_SCOPE ? 0 : tipc_own_addr(net);
}
u32 tipc_own_addr(struct net *net);
int in_own_cluster(struct net *net, u32 addr);
int in_own_cluster_exact(struct net *net, u32 addr);
......
......@@ -131,6 +131,11 @@ static inline struct list_head *tipc_nodes(struct net *net)
return &tipc_net(net)->node_list;
}
static inline struct name_table *tipc_name_table(struct net *net)
{
return tipc_net(net)->nametbl;
}
static inline struct tipc_topsrv *tipc_topsrv(struct net *net)
{
return tipc_net(net)->topsrv;
......
......@@ -580,7 +580,7 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
msg = buf_msg(skb);
if (msg_reroute_cnt(msg))
return false;
dnode = addr_domain(net, msg_lookup_scope(msg));
dnode = tipc_scope2node(net, msg_lookup_scope(msg));
dport = tipc_nametbl_translate(net, msg_nametype(msg),
msg_nameinst(msg), &dnode);
if (!dport)
......
......@@ -56,7 +56,7 @@ static void publ_to_item(struct distr_item *i, struct publication *p)
i->type = htonl(p->type);
i->lower = htonl(p->lower);
i->upper = htonl(p->upper);
i->ref = htonl(p->ref);
i->port = htonl(p->port);
i->key = htonl(p->key);
}
......@@ -86,25 +86,25 @@ static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
*/
struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *buf;
struct name_table *nt = tipc_name_table(net);
struct distr_item *item;
struct sk_buff *skb;
list_add_tail_rcu(&publ->local_list,
&tn->nametbl->publ_list[publ->scope]);
if (publ->scope == TIPC_NODE_SCOPE)
if (publ->scope == TIPC_NODE_SCOPE) {
list_add_tail_rcu(&publ->binding_node, &nt->node_scope);
return NULL;
}
list_add_tail_rcu(&publ->binding_node, &nt->cluster_scope);
buf = named_prepare_buf(net, PUBLICATION, ITEM_SIZE, 0);
if (!buf) {
skb = named_prepare_buf(net, PUBLICATION, ITEM_SIZE, 0);
if (!skb) {
pr_warn("Publication distribution failure\n");
return NULL;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
item = (struct distr_item *)msg_data(buf_msg(skb));
publ_to_item(item, publ);
return buf;
return skb;
}
/**
......@@ -115,7 +115,7 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
struct sk_buff *buf;
struct distr_item *item;
list_del(&publ->local_list);
list_del(&publ->binding_node);
if (publ->scope == TIPC_NODE_SCOPE)
return NULL;
......@@ -147,7 +147,7 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
ITEM_SIZE) * ITEM_SIZE;
u32 msg_rem = msg_dsz;
list_for_each_entry(publ, pls, local_list) {
list_for_each_entry(publ, pls, binding_node) {
/* Prepare next buffer: */
if (!skb) {
skb = named_prepare_buf(net, PUBLICATION, msg_rem,
......@@ -184,16 +184,13 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
*/
void tipc_named_node_up(struct net *net, u32 dnode)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct name_table *nt = tipc_name_table(net);
struct sk_buff_head head;
__skb_queue_head_init(&head);
rcu_read_lock();
named_distribute(net, &head, dnode,
&tn->nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
named_distribute(net, &head, dnode,
&tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
named_distribute(net, &head, dnode, &nt->cluster_scope);
rcu_read_unlock();
tipc_node_xmit(net, &head, dnode, 0);
......@@ -212,15 +209,15 @@ static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr)
spin_lock_bh(&tn->nametbl_lock);
p = tipc_nametbl_remove_publ(net, publ->type, publ->lower,
publ->node, publ->ref, publ->key);
publ->node, publ->port, publ->key);
if (p)
tipc_node_unsubscribe(net, &p->nodesub_list, addr);
tipc_node_unsubscribe(net, &p->binding_node, addr);
spin_unlock_bh(&tn->nametbl_lock);
if (p != publ) {
pr_err("Unable to remove publication from failed node\n"
" (type=%u, lower=%u, node=0x%x, ref=%u, key=%u)\n",
publ->type, publ->lower, publ->node, publ->ref,
" (type=%u, lower=%u, node=0x%x, port=%u, key=%u)\n",
publ->type, publ->lower, publ->node, publ->port,
publ->key);
}
......@@ -249,7 +246,7 @@ void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
{
struct publication *publ, *tmp;
list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list)
list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
tipc_publ_purge(net, publ, addr);
tipc_dist_queue_purge(net, addr);
}
......@@ -271,18 +268,18 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
ntohl(i->lower),
ntohl(i->upper),
TIPC_CLUSTER_SCOPE, node,
ntohl(i->ref), ntohl(i->key));
ntohl(i->port), ntohl(i->key));
if (publ) {
tipc_node_subscribe(net, &publ->nodesub_list, node);
tipc_node_subscribe(net, &publ->binding_node, node);
return true;
}
} else if (dtype == WITHDRAWAL) {
publ = tipc_nametbl_remove_publ(net, ntohl(i->type),
ntohl(i->lower),
node, ntohl(i->ref),
node, ntohl(i->port),
ntohl(i->key));
if (publ) {
tipc_node_unsubscribe(net, &publ->nodesub_list, node);
tipc_node_unsubscribe(net, &publ->binding_node, node);
kfree_rcu(publ, rcu);
return true;
}
......@@ -382,15 +379,15 @@ void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
*/
void tipc_named_reinit(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);
struct publication *publ;
int scope;
spin_lock_bh(&tn->nametbl_lock);
for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
list_for_each_entry_rcu(publ, &tn->nametbl->publ_list[scope],
local_list)
list_for_each_entry_rcu(publ, &nt->node_scope, binding_node)
publ->node = tn->own_addr;
list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
publ->node = tn->own_addr;
spin_unlock_bh(&tn->nametbl_lock);
......
......@@ -63,7 +63,7 @@ struct distr_item {
__be32 type;
__be32 lower;
__be32 upper;
__be32 ref;
__be32 port;
__be32 key;
};
......
/*
* net/tipc/name_table.c: TIPC name table code
*
* Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2000-2006, 2014-2018, Ericsson AB
* Copyright (c) 2004-2008, 2010-2014, Wind River Systems
* All rights reserved.
*
......@@ -50,24 +50,12 @@
/**
* struct name_info - name sequence publication info
* @node_list: circular list of publications made by own node
* @cluster_list: circular list of publications made by own cluster
* @zone_list: circular list of publications made by own zone
* @node_list_size: number of entries in "node_list"
* @cluster_list_size: number of entries in "cluster_list"
* @zone_list_size: number of entries in "zone_list"
*
* Note: The zone list always contains at least one entry, since all
* publications of the associated name sequence belong to it.
* (The cluster and node lists may be empty.)
* @node_list: list of publications on own node of this <type,lower,upper>
* @all_publ: list of all publications of this <type,lower,upper>
*/
struct name_info {
struct list_head node_list;
struct list_head cluster_list;
struct list_head zone_list;
u32 node_list_size;
u32 cluster_list_size;
u32 zone_list_size;
struct list_head local_publ;
struct list_head all_publ;
};
/**
......@@ -114,7 +102,7 @@ static int hash(int x)
* publ_create - create a publication structure
*/
static struct publication *publ_create(u32 type, u32 lower, u32 upper,
u32 scope, u32 node, u32 port_ref,
u32 scope, u32 node, u32 port,
u32 key)
{
struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC);
......@@ -128,9 +116,9 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
publ->upper = upper;
publ->scope = scope;
publ->node = node;
publ->ref = port_ref;
publ->port = port;
publ->key = key;
INIT_LIST_HEAD(&publ->pport_list);
INIT_LIST_HEAD(&publ->binding_sock);
return publ;
}
......@@ -249,9 +237,9 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
info = sseq->info;
/* Check if an identical publication already exists */
list_for_each_entry(publ, &info->zone_list, zone_list) {
if ((publ->ref == port) && (publ->key == key) &&
(!publ->node || (publ->node == node)))
list_for_each_entry(publ, &info->all_publ, all_publ) {
if (publ->port == port && publ->key == key &&
(!publ->node || publ->node == node))
return NULL;
}
} else {
......@@ -290,9 +278,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
return NULL;
}
INIT_LIST_HEAD(&info->node_list);
INIT_LIST_HEAD(&info->cluster_list);
INIT_LIST_HEAD(&info->zone_list);
INIT_LIST_HEAD(&info->local_publ);
INIT_LIST_HEAD(&info->all_publ);
/* Insert new sub-sequence */
sseq = &nseq->sseqs[inspos];
......@@ -311,23 +298,15 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
if (!publ)
return NULL;
list_add(&publ->zone_list, &info->zone_list);
info->zone_list_size++;
if (in_own_cluster(net, node)) {
list_add(&publ->cluster_list, &info->cluster_list);
info->cluster_list_size++;
}
list_add(&publ->all_publ, &info->all_publ);
if (in_own_node(net, node)) {
list_add(&publ->node_list, &info->node_list);
info->node_list_size++;
}
if (in_own_node(net, node))
list_add(&publ->local_publ, &info->local_publ);
/* Any subscriptions waiting for notification? */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_sub_report_overlap(s, publ->lower, publ->upper,
TIPC_PUBLISHED, publ->ref,
TIPC_PUBLISHED, publ->port,
publ->node, publ->scope,
created_subseq);
}
......@@ -348,7 +327,7 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
static struct publication *tipc_nameseq_remove_publ(struct net *net,
struct name_seq *nseq,
u32 inst, u32 node,
u32 ref, u32 key)
u32 port, u32 key)
{
struct publication *publ;
struct sub_seq *sseq = nameseq_find_subseq(nseq, inst);
......@@ -363,32 +342,20 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
info = sseq->info;
/* Locate publication, if it exists */
list_for_each_entry(publ, &info->zone_list, zone_list) {
if ((publ->key == key) && (publ->ref == ref) &&
(!publ->node || (publ->node == node)))
list_for_each_entry(publ, &info->all_publ, all_publ) {
if (publ->key == key && publ->port == port &&
(!publ->node || publ->node == node))
goto found;
}
return NULL;
found:
/* Remove publication from zone scope list */
list_del(&publ->zone_list);
info->zone_list_size--;
/* Remove publication from cluster scope list, if present */
if (in_own_cluster(net, node)) {
list_del(&publ->cluster_list);
info->cluster_list_size--;
}
/* Remove publication from node scope list, if present */
if (in_own_node(net, node)) {
list_del(&publ->node_list);
info->node_list_size--;
}
list_del(&publ->all_publ);
if (in_own_node(net, node))
list_del(&publ->local_publ);
/* Contract subseq list if no more publications for that subseq */
if (list_empty(&info->zone_list)) {
if (list_empty(&info->all_publ)) {
kfree(info);
free = &nseq->sseqs[nseq->first_free--];
memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq));
......@@ -398,8 +365,9 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
/* Notify any waiting subscriptions */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_sub_report_overlap(s, publ->lower, publ->upper,
TIPC_WITHDRAWN, publ->ref, publ->node,
publ->scope, removed_subseq);
TIPC_WITHDRAWN, publ->port,
publ->node, publ->scope,
removed_subseq);
}
return publ;
......@@ -435,11 +403,12 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
struct name_info *info = sseq->info;
int must_report = 1;
list_for_each_entry(crs, &info->zone_list, zone_list) {
list_for_each_entry(crs, &info->all_publ, all_publ) {
tipc_sub_report_overlap(sub, sseq->lower,
sseq->upper,
TIPC_PUBLISHED,
crs->ref, crs->node,
crs->port,
crs->node,
crs->scope,
must_report);
must_report = 0;
......@@ -473,8 +442,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
struct name_seq *seq = nametbl_find_seq(net, type);
int index = hash(type);
if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) ||
(lower > upper)) {
if (scope > TIPC_NODE_SCOPE || lower > upper) {
pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n",
type, lower, upper, scope);
return NULL;
......@@ -493,7 +461,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
}
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
u32 lower, u32 node, u32 ref,
u32 lower, u32 node, u32 port,
u32 key)
{
struct publication *publ;
......@@ -503,7 +471,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
return NULL;
spin_lock_bh(&seq->lock);
publ = tipc_nameseq_remove_publ(net, seq, lower, node, ref, key);
publ = tipc_nameseq_remove_publ(net, seq, lower, node, port, key);
if (!seq->first_free && list_empty(&seq->subscriptions)) {
hlist_del_init_rcu(&seq->ns_list);
kfree(seq->sseqs);
......@@ -536,7 +504,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
struct name_info *info;
struct publication *publ;
struct name_seq *seq;
u32 ref = 0;
u32 port = 0;
u32 node = 0;
if (!tipc_in_scope(*destnode, tn->own_addr))
......@@ -554,54 +522,42 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
/* Closest-First Algorithm */
if (likely(!*destnode)) {
if (!list_empty(&info->node_list)) {
publ = list_first_entry(&info->node_list,
struct publication,
node_list);
list_move_tail(&publ->node_list,
&info->node_list);
} else if (!list_empty(&info->cluster_list)) {
publ = list_first_entry(&info->cluster_list,
if (!list_empty(&info->local_publ)) {
publ = list_first_entry(&info->local_publ,
struct publication,
cluster_list);
list_move_tail(&publ->cluster_list,
&info->cluster_list);
local_publ);
list_move_tail(&publ->local_publ,
&info->local_publ);
} else {
publ = list_first_entry(&info->zone_list,
publ = list_first_entry(&info->all_publ,
struct publication,
zone_list);
list_move_tail(&publ->zone_list,
&info->zone_list);
all_publ);
list_move_tail(&publ->all_publ,
&info->all_publ);
}
}
/* Round-Robin Algorithm */
else if (*destnode == tn->own_addr) {
if (list_empty(&info->node_list))
if (list_empty(&info->local_publ))
goto no_match;
publ = list_first_entry(&info->node_list, struct publication,
node_list);
list_move_tail(&publ->node_list, &info->node_list);
} else if (in_own_cluster_exact(net, *destnode)) {
if (list_empty(&info->cluster_list))
goto no_match;
publ = list_first_entry(&info->cluster_list, struct publication,
cluster_list);
list_move_tail(&publ->cluster_list, &info->cluster_list);
publ = list_first_entry(&info->local_publ, struct publication,
local_publ);
list_move_tail(&publ->local_publ, &info->local_publ);
} else {
publ = list_first_entry(&info->zone_list, struct publication,
zone_list);
list_move_tail(&publ->zone_list, &info->zone_list);
publ = list_first_entry(&info->all_publ, struct publication,
all_publ);
list_move_tail(&publ->all_publ, &info->all_publ);
}
ref = publ->ref;
port = publ->port;
node = publ->node;
no_match:
spin_unlock_bh(&seq->lock);
not_found:
rcu_read_unlock();
*destnode = node;
return ref;
return port;
}
bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
......@@ -623,16 +579,16 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
sseq = nameseq_find_subseq(seq, instance);
if (likely(sseq)) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
list_for_each_entry(publ, &info->all_publ, all_publ) {
if (publ->scope != scope)
continue;
if (publ->ref == exclude && publ->node == self)
if (publ->port == exclude && publ->node == self)
continue;
tipc_dest_push(dsts, publ->node, publ->ref);
tipc_dest_push(dsts, publ->node, publ->port);
(*dstcnt)++;
if (all)
continue;
list_move_tail(&publ->zone_list, &info->zone_list);
list_move_tail(&publ->all_publ, &info->all_publ);
break;
}
}
......@@ -642,7 +598,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
return !list_empty(dsts);
}
int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
u32 scope, bool exact, struct list_head *dports)
{
struct sub_seq *sseq_stop;
......@@ -650,7 +606,6 @@ int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
struct publication *p;
struct name_seq *seq;
struct sub_seq *sseq;
int res = 0;
rcu_read_lock();
seq = nametbl_find_seq(net, type);
......@@ -664,18 +619,14 @@ int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
if (sseq->lower > upper)
break;
info = sseq->info;
list_for_each_entry(p, &info->node_list, node_list) {
list_for_each_entry(p, &info->local_publ, local_publ) {
if (p->scope == scope || (!exact && p->scope < scope))
tipc_dest_push(dports, 0, p->ref);
tipc_dest_push(dports, 0, p->port);
}
if (info->cluster_list_size != info->node_list_size)
res = 1;
}
spin_unlock_bh(&seq->lock);
exit:
rcu_read_unlock();
return res;
}
/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
......@@ -700,7 +651,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
stop = seq->sseqs + seq->first_free;
for (; sseq != stop && sseq->lower <= upper; sseq++) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
list_for_each_entry(publ, &info->all_publ, all_publ) {
tipc_nlist_add(nodes, publ->node);
}
}
......@@ -729,10 +680,10 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
stop = seq->sseqs + seq->first_free;
for (; sseq != stop; sseq++) {
info = sseq->info;
list_for_each_entry(p, &info->zone_list, zone_list) {
list_for_each_entry(p, &info->all_publ, all_publ) {
if (p->scope != scope)
continue;
tipc_group_add_member(grp, p->node, p->ref, p->lower);
tipc_group_add_member(grp, p->node, p->port, p->lower);
}
}
spin_unlock_bh(&seq->lock);
......@@ -777,7 +728,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
/**
* tipc_nametbl_withdraw - withdraw name publication from network name tables
*/
int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port,
u32 key)
{
struct publication *publ;
......@@ -786,18 +737,18 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
spin_lock_bh(&tn->nametbl_lock);
publ = tipc_nametbl_remove_publ(net, type, lower, tn->own_addr,
ref, key);
port, key);
if (likely(publ)) {
tn->nametbl->local_publ_count--;
skb = tipc_named_withdraw(net, publ);
/* Any pending external events? */
tipc_named_process_backlog(net);
list_del_init(&publ->pport_list);
list_del_init(&publ->binding_sock);
kfree_rcu(publ, rcu);
} else {
pr_err("Unable to remove local publication\n"
"(type=%u, lower=%u, ref=%u, key=%u)\n",
type, lower, ref, key);
"(type=%u, lower=%u, port=%u, key=%u)\n",
type, lower, port, key);
}
spin_unlock_bh(&tn->nametbl_lock);
......@@ -879,9 +830,8 @@ int tipc_nametbl_init(struct net *net)
for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]);
INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_NODE_SCOPE]);
INIT_LIST_HEAD(&tipc_nametbl->node_scope);
INIT_LIST_HEAD(&tipc_nametbl->cluster_scope);
tn->nametbl = tipc_nametbl;
spin_lock_init(&tn->nametbl_lock);
return 0;
......@@ -901,9 +851,9 @@ static void tipc_purge_publications(struct net *net, struct name_seq *seq)
spin_lock_bh(&seq->lock);
sseq = seq->sseqs;
info = sseq->info;
list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
list_for_each_entry_safe(publ, safe, &info->all_publ, all_publ) {
tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node,
publ->ref, publ->key);
publ->port, publ->key);
kfree_rcu(publ, rcu);
}
hlist_del_init_rcu(&seq->ns_list);
......@@ -950,17 +900,17 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
struct publication *p;
if (*last_publ) {
list_for_each_entry(p, &sseq->info->zone_list, zone_list)
list_for_each_entry(p, &sseq->info->all_publ, all_publ)
if (p->key == *last_publ)
break;
if (p->key != *last_publ)
return -EPIPE;
} else {
p = list_first_entry(&sseq->info->zone_list, struct publication,
zone_list);
p = list_first_entry(&sseq->info->all_publ, struct publication,
all_publ);
}
list_for_each_entry_from(p, &sseq->info->zone_list, zone_list) {
list_for_each_entry_from(p, &sseq->info->all_publ, all_publ) {
*last_publ = p->key;
hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
......@@ -987,7 +937,7 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_NODE, p->node))
goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->ref))
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->port))
goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
goto publ_msg_full;
......
/*
* net/tipc/name_table.h: Include file for TIPC name table code
*
* Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2000-2006, 2014-2018, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved.
*
......@@ -54,19 +54,22 @@ struct tipc_group;
* @type: name sequence type
* @lower: name sequence lower bound
* @upper: name sequence upper bound
* @scope: scope of publication
* @node: network address of publishing port's node
* @ref: publishing port
* @key: publication key
* @nodesub_list: subscription to "node down" event (off-node publication only)
* @local_list: adjacent entries in list of publications made by this node
* @pport_list: adjacent entries in list of publications made by this port
* @node_list: adjacent matching name seq publications with >= node scope
* @cluster_list: adjacent matching name seq publications with >= cluster scope
* @zone_list: adjacent matching name seq publications with >= zone scope
* @scope: scope of publication, TIPC_NODE_SCOPE or TIPC_CLUSTER_SCOPE
* @node: network address of publishing socket's node
* @port: publishing port
* @key: publication key, unique across the cluster
* @binding_node: all publications from the same node which bound this one
* - Remote publications: in node->publ_list
* Used by node/name distr to withdraw publications when node is lost
* - Local/node scope publications: in name_table->node_scope list
* - Local/cluster scope publications: in name_table->cluster_scope list
* @binding_sock: all publications from the same socket which bound this one
* Used by socket to withdraw publications when socket is unbound/released
* @local_publ: list of identical publications made from this node
* Used by closest_first and multicast receive lookup algorithms
* @all_publ: all publications identical to this one, whatever node and scope
* Used by round-robin lookup algorithm
* @rcu: RCU callback head used for deferred freeing
*
* Note that the node list, cluster list, and zone list are circular lists.
*/
struct publication {
u32 type;
......@@ -74,33 +77,36 @@ struct publication {
u32 upper;
u32 scope;
u32 node;
u32 ref;
u32 port;
u32 key;
struct list_head nodesub_list;
struct list_head local_list;
struct list_head pport_list;
struct list_head node_list;
struct list_head cluster_list;
struct list_head zone_list;
struct list_head binding_node;
struct list_head binding_sock;
struct list_head local_publ;
struct list_head all_publ;
struct rcu_head rcu;
};
/**
* struct name_table - table containing all existing port name publications
* @seq_hlist: name sequence hash lists
* @publ_list: pulication lists
* @node_scope: all local publications with node scope
* - used by name_distr during re-init of name table
* @cluster_scope: all local publications with cluster scope
* - used by name_distr to send bulk updates to new nodes
* - used by name_distr during re-init of name table
* @local_publ_count: number of publications issued by this node
*/
struct name_table {
struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE];
struct list_head publ_list[TIPC_PUBL_SCOPE_NUM];
struct list_head node_scope;
struct list_head cluster_scope;
u32 local_publ_count;
};
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
u32 scope, bool exact, struct list_head *dports);
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
u32 type, u32 domain);
......
......@@ -118,7 +118,7 @@ int tipc_net_start(struct net *net, u32 addr)
tipc_sk_reinit(net);
tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr,
TIPC_ZONE_SCOPE, 0, tn->own_addr);
TIPC_CLUSTER_SCOPE, 0, tn->own_addr);
pr_info("Started in network mode\n");
pr_info("Own node address %s, network identity %u\n",
......
......@@ -644,7 +644,7 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
goto exit;
}
res = (addr->scope > 0) ?
res = (addr->scope >= 0) ?
tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
exit:
......@@ -1280,8 +1280,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq *seq;
struct sk_buff_head pkts;
u32 type, inst, domain;
u32 dnode, dport;
u32 type, inst;
int mtu, rc;
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
......@@ -1332,13 +1332,12 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (dest->addrtype == TIPC_ADDR_NAME) {
type = dest->addr.name.name.type;
inst = dest->addr.name.name.instance;
domain = dest->addr.name.domain;
dnode = domain;
dnode = dest->addr.name.domain;
msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(hdr, type);
msg_set_nameinst(hdr, inst);
msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
dport = tipc_nametbl_translate(net, type, inst, &dnode);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dport);
......@@ -2592,6 +2591,9 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
struct publication *publ;
u32 key;
if (scope != TIPC_NODE_SCOPE)
scope = TIPC_CLUSTER_SCOPE;
if (tipc_sk_connected(sk))
return -EINVAL;
key = tsk->portid + tsk->pub_count + 1;
......@@ -2603,7 +2605,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
if (unlikely(!publ))
return -EINVAL;
list_add(&publ->pport_list, &tsk->publications);
list_add(&publ->binding_sock, &tsk->publications);
tsk->pub_count++;
tsk->published = 1;
return 0;
......@@ -2617,7 +2619,10 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
struct publication *safe;
int rc = -EINVAL;
list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
if (scope != TIPC_NODE_SCOPE)
scope = TIPC_CLUSTER_SCOPE;
list_for_each_entry_safe(publ, safe, &tsk->publications, binding_sock) {
if (seq) {
if (publ->scope != scope)
continue;
......@@ -2628,12 +2633,12 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
if (publ->upper != seq->upper)
break;
tipc_nametbl_withdraw(net, publ->type, publ->lower,
publ->ref, publ->key);
publ->port, publ->key);
rc = 0;
break;
}
tipc_nametbl_withdraw(net, publ->type, publ->lower,
publ->ref, publ->key);
publ->port, publ->key);
rc = 0;
}
if (list_empty(&tsk->publications))
......@@ -3287,7 +3292,7 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
struct publication *p;
if (*last_publ) {
list_for_each_entry(p, &tsk->publications, pport_list) {
list_for_each_entry(p, &tsk->publications, binding_sock) {
if (p->key == *last_publ)
break;
}
......@@ -3304,10 +3309,10 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
}
} else {
p = list_first_entry(&tsk->publications, struct publication,
pport_list);
binding_sock);
}
list_for_each_entry_from(p, &tsk->publications, pport_list) {
list_for_each_entry_from(p, &tsk->publications, binding_sock) {
err = __tipc_nl_add_sk_publ(skb, cb, p);
if (err) {
*last_publ = p->key;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment