Commit fcc5450c authored by Andy Grover's avatar Andy Grover

RDS: Remove send_quota from send_xmit()

The purpose of the send quota was really to give fairness
when different connections were all using the same
workq thread to send backlogged msgs -- they could only send
so many before another connection could make progress.

Now that each connection is pushing the backlog from its
completion handler, they are all guaranteed to make progress
and the quota isn't needed any longer.

A thread *will* have to send all previously queued data, as well
as any further msgs placed on the queue while while c_send_lock
was held. In a pathological case a single process can get
roped into doing this for long periods while other threads
get off free. But, since it can only do this until the transport
reports full, this is a bounded scenario.
Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent 51e2cba8
...@@ -110,12 +110,11 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -110,12 +110,11 @@ int rds_send_xmit(struct rds_connection *conn)
struct rds_message *rm; struct rds_message *rm;
unsigned long flags; unsigned long flags;
unsigned int tmp; unsigned int tmp;
unsigned int send_quota = send_batch_count;
struct scatterlist *sg; struct scatterlist *sg;
int ret = 0; int ret = 0;
int was_empty = 0;
LIST_HEAD(to_be_dropped); LIST_HEAD(to_be_dropped);
restart:
if (!rds_conn_up(conn)) if (!rds_conn_up(conn))
goto out; goto out;
...@@ -139,7 +138,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -139,7 +138,7 @@ int rds_send_xmit(struct rds_connection *conn)
* spin trying to push headers and data down the connection until * spin trying to push headers and data down the connection until
* the connection doesn't make forward progress. * the connection doesn't make forward progress.
*/ */
while (--send_quota) { while (1) {
rm = conn->c_xmit_rm; rm = conn->c_xmit_rm;
...@@ -185,10 +184,8 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -185,10 +184,8 @@ int rds_send_xmit(struct rds_connection *conn)
spin_unlock(&conn->c_lock); spin_unlock(&conn->c_lock);
if (!rm) { if (!rm)
was_empty = 1;
break; break;
}
/* Unfortunately, the way Infiniband deals with /* Unfortunately, the way Infiniband deals with
* RDMA to a bad MR key is by moving the entire * RDMA to a bad MR key is by moving the entire
...@@ -350,20 +347,23 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -350,20 +347,23 @@ int rds_send_xmit(struct rds_connection *conn)
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
} }
if (send_quota == 0 && !was_empty) { /*
/* We exhausted the send quota, but there's work left to * Other senders will see we have c_send_lock and exit. We
* do. Return and (re-)schedule the send worker. * need to recheck the send queue and race again for c_send_lock
* to make sure messages don't just sit on the send queue.
*
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
* completion handler.
*/ */
ret = -EAGAIN; if (ret == 0) {
}
if (ret == 0 && was_empty) {
/* A simple bit test would be way faster than taking the /* A simple bit test would be way faster than taking the
* spin lock */ * spin lock */
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) { if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
ret = -EAGAIN; spin_unlock_irqrestore(&conn->c_lock, flags);
goto restart;
} }
spin_unlock_irqrestore(&conn->c_lock, flags); spin_unlock_irqrestore(&conn->c_lock, flags);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment