Commit 1364db42 authored by David S. Miller's avatar David S. Miller

Merge branch 'rds-multipath-datastructures'

Sowmini Varadhan says:

====================
RDS:TCP data structure changes for multipath support

The second installment of changes to enable multipath support in
RDS-TCP. This series implements the changes in rds-tcp so that the
rds_conn_path has a pointer to the rds_tcp_connection in cp_transport_data.
Struct rds_tcp_connection keeps track of the inet_sk per path in
t_sock. The ->sk_user_data in turn is a pointer to the rds_conn_path.
With this set of changes, rds_tcp has the needed plumbing to handle
multiple paths(socket) per rds_connection.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents dc9a2002 11bb62f7
......@@ -253,9 +253,12 @@ static struct rds_connection *__rds_conn_create(struct net *net,
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
trans->conn_free(cp->cp_transport_data);
if (!trans->t_mp_capable)
break;
/* The ->conn_alloc invocation may have
* allocated resource for all paths, so all
* of them may have to be freed here.
*/
if (cp->cp_transport_data)
trans->conn_free(cp->cp_transport_data);
}
kmem_cache_free(rds_conn_slab, conn);
conn = found;
......@@ -326,10 +329,7 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
wait_event(cp->cp_waitq,
!test_bit(RDS_RECV_REFILL, &cp->cp_flags));
if (!conn->c_trans->t_mp_capable)
conn->c_trans->conn_shutdown(conn);
else
conn->c_trans->conn_path_shutdown(cp);
conn->c_trans->conn_path_shutdown(cp);
rds_conn_path_reset(cp);
if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
......@@ -355,9 +355,7 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
if (conn->c_trans->t_type != RDS_TRANS_TCP ||
cp->cp_outgoing == 1)
rds_queue_reconnect(cp);
rds_queue_reconnect(cp);
} else {
rcu_read_unlock();
}
......@@ -370,6 +368,9 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
{
struct rds_message *rm, *rtmp;
if (!cp->cp_transport_data)
return;
rds_conn_path_drop(cp);
flush_work(&cp->cp_down_w);
......@@ -401,6 +402,8 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
void rds_conn_destroy(struct rds_connection *conn)
{
unsigned long flags;
int i;
struct rds_conn_path *cp;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
......@@ -413,18 +416,10 @@ void rds_conn_destroy(struct rds_connection *conn)
synchronize_rcu();
/* shut the connection down */
if (!conn->c_trans->t_mp_capable) {
rds_conn_path_destroy(&conn->c_path[0]);
BUG_ON(!list_empty(&conn->c_path[0].cp_retrans));
} else {
int i;
struct rds_conn_path *cp;
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
rds_conn_path_destroy(cp);
BUG_ON(!list_empty(&cp->cp_retrans));
}
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
rds_conn_path_destroy(cp);
BUG_ON(!list_empty(&cp->cp_retrans));
}
/*
......
......@@ -381,15 +381,15 @@ void rds_ib_exit(void)
struct rds_transport rds_ib_transport = {
.laddr_check = rds_ib_laddr_check,
.xmit_complete = rds_ib_xmit_complete,
.xmit_path_complete = rds_ib_xmit_path_complete,
.xmit = rds_ib_xmit,
.xmit_rdma = rds_ib_xmit_rdma,
.xmit_atomic = rds_ib_xmit_atomic,
.recv = rds_ib_recv,
.recv_path = rds_ib_recv_path,
.conn_alloc = rds_ib_conn_alloc,
.conn_free = rds_ib_conn_free,
.conn_connect = rds_ib_conn_connect,
.conn_shutdown = rds_ib_conn_shutdown,
.conn_path_connect = rds_ib_conn_path_connect,
.conn_path_shutdown = rds_ib_conn_path_shutdown,
.inc_copy_to_user = rds_ib_inc_copy_to_user,
.inc_free = rds_ib_inc_free,
.cm_initiate_connect = rds_ib_cm_initiate_connect,
......
......@@ -328,8 +328,8 @@ extern struct list_head ib_nodev_conns;
/* ib_cm.c */
int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
void rds_ib_conn_free(void *arg);
int rds_ib_conn_connect(struct rds_connection *conn);
void rds_ib_conn_shutdown(struct rds_connection *conn);
int rds_ib_conn_path_connect(struct rds_conn_path *cp);
void rds_ib_conn_path_shutdown(struct rds_conn_path *cp);
void rds_ib_state_change(struct sock *sk);
int rds_ib_listen_init(void);
void rds_ib_listen_stop(void);
......@@ -354,7 +354,7 @@ void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
/* ib_recv.c */
int rds_ib_recv_init(void);
void rds_ib_recv_exit(void);
int rds_ib_recv(struct rds_connection *conn);
int rds_ib_recv_path(struct rds_conn_path *conn);
int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
......@@ -384,7 +384,7 @@ u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest);
extern wait_queue_head_t rds_ib_ring_empty_wait;
/* ib_send.c */
void rds_ib_xmit_complete(struct rds_connection *conn);
void rds_ib_xmit_path_complete(struct rds_conn_path *cp);
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
......
......@@ -685,8 +685,9 @@ int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
return ret;
}
int rds_ib_conn_connect(struct rds_connection *conn)
int rds_ib_conn_path_connect(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
struct rds_ib_connection *ic = conn->c_transport_data;
struct sockaddr_in src, dest;
int ret;
......@@ -731,8 +732,9 @@ int rds_ib_conn_connect(struct rds_connection *conn)
* so that it can be called at any point during startup. In fact it
* can be called multiple times for a given connection.
*/
void rds_ib_conn_shutdown(struct rds_connection *conn)
void rds_ib_conn_path_shutdown(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
struct rds_ib_connection *ic = conn->c_transport_data;
int err = 0;
......
......@@ -1009,8 +1009,9 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
}
int rds_ib_recv(struct rds_connection *conn)
int rds_ib_recv_path(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
struct rds_ib_connection *ic = conn->c_transport_data;
int ret = 0;
......
......@@ -980,8 +980,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
return ret;
}
void rds_ib_xmit_complete(struct rds_connection *conn)
void rds_ib_xmit_path_complete(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
struct rds_ib_connection *ic = conn->c_transport_data;
/* We may have a pending ACK or window update we were unable
......
......@@ -102,7 +102,7 @@ static void rds_loop_inc_free(struct rds_incoming *inc)
}
/* we need to at least give the thread something to succeed */
static int rds_loop_recv(struct rds_connection *conn)
static int rds_loop_recv_path(struct rds_conn_path *cp)
{
return 0;
}
......@@ -150,13 +150,13 @@ static void rds_loop_conn_free(void *arg)
kfree(lc);
}
static int rds_loop_conn_connect(struct rds_connection *conn)
static int rds_loop_conn_path_connect(struct rds_conn_path *cp)
{
rds_connect_complete(conn);
rds_connect_complete(cp->cp_conn);
return 0;
}
static void rds_loop_conn_shutdown(struct rds_connection *conn)
static void rds_loop_conn_path_shutdown(struct rds_conn_path *cp)
{
}
......@@ -185,11 +185,11 @@ void rds_loop_exit(void)
*/
struct rds_transport rds_loop_transport = {
.xmit = rds_loop_xmit,
.recv = rds_loop_recv,
.recv_path = rds_loop_recv_path,
.conn_alloc = rds_loop_conn_alloc,
.conn_free = rds_loop_conn_free,
.conn_connect = rds_loop_conn_connect,
.conn_shutdown = rds_loop_conn_shutdown,
.conn_path_connect = rds_loop_conn_path_connect,
.conn_path_shutdown = rds_loop_conn_path_shutdown,
.inc_copy_to_user = rds_message_inc_copy_to_user,
.inc_free = rds_loop_inc_free,
.t_name = "loopback",
......
......@@ -454,18 +454,15 @@ struct rds_transport {
int (*laddr_check)(struct net *net, __be32 addr);
int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
void (*conn_free)(void *data);
int (*conn_connect)(struct rds_connection *conn);
void (*conn_shutdown)(struct rds_connection *conn);
int (*conn_path_connect)(struct rds_conn_path *cp);
void (*conn_path_shutdown)(struct rds_conn_path *conn);
void (*xmit_prepare)(struct rds_connection *conn);
void (*xmit_path_prepare)(struct rds_conn_path *cp);
void (*xmit_complete)(struct rds_connection *conn);
void (*xmit_path_complete)(struct rds_conn_path *cp);
int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
int (*recv)(struct rds_connection *conn);
int (*recv_path)(struct rds_conn_path *cp);
int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to);
void (*inc_free)(struct rds_incoming *inc);
......
......@@ -226,6 +226,10 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
if (inc->i_hdr.h_sport == 0) {
rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr);
goto out;
}
rds_stats_inc(s_recv_ping);
rds_send_pong(cp, inc->i_hdr.h_sport);
goto out;
......
......@@ -183,12 +183,8 @@ int rds_send_xmit(struct rds_conn_path *cp)
goto out;
}
if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_prepare)
conn->c_trans->xmit_path_prepare(cp);
} else if (conn->c_trans->xmit_prepare) {
conn->c_trans->xmit_prepare(conn);
}
if (conn->c_trans->xmit_path_prepare)
conn->c_trans->xmit_path_prepare(cp);
/*
* spin trying to push headers and data down the connection until
......@@ -403,12 +399,8 @@ int rds_send_xmit(struct rds_conn_path *cp)
}
over_batch:
if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_complete)
conn->c_trans->xmit_path_complete(cp);
} else if (conn->c_trans->xmit_complete) {
conn->c_trans->xmit_complete(conn);
}
if (conn->c_trans->xmit_path_complete)
conn->c_trans->xmit_path_complete(cp);
release_in_xmit(cp);
/* Nuke any messages we decided not to retransmit. */
......
......@@ -136,9 +136,9 @@ void rds_tcp_restore_callbacks(struct socket *sock,
* from being called while it isn't set.
*/
void rds_tcp_reset_callbacks(struct socket *sock,
struct rds_connection *conn)
struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct socket *osock = tc->t_sock;
if (!osock)
......@@ -148,8 +148,8 @@ void rds_tcp_reset_callbacks(struct socket *sock,
* We have an outstanding SYN to this peer, which may
* potentially have transitioned to the RDS_CONN_UP state,
* so we must quiesce any send threads before resetting
* c_transport_data. We quiesce these threads by setting
* c_state to something other than RDS_CONN_UP, and then
* cp_transport_data. We quiesce these threads by setting
* cp_state to something other than RDS_CONN_UP, and then
* waiting for any existing threads in rds_send_xmit to
* complete release_in_xmit(). (Subsequent threads entering
* rds_send_xmit() will bail on !rds_conn_up().
......@@ -164,8 +164,8 @@ void rds_tcp_reset_callbacks(struct socket *sock,
* RDS_CONN_RESETTTING, to ensure that rds_tcp_state_change
* cannot mark rds_conn_path_up() in the window before lock_sock()
*/
atomic_set(&conn->c_state, RDS_CONN_RESETTING);
wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags));
atomic_set(&cp->cp_state, RDS_CONN_RESETTING);
wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
lock_sock(osock->sk);
/* reset receive side state for rds_tcp_data_recv() for osock */
if (tc->t_tinc) {
......@@ -186,11 +186,12 @@ void rds_tcp_reset_callbacks(struct socket *sock,
release_sock(osock->sk);
sock_release(osock);
newsock:
rds_send_path_reset(&conn->c_path[0]);
rds_send_path_reset(cp);
lock_sock(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
tc->t_sock = sock;
sock->sk->sk_user_data = conn;
tc->t_cpath = cp;
sock->sk->sk_user_data = cp;
sock->sk->sk_data_ready = rds_tcp_data_ready;
sock->sk->sk_write_space = rds_tcp_write_space;
sock->sk->sk_state_change = rds_tcp_state_change;
......@@ -203,9 +204,9 @@ void rds_tcp_reset_callbacks(struct socket *sock,
* above rds_tcp_reset_callbacks for notes about synchronization
* with data path
*/
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn)
void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc);
write_lock_bh(&sock->sk->sk_callback_lock);
......@@ -221,12 +222,12 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn)
sock->sk->sk_data_ready = sock->sk->sk_user_data;
tc->t_sock = sock;
tc->conn = conn;
tc->t_cpath = cp;
tc->t_orig_data_ready = sock->sk->sk_data_ready;
tc->t_orig_write_space = sock->sk->sk_write_space;
tc->t_orig_state_change = sock->sk->sk_state_change;
sock->sk->sk_user_data = conn;
sock->sk->sk_user_data = cp;
sock->sk->sk_data_ready = rds_tcp_data_ready;
sock->sk->sk_write_space = rds_tcp_write_space;
sock->sk->sk_state_change = rds_tcp_state_change;
......@@ -284,24 +285,29 @@ static int rds_tcp_laddr_check(struct net *net, __be32 addr)
static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
{
struct rds_tcp_connection *tc;
int i;
tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
if (!tc)
return -ENOMEM;
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
if (!tc)
return -ENOMEM;
mutex_init(&tc->t_conn_lock);
tc->t_sock = NULL;
tc->t_tinc = NULL;
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
tc->t_tinc_data_rem = 0;
mutex_init(&tc->t_conn_path_lock);
tc->t_sock = NULL;
tc->t_tinc = NULL;
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
tc->t_tinc_data_rem = 0;
conn->c_transport_data = tc;
conn->c_path[i].cp_transport_data = tc;
tc->t_cpath = &conn->c_path[i];
spin_lock_irq(&rds_tcp_conn_lock);
list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list);
spin_unlock_irq(&rds_tcp_conn_lock);
spin_lock_irq(&rds_tcp_conn_lock);
list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list);
spin_unlock_irq(&rds_tcp_conn_lock);
rdsdebug("rds_conn_path [%d] tc %p\n", i,
conn->c_path[i].cp_transport_data);
}
rdsdebug("alloced tc %p\n", conn->c_transport_data);
return 0;
}
......@@ -318,6 +324,17 @@ static void rds_tcp_conn_free(void *arg)
kmem_cache_free(rds_tcp_conn_slab, tc);
}
static bool list_has_conn(struct list_head *list, struct rds_connection *conn)
{
struct rds_tcp_connection *tc, *_tc;
list_for_each_entry_safe(tc, _tc, list, t_tcp_node) {
if (tc->t_cpath->cp_conn == conn)
return true;
}
return false;
}
static void rds_tcp_destroy_conns(void)
{
struct rds_tcp_connection *tc, *_tc;
......@@ -325,29 +342,28 @@ static void rds_tcp_destroy_conns(void)
/* avoid calling conn_destroy with irqs off */
spin_lock_irq(&rds_tcp_conn_lock);
list_splice(&rds_tcp_conn_list, &tmp_list);
INIT_LIST_HEAD(&rds_tcp_conn_list);
list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn))
list_move_tail(&tc->t_tcp_node, &tmp_list);
}
spin_unlock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) {
if (tc->conn->c_passive)
rds_conn_destroy(tc->conn->c_passive);
rds_conn_destroy(tc->conn);
}
list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node)
rds_conn_destroy(tc->t_cpath->cp_conn);
}
static void rds_tcp_exit(void);
struct rds_transport rds_tcp_transport = {
.laddr_check = rds_tcp_laddr_check,
.xmit_prepare = rds_tcp_xmit_prepare,
.xmit_complete = rds_tcp_xmit_complete,
.xmit_path_prepare = rds_tcp_xmit_path_prepare,
.xmit_path_complete = rds_tcp_xmit_path_complete,
.xmit = rds_tcp_xmit,
.recv = rds_tcp_recv,
.recv_path = rds_tcp_recv_path,
.conn_alloc = rds_tcp_conn_alloc,
.conn_free = rds_tcp_conn_free,
.conn_connect = rds_tcp_conn_connect,
.conn_shutdown = rds_tcp_conn_shutdown,
.conn_path_connect = rds_tcp_conn_path_connect,
.conn_path_shutdown = rds_tcp_conn_path_shutdown,
.inc_copy_to_user = rds_tcp_inc_copy_to_user,
.inc_free = rds_tcp_inc_free,
.stats_info_copy = rds_tcp_stats_info_copy,
......@@ -489,10 +505,30 @@ static struct pernet_operations rds_tcp_net_ops = {
.size = sizeof(struct rds_tcp_net),
};
/* explicitly send a RST on each socket, thereby releasing any socket refcnts
* that may otherwise hold up netns deletion.
*/
static void rds_tcp_conn_paths_destroy(struct rds_connection *conn)
{
struct rds_conn_path *cp;
struct rds_tcp_connection *tc;
int i;
struct sock *sk;
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
tc = cp->cp_transport_data;
if (!tc->t_sock)
continue;
sk = tc->t_sock->sk;
sk->sk_prot->disconnect(sk, 0);
tcp_done(sk);
}
}
static void rds_tcp_kill_sock(struct net *net)
{
struct rds_tcp_connection *tc, *_tc;
struct sock *sk;
LIST_HEAD(tmp_list);
struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
......@@ -501,20 +537,17 @@ static void rds_tcp_kill_sock(struct net *net)
flush_work(&rtn->rds_tcp_accept_w);
spin_lock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
struct net *c_net = read_pnet(&tc->conn->c_net);
struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
if (net != c_net || !tc->t_sock)
continue;
list_move_tail(&tc->t_tcp_node, &tmp_list);
if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn))
list_move_tail(&tc->t_tcp_node, &tmp_list);
}
spin_unlock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) {
sk = tc->t_sock->sk;
sk->sk_prot->disconnect(sk, 0);
tcp_done(sk);
if (tc->conn->c_passive)
rds_conn_destroy(tc->conn->c_passive);
rds_conn_destroy(tc->conn);
rds_tcp_conn_paths_destroy(tc->t_cpath->cp_conn);
rds_conn_destroy(tc->t_cpath->cp_conn);
}
}
......@@ -552,12 +585,13 @@ static void rds_tcp_sysctl_reset(struct net *net)
spin_lock_irq(&rds_tcp_conn_lock);
list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
struct net *c_net = read_pnet(&tc->conn->c_net);
struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
if (net != c_net || !tc->t_sock)
continue;
rds_conn_drop(tc->conn); /* reconnect with new parameters */
/* reconnect with new parameters */
rds_conn_path_drop(tc->t_cpath);
}
spin_unlock_irq(&rds_tcp_conn_lock);
}
......
......@@ -11,11 +11,11 @@ struct rds_tcp_incoming {
struct rds_tcp_connection {
struct list_head t_tcp_node;
struct rds_connection *conn;
/* t_conn_lock synchronizes the connection establishment between
* rds_tcp_accept_one and rds_tcp_conn_connect
struct rds_conn_path *t_cpath;
/* t_conn_path_lock synchronizes the connection establishment between
* rds_tcp_accept_one and rds_tcp_conn_path_connect
*/
struct mutex t_conn_lock;
struct mutex t_conn_path_lock;
struct socket *t_sock;
void *t_orig_write_space;
void *t_orig_data_ready;
......@@ -49,8 +49,8 @@ struct rds_tcp_statistics {
/* tcp.c */
void rds_tcp_tune(struct socket *sock);
void rds_tcp_nonagle(struct socket *sock);
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn);
void rds_tcp_reset_callbacks(struct socket *sock, struct rds_connection *conn);
void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp);
void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp);
void rds_tcp_restore_callbacks(struct socket *sock,
struct rds_tcp_connection *tc);
u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc);
......@@ -60,8 +60,8 @@ extern struct rds_transport rds_tcp_transport;
void rds_tcp_accept_work(struct sock *sk);
/* tcp_connect.c */
int rds_tcp_conn_connect(struct rds_connection *conn);
void rds_tcp_conn_shutdown(struct rds_connection *conn);
int rds_tcp_conn_path_connect(struct rds_conn_path *cp);
void rds_tcp_conn_path_shutdown(struct rds_conn_path *conn);
void rds_tcp_state_change(struct sock *sk);
/* tcp_listen.c */
......@@ -75,13 +75,13 @@ int rds_tcp_keepalive(struct socket *sock);
int rds_tcp_recv_init(void);
void rds_tcp_recv_exit(void);
void rds_tcp_data_ready(struct sock *sk);
int rds_tcp_recv(struct rds_connection *conn);
int rds_tcp_recv_path(struct rds_conn_path *cp);
void rds_tcp_inc_free(struct rds_incoming *inc);
int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
/* tcp_send.c */
void rds_tcp_xmit_prepare(struct rds_connection *conn);
void rds_tcp_xmit_complete(struct rds_connection *conn);
void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp);
void rds_tcp_xmit_path_complete(struct rds_conn_path *cp);
int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
void rds_tcp_write_space(struct sock *sk);
......
......@@ -41,16 +41,16 @@
void rds_tcp_state_change(struct sock *sk)
{
void (*state_change)(struct sock *sk);
struct rds_connection *conn;
struct rds_conn_path *cp;
struct rds_tcp_connection *tc;
read_lock_bh(&sk->sk_callback_lock);
conn = sk->sk_user_data;
if (!conn) {
cp = sk->sk_user_data;
if (!cp) {
state_change = sk->sk_state_change;
goto out;
}
tc = conn->c_transport_data;
tc = cp->cp_transport_data;
state_change = tc->t_orig_state_change;
rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state);
......@@ -61,12 +61,11 @@ void rds_tcp_state_change(struct sock *sk)
case TCP_SYN_RECV:
break;
case TCP_ESTABLISHED:
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_CONNECTING);
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
break;
case TCP_CLOSE_WAIT:
case TCP_CLOSE:
rds_conn_drop(conn);
rds_conn_path_drop(cp);
default:
break;
}
......@@ -75,17 +74,18 @@ void rds_tcp_state_change(struct sock *sk)
state_change(sk);
}
int rds_tcp_conn_connect(struct rds_connection *conn)
int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
{
struct socket *sock = NULL;
struct sockaddr_in src, dest;
int ret;
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_connection *conn = cp->cp_conn;
struct rds_tcp_connection *tc = cp->cp_transport_data;
mutex_lock(&tc->t_conn_lock);
mutex_lock(&tc->t_conn_path_lock);
if (rds_conn_up(conn)) {
mutex_unlock(&tc->t_conn_lock);
if (rds_conn_path_up(cp)) {
mutex_unlock(&tc->t_conn_path_lock);
return 0;
}
ret = sock_create_kern(rds_conn_net(conn), PF_INET,
......@@ -114,10 +114,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn)
* once we call connect() we can start getting callbacks and they
* own the socket
*/
rds_tcp_set_callbacks(sock, conn);
rds_tcp_set_callbacks(sock, cp);
ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest),
O_NONBLOCK);
cp->cp_outgoing = 1;
rdsdebug("connect to address %pI4 returned %d\n", &conn->c_faddr, ret);
if (ret == -EINPROGRESS)
ret = 0;
......@@ -125,11 +126,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn)
rds_tcp_keepalive(sock);
sock = NULL;
} else {
rds_tcp_restore_callbacks(sock, conn->c_transport_data);
rds_tcp_restore_callbacks(sock, cp->cp_transport_data);
}
out:
mutex_unlock(&tc->t_conn_lock);
mutex_unlock(&tc->t_conn_path_lock);
if (sock)
sock_release(sock);
return ret;
......@@ -144,12 +145,13 @@ int rds_tcp_conn_connect(struct rds_connection *conn)
* callbacks to those set by TCP. Our callbacks won't execute again once we
* hold the sock lock.
*/
void rds_tcp_conn_shutdown(struct rds_connection *conn)
void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct socket *sock = tc->t_sock;
rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock);
rdsdebug("shutting down conn %p tc %p sock %p\n",
cp->cp_conn, tc, sock);
if (sock) {
sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
......
......@@ -79,6 +79,7 @@ int rds_tcp_accept_one(struct socket *sock)
struct inet_sock *inet;
struct rds_tcp_connection *rs_tcp = NULL;
int conn_state;
struct rds_conn_path *cp;
if (!sock) /* module unload or netns delete in progress */
return -ENETUNREACH;
......@@ -120,8 +121,9 @@ int rds_tcp_accept_one(struct socket *sock)
* rds_tcp_state_change() will do that cleanup
*/
rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data;
cp = &conn->c_path[0];
rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
mutex_lock(&rs_tcp->t_conn_lock);
mutex_lock(&rs_tcp->t_conn_path_lock);
conn_state = rds_conn_state(conn);
if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP)
goto rst_nsk;
......@@ -136,16 +138,14 @@ int rds_tcp_accept_one(struct socket *sock)
!conn->c_path[0].cp_outgoing) {
goto rst_nsk;
} else {
rds_tcp_reset_callbacks(new_sock, conn);
rds_tcp_reset_callbacks(new_sock, cp);
conn->c_path[0].cp_outgoing = 0;
/* rds_connect_path_complete() marks RDS_CONN_UP */
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_RESETTING);
rds_connect_path_complete(cp, RDS_CONN_RESETTING);
}
} else {
rds_tcp_set_callbacks(new_sock, conn);
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_CONNECTING);
rds_tcp_set_callbacks(new_sock, cp);
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
}
new_sock = NULL;
ret = 0;
......@@ -156,7 +156,7 @@ int rds_tcp_accept_one(struct socket *sock)
ret = 0;
out:
if (rs_tcp)
mutex_unlock(&rs_tcp->t_conn_lock);
mutex_unlock(&rs_tcp->t_conn_path_lock);
if (new_sock)
sock_release(new_sock);
return ret;
......
......@@ -34,7 +34,6 @@
#include <linux/slab.h>
#include <net/tcp.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......@@ -148,7 +147,7 @@ static void rds_tcp_cong_recv(struct rds_connection *conn,
}
struct rds_tcp_desc_arg {
struct rds_connection *conn;
struct rds_conn_path *conn_path;
gfp_t gfp;
};
......@@ -156,8 +155,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{
struct rds_tcp_desc_arg *arg = desc->arg.data;
struct rds_connection *conn = arg->conn;
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_conn_path *cp = arg->conn_path;
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct rds_tcp_incoming *tinc = tc->t_tinc;
struct sk_buff *clone;
size_t left = len, to_copy;
......@@ -179,7 +178,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
}
tc->t_tinc = tinc;
rdsdebug("alloced tinc %p\n", tinc);
rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr);
rds_inc_path_init(&tinc->ti_inc, cp,
cp->cp_conn->c_faddr);
/*
* XXX * we might be able to use the __ variants when
* we've already serialized at a higher level.
......@@ -229,6 +229,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
}
if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
struct rds_connection *conn = cp->cp_conn;
if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
rds_tcp_cong_recv(conn, tinc);
else
......@@ -251,15 +253,15 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
}
/* the caller has to hold the sock lock */
static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp)
static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct socket *sock = tc->t_sock;
read_descriptor_t desc;
struct rds_tcp_desc_arg arg;
/* It's like glib in the kernel! */
arg.conn = conn;
arg.conn_path = cp;
arg.gfp = gfp;
desc.arg.data = &arg;
desc.error = 0;
......@@ -279,16 +281,17 @@ static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp)
* if we fail to allocate we're in trouble.. blindly wait some time before
* trying again to see if the VM can free up something for us.
*/
int rds_tcp_recv(struct rds_connection *conn)
int rds_tcp_recv_path(struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct socket *sock = tc->t_sock;
int ret = 0;
rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock);
rdsdebug("recv worker path [%d] tc %p sock %p\n",
cp->cp_index, tc, sock);
lock_sock(sock->sk);
ret = rds_tcp_read_sock(conn, GFP_KERNEL);
ret = rds_tcp_read_sock(cp, GFP_KERNEL);
release_sock(sock->sk);
return ret;
......@@ -297,24 +300,24 @@ int rds_tcp_recv(struct rds_connection *conn)
void rds_tcp_data_ready(struct sock *sk)
{
void (*ready)(struct sock *sk);
struct rds_connection *conn;
struct rds_conn_path *cp;
struct rds_tcp_connection *tc;
rdsdebug("data ready sk %p\n", sk);
read_lock_bh(&sk->sk_callback_lock);
conn = sk->sk_user_data;
if (!conn) { /* check for teardown race */
cp = sk->sk_user_data;
if (!cp) { /* check for teardown race */
ready = sk->sk_data_ready;
goto out;
}
tc = conn->c_transport_data;
tc = cp->cp_transport_data;
ready = tc->t_orig_data_ready;
rds_tcp_stats_inc(s_tcp_data_ready_calls);
if (rds_tcp_read_sock(conn, GFP_ATOMIC) == -ENOMEM)
queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM)
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
out:
read_unlock_bh(&sk->sk_callback_lock);
ready(sk);
......
......@@ -49,16 +49,16 @@ static void rds_tcp_cork(struct socket *sock, int val)
set_fs(oldfs);
}
void rds_tcp_xmit_prepare(struct rds_connection *conn)
void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
rds_tcp_cork(tc->t_sock, 1);
}
void rds_tcp_xmit_complete(struct rds_connection *conn)
void rds_tcp_xmit_path_complete(struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct rds_tcp_connection *tc = cp->cp_transport_data;
rds_tcp_cork(tc->t_sock, 0);
}
......@@ -178,27 +178,27 @@ static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
void rds_tcp_write_space(struct sock *sk)
{
void (*write_space)(struct sock *sk);
struct rds_connection *conn;
struct rds_conn_path *cp;
struct rds_tcp_connection *tc;
read_lock_bh(&sk->sk_callback_lock);
conn = sk->sk_user_data;
if (!conn) {
cp = sk->sk_user_data;
if (!cp) {
write_space = sk->sk_write_space;
goto out;
}
tc = conn->c_transport_data;
tc = cp->cp_transport_data;
rdsdebug("write_space for tc %p\n", tc);
write_space = tc->t_orig_write_space;
rds_tcp_stats_inc(s_tcp_write_space_calls);
rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc));
tc->t_last_seen_una = rds_tcp_snd_una(tc);
rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked);
rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf)
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
out:
read_unlock_bh(&sk->sk_callback_lock);
......
......@@ -125,6 +125,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
conn, &conn->c_laddr, &conn->c_faddr,
cp->cp_reconnect_jiffies);
/* let peer with smaller addr initiate reconnect, to avoid duels */
if (conn->c_trans->t_type == RDS_TRANS_TCP &&
conn->c_laddr > conn->c_faddr)
return;
set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
if (cp->cp_reconnect_jiffies == 0) {
cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
......@@ -152,8 +157,9 @@ void rds_connect_worker(struct work_struct *work)
int ret;
clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
if (rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
ret = conn->c_trans->conn_connect(conn);
ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
if (ret) {
ret = conn->c_trans->conn_path_connect(cp);
rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
conn, &conn->c_laddr, &conn->c_faddr, ret);
......@@ -203,7 +209,7 @@ void rds_recv_worker(struct work_struct *work)
int ret;
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
ret = cp->cp_conn->c_trans->recv(cp->cp_conn);
ret = cp->cp_conn->c_trans->recv_path(cp);
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment