Commit 59e0cd11 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by Greg Kroah-Hartman

tipc: fix socket timer deadlock

commit f1d048f2 upstream.

We sometimes observe a 'deadly embrace' type deadlock occurring
between mutually connected sockets on the same node. This happens
when the one-hour peer supervision timers happen to expire
simultaneously in both sockets.

The scenario is as follows:

CPU 1:                          CPU 2:
--------                        --------
tipc_sk_timeout(sk1)            tipc_sk_timeout(sk2)
  lock(sk1.slock)                 lock(sk2.slock)
  msg_create(probe)               msg_create(probe)
  unlock(sk1.slock)               unlock(sk2.slock)
  tipc_node_xmit_skb()            tipc_node_xmit_skb()
    tipc_node_xmit()                tipc_node_xmit()
      tipc_sk_rcv(sk2)                tipc_sk_rcv(sk1)
        lock(sk2.slock)                 lock((sk1.slock)
        filter_rcv()                    filter_rcv()
          tipc_sk_proto_rcv()             tipc_sk_proto_rcv()
            msg_create(probe_rsp)           msg_create(probe_rsp)
            tipc_sk_respond()               tipc_sk_respond()
              tipc_node_xmit_skb()            tipc_node_xmit_skb()
                tipc_node_xmit()                tipc_node_xmit()
                  tipc_sk_rcv(sk1)                tipc_sk_rcv(sk2)
                    lock((sk1.slock)                lock((sk2.slock)
                    ===> DEADLOCK                   ===> DEADLOCK

Further analysis reveals that there are three different locations in the
socket code where tipc_sk_respond() is called within the context of the
socket lock, with ensuing risk of similar deadlocks.

We now solve this by passing a buffer queue along with all upcalls where
sk_lock.slock may potentially be held. Response or rejected message
buffers are accumulated into this queue instead of being sent out
directly, and only sent once we know we are safely outside the slock
context.
Reported-by: default avatarGUNA <gbalasun@gmail.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent abc025d1
......@@ -777,9 +777,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
* @tsk: receiving socket
* @skb: pointer to message buffer.
*/
static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct sock *sk = &tsk->sk;
u32 onode = tsk_own_node(tsk);
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
int conn_cong;
......@@ -792,7 +794,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
if (mtyp == CONN_PROBE) {
msg_set_type(hdr, CONN_PROBE_REPLY);
tipc_sk_respond(sk, skb, TIPC_OK);
if (tipc_msg_reverse(onode, &skb, TIPC_OK))
__skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
......@@ -1647,7 +1650,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Returns true if message was added to socket receive queue, otherwise false
*/
static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
......@@ -1657,7 +1661,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
int usr = msg_user(hdr);
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
tipc_sk_proto_rcv(tsk, skb);
tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
}
......@@ -1700,7 +1704,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
return true;
reject:
tipc_sk_respond(sk, skb, err);
if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
__skb_queue_tail(xmitq, skb);
return false;
}
......@@ -1716,9 +1721,24 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
unsigned int truesize = skb->truesize;
struct sk_buff_head xmitq;
u32 dnode, selector;
if (likely(filter_rcv(sk, skb)))
__skb_queue_head_init(&xmitq);
if (likely(filter_rcv(sk, skb, &xmitq))) {
atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
return 0;
}
if (skb_queue_empty(&xmitq))
return 0;
/* Send response/rejected message */
skb = __skb_dequeue(&xmitq);
dnode = msg_destnode(buf_msg(skb));
selector = msg_origport(buf_msg(skb));
tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
return 0;
}
......@@ -1732,12 +1752,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
* Caller must hold socket lock
*/
static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
u32 dport)
u32 dport, struct sk_buff_head *xmitq)
{
unsigned long time_limit = jiffies + 2;
struct sk_buff *skb;
unsigned int lim;
atomic_t *dcnt;
struct sk_buff *skb;
unsigned long time_limit = jiffies + 2;
u32 onode;
while (skb_queue_len(inputq)) {
if (unlikely(time_after_eq(jiffies, time_limit)))
......@@ -1749,7 +1770,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Add message directly to receive queue if possible */
if (!sock_owned_by_user(sk)) {
filter_rcv(sk, skb);
filter_rcv(sk, skb, xmitq);
continue;
}
......@@ -1762,7 +1783,9 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
continue;
/* Overload => reject message back to sender */
tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
onode = tipc_own_addr(sock_net(sk));
if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
__skb_queue_tail(xmitq, skb);
break;
}
}
......@@ -1775,12 +1798,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
*/
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
struct sk_buff_head xmitq;
u32 dnode, dport = 0;
int err;
struct tipc_sock *tsk;
struct sock *sk;
struct sk_buff *skb;
__skb_queue_head_init(&xmitq);
while (skb_queue_len(inputq)) {
dport = tipc_skb_peek_port(inputq, dport);
tsk = tipc_sk_lookup(net, dport);
......@@ -1788,9 +1813,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
if (likely(tsk)) {
sk = &tsk->sk;
if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
tipc_sk_enqueue(inputq, sk, dport);
tipc_sk_enqueue(inputq, sk, dport, &xmitq);
spin_unlock_bh(&sk->sk_lock.slock);
}
/* Send pending response/rejected messages, if any */
while ((skb = __skb_dequeue(&xmitq))) {
dnode = msg_destnode(buf_msg(skb));
tipc_node_xmit_skb(net, skb, dnode, dport);
}
sock_put(sk);
continue;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment