Commit 0c28c045 authored by Santosh Shilimkar's avatar Santosh Shilimkar

RDS: IB: split send completion handling and do batch ack

Similar to what we did with receive CQ completion handling, we split
the transmit completion handler so that it lets us implement batched
work completion handling.

We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to
identify the send vs receive completion event handler invocation.
Signed-off-by: default avatarSantosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: default avatarSantosh Shilimkar <santosh.shilimkar@oracle.com>
parent f4f943c9
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#define RDS_IB_RECYCLE_BATCH_COUNT 32 #define RDS_IB_RECYCLE_BATCH_COUNT 32
#define RDS_IB_WC_MAX 32 #define RDS_IB_WC_MAX 32
#define RDS_IB_SEND_OP BIT_ULL(63)
extern struct rw_semaphore rds_ib_devices_lock; extern struct rw_semaphore rds_ib_devices_lock;
extern struct list_head rds_ib_devices; extern struct list_head rds_ib_devices;
...@@ -118,9 +119,11 @@ struct rds_ib_connection { ...@@ -118,9 +119,11 @@ struct rds_ib_connection {
struct ib_pd *i_pd; struct ib_pd *i_pd;
struct ib_cq *i_send_cq; struct ib_cq *i_send_cq;
struct ib_cq *i_recv_cq; struct ib_cq *i_recv_cq;
struct ib_wc i_send_wc[RDS_IB_WC_MAX];
struct ib_wc i_recv_wc[RDS_IB_WC_MAX]; struct ib_wc i_recv_wc[RDS_IB_WC_MAX];
/* interrupt handling */ /* interrupt handling */
struct tasklet_struct i_send_tasklet;
struct tasklet_struct i_recv_tasklet; struct tasklet_struct i_recv_tasklet;
/* tx */ /* tx */
...@@ -217,7 +220,6 @@ struct rds_ib_device { ...@@ -217,7 +220,6 @@ struct rds_ib_device {
struct rds_ib_statistics { struct rds_ib_statistics {
uint64_t s_ib_connect_raced; uint64_t s_ib_connect_raced;
uint64_t s_ib_listen_closed_stale; uint64_t s_ib_listen_closed_stale;
uint64_t s_ib_tx_cq_call;
uint64_t s_ib_evt_handler_call; uint64_t s_ib_evt_handler_call;
uint64_t s_ib_tasklet_call; uint64_t s_ib_tasklet_call;
uint64_t s_ib_tx_cq_event; uint64_t s_ib_tx_cq_event;
...@@ -371,7 +373,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait; ...@@ -371,7 +373,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait;
void rds_ib_xmit_complete(struct rds_connection *conn); void rds_ib_xmit_complete(struct rds_connection *conn);
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off); unsigned int hdr_off, unsigned int sg, unsigned int off);
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context); void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
void rds_ib_send_init_ring(struct rds_ib_connection *ic); void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic); void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op); int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
......
...@@ -250,11 +250,34 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, ...@@ -250,11 +250,34 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status, (unsigned long long)wc->wr_id, wc->status,
wc->byte_len, be32_to_cpu(wc->ex.imm_data)); wc->byte_len, be32_to_cpu(wc->ex.imm_data));
if (wc->wr_id & RDS_IB_SEND_OP)
rds_ib_send_cqe_handler(ic, wc);
else
rds_ib_recv_cqe_handler(ic, wc, ack_state); rds_ib_recv_cqe_handler(ic, wc, ack_state);
} }
} }
} }
static void rds_ib_tasklet_fn_send(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state state;
rds_ib_stats_inc(s_ib_tasklet_call);
memset(&state, 0, sizeof(state));
poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued)))
rds_send_xmit(ic->conn);
}
static void rds_ib_tasklet_fn_recv(unsigned long data) static void rds_ib_tasklet_fn_recv(unsigned long data)
{ {
struct rds_ib_connection *ic = (struct rds_ib_connection *)data; struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
...@@ -304,6 +327,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) ...@@ -304,6 +327,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
} }
} }
static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
{
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
rdsdebug("conn %p cq %p\n", conn, cq);
rds_ib_stats_inc(s_ib_evt_handler_call);
tasklet_schedule(&ic->i_send_tasklet);
}
/* /*
* This needs to be very careful to not leave IS_ERR pointers around for * This needs to be very careful to not leave IS_ERR pointers around for
* cleanup to trip over. * cleanup to trip over.
...@@ -337,7 +372,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ...@@ -337,7 +372,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_pd = rds_ibdev->pd; ic->i_pd = rds_ibdev->pd;
cq_attr.cqe = ic->i_send_ring.w_nr + 1; cq_attr.cqe = ic->i_send_ring.w_nr + 1;
ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
rds_ib_cq_event_handler, conn, rds_ib_cq_event_handler, conn,
&cq_attr); &cq_attr);
if (IS_ERR(ic->i_send_cq)) { if (IS_ERR(ic->i_send_cq)) {
...@@ -703,6 +739,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ...@@ -703,6 +739,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
wait_event(rds_ib_ring_empty_wait, wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_recv_ring) && rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0)); (atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_send_tasklet);
tasklet_kill(&ic->i_recv_tasklet); tasklet_kill(&ic->i_recv_tasklet);
/* first destroy the ib state that generates callbacks */ /* first destroy the ib state that generates callbacks */
...@@ -809,8 +846,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) ...@@ -809,8 +846,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
} }
INIT_LIST_HEAD(&ic->ib_node); INIT_LIST_HEAD(&ic->ib_node);
tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
(unsigned long)ic);
tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv, tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
(unsigned long) ic); (unsigned long)ic);
mutex_init(&ic->i_recv_mutex); mutex_init(&ic->i_recv_mutex);
#ifndef KERNEL_HAS_ATOMIC64 #ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock); spin_lock_init(&ic->i_ack_lock);
......
...@@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) ...@@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_op = NULL; send->s_op = NULL;
send->s_wr.wr_id = i; send->s_wr.wr_id = i | RDS_IB_SEND_OP;
send->s_wr.sg_list = send->s_sge; send->s_wr.sg_list = send->s_sge;
send->s_wr.ex.imm_data = 0; send->s_wr.ex.imm_data = 0;
...@@ -237,57 +237,51 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) ...@@ -237,57 +237,51 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
* unallocs the next free entry in the ring it doesn't alter which is * unallocs the next free entry in the ring it doesn't alter which is
* the next to be freed, which is what this is concerned with. * the next to be freed, which is what this is concerned with.
*/ */
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
{ {
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_message *rm = NULL; struct rds_message *rm = NULL;
struct ib_wc wc; struct rds_connection *conn = ic->conn;
struct rds_ib_send_work *send; struct rds_ib_send_work *send;
u32 completed; u32 completed;
u32 oldest; u32 oldest;
u32 i = 0; u32 i = 0;
int ret;
int nr_sig = 0; int nr_sig = 0;
rdsdebug("cq %p conn %p\n", cq, conn);
rds_ib_stats_inc(s_ib_tx_cq_call);
ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
if (ret)
rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
while (ib_poll_cq(cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status, (unsigned long long)wc->wr_id, wc->status,
ib_wc_status_msg(wc.status), wc.byte_len, ib_wc_status_msg(wc->status), wc->byte_len,
be32_to_cpu(wc.ex.imm_data)); be32_to_cpu(wc->ex.imm_data));
rds_ib_stats_inc(s_ib_tx_cq_event); rds_ib_stats_inc(s_ib_tx_cq_event);
if (wc.wr_id == RDS_IB_ACK_WR_ID) { if (wc->wr_id == RDS_IB_ACK_WR_ID) {
if (time_after(jiffies, ic->i_ack_queued + HZ/2)) if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
rds_ib_stats_inc(s_ib_tx_stalled); rds_ib_stats_inc(s_ib_tx_stalled);
rds_ib_ack_send_complete(ic); rds_ib_ack_send_complete(ic);
continue; return;
} }
oldest = rds_ib_ring_oldest(&ic->i_send_ring); oldest = rds_ib_ring_oldest(&ic->i_send_ring);
completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest); completed = rds_ib_ring_completed(&ic->i_send_ring,
(wc->wr_id & ~RDS_IB_SEND_OP),
oldest);
for (i = 0; i < completed; i++) { for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest]; send = &ic->i_sends[oldest];
if (send->s_wr.send_flags & IB_SEND_SIGNALED) if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++; nr_sig++;
rm = rds_ib_send_unmap_op(ic, send, wc.status); rm = rds_ib_send_unmap_op(ic, send, wc->status);
if (time_after(jiffies, send->s_queued + HZ/2)) if (time_after(jiffies, send->s_queued + HZ / 2))
rds_ib_stats_inc(s_ib_tx_stalled); rds_ib_stats_inc(s_ib_tx_stalled);
if (send->s_op) { if (send->s_op) {
if (send->s_op == rm->m_final_op) { if (send->s_op == rm->m_final_op) {
/* If anyone waited for this message to get flushed out, wake /* If anyone waited for this message to get
* them up now */ * flushed out, wake them up now
*/
rds_message_unmapped(rm); rds_message_unmapped(rm);
} }
rds_message_put(rm); rds_message_put(rm);
...@@ -306,12 +300,10 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) ...@@ -306,12 +300,10 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
queue_delayed_work(rds_wq, &conn->c_send_w, 0); queue_delayed_work(rds_wq, &conn->c_send_w, 0);
/* We expect errors as the qp is drained during shutdown */ /* We expect errors as the qp is drained during shutdown */
if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
rds_ib_conn_error(conn, "send completion on %pI4 had status " rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
"%u (%s), disconnecting and reconnecting\n", &conn->c_faddr, wc->status,
&conn->c_faddr, wc.status, ib_wc_status_msg(wc->status));
ib_wc_status_msg(wc.status));
}
} }
} }
......
...@@ -43,7 +43,6 @@ static const char *const rds_ib_stat_names[] = { ...@@ -43,7 +43,6 @@ static const char *const rds_ib_stat_names[] = {
"ib_connect_raced", "ib_connect_raced",
"ib_listen_closed_stale", "ib_listen_closed_stale",
"s_ib_evt_handler_call", "s_ib_evt_handler_call",
"ib_tx_cq_call",
"ib_tasklet_call", "ib_tasklet_call",
"ib_tx_cq_event", "ib_tx_cq_event",
"ib_tx_ring_full", "ib_tx_ring_full",
......
...@@ -432,6 +432,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -432,6 +432,7 @@ int rds_send_xmit(struct rds_connection *conn)
out: out:
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(rds_send_xmit);
static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment