Commit 161cd45f authored by David S. Miller's avatar David S. Miller

Merge branch 'rds-mprds-foundations'

Sowmini Varadhan says:

====================
RDS: multiple connection paths for scaling

Today RDS-over-TCP is implemented by demux-ing multiple PF_RDS sockets
between any 2 endpoints (where endpoint == [IP address, port]) over a
single TCP socket between the 2 IP addresses involved. This has the
limitation that it ends up funneling multiple RDS flows over a single
TCP flow, thus the rds/tcp connection is
   (a) upper-bounded to the single-flow bandwidth,
   (b) suffers from head-of-line blocking for the RDS sockets.

Better throughput (for a fixed small packet size, MTU) can be achieved
by having multiple TCP/IP flows per rds/tcp connection, i.e., multipathed
RDS (mprds).  Each such TCP/IP flow constitutes a path for the rds/tcp
connection. RDS sockets will be attached to a path based on some hash
(e.g., of local address and RDS port number) and packets for that RDS
socket will be sent over the attached path using TCP to segment/reassemble
RDS datagrams on that path.

The table below, generated using a prototype that implements mprds,
shows that this is significant for scaling to 40G.  Packet sizes
used were: 8K byte req, 256 byte resp. MTU: 1500.  The parameters for
RDS-concurrency used below are described in the rds-stress(1) man page-
the number listed is proportional to the number of threads at which max
throughput was attained.

  -------------------------------------------------------------------
     RDS-concurrency   Num of       tx+rx K/s (iops)       throughput
     (-t N -d N)       TCP paths
  -------------------------------------------------------------------
        16             1             600K -  700K            4 Gbps
        28             8            5000K - 6000K           32 Gbps
  -------------------------------------------------------------------

FAQ: what is the relation between mprds and mptcp?
  mprds is orthogonal to mptcp. Whereas mptcp creates
  sub-flows for a single TCP connection, mprds parallelizes tx/rx
  at the RDS layer. MPRDS with N paths will allow N datagrams to
  be sent in parallel; each path will continue to send one
  datagram at a time, with sender and receiver keeping track of
  the retransmit and dgram-assembly state based on the RDS header.
  If desired, mptcp can additionally be used to speed up each TCP
  path. That acceleration is orthogonal to the parallelization benefits
  of mprds.

This patch series lays down the foundational data-structures to support
mprds in the kernel. It implements the changes to split up the
rds_connection structure into a common (to all paths) part,
and a per-path rds_conn_path. All I/O workqs are driven from
the rds_conn_path.

Note that this patchset does not (yet) actually enable multipathing
for any of the transports; all transports will continue to use a
single path with the refactored data-structures. A subsequent patchset
will  add the changes to the rds-tcp module to actually use mprds
in rds-tcp.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents dcf1158b 3ecc5693
......@@ -235,7 +235,8 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
* therefore trigger warnings.
* Defer the xmit to rds_send_worker() instead.
*/
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
queue_delayed_work(rds_wq,
&conn->c_path[0].cp_send_w, 0);
}
}
......
......@@ -95,14 +95,16 @@ static struct rds_connection *rds_conn_lookup(struct net *net,
* and receiving over this connection again in the future. It is up to
* the transport to have serialized this call with its send and recv.
*/
static void rds_conn_reset(struct rds_connection *conn)
static void rds_conn_path_reset(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
rdsdebug("connection %pI4 to %pI4 reset\n",
&conn->c_laddr, &conn->c_faddr);
rds_stats_inc(s_conn_reset);
rds_send_reset(conn);
conn->c_flags = 0;
rds_send_path_reset(cp);
cp->cp_flags = 0;
/* Do not clear next_rx_seq here, else we cannot distinguish
* retransmitted packets from new packets, and will hand all
......@@ -110,6 +112,32 @@ static void rds_conn_reset(struct rds_connection *conn)
* reliability guarantees of RDS. */
}
static void __rds_conn_path_init(struct rds_connection *conn,
struct rds_conn_path *cp, bool is_outgoing)
{
spin_lock_init(&cp->cp_lock);
cp->cp_next_tx_seq = 1;
init_waitqueue_head(&cp->cp_waitq);
INIT_LIST_HEAD(&cp->cp_send_queue);
INIT_LIST_HEAD(&cp->cp_retrans);
cp->cp_conn = conn;
atomic_set(&cp->cp_state, RDS_CONN_DOWN);
cp->cp_send_gen = 0;
/* cp_outgoing is per-path. So we can only set it here
* for the single-path transports.
*/
if (!conn->c_trans->t_mp_capable)
cp->cp_outgoing = (is_outgoing ? 1 : 0);
cp->cp_reconnect_jiffies = 0;
INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
mutex_init(&cp->cp_cm_lock);
cp->cp_flags = 0;
}
/*
* There is only every one 'conn' for a given pair of addresses in the
* system at a time. They contain messages to be retransmitted and so
......@@ -153,13 +181,8 @@ static struct rds_connection *__rds_conn_create(struct net *net,
INIT_HLIST_NODE(&conn->c_hash_node);
conn->c_laddr = laddr;
conn->c_faddr = faddr;
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
rds_conn_net_set(conn, net);
init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
rds_conn_net_set(conn, net);
ret = rds_cong_get_maps(conn);
if (ret) {
......@@ -195,17 +218,6 @@ static struct rds_connection *__rds_conn_create(struct net *net,
goto out;
}
atomic_set(&conn->c_state, RDS_CONN_DOWN);
conn->c_send_gen = 0;
conn->c_outgoing = (is_outgoing ? 1 : 0);
conn->c_reconnect_jiffies = 0;
INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker);
INIT_WORK(&conn->c_down_w, rds_shutdown_worker);
mutex_init(&conn->c_cm_lock);
conn->c_flags = 0;
rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n",
conn, &laddr, &faddr,
trans->t_name ? trans->t_name : "[unknown]",
......@@ -222,7 +234,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
if (parent) {
/* Creating passive conn */
if (parent->c_passive) {
trans->conn_free(conn->c_transport_data);
trans->conn_free(conn->c_path[0].cp_transport_data);
kmem_cache_free(rds_conn_slab, conn);
conn = parent->c_passive;
} else {
......@@ -236,10 +248,26 @@ static struct rds_connection *__rds_conn_create(struct net *net,
found = rds_conn_lookup(net, head, laddr, faddr, trans);
if (found) {
trans->conn_free(conn->c_transport_data);
struct rds_conn_path *cp;
int i;
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
trans->conn_free(cp->cp_transport_data);
if (!trans->t_mp_capable)
break;
}
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
int i;
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
__rds_conn_path_init(conn, &conn->c_path[i],
is_outgoing);
conn->c_path[i].cp_index = i;
}
hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
......@@ -267,10 +295,12 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
void rds_conn_shutdown(struct rds_connection *conn)
void rds_conn_shutdown(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
/* shut it down unless it's down already */
if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
/*
* Quiesce the connection mgmt handlers before we start tearing
* things down. We don't hold the mutex for the entire
......@@ -278,35 +308,41 @@ void rds_conn_shutdown(struct rds_connection *conn)
* deadlocking with the CM handler. Instead, the CM event
* handler is supposed to check for state DISCONNECTING
*/
mutex_lock(&conn->c_cm_lock);
if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
&& !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
rds_conn_error(conn, "shutdown called in state %d\n",
atomic_read(&conn->c_state));
mutex_unlock(&conn->c_cm_lock);
mutex_lock(&cp->cp_cm_lock);
if (!rds_conn_path_transition(cp, RDS_CONN_UP,
RDS_CONN_DISCONNECTING) &&
!rds_conn_path_transition(cp, RDS_CONN_ERROR,
RDS_CONN_DISCONNECTING)) {
rds_conn_path_error(cp,
"shutdown called in state %d\n",
atomic_read(&cp->cp_state));
mutex_unlock(&cp->cp_cm_lock);
return;
}
mutex_unlock(&conn->c_cm_lock);
mutex_unlock(&cp->cp_cm_lock);
wait_event(conn->c_waitq,
!test_bit(RDS_IN_XMIT, &conn->c_flags));
wait_event(conn->c_waitq,
!test_bit(RDS_RECV_REFILL, &conn->c_flags));
wait_event(cp->cp_waitq,
!test_bit(RDS_IN_XMIT, &cp->cp_flags));
wait_event(cp->cp_waitq,
!test_bit(RDS_RECV_REFILL, &cp->cp_flags));
conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn);
if (!conn->c_trans->t_mp_capable)
conn->c_trans->conn_shutdown(conn);
else
conn->c_trans->conn_path_shutdown(cp);
rds_conn_path_reset(cp);
if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
RDS_CONN_DOWN)) {
/* This can happen - eg when we're in the middle of tearing
* down the connection, and someone unloads the rds module.
* Quite reproduceable with loopback connections.
* Mostly harmless.
*/
rds_conn_error(conn,
"%s: failed to transition to state DOWN, "
"current state is %d\n",
__func__,
atomic_read(&conn->c_state));
rds_conn_path_error(cp, "%s: failed to transition "
"to state DOWN, current state "
"is %d\n", __func__,
atomic_read(&cp->cp_state));
return;
}
}
......@@ -315,18 +351,46 @@ void rds_conn_shutdown(struct rds_connection *conn)
* The passive side of an IB loopback connection is never added
* to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */
cancel_delayed_work_sync(&conn->c_conn_w);
cancel_delayed_work_sync(&cp->cp_conn_w);
rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
if (conn->c_trans->t_type != RDS_TRANS_TCP ||
conn->c_outgoing == 1)
rds_queue_reconnect(conn);
cp->cp_outgoing == 1)
rds_queue_reconnect(cp);
} else {
rcu_read_unlock();
}
}
/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
* all paths using rds_conn_path_destroy()
*/
static void rds_conn_path_destroy(struct rds_conn_path *cp)
{
struct rds_message *rm, *rtmp;
rds_conn_path_drop(cp);
flush_work(&cp->cp_down_w);
/* make sure lingering queued work won't try to ref the conn */
cancel_delayed_work_sync(&cp->cp_send_w);
cancel_delayed_work_sync(&cp->cp_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
&cp->cp_send_queue,
m_conn_item) {
list_del_init(&rm->m_conn_item);
BUG_ON(!list_empty(&rm->m_sock_item));
rds_message_put(rm);
}
if (cp->cp_xmit_rm)
rds_message_put(cp->cp_xmit_rm);
cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
}
/*
* Stop and free a connection.
*
......@@ -336,7 +400,6 @@ void rds_conn_shutdown(struct rds_connection *conn)
*/
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
......@@ -350,25 +413,19 @@ void rds_conn_destroy(struct rds_connection *conn)
synchronize_rcu();
/* shut the connection down */
rds_conn_drop(conn);
flush_work(&conn->c_down_w);
/* make sure lingering queued work won't try to ref the conn */
cancel_delayed_work_sync(&conn->c_send_w);
cancel_delayed_work_sync(&conn->c_recv_w);
if (!conn->c_trans->t_mp_capable) {
rds_conn_path_destroy(&conn->c_path[0]);
BUG_ON(!list_empty(&conn->c_path[0].cp_retrans));
} else {
int i;
struct rds_conn_path *cp;
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
&conn->c_send_queue,
m_conn_item) {
list_del_init(&rm->m_conn_item);
BUG_ON(!list_empty(&rm->m_sock_item));
rds_message_put(rm);
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
cp = &conn->c_path[i];
rds_conn_path_destroy(cp);
BUG_ON(!list_empty(&cp->cp_retrans));
}
}
if (conn->c_xmit_rm)
rds_message_put(conn->c_xmit_rm);
conn->c_trans->conn_free(conn->c_transport_data);
/*
* The congestion maps aren't freed up here. They're
......@@ -377,7 +434,6 @@ void rds_conn_destroy(struct rds_connection *conn)
*/
rds_cong_remove_conn(conn);
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
spin_lock_irqsave(&rds_conn_lock, flags);
......@@ -398,6 +454,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
unsigned int total = 0;
unsigned long flags;
size_t i;
int j;
len /= sizeof(struct rds_info_message);
......@@ -406,23 +463,32 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
hlist_for_each_entry_rcu(conn, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
total++;
if (total <= len)
rds_inc_info_copy(&rm->m_inc, iter,
conn->c_laddr,
conn->c_faddr, 0);
struct rds_conn_path *cp;
for (j = 0; j < RDS_MPATH_WORKERS; j++) {
cp = &conn->c_path[j];
if (want_send)
list = &cp->cp_send_queue;
else
list = &cp->cp_retrans;
spin_lock_irqsave(&cp->cp_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
total++;
if (total <= len)
rds_inc_info_copy(&rm->m_inc,
iter,
conn->c_laddr,
conn->c_faddr,
0);
}
spin_unlock_irqrestore(&cp->cp_lock, flags);
if (!conn->c_trans->t_mp_capable)
break;
}
spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
rcu_read_unlock();
......@@ -484,27 +550,72 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
static int rds_conn_info_visitor(struct rds_connection *conn,
void *buffer)
void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
int (*visitor)(struct rds_conn_path *, void *),
size_t item_len)
{
u64 buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct rds_connection *conn;
size_t i;
int j;
rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
hlist_for_each_entry_rcu(conn, head, c_hash_node) {
struct rds_conn_path *cp;
for (j = 0; j < RDS_MPATH_WORKERS; j++) {
cp = &conn->c_path[j];
/* XXX no cp_lock usage.. */
if (!visitor(cp, buffer))
continue;
if (!conn->c_trans->t_mp_capable)
break;
}
/* We copy as much as we can fit in the buffer,
* but we count all items so that the caller
* can resize the buffer.
*/
if (len >= item_len) {
rds_info_copy(iter, buffer, item_len);
len -= item_len;
}
lens->nr++;
}
}
rcu_read_unlock();
}
static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
{
struct rds_info_connection *cinfo = buffer;
cinfo->next_tx_seq = conn->c_next_tx_seq;
cinfo->next_rx_seq = conn->c_next_rx_seq;
cinfo->laddr = conn->c_laddr;
cinfo->faddr = conn->c_faddr;
strncpy(cinfo->transport, conn->c_trans->t_name,
cinfo->next_tx_seq = cp->cp_next_tx_seq;
cinfo->next_rx_seq = cp->cp_next_rx_seq;
cinfo->laddr = cp->cp_conn->c_laddr;
cinfo->faddr = cp->cp_conn->c_faddr;
strncpy(cinfo->transport, cp->cp_conn->c_trans->t_name,
sizeof(cinfo->transport));
cinfo->flags = 0;
rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
CONNECTING);
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_UP,
atomic_read(&cp->cp_state) == RDS_CONN_UP,
CONNECTED);
return 1;
}
......@@ -513,7 +624,7 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens)
{
rds_for_each_conn_info(sock, len, iter, lens,
rds_walk_conn_path_info(sock, len, iter, lens,
rds_conn_info_visitor,
sizeof(struct rds_info_connection));
}
......@@ -553,10 +664,16 @@ void rds_conn_exit(void)
/*
* Force a disconnect
*/
void rds_conn_path_drop(struct rds_conn_path *cp)
{
atomic_set(&cp->cp_state, RDS_CONN_ERROR);
queue_work(rds_wq, &cp->cp_down_w);
}
EXPORT_SYMBOL_GPL(rds_conn_path_drop);
void rds_conn_drop(struct rds_connection *conn)
{
atomic_set(&conn->c_state, RDS_CONN_ERROR);
queue_work(rds_wq, &conn->c_down_w);
rds_conn_path_drop(&conn->c_path[0]);
}
EXPORT_SYMBOL_GPL(rds_conn_drop);
......@@ -564,11 +681,17 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
* If the connection is down, trigger a connect. We may have scheduled a
* delayed reconnect however - in this case we should not interfere.
*/
void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
{
if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
!test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
}
void rds_conn_connect_if_down(struct rds_connection *conn)
{
if (rds_conn_state(conn) == RDS_CONN_DOWN &&
!test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
WARN_ON(conn->c_trans->t_mp_capable);
rds_conn_path_connect_if_down(&conn->c_path[0]);
}
EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
......@@ -586,3 +709,15 @@ __rds_conn_error(struct rds_connection *conn, const char *fmt, ...)
rds_conn_drop(conn);
}
void
__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vprintk(fmt, ap);
va_end(ap);
rds_conn_path_drop(cp);
}
......@@ -40,6 +40,7 @@
#include <linux/slab.h>
#include <linux/module.h>
#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
#include "ib_mr.h"
......
......@@ -36,6 +36,7 @@
#include <linux/vmalloc.h>
#include <linux/ratelimit.h>
#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
......@@ -273,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued)))
rds_send_xmit(ic->conn);
rds_send_xmit(&ic->conn->c_path[0]);
}
static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
......
......@@ -35,6 +35,7 @@
#include <linux/rculist.h>
#include <linux/llist.h>
#include "rds_single_path.h"
#include "ib_mr.h"
struct workqueue_struct *rds_ib_mr_wq;
......
......@@ -36,6 +36,7 @@
#include <linux/dma-mapping.h>
#include <rdma/rdma_cm.h>
#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
......
......@@ -36,6 +36,7 @@
#include <linux/dmapool.h>
#include <linux/ratelimit.h>
#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
......
......@@ -34,6 +34,7 @@
#include <linux/slab.h>
#include <linux/in.h>
#include "rds_single_path.h"
#include "rds.h"
#include "loop.h"
......
......@@ -33,6 +33,7 @@
#include <linux/module.h>
#include <rdma/rdma_cm.h>
#include "rds_single_path.h"
#include "rdma_transport.h"
#include "ib.h"
......
......@@ -84,56 +84,69 @@ enum {
#define RDS_IN_XMIT 2
#define RDS_RECV_REFILL 3
/* Max number of multipaths per RDS connection. Must be a power of 2 */
#define RDS_MPATH_WORKERS 1
/* Per mpath connection state */
struct rds_conn_path {
struct rds_connection *cp_conn;
struct rds_message *cp_xmit_rm;
unsigned long cp_xmit_sg;
unsigned int cp_xmit_hdr_off;
unsigned int cp_xmit_data_off;
unsigned int cp_xmit_atomic_sent;
unsigned int cp_xmit_rdma_sent;
unsigned int cp_xmit_data_sent;
spinlock_t cp_lock; /* protect msg queues */
u64 cp_next_tx_seq;
struct list_head cp_send_queue;
struct list_head cp_retrans;
u64 cp_next_rx_seq;
void *cp_transport_data;
atomic_t cp_state;
unsigned long cp_send_gen;
unsigned long cp_flags;
unsigned long cp_reconnect_jiffies;
struct delayed_work cp_send_w;
struct delayed_work cp_recv_w;
struct delayed_work cp_conn_w;
struct work_struct cp_down_w;
struct mutex cp_cm_lock; /* protect cp_state & cm */
wait_queue_head_t cp_waitq;
unsigned int cp_unacked_packets;
unsigned int cp_unacked_bytes;
unsigned int cp_outgoing:1,
cp_pad_to_32:31;
unsigned int cp_index;
};
/* One rds_connection per RDS address pair */
struct rds_connection {
struct hlist_node c_hash_node;
__be32 c_laddr;
__be32 c_faddr;
unsigned int c_loopback:1,
c_outgoing:1,
c_pad_to_32:30;
c_pad_to_32:31;
int c_npaths;
struct rds_connection *c_passive;
struct rds_transport *c_trans;
struct rds_cong_map *c_lcong;
struct rds_cong_map *c_fcong;
struct rds_message *c_xmit_rm;
unsigned long c_xmit_sg;
unsigned int c_xmit_hdr_off;
unsigned int c_xmit_data_off;
unsigned int c_xmit_atomic_sent;
unsigned int c_xmit_rdma_sent;
unsigned int c_xmit_data_sent;
spinlock_t c_lock; /* protect msg queues */
u64 c_next_tx_seq;
struct list_head c_send_queue;
struct list_head c_retrans;
u64 c_next_rx_seq;
struct rds_transport *c_trans;
void *c_transport_data;
atomic_t c_state;
unsigned long c_send_gen;
unsigned long c_flags;
unsigned long c_reconnect_jiffies;
struct delayed_work c_send_w;
struct delayed_work c_recv_w;
struct delayed_work c_conn_w;
struct work_struct c_down_w;
struct mutex c_cm_lock; /* protect conn state & cm */
wait_queue_head_t c_waitq;
/* Protocol version */
unsigned int c_version;
possible_net_t c_net;
struct list_head c_map_item;
unsigned long c_map_queued;
unsigned int c_unacked_packets;
unsigned int c_unacked_bytes;
/* Protocol version */
unsigned int c_version;
possible_net_t c_net;
struct rds_conn_path c_path[RDS_MPATH_WORKERS];
};
static inline
......@@ -218,6 +231,7 @@ struct rds_incoming {
atomic_t i_refcount;
struct list_head i_item;
struct rds_connection *i_conn;
struct rds_conn_path *i_conn_path;
struct rds_header i_hdr;
unsigned long i_rx_jiffies;
__be32 i_saddr;
......@@ -433,7 +447,8 @@ struct rds_transport {
char t_name[TRANSNAMSIZ];
struct list_head t_item;
struct module *t_owner;
unsigned int t_prefer_loopback:1;
unsigned int t_prefer_loopback:1,
t_mp_capable:1;
unsigned int t_type;
int (*laddr_check)(struct net *net, __be32 addr);
......@@ -441,8 +456,11 @@ struct rds_transport {
void (*conn_free)(void *data);
int (*conn_connect)(struct rds_connection *conn);
void (*conn_shutdown)(struct rds_connection *conn);
void (*conn_path_shutdown)(struct rds_conn_path *conn);
void (*xmit_prepare)(struct rds_connection *conn);
void (*xmit_path_prepare)(struct rds_conn_path *cp);
void (*xmit_complete)(struct rds_connection *conn);
void (*xmit_path_complete)(struct rds_conn_path *cp);
int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
......@@ -636,10 +654,12 @@ struct rds_connection *rds_conn_create(struct net *net,
struct rds_connection *rds_conn_create_outgoing(struct net *net,
__be32 laddr, __be32 faddr,
struct rds_transport *trans, gfp_t gfp);
void rds_conn_shutdown(struct rds_connection *conn);
void rds_conn_shutdown(struct rds_conn_path *cpath);
void rds_conn_destroy(struct rds_connection *conn);
void rds_conn_drop(struct rds_connection *conn);
void rds_conn_path_drop(struct rds_conn_path *cpath);
void rds_conn_connect_if_down(struct rds_connection *conn);
void rds_conn_path_connect_if_down(struct rds_conn_path *cp);
void rds_for_each_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
......@@ -650,28 +670,60 @@ void __rds_conn_error(struct rds_connection *conn, const char *, ...);
#define rds_conn_error(conn, fmt...) \
__rds_conn_error(conn, KERN_WARNING "RDS: " fmt)
void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
#define rds_conn_path_error(cp, fmt...) \
__rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
static inline int
rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
{
return atomic_cmpxchg(&cp->cp_state, old, new) == old;
}
static inline int
rds_conn_transition(struct rds_connection *conn, int old, int new)
{
return atomic_cmpxchg(&conn->c_state, old, new) == old;
WARN_ON(conn->c_trans->t_mp_capable);
return rds_conn_path_transition(&conn->c_path[0], old, new);
}
static inline int
rds_conn_path_state(struct rds_conn_path *cp)
{
return atomic_read(&cp->cp_state);
}
static inline int
rds_conn_state(struct rds_connection *conn)
{
return atomic_read(&conn->c_state);
WARN_ON(conn->c_trans->t_mp_capable);
return rds_conn_path_state(&conn->c_path[0]);
}
static inline int
rds_conn_path_up(struct rds_conn_path *cp)
{
return atomic_read(&cp->cp_state) == RDS_CONN_UP;
}
static inline int
rds_conn_up(struct rds_connection *conn)
{
return atomic_read(&conn->c_state) == RDS_CONN_UP;
WARN_ON(conn->c_trans->t_mp_capable);
return rds_conn_path_up(&conn->c_path[0]);
}
static inline int
rds_conn_path_connecting(struct rds_conn_path *cp)
{
return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING;
}
static inline int
rds_conn_connecting(struct rds_connection *conn)
{
return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING;
WARN_ON(conn->c_trans->t_mp_capable);
return rds_conn_path_connecting(&conn->c_path[0]);
}
/* message.c */
......@@ -720,6 +772,8 @@ void rds_page_exit(void);
/* recv.c */
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
__be32 saddr);
void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *conn,
__be32 saddr);
void rds_inc_put(struct rds_incoming *inc);
void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_incoming *inc, gfp_t gfp);
......@@ -733,16 +787,16 @@ void rds_inc_info_copy(struct rds_incoming *inc,
/* send.c */
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
void rds_send_reset(struct rds_connection *conn);
int rds_send_xmit(struct rds_connection *conn);
void rds_send_path_reset(struct rds_conn_path *conn);
int rds_send_xmit(struct rds_conn_path *cp);
struct sockaddr_in;
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);
void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
is_acked_func is_acked);
int rds_send_pong(struct rds_connection *conn, __be16 dport);
struct rds_message *rds_send_get_message(struct rds_connection *,
struct rm_rdma_op *);
void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
is_acked_func is_acked);
int rds_send_pong(struct rds_conn_path *cp, __be16 dport);
/* rdma.c */
void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force);
......@@ -809,12 +863,12 @@ extern unsigned int rds_sysctl_trace_level;
int rds_threads_init(void);
void rds_threads_exit(void);
extern struct workqueue_struct *rds_wq;
void rds_queue_reconnect(struct rds_connection *conn);
void rds_queue_reconnect(struct rds_conn_path *cp);
void rds_connect_worker(struct work_struct *);
void rds_shutdown_worker(struct work_struct *);
void rds_send_worker(struct work_struct *);
void rds_recv_worker(struct work_struct *);
void rds_connect_path_complete(struct rds_connection *conn, int curr);
void rds_connect_path_complete(struct rds_conn_path *conn, int curr);
void rds_connect_complete(struct rds_connection *conn);
/* transport.c */
......
#ifndef _RDS_RDS_SINGLE_H
#define _RDS_RDS_SINGLE_H
#define c_xmit_rm c_path[0].cp_xmit_rm
#define c_xmit_sg c_path[0].cp_xmit_sg
#define c_xmit_hdr_off c_path[0].cp_xmit_hdr_off
#define c_xmit_data_off c_path[0].cp_xmit_data_off
#define c_xmit_atomic_sent c_path[0].cp_xmit_atomic_sent
#define c_xmit_rdma_sent c_path[0].cp_xmit_rdma_sent
#define c_xmit_data_sent c_path[0].cp_xmit_data_sent
#define c_lock c_path[0].cp_lock
#define c_next_tx_seq c_path[0].cp_next_tx_seq
#define c_send_queue c_path[0].cp_send_queue
#define c_retrans c_path[0].cp_retrans
#define c_next_rx_seq c_path[0].cp_next_rx_seq
#define c_transport_data c_path[0].cp_transport_data
#define c_state c_path[0].cp_state
#define c_send_gen c_path[0].cp_send_gen
#define c_flags c_path[0].cp_flags
#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies
#define c_send_w c_path[0].cp_send_w
#define c_recv_w c_path[0].cp_recv_w
#define c_conn_w c_path[0].cp_conn_w
#define c_down_w c_path[0].cp_down_w
#define c_cm_lock c_path[0].cp_cm_lock
#define c_waitq c_path[0].cp_waitq
#define c_unacked_packets c_path[0].cp_unacked_packets
#define c_unacked_bytes c_path[0].cp_unacked_bytes
#endif /* _RDS_RDS_SINGLE_H */
......@@ -53,6 +53,20 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
}
EXPORT_SYMBOL_GPL(rds_inc_init);
void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
__be32 saddr)
{
atomic_set(&inc->i_refcount, 1);
INIT_LIST_HEAD(&inc->i_item);
inc->i_conn = cp->cp_conn;
inc->i_conn_path = cp;
inc->i_saddr = saddr;
inc->i_rdma_cookie = 0;
inc->i_rx_tstamp.tv_sec = 0;
inc->i_rx_tstamp.tv_usec = 0;
}
EXPORT_SYMBOL_GPL(rds_inc_path_init);
static void rds_inc_addref(struct rds_incoming *inc)
{
rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
......@@ -164,13 +178,18 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_sock *rs = NULL;
struct sock *sk;
unsigned long flags;
struct rds_conn_path *cp;
inc->i_conn = conn;
inc->i_rx_jiffies = jiffies;
if (conn->c_trans->t_mp_capable)
cp = inc->i_conn_path;
else
cp = &conn->c_path[0];
rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
"flags 0x%x rx_jiffies %lu\n", conn,
(unsigned long long)conn->c_next_rx_seq,
(unsigned long long)cp->cp_next_rx_seq,
inc,
(unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
be32_to_cpu(inc->i_hdr.h_len),
......@@ -199,16 +218,16 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
* XXX we could spend more on the wire to get more robust failure
* detection, arguably worth it to avoid data corruption.
*/
if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
(inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
rds_stats_inc(s_recv_drop_old_seq);
goto out;
}
conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
rds_stats_inc(s_recv_ping);
rds_send_pong(conn, inc->i_hdr.h_sport);
rds_send_pong(cp, inc->i_hdr.h_sport);
goto out;
}
......
......@@ -62,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status);
* Reset the send state. Callers must ensure that this doesn't race with
* rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
void rds_send_path_reset(struct rds_conn_path *cp)
{
struct rds_message *rm, *tmp;
unsigned long flags;
if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
if (cp->cp_xmit_rm) {
rm = cp->cp_xmit_rm;
cp->cp_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
......@@ -78,37 +78,37 @@ void rds_send_reset(struct rds_connection *conn)
rds_message_put(rm);
}
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
conn->c_xmit_atomic_sent = 0;
conn->c_xmit_rdma_sent = 0;
conn->c_xmit_data_sent = 0;
cp->cp_xmit_sg = 0;
cp->cp_xmit_hdr_off = 0;
cp->cp_xmit_data_off = 0;
cp->cp_xmit_atomic_sent = 0;
cp->cp_xmit_rdma_sent = 0;
cp->cp_xmit_data_sent = 0;
conn->c_map_queued = 0;
cp->cp_conn->c_map_queued = 0;
conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
/* Mark messages as retransmissions, and move them to the send q */
spin_lock_irqsave(&conn->c_lock, flags);
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
spin_lock_irqsave(&cp->cp_lock, flags);
list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
}
list_splice_init(&conn->c_retrans, &conn->c_send_queue);
spin_unlock_irqrestore(&conn->c_lock, flags);
list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
spin_unlock_irqrestore(&cp->cp_lock, flags);
}
EXPORT_SYMBOL_GPL(rds_send_reset);
EXPORT_SYMBOL_GPL(rds_send_path_reset);
static int acquire_in_xmit(struct rds_connection *conn)
static int acquire_in_xmit(struct rds_conn_path *cp)
{
return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
}
static void release_in_xmit(struct rds_connection *conn)
static void release_in_xmit(struct rds_conn_path *cp)
{
clear_bit(RDS_IN_XMIT, &conn->c_flags);
clear_bit(RDS_IN_XMIT, &cp->cp_flags);
smp_mb__after_atomic();
/*
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
......@@ -116,8 +116,8 @@ static void release_in_xmit(struct rds_connection *conn)
* the system-wide hashed waitqueue buckets in the fast path only to
* almost never find waiters.
*/
if (waitqueue_active(&conn->c_waitq))
wake_up_all(&conn->c_waitq);
if (waitqueue_active(&cp->cp_waitq))
wake_up_all(&cp->cp_waitq);
}
/*
......@@ -134,8 +134,9 @@ static void release_in_xmit(struct rds_connection *conn)
* - small message latency is higher behind queued large messages
* - large message latency isn't starved by intervening small sends
*/
int rds_send_xmit(struct rds_connection *conn)
int rds_send_xmit(struct rds_conn_path *cp)
{
struct rds_connection *conn = cp->cp_conn;
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
......@@ -155,7 +156,7 @@ int rds_send_xmit(struct rds_connection *conn)
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
if (!acquire_in_xmit(conn)) {
if (!acquire_in_xmit(cp)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
......@@ -169,21 +170,25 @@ int rds_send_xmit(struct rds_connection *conn)
* The acquire_in_xmit() check above ensures that only one
* caller can increment c_send_gen at any time.
*/
conn->c_send_gen++;
send_gen = conn->c_send_gen;
cp->cp_send_gen++;
send_gen = cp->cp_send_gen;
/*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races.
*/
if (!rds_conn_up(conn)) {
release_in_xmit(conn);
if (!rds_conn_path_up(cp)) {
release_in_xmit(cp);
ret = 0;
goto out;
}
if (conn->c_trans->xmit_prepare)
if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_prepare)
conn->c_trans->xmit_path_prepare(cp);
} else if (conn->c_trans->xmit_prepare) {
conn->c_trans->xmit_prepare(conn);
}
/*
* spin trying to push headers and data down the connection until
......@@ -191,7 +196,7 @@ int rds_send_xmit(struct rds_connection *conn)
*/
while (1) {
rm = conn->c_xmit_rm;
rm = cp->cp_xmit_rm;
/*
* If between sending messages, we can send a pending congestion
......@@ -204,14 +209,16 @@ int rds_send_xmit(struct rds_connection *conn)
break;
}
rm->data.op_active = 1;
rm->m_inc.i_conn_path = cp;
rm->m_inc.i_conn = cp->cp_conn;
conn->c_xmit_rm = rm;
cp->cp_xmit_rm = rm;
}
/*
* If not already working on one, grab the next message.
*
* c_xmit_rm holds a ref while we're sending this message down
* cp_xmit_rm holds a ref while we're sending this message down
* the connction. We can use this ref while holding the
* send_sem.. rds_send_reset() is serialized with it.
*/
......@@ -228,10 +235,10 @@ int rds_send_xmit(struct rds_connection *conn)
if (batch_count >= send_batch_count)
goto over_batch;
spin_lock_irqsave(&conn->c_lock, flags);
spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
if (!list_empty(&cp->cp_send_queue)) {
rm = list_entry(cp->cp_send_queue.next,
struct rds_message,
m_conn_item);
rds_message_addref(rm);
......@@ -240,10 +247,11 @@ int rds_send_xmit(struct rds_connection *conn)
* Move the message from the send queue to the retransmit
* list right away.
*/
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
list_move_tail(&rm->m_conn_item,
&cp->cp_retrans);
}
spin_unlock_irqrestore(&conn->c_lock, flags);
spin_unlock_irqrestore(&cp->cp_lock, flags);
if (!rm)
break;
......@@ -257,32 +265,34 @@ int rds_send_xmit(struct rds_connection *conn)
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
spin_lock_irqsave(&conn->c_lock, flags);
spin_lock_irqsave(&cp->cp_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
spin_unlock_irqrestore(&conn->c_lock, flags);
spin_unlock_irqrestore(&cp->cp_lock, flags);
continue;
}
/* Require an ACK every once in a while */
len = ntohl(rm->m_inc.i_hdr.h_len);
if (conn->c_unacked_packets == 0 ||
conn->c_unacked_bytes < len) {
if (cp->cp_unacked_packets == 0 ||
cp->cp_unacked_bytes < len) {
__set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
cp->cp_unacked_packets =
rds_sysctl_max_unacked_packets;
cp->cp_unacked_bytes =
rds_sysctl_max_unacked_bytes;
rds_stats_inc(s_send_ack_required);
} else {
conn->c_unacked_bytes -= len;
conn->c_unacked_packets--;
cp->cp_unacked_bytes -= len;
cp->cp_unacked_packets--;
}
conn->c_xmit_rm = rm;
cp->cp_xmit_rm = rm;
}
/* The transport either sends the whole rdma or none of it */
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
rm->m_final_op = &rm->rdma;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
......@@ -294,11 +304,11 @@ int rds_send_xmit(struct rds_connection *conn)
wake_up_interruptible(&rm->m_flush_wait);
break;
}
conn->c_xmit_rdma_sent = 1;
cp->cp_xmit_rdma_sent = 1;
}
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
rm->m_final_op = &rm->atomic;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
......@@ -310,7 +320,7 @@ int rds_send_xmit(struct rds_connection *conn)
wake_up_interruptible(&rm->m_flush_wait);
break;
}
conn->c_xmit_atomic_sent = 1;
cp->cp_xmit_atomic_sent = 1;
}
......@@ -336,41 +346,42 @@ int rds_send_xmit(struct rds_connection *conn)
rm->data.op_active = 0;
}
if (rm->data.op_active && !conn->c_xmit_data_sent) {
if (rm->data.op_active && !cp->cp_xmit_data_sent) {
rm->m_final_op = &rm->data;
ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off,
conn->c_xmit_sg,
conn->c_xmit_data_off);
cp->cp_xmit_hdr_off,
cp->cp_xmit_sg,
cp->cp_xmit_data_off);
if (ret <= 0)
break;
if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
tmp = min_t(int, ret,
sizeof(struct rds_header) -
conn->c_xmit_hdr_off);
conn->c_xmit_hdr_off += tmp;
cp->cp_xmit_hdr_off);
cp->cp_xmit_hdr_off += tmp;
ret -= tmp;
}
sg = &rm->data.op_sg[conn->c_xmit_sg];
sg = &rm->data.op_sg[cp->cp_xmit_sg];
while (ret) {
tmp = min_t(int, ret, sg->length -
conn->c_xmit_data_off);
conn->c_xmit_data_off += tmp;
cp->cp_xmit_data_off);
cp->cp_xmit_data_off += tmp;
ret -= tmp;
if (conn->c_xmit_data_off == sg->length) {
conn->c_xmit_data_off = 0;
if (cp->cp_xmit_data_off == sg->length) {
cp->cp_xmit_data_off = 0;
sg++;
conn->c_xmit_sg++;
BUG_ON(ret != 0 &&
conn->c_xmit_sg == rm->data.op_nents);
cp->cp_xmit_sg++;
BUG_ON(ret != 0 && cp->cp_xmit_sg ==
rm->data.op_nents);
}
}
if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
(conn->c_xmit_sg == rm->data.op_nents))
conn->c_xmit_data_sent = 1;
if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
(cp->cp_xmit_sg == rm->data.op_nents))
cp->cp_xmit_data_sent = 1;
}
/*
......@@ -378,23 +389,27 @@ int rds_send_xmit(struct rds_connection *conn)
* if there is a data op. Thus, if the data is sent (or there was
* none), then we're done with the rm.
*/
if (!rm->data.op_active || conn->c_xmit_data_sent) {
conn->c_xmit_rm = NULL;
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
conn->c_xmit_rdma_sent = 0;
conn->c_xmit_atomic_sent = 0;
conn->c_xmit_data_sent = 0;
if (!rm->data.op_active || cp->cp_xmit_data_sent) {
cp->cp_xmit_rm = NULL;
cp->cp_xmit_sg = 0;
cp->cp_xmit_hdr_off = 0;
cp->cp_xmit_data_off = 0;
cp->cp_xmit_rdma_sent = 0;
cp->cp_xmit_atomic_sent = 0;
cp->cp_xmit_data_sent = 0;
rds_message_put(rm);
}
}
over_batch:
if (conn->c_trans->xmit_complete)
if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_complete)
conn->c_trans->xmit_path_complete(cp);
} else if (conn->c_trans->xmit_complete) {
conn->c_trans->xmit_complete(conn);
release_in_xmit(conn);
}
release_in_xmit(cp);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
......@@ -422,12 +437,12 @@ int rds_send_xmit(struct rds_connection *conn)
if (ret == 0) {
smp_mb();
if ((test_bit(0, &conn->c_map_queued) ||
!list_empty(&conn->c_send_queue)) &&
send_gen == conn->c_send_gen) {
!list_empty(&cp->cp_send_queue)) &&
send_gen == cp->cp_send_gen) {
rds_stats_inc(s_send_lock_queue_raced);
if (batch_count < send_batch_count)
goto restart;
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
}
}
out:
......@@ -559,42 +574,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
/* No need to wake the app - caller does this */
}
/*
* This is called from the IB send completion when we detect
* a RDMA operation that failed with remote access error.
* So speed is not an issue here.
*/
struct rds_message *rds_send_get_message(struct rds_connection *conn,
struct rm_rdma_op *op)
{
struct rds_message *rm, *tmp, *found = NULL;
unsigned long flags;
spin_lock_irqsave(&conn->c_lock, flags);
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
if (&rm->rdma == op) {
atomic_inc(&rm->m_refcount);
found = rm;
goto out;
}
}
list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
if (&rm->rdma == op) {
atomic_inc(&rm->m_refcount);
found = rm;
break;
}
}
out:
spin_unlock_irqrestore(&conn->c_lock, flags);
return found;
}
EXPORT_SYMBOL_GPL(rds_send_get_message);
/*
* This removes messages from the socket's list if they're on it. The list
* argument must be private to the caller, we must be able to modify it
......@@ -685,16 +664,16 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status)
* assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
* checks the RDS_MSG_HAS_ACK_SEQ bit.
*/
void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
is_acked_func is_acked)
void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
is_acked_func is_acked)
{
struct rds_message *rm, *tmp;
unsigned long flags;
LIST_HEAD(list);
spin_lock_irqsave(&conn->c_lock, flags);
spin_lock_irqsave(&cp->cp_lock, flags);
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
if (!rds_send_is_acked(rm, ack, is_acked))
break;
......@@ -706,17 +685,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
if (!list_empty(&list))
smp_mb__after_atomic();
spin_unlock_irqrestore(&conn->c_lock, flags);
spin_unlock_irqrestore(&cp->cp_lock, flags);
/* now remove the messages from the sock list as needed */
rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
}
EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
is_acked_func is_acked)
{
WARN_ON(conn->c_trans->t_mp_capable);
rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
}
EXPORT_SYMBOL_GPL(rds_send_drop_acked);
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
{
struct rds_message *rm, *tmp;
struct rds_connection *conn;
struct rds_conn_path *cp;
unsigned long flags;
LIST_HEAD(list);
......@@ -745,22 +733,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
list_for_each_entry(rm, &list, m_sock_item) {
conn = rm->m_inc.i_conn;
if (conn->c_trans->t_mp_capable)
cp = rm->m_inc.i_conn_path;
else
cp = &conn->c_path[0];
spin_lock_irqsave(&conn->c_lock, flags);
spin_lock_irqsave(&cp->cp_lock, flags);
/*
* Maybe someone else beat us to removing rm from the conn.
* If we race with their flag update we'll get the lock and
* then really see that the flag has been cleared.
*/
if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
spin_unlock_irqrestore(&conn->c_lock, flags);
spin_unlock_irqrestore(&cp->cp_lock, flags);
spin_lock_irqsave(&rm->m_rs_lock, flags);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
continue;
}
list_del_init(&rm->m_conn_item);
spin_unlock_irqrestore(&conn->c_lock, flags);
spin_unlock_irqrestore(&cp->cp_lock, flags);
/*
* Couldn't grab m_rs_lock in top loop (lock ordering),
......@@ -809,6 +801,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
* message from the flow with RDS_CANCEL_SENT_TO.
*/
static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
struct rds_conn_path *cp,
struct rds_message *rm, __be16 sport,
__be16 dport, int *queued)
{
......@@ -852,13 +845,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
trying to minimize the time we hold c_lock */
rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
rm->m_inc.i_conn = conn;
rm->m_inc.i_conn_path = cp;
rds_message_addref(rm);
spin_lock(&conn->c_lock);
rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++);
list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
spin_lock(&cp->cp_lock);
rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
spin_unlock(&conn->c_lock);
spin_unlock(&cp->cp_lock);
rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
rm, len, rs, rs->rs_snd_bytes,
......@@ -990,6 +984,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
int queued = 0, allocated_mr = 0;
int nonblock = msg->msg_flags & MSG_DONTWAIT;
long timeo = sock_sndtimeo(sk, nonblock);
struct rds_conn_path *cpath;
/* Mirror Linux UDP mirror of BSD error message compatibility */
/* XXX: Perhaps MSG_MORE someday */
......@@ -1088,15 +1083,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out;
}
rds_conn_connect_if_down(conn);
cpath = &conn->c_path[0];
rds_conn_path_connect_if_down(cpath);
ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
if (ret) {
rs->rs_seen_congestion = 1;
goto out;
}
while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
dport, &queued)) {
rds_stats_inc(s_send_queue_full);
......@@ -1106,7 +1102,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}
timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
rds_send_queue_rm(rs, conn, rm,
rds_send_queue_rm(rs, conn, cpath, rm,
rs->rs_bound_port,
dport,
&queued),
......@@ -1127,9 +1123,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
*/
rds_stats_inc(s_send_queued);
ret = rds_send_xmit(conn);
ret = rds_send_xmit(cpath);
if (ret == -ENOMEM || ret == -EAGAIN)
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
rds_message_put(rm);
return payload_len;
......@@ -1150,7 +1146,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
* Reply to a ping packet.
*/
int
rds_send_pong(struct rds_connection *conn, __be16 dport)
rds_send_pong(struct rds_conn_path *cp, __be16 dport)
{
struct rds_message *rm;
unsigned long flags;
......@@ -1162,31 +1158,32 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
goto out;
}
rm->m_daddr = conn->c_faddr;
rm->m_daddr = cp->cp_conn->c_faddr;
rm->data.op_active = 1;
rds_conn_connect_if_down(conn);
rds_conn_path_connect_if_down(cp);
ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
if (ret)
goto out;
spin_lock_irqsave(&conn->c_lock, flags);
list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
spin_lock_irqsave(&cp->cp_lock, flags);
list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
rds_message_addref(rm);
rm->m_inc.i_conn = conn;
rm->m_inc.i_conn = cp->cp_conn;
rm->m_inc.i_conn_path = cp;
rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
conn->c_next_tx_seq);
conn->c_next_tx_seq++;
spin_unlock_irqrestore(&conn->c_lock, flags);
cp->cp_next_tx_seq);
cp->cp_next_tx_seq++;
spin_unlock_irqrestore(&cp->cp_lock, flags);
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
/* schedule the send work on rds_wq */
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
rds_message_put(rm);
return 0;
......
......@@ -38,6 +38,7 @@
#include <net/net_namespace.h>
#include <net/netns/generic.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......@@ -185,7 +186,7 @@ void rds_tcp_reset_callbacks(struct socket *sock,
release_sock(osock->sk);
sock_release(osock);
newsock:
rds_send_reset(conn);
rds_send_path_reset(&conn->c_path[0]);
lock_sock(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
tc->t_sock = sock;
......
......@@ -34,6 +34,7 @@
#include <linux/in.h>
#include <net/tcp.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......@@ -60,7 +61,8 @@ void rds_tcp_state_change(struct sock *sk)
case TCP_SYN_RECV:
break;
case TCP_ESTABLISHED:
rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_CONNECTING);
break;
case TCP_CLOSE_WAIT:
case TCP_CLOSE:
......
......@@ -35,6 +35,7 @@
#include <linux/in.h>
#include <net/tcp.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......@@ -132,17 +133,19 @@ int rds_tcp_accept_one(struct socket *sock)
* c_transport_data.
*/
if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) ||
!conn->c_outgoing) {
!conn->c_path[0].cp_outgoing) {
goto rst_nsk;
} else {
rds_tcp_reset_callbacks(new_sock, conn);
conn->c_outgoing = 0;
conn->c_path[0].cp_outgoing = 0;
/* rds_connect_path_complete() marks RDS_CONN_UP */
rds_connect_path_complete(conn, RDS_CONN_DISCONNECTING);
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_DISCONNECTING);
}
} else {
rds_tcp_set_callbacks(new_sock, conn);
rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
rds_connect_path_complete(&conn->c_path[0],
RDS_CONN_CONNECTING);
}
new_sock = NULL;
ret = 0;
......
......@@ -34,6 +34,7 @@
#include <linux/slab.h>
#include <net/tcp.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......
......@@ -34,6 +34,7 @@
#include <linux/in.h>
#include <net/tcp.h>
#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
......
......@@ -71,30 +71,30 @@
struct workqueue_struct *rds_wq;
EXPORT_SYMBOL_GPL(rds_wq);
void rds_connect_path_complete(struct rds_connection *conn, int curr)
void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
{
if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) {
if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
printk(KERN_WARNING "%s: Cannot transition to state UP, "
"current state is %d\n",
__func__,
atomic_read(&conn->c_state));
rds_conn_drop(conn);
atomic_read(&cp->cp_state));
rds_conn_path_drop(cp);
return;
}
rdsdebug("conn %p for %pI4 to %pI4 complete\n",
conn, &conn->c_laddr, &conn->c_faddr);
cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
conn->c_reconnect_jiffies = 0;
set_bit(0, &conn->c_map_queued);
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
cp->cp_reconnect_jiffies = 0;
set_bit(0, &cp->cp_conn->c_map_queued);
queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
}
EXPORT_SYMBOL_GPL(rds_connect_path_complete);
void rds_connect_complete(struct rds_connection *conn)
{
rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
}
EXPORT_SYMBOL_GPL(rds_connect_complete);
......@@ -116,70 +116,79 @@ EXPORT_SYMBOL_GPL(rds_connect_complete);
* We should *always* start with a random backoff; otherwise a broken connection
* will always take several iterations to be re-established.
*/
void rds_queue_reconnect(struct rds_connection *conn)
void rds_queue_reconnect(struct rds_conn_path *cp)
{
unsigned long rand;
struct rds_connection *conn = cp->cp_conn;
rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
conn, &conn->c_laddr, &conn->c_faddr,
conn->c_reconnect_jiffies);
cp->cp_reconnect_jiffies);
set_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
if (conn->c_reconnect_jiffies == 0) {
conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
if (cp->cp_reconnect_jiffies == 0) {
cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
return;
}
get_random_bytes(&rand, sizeof(rand));
rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies,
rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
conn, &conn->c_laddr, &conn->c_faddr);
queue_delayed_work(rds_wq, &conn->c_conn_w,
rand % conn->c_reconnect_jiffies);
queue_delayed_work(rds_wq, &cp->cp_conn_w,
rand % cp->cp_reconnect_jiffies);
conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2,
cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
rds_sysctl_reconnect_max_jiffies);
}
void rds_connect_worker(struct work_struct *work)
{
struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work);
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_conn_w.work);
struct rds_connection *conn = cp->cp_conn;
int ret;
clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
if (rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
ret = conn->c_trans->conn_connect(conn);
rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
conn, &conn->c_laddr, &conn->c_faddr, ret);
if (ret) {
if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN))
rds_queue_reconnect(conn);
if (rds_conn_path_transition(cp,
RDS_CONN_CONNECTING,
RDS_CONN_DOWN))
rds_queue_reconnect(cp);
else
rds_conn_error(conn, "RDS: connect failed\n");
rds_conn_path_error(cp,
"RDS: connect failed\n");
}
}
}
void rds_send_worker(struct work_struct *work)
{
struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work);
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_send_w.work);
int ret;
if (rds_conn_state(conn) == RDS_CONN_UP) {
clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
ret = rds_send_xmit(conn);
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
ret = rds_send_xmit(cp);
cond_resched();
rdsdebug("conn %p ret %d\n", conn, ret);
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_send_immediate_retry);
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
break;
case -ENOMEM:
rds_stats_inc(s_send_delayed_retry);
queue_delayed_work(rds_wq, &conn->c_send_w, 2);
queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
default:
break;
}
......@@ -188,20 +197,22 @@ void rds_send_worker(struct work_struct *work)
void rds_recv_worker(struct work_struct *work)
{
struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work);
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_recv_w.work);
int ret;
if (rds_conn_state(conn) == RDS_CONN_UP) {
ret = conn->c_trans->recv(conn);
rdsdebug("conn %p ret %d\n", conn, ret);
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
ret = cp->cp_conn->c_trans->recv(cp->cp_conn);
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_recv_immediate_retry);
queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
break;
case -ENOMEM:
rds_stats_inc(s_recv_delayed_retry);
queue_delayed_work(rds_wq, &conn->c_recv_w, 2);
queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
default:
break;
}
......@@ -210,9 +221,11 @@ void rds_recv_worker(struct work_struct *work)
void rds_shutdown_worker(struct work_struct *work)
{
struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w);
struct rds_conn_path *cp = container_of(work,
struct rds_conn_path,
cp_down_w);
rds_conn_shutdown(conn);
rds_conn_shutdown(cp);
}
void rds_threads_exit(void)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment