Commit 049ee3f5 authored by Andy Grover's avatar Andy Grover

RDS: Change send lock from a mutex to a spinlock

This change allows us to call rds_send_xmit() from a tasklet,
which is crucial to our new operating model.

* Change c_send_lock to a spinlock
* Update stats fields "sem_" to "_lock"
* Remove unneeded rds_conn_is_sending()

About locking between shutdown and send -- send checks if the
connection is up. Shutdown puts the connection into
DISCONNECTING. After this, all threads entering send will exit
immediately. However, a thread could be *in* send_xmit(), so
shutdown acquires the c_send_lock to ensure everyone is out
before proceeding with connection shutdown.
Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent f17a1a55
...@@ -62,18 +62,6 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) ...@@ -62,18 +62,6 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0) } while (0)
static inline int rds_conn_is_sending(struct rds_connection *conn)
{
int ret = 0;
if (!mutex_trylock(&conn->c_send_lock))
ret = 1;
else
mutex_unlock(&conn->c_send_lock);
return ret;
}
static struct rds_connection *rds_conn_lookup(struct hlist_head *head, static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr, __be32 laddr, __be32 faddr,
struct rds_transport *trans) struct rds_transport *trans)
...@@ -158,7 +146,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -158,7 +146,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock); spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1; conn->c_next_tx_seq = 1;
mutex_init(&conn->c_send_lock); spin_lock_init(&conn->c_send_lock);
INIT_LIST_HEAD(&conn->c_send_queue); INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans); INIT_LIST_HEAD(&conn->c_retrans);
...@@ -283,10 +271,12 @@ void rds_conn_shutdown(struct rds_connection *conn) ...@@ -283,10 +271,12 @@ void rds_conn_shutdown(struct rds_connection *conn)
} }
mutex_unlock(&conn->c_cm_lock); mutex_unlock(&conn->c_cm_lock);
mutex_lock(&conn->c_send_lock); /* verify everybody's out of rds_send_xmit() */
spin_lock_irq(&conn->c_send_lock);
spin_unlock_irq(&conn->c_send_lock);
conn->c_trans->conn_shutdown(conn); conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn); rds_conn_reset(conn);
mutex_unlock(&conn->c_send_lock);
if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
/* This can happen - eg when we're in the middle of tearing /* This can happen - eg when we're in the middle of tearing
...@@ -476,7 +466,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn, ...@@ -476,7 +466,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
cinfo->flags = 0; cinfo->flags = 0;
rds_conn_info_set(cinfo->flags, rds_conn_info_set(cinfo->flags,
rds_conn_is_sending(conn), SENDING); spin_is_locked(&conn->c_send_lock), SENDING);
/* XXX Future: return the state rather than these funky bits */ /* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags, rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
......
...@@ -91,7 +91,7 @@ struct rds_connection { ...@@ -91,7 +91,7 @@ struct rds_connection {
struct rds_cong_map *c_lcong; struct rds_cong_map *c_lcong;
struct rds_cong_map *c_fcong; struct rds_cong_map *c_fcong;
struct mutex c_send_lock; /* protect send ring */ spinlock_t c_send_lock; /* protect send ring */
struct rds_message *c_xmit_rm; struct rds_message *c_xmit_rm;
unsigned long c_xmit_sg; unsigned long c_xmit_sg;
unsigned int c_xmit_hdr_off; unsigned int c_xmit_hdr_off;
...@@ -548,8 +548,8 @@ struct rds_statistics { ...@@ -548,8 +548,8 @@ struct rds_statistics {
uint64_t s_recv_ping; uint64_t s_recv_ping;
uint64_t s_send_queue_empty; uint64_t s_send_queue_empty;
uint64_t s_send_queue_full; uint64_t s_send_queue_full;
uint64_t s_send_sem_contention; uint64_t s_send_lock_contention;
uint64_t s_send_sem_queue_raced; uint64_t s_send_lock_queue_raced;
uint64_t s_send_immediate_retry; uint64_t s_send_immediate_retry;
uint64_t s_send_delayed_retry; uint64_t s_send_delayed_retry;
uint64_t s_send_drop_acked; uint64_t s_send_drop_acked;
......
...@@ -116,19 +116,18 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -116,19 +116,18 @@ int rds_send_xmit(struct rds_connection *conn)
int was_empty = 0; int was_empty = 0;
LIST_HEAD(to_be_dropped); LIST_HEAD(to_be_dropped);
if (!rds_conn_up(conn))
goto out;
/* /*
* sendmsg calls here after having queued its message on the send * sendmsg calls here after having queued its message on the send
* queue. We only have one task feeding the connection at a time. If * queue. We only have one task feeding the connection at a time. If
* another thread is already feeding the queue then we back off. This * another thread is already feeding the queue then we back off. This
* avoids blocking the caller and trading per-connection data between * avoids blocking the caller and trading per-connection data between
* caches per message. * caches per message.
*
* The sem holder will issue a retry if they notice that someone queued
* a message after they stopped walking the send queue but before they
* dropped the sem.
*/ */
if (!mutex_trylock(&conn->c_send_lock)) { if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
rds_stats_inc(s_send_sem_contention); rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM; ret = -ENOMEM;
goto out; goto out;
} }
...@@ -346,7 +345,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -346,7 +345,7 @@ int rds_send_xmit(struct rds_connection *conn)
* stop processing the loop when the transport hasn't taken * stop processing the loop when the transport hasn't taken
* responsibility for forward progress. * responsibility for forward progress.
*/ */
mutex_unlock(&conn->c_send_lock); spin_unlock_irqrestore(&conn->c_send_lock, flags);
if (send_quota == 0 && !was_empty) { if (send_quota == 0 && !was_empty) {
/* We exhausted the send quota, but there's work left to /* We exhausted the send quota, but there's work left to
...@@ -360,7 +359,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -360,7 +359,7 @@ int rds_send_xmit(struct rds_connection *conn)
* spin lock */ * spin lock */
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) { if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_sem_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
ret = -EAGAIN; ret = -EAGAIN;
} }
spin_unlock_irqrestore(&conn->c_lock, flags); spin_unlock_irqrestore(&conn->c_lock, flags);
......
...@@ -57,8 +57,8 @@ static const char *const rds_stat_names[] = { ...@@ -57,8 +57,8 @@ static const char *const rds_stat_names[] = {
"recv_ping", "recv_ping",
"send_queue_empty", "send_queue_empty",
"send_queue_full", "send_queue_full",
"send_sem_contention", "send_lock_contention",
"send_sem_queue_raced", "send_lock_queue_raced",
"send_immediate_retry", "send_immediate_retry",
"send_delayed_retry", "send_delayed_retry",
"send_drop_acked", "send_drop_acked",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment