Commit c8c5b287 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc_net-next' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux

Paul Gortmaker says:

====================
Two relatively small cleanup patches here, plus a reimplementation
of the patch Neil had questions about[1] in the last development
cycle.

Tested on today's net-next, between 32 and 64 bit x86 machines using
the server/client in tipc-utils, as usual.

[1] http://patchwork.ozlabs.org/patch/204507/
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents efd9450e 97f8b87e
...@@ -43,7 +43,8 @@ ...@@ -43,7 +43,8 @@
#define SS_LISTENING -1 /* socket is listening */ #define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */ #define SS_READY -2 /* socket is connectionless */
#define OVERLOAD_LIMIT_BASE 10000 #define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
struct tipc_sock { struct tipc_sock {
...@@ -128,19 +129,6 @@ static void advance_rx_queue(struct sock *sk) ...@@ -128,19 +129,6 @@ static void advance_rx_queue(struct sock *sk)
kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
} }
/**
* discard_rx_queue - discard all buffers in socket receive queue
*
* Caller must hold socket lock
*/
static void discard_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
kfree_skb(buf);
}
/** /**
* reject_rx_queue - reject all buffers in socket receive queue * reject_rx_queue - reject all buffers in socket receive queue
* *
...@@ -215,7 +203,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, ...@@ -215,7 +203,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
sock_init_data(sock, sk); sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv; sk->sk_backlog_rcv = backlog_rcv;
sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
sk->sk_data_ready = tipc_data_ready; sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->p = tp_ptr;
...@@ -292,7 +279,7 @@ static int release(struct socket *sock) ...@@ -292,7 +279,7 @@ static int release(struct socket *sock)
res = tipc_deleteport(tport->ref); res = tipc_deleteport(tport->ref);
/* Discard any remaining (connection-based) messages in receive queue */ /* Discard any remaining (connection-based) messages in receive queue */
discard_rx_queue(sk); __skb_queue_purge(&sk->sk_receive_queue);
/* Reject any messages that accumulated in backlog queue */ /* Reject any messages that accumulated in backlog queue */
sock->state = SS_DISCONNECTING; sock->state = SS_DISCONNECTING;
...@@ -516,8 +503,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock, ...@@ -516,8 +503,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
if (unlikely((m->msg_namelen < sizeof(*dest)) || if (unlikely((m->msg_namelen < sizeof(*dest)) ||
(dest->family != AF_TIPC))) (dest->family != AF_TIPC)))
return -EINVAL; return -EINVAL;
if ((total_len > TIPC_MAX_USER_MSG_SIZE) || if (total_len > TIPC_MAX_USER_MSG_SIZE)
(m->msg_iovlen > (unsigned int)INT_MAX))
return -EMSGSIZE; return -EMSGSIZE;
if (iocb) if (iocb)
...@@ -625,8 +611,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock, ...@@ -625,8 +611,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
if (unlikely(dest)) if (unlikely(dest))
return send_msg(iocb, sock, m, total_len); return send_msg(iocb, sock, m, total_len);
if ((total_len > TIPC_MAX_USER_MSG_SIZE) || if (total_len > TIPC_MAX_USER_MSG_SIZE)
(m->msg_iovlen > (unsigned int)INT_MAX))
return -EMSGSIZE; return -EMSGSIZE;
if (iocb) if (iocb)
...@@ -711,8 +696,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -711,8 +696,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
goto exit; goto exit;
} }
if ((total_len > (unsigned int)INT_MAX) || if (total_len > (unsigned int)INT_MAX) {
(m->msg_iovlen > (unsigned int)INT_MAX)) {
res = -EMSGSIZE; res = -EMSGSIZE;
goto exit; goto exit;
} }
...@@ -1154,34 +1138,6 @@ static void tipc_data_ready(struct sock *sk, int len) ...@@ -1154,34 +1138,6 @@ static void tipc_data_ready(struct sock *sk, int len)
rcu_read_unlock(); rcu_read_unlock();
} }
/**
* rx_queue_full - determine if receive queue can accept another message
* @msg: message to be added to queue
* @queue_size: current size of queue
* @base: nominal maximum size of queue
*
* Returns 1 if queue is unable to accept message, 0 otherwise
*/
static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
{
u32 threshold;
u32 imp = msg_importance(msg);
if (imp == TIPC_LOW_IMPORTANCE)
threshold = base;
else if (imp == TIPC_MEDIUM_IMPORTANCE)
threshold = base * 2;
else if (imp == TIPC_HIGH_IMPORTANCE)
threshold = base * 100;
else
return 0;
if (msg_connected(msg))
threshold *= 4;
return queue_size >= threshold;
}
/** /**
* filter_connect - Handle all incoming messages for a connection-based socket * filter_connect - Handle all incoming messages for a connection-based socket
* @tsock: TIPC socket * @tsock: TIPC socket
...@@ -1259,6 +1215,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) ...@@ -1259,6 +1215,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
return retval; return retval;
} }
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
* @buf: message
*
* For all connection oriented messages, irrespective of importance,
* the default overload value (i.e. 67MB) is set as limit.
*
* For all connectionless messages, by default new queue limits are
* as belows:
*
* TIPC_LOW_IMPORTANCE (5MB)
* TIPC_MEDIUM_IMPORTANCE (10MB)
* TIPC_HIGH_IMPORTANCE (20MB)
* TIPC_CRITICAL_IMPORTANCE (40MB)
*
* Returns overload limit according to corresponding message importance
*/
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
unsigned int limit;
if (msg_connected(msg))
limit = CONN_OVERLOAD_LIMIT;
else
limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
return limit;
}
/** /**
* filter_rcv - validate incoming message * filter_rcv - validate incoming message
* @sk: socket * @sk: socket
...@@ -1275,7 +1261,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1275,7 +1261,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
{ {
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
u32 recv_q_len; unsigned int limit = rcvbuf_limit(sk, buf);
u32 res = TIPC_OK; u32 res = TIPC_OK;
/* Reject message if it is wrong sort of message for socket */ /* Reject message if it is wrong sort of message for socket */
...@@ -1292,15 +1278,13 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1292,15 +1278,13 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
} }
/* Reject message if there isn't room to queue it */ /* Reject message if there isn't room to queue it */
recv_q_len = skb_queue_len(&sk->sk_receive_queue); if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
return TIPC_ERR_OVERLOAD; return TIPC_ERR_OVERLOAD;
}
/* Enqueue message (finally!) */ /* Enqueue message */
TIPC_SKB_CB(buf)->handle = 0; TIPC_SKB_CB(buf)->handle = 0;
__skb_queue_tail(&sk->sk_receive_queue, buf); __skb_queue_tail(&sk->sk_receive_queue, buf);
skb_set_owner_r(buf, sk);
sk->sk_data_ready(sk, 0); sk->sk_data_ready(sk, 0);
return TIPC_OK; return TIPC_OK;
...@@ -1349,7 +1333,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) ...@@ -1349,7 +1333,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
if (!sock_owned_by_user(sk)) { if (!sock_owned_by_user(sk)) {
res = filter_rcv(sk, buf); res = filter_rcv(sk, buf);
} else { } else {
if (sk_add_backlog(sk, buf, sk->sk_rcvbuf)) if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
res = TIPC_ERR_OVERLOAD; res = TIPC_ERR_OVERLOAD;
else else
res = TIPC_OK; res = TIPC_OK;
...@@ -1583,6 +1567,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) ...@@ -1583,6 +1567,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
} else { } else {
__skb_dequeue(&sk->sk_receive_queue); __skb_dequeue(&sk->sk_receive_queue);
__skb_queue_head(&new_sk->sk_receive_queue, buf); __skb_queue_head(&new_sk->sk_receive_queue, buf);
skb_set_owner_r(buf, new_sk);
} }
release_sock(new_sk); release_sock(new_sk);
...@@ -1637,7 +1622,7 @@ static int shutdown(struct socket *sock, int how) ...@@ -1637,7 +1622,7 @@ static int shutdown(struct socket *sock, int how)
case SS_DISCONNECTING: case SS_DISCONNECTING:
/* Discard any unreceived messages */ /* Discard any unreceived messages */
discard_rx_queue(sk); __skb_queue_purge(&sk->sk_receive_queue);
/* Wake up anyone sleeping in poll */ /* Wake up anyone sleeping in poll */
sk->sk_state_change(sk); sk->sk_state_change(sk);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment