Commit 5405ff6e authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: convert node lock to rwlock

According to the node FSM a node in state SELF_UP_PEER_UP cannot
change state inside a lock context, except when a TUNNEL_PROTOCOL
(SYNCH or FAILOVER) packet arrives. However, the node's individual
links may still change state.

Since each link now is protected by its own spinlock, we finally have
the conditions in place to convert the node spinlock to an rwlock_t.
If the node state and arriving packet type are rigth, we can let the
link directly receive the packet under protection of its own spinlock
and the node lock in read mode. In all other cases we use the node
lock in write mode. This enables full concurrent execution between
parallel links during steady-state traffic situations, i.e., 99+ %
of the time.

This commit implements this change.
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 2312bf61
......@@ -1547,7 +1547,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
*bearer_id = 0;
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
tipc_node_lock(n_ptr);
tipc_node_read_lock(n_ptr);
for (i = 0; i < MAX_BEARERS; i++) {
l_ptr = n_ptr->links[i].link;
if (l_ptr && !strcmp(l_ptr->name, link_name)) {
......@@ -1556,7 +1556,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
break;
}
}
tipc_node_unlock(n_ptr);
tipc_node_read_unlock(n_ptr);
if (found_node)
break;
}
......@@ -1658,7 +1658,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
if (!node)
return -EINVAL;
tipc_node_lock(node);
tipc_node_read_lock(node);
link = node->links[bearer_id].link;
if (!link) {
......@@ -1699,7 +1699,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
}
out:
tipc_node_unlock(node);
tipc_node_read_unlock(node);
return res;
}
......@@ -1898,10 +1898,10 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
list_for_each_entry_continue_rcu(node, &tn->node_list,
list) {
tipc_node_lock(node);
tipc_node_read_lock(node);
err = __tipc_nl_add_node_links(net, &msg, node,
&prev_link);
tipc_node_unlock(node);
tipc_node_read_unlock(node);
if (err)
goto out;
......@@ -1913,10 +1913,10 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
goto out;
list_for_each_entry_rcu(node, &tn->node_list, list) {
tipc_node_lock(node);
tipc_node_read_lock(node);
err = __tipc_nl_add_node_links(net, &msg, node,
&prev_link);
tipc_node_unlock(node);
tipc_node_read_unlock(node);
if (err)
goto out;
......@@ -1967,16 +1967,16 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
if (!node)
return -EINVAL;
tipc_node_lock(node);
tipc_node_read_lock(node);
link = node->links[bearer_id].link;
if (!link) {
tipc_node_unlock(node);
tipc_node_read_unlock(node);
nlmsg_free(msg.skb);
return -EINVAL;
}
err = __tipc_nl_add_link(net, &msg, link, 0);
tipc_node_unlock(node);
tipc_node_read_unlock(node);
if (err) {
nlmsg_free(msg.skb);
return err;
......@@ -2021,18 +2021,18 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
node = tipc_link_find_owner(net, link_name, &bearer_id);
if (!node)
return -EINVAL;
le = &node->links[bearer_id];
tipc_node_lock(node);
tipc_node_read_lock(node);
spin_lock_bh(&le->lock);
link = le->link;
if (!link) {
tipc_node_unlock(node);
spin_unlock_bh(&le->lock);
tipc_node_read_unlock(node);
return -EINVAL;
}
link_reset_statistics(link);
spin_unlock_bh(&le->lock);
tipc_node_unlock(node);
tipc_node_read_unlock(node);
return 0;
}
......@@ -141,10 +141,63 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
return NULL;
}
void tipc_node_read_lock(struct tipc_node *n)
{
read_lock_bh(&n->lock);
}
void tipc_node_read_unlock(struct tipc_node *n)
{
read_unlock_bh(&n->lock);
}
static void tipc_node_write_lock(struct tipc_node *n)
{
write_lock_bh(&n->lock);
}
static void tipc_node_write_unlock(struct tipc_node *n)
{
struct net *net = n->net;
u32 addr = 0;
u32 flags = n->action_flags;
u32 link_id = 0;
struct list_head *publ_list;
if (likely(!flags)) {
write_unlock_bh(&n->lock);
return;
}
addr = n->addr;
link_id = n->link_id;
publ_list = &n->publ_list;
n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
write_unlock_bh(&n->lock);
if (flags & TIPC_NOTIFY_NODE_DOWN)
tipc_publ_notify(net, publ_list, addr);
if (flags & TIPC_NOTIFY_NODE_UP)
tipc_named_node_up(net, addr);
if (flags & TIPC_NOTIFY_LINK_UP)
tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
TIPC_NODE_SCOPE, link_id, addr);
if (flags & TIPC_NOTIFY_LINK_DOWN)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr);
}
struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *n_ptr, *temp_node;
int i;
spin_lock_bh(&tn->node_list_lock);
n_ptr = tipc_node_find(net, addr);
......@@ -159,7 +212,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
n_ptr->net = net;
n_ptr->capabilities = capabilities;
kref_init(&n_ptr->kref);
spin_lock_init(&n_ptr->lock);
rwlock_init(&n_ptr->lock);
INIT_HLIST_NODE(&n_ptr->hash);
INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list);
......@@ -168,6 +221,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
skb_queue_head_init(&n_ptr->bc_entry.inputq1);
__skb_queue_head_init(&n_ptr->bc_entry.arrvq);
skb_queue_head_init(&n_ptr->bc_entry.inputq2);
for (i = 0; i < MAX_BEARERS; i++)
spin_lock_init(&n_ptr->links[i].lock);
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
if (n_ptr->addr < temp_node->addr)
......@@ -246,9 +301,9 @@ void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
return;
}
tipc_node_lock(n);
tipc_node_write_lock(n);
list_add_tail(subscr, &n->publ_list);
tipc_node_unlock(n);
tipc_node_write_unlock(n);
tipc_node_put(n);
}
......@@ -264,9 +319,9 @@ void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
return;
}
tipc_node_lock(n);
tipc_node_write_lock(n);
list_del_init(subscr);
tipc_node_unlock(n);
tipc_node_write_unlock(n);
tipc_node_put(n);
}
......@@ -293,9 +348,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
conn->port = port;
conn->peer_port = peer_port;
tipc_node_lock(node);
tipc_node_write_lock(node);
list_add_tail(&conn->list, &node->conn_sks);
tipc_node_unlock(node);
tipc_node_write_unlock(node);
exit:
tipc_node_put(node);
return err;
......@@ -313,14 +368,14 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
if (!node)
return;
tipc_node_lock(node);
tipc_node_write_lock(node);
list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
if (port != conn->port)
continue;
list_del(&conn->list);
kfree(conn);
}
tipc_node_unlock(node);
tipc_node_write_unlock(node);
tipc_node_put(node);
}
......@@ -337,7 +392,7 @@ static void tipc_node_timeout(unsigned long data)
__skb_queue_head_init(&xmitq);
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
tipc_node_lock(n);
tipc_node_read_lock(n);
le = &n->links[bearer_id];
spin_lock_bh(&le->lock);
if (le->link) {
......@@ -346,7 +401,7 @@ static void tipc_node_timeout(unsigned long data)
rc = tipc_link_timeout(le->link, &xmitq);
}
spin_unlock_bh(&le->lock);
tipc_node_unlock(n);
tipc_node_read_unlock(n);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
if (rc & TIPC_LINK_DOWN_EVT)
tipc_node_link_down(n, bearer_id, false);
......@@ -425,9 +480,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct sk_buff_head *xmitq)
{
tipc_node_lock(n);
tipc_node_write_lock(n);
__tipc_node_link_up(n, bearer_id, xmitq);
tipc_node_unlock(n);
tipc_node_write_unlock(n);
}
/**
......@@ -516,7 +571,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
__skb_queue_head_init(&xmitq);
tipc_node_lock(n);
tipc_node_write_lock(n);
if (!tipc_link_is_establishing(l)) {
__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
if (delete) {
......@@ -528,7 +583,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
/* Defuse pending tipc_node_link_up() */
tipc_link_fsm_evt(l, LINK_RESET_EVT);
}
tipc_node_unlock(n);
tipc_node_write_unlock(n);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
tipc_sk_rcv(n->net, &le->inputq);
}
......@@ -561,7 +616,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
if (!n)
return;
tipc_node_lock(n);
tipc_node_write_lock(n);
le = &n->links[b->identity];
......@@ -656,7 +711,6 @@ void tipc_node_check_dest(struct net *net, u32 onode,
if (n->state == NODE_FAILINGOVER)
tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
le->link = l;
spin_lock_init(&le->lock);
n->link_cnt++;
tipc_node_calculate_timer(n, l);
if (n->link_cnt == 1)
......@@ -665,7 +719,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
}
memcpy(&le->maddr, maddr, sizeof(*maddr));
exit:
tipc_node_unlock(n);
tipc_node_write_unlock(n);
if (reset && !tipc_link_is_reset(l))
tipc_node_link_down(n, b->identity, false);
tipc_node_put(n);
......@@ -873,24 +927,6 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
}
bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
{
int state = n->state;
if (likely(state == SELF_UP_PEER_UP))
return true;
if (state == SELF_LEAVING_PEER_DOWN)
return false;
if (state == SELF_DOWN_PEER_LEAVING) {
if (msg_peer_node_is_up(hdr))
return false;
}
return true;
}
static void node_lost_contact(struct tipc_node *n,
struct sk_buff_head *inputq)
{
......@@ -952,56 +988,18 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
if (bearer_id >= MAX_BEARERS)
goto exit;
tipc_node_lock(node);
tipc_node_read_lock(node);
link = node->links[bearer_id].link;
if (link) {
strncpy(linkname, link->name, len);
err = 0;
}
exit:
tipc_node_unlock(node);
tipc_node_read_unlock(node);
tipc_node_put(node);
return err;
}
void tipc_node_unlock(struct tipc_node *node)
{
struct net *net = node->net;
u32 addr = 0;
u32 flags = node->action_flags;
u32 link_id = 0;
struct list_head *publ_list;
if (likely(!flags)) {
spin_unlock_bh(&node->lock);
return;
}
addr = node->addr;
link_id = node->link_id;
publ_list = &node->publ_list;
node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
spin_unlock_bh(&node->lock);
if (flags & TIPC_NOTIFY_NODE_DOWN)
tipc_publ_notify(net, publ_list, addr);
if (flags & TIPC_NOTIFY_NODE_UP)
tipc_named_node_up(net, addr);
if (flags & TIPC_NOTIFY_LINK_UP)
tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
TIPC_NODE_SCOPE, link_id, addr);
if (flags & TIPC_NOTIFY_LINK_DOWN)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr);
}
/* Caller should hold node lock for the passed node */
static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
{
......@@ -1048,40 +1046,38 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
u32 dnode, int selector)
{
struct tipc_link_entry *le;
struct tipc_link_entry *le = NULL;
struct tipc_node *n;
struct sk_buff_head xmitq;
struct tipc_media_addr *maddr = NULL;
int bearer_id = -1;
int rc = -EHOSTUNREACH;
__skb_queue_head_init(&xmitq);
n = tipc_node_find(net, dnode);
if (likely(n)) {
tipc_node_lock(n);
tipc_node_read_lock(n);
bearer_id = n->active_links[selector & 1];
if (bearer_id >= 0) {
le = &n->links[bearer_id];
maddr = &le->maddr;
spin_lock_bh(&le->lock);
if (likely(le->link))
rc = tipc_link_xmit(le->link, list, &xmitq);
spin_unlock_bh(&le->lock);
}
tipc_node_unlock(n);
tipc_node_read_unlock(n);
if (likely(!skb_queue_empty(&xmitq))) {
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
return 0;
}
if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false);
tipc_node_put(n);
return rc;
}
if (likely(!skb_queue_empty(&xmitq))) {
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
return 0;
}
if (likely(in_own_node(net, dnode))) {
if (unlikely(!in_own_node(net, dnode)))
return rc;
tipc_sk_rcv(net, list);
return 0;
}
return rc;
}
/* tipc_node_xmit_skb(): send single buffer to destination
......@@ -1171,9 +1167,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
/* Broadcast ACKs are sent on a unicast link */
if (rc & TIPC_LINK_SND_BC_ACK) {
tipc_node_lock(n);
tipc_node_read_lock(n);
tipc_link_build_ack_msg(le->link, &xmitq);
tipc_node_unlock(n);
tipc_node_read_unlock(n);
}
if (!skb_queue_empty(&xmitq))
......@@ -1229,7 +1225,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
}
}
/* Update node accesibility if applicable */
/* Check and update node accesibility if applicable */
if (state == SELF_UP_PEER_COMING) {
if (!tipc_link_is_up(l))
return true;
......@@ -1245,6 +1241,9 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
return true;
}
if (state == SELF_LEAVING_PEER_DOWN)
return false;
/* Ignore duplicate packets */
if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
return true;
......@@ -1361,21 +1360,29 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
else if (unlikely(n->bc_entry.link->acked != bc_ack))
tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
tipc_node_lock(n);
/* Is reception permitted at the moment ? */
if (!tipc_node_filter_pkt(n, hdr))
goto unlock;
/* Check and if necessary update node state */
if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
/* Receive packet directly if conditions permit */
tipc_node_read_lock(n);
if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
spin_lock_bh(&le->lock);
if (le->link) {
rc = tipc_link_rcv(le->link, skb, &xmitq);
skb = NULL;
}
spin_unlock_bh(&le->lock);
}
tipc_node_read_unlock(n);
/* Check/update node state before receiving */
if (unlikely(skb)) {
tipc_node_write_lock(n);
if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
if (le->link) {
rc = tipc_link_rcv(le->link, skb, &xmitq);
skb = NULL;
}
unlock:
tipc_node_unlock(n);
}
tipc_node_write_unlock(n);
}
if (unlikely(rc & TIPC_LINK_UP_EVT))
tipc_node_link_up(n, bearer_id, &xmitq);
......@@ -1440,15 +1447,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
continue;
}
tipc_node_lock(node);
tipc_node_read_lock(node);
err = __tipc_nl_add_node(&msg, node);
if (err) {
last_addr = node->addr;
tipc_node_unlock(node);
tipc_node_read_unlock(node);
goto out;
}
tipc_node_unlock(node);
tipc_node_read_unlock(node);
}
done = 1;
out:
......
......@@ -109,7 +109,7 @@ struct tipc_bclink_entry {
struct tipc_node {
u32 addr;
struct kref kref;
spinlock_t lock;
rwlock_t lock;
struct net *net;
struct hlist_node hash;
int active_links[2];
......@@ -145,7 +145,8 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node);
void tipc_node_read_lock(struct tipc_node *n);
void tipc_node_read_unlock(struct tipc_node *node);
int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
int selector);
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
......@@ -157,11 +158,6 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
static inline void tipc_node_lock(struct tipc_node *node)
{
spin_lock_bh(&node->lock);
}
static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
int bearer_id = n->active_links[sel & 1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment