Commit f046011c authored by Zach Brown's avatar Zach Brown Committed by Andy Grover

RDS/IB: track signaled sends

We're seeing bugs today where IB connection shutdown clears the send
ring while the tasklet is processing completed sends.  Implementation
details cause this to dereference a null pointer.  Shutdown needs to
wait for send completion to stop before tearing down the connection.  We
can't simply wait for the ring to empty because it may contain
unsignaled sends that will never be processed.

This patch tracks the number of signaled sends that we've posted and
waits for them to complete.  It also makes sure that the tasklet has
finished executing.
Signed-off-by: default avatarZach Brown <zach.brown@oracle.com>
parent ef87b7ea
......@@ -108,6 +108,7 @@ struct rds_ib_connection {
struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends;
atomic_t i_signaled_sends;
/* rx */
struct tasklet_struct i_recv_tasklet;
......
......@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
}
/*
* Don't wait for the send ring to be empty -- there may be completed
* non-signaled entries sitting on there. We unmap these below.
* We want to wait for tx and rx completion to finish
* before we tear down the connection, but we have to be
* careful not to get stuck waiting on a send ring that
* only has unsignaled sends in it. We've shutdown new
* sends before getting here so by waiting for signaled
* sends to complete we're ensured that there will be no
* more tx processing.
*/
wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_recv_ring));
rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_recv_tasklet);
if (ic->i_send_hdrs)
ib_dma_free_coherent(dev,
......@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
#endif
atomic_set(&ic->i_signaled_sends, 0);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
......
......@@ -219,6 +219,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
}
}
/*
* The only fast path caller always has a non-zero nr, so we don't
* bother testing nr before performing the atomic sub.
*/
static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
{
if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
waitqueue_active(&rds_ib_ring_empty_wait))
wake_up(&rds_ib_ring_empty_wait);
BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
}
/*
* The _oldest/_free ring operations here race cleanly with the alloc/unalloc
* operations performed in the send path. As the sender allocs and potentially
......@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
u32 oldest;
u32 i = 0;
int ret;
int nr_sig = 0;
rdsdebug("cq %p conn %p\n", cq, conn);
rds_ib_stats_inc(s_ib_tx_cq_call);
......@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest];
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++;
rm = rds_ib_send_unmap_op(ic, send, wc.status);
......@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
}
rds_ib_ring_free(&ic->i_send_ring, completed);
rds_ib_sub_signaled(ic, nr_sig);
nr_sig = 0;
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))
......@@ -440,7 +457,7 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
}
static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
struct rds_ib_send_work *send,
bool notify)
{
......@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
if (ic->i_unsignaled_wrs-- == 0 || notify) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags |= IB_SEND_SIGNALED;
return 1;
}
return 0;
}
/*
......@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
int bytes_sent = 0;
int ret;
int flow_controlled = 0;
int nr_sig = 0;
BUG_ON(off % RDS_FRAG_SIZE);
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
......@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++;
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
......@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && i < credit_alloc)
rds_ib_send_add_credits(conn, credit_alloc - i);
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
/* XXX need to worry about failed_wr and partial sends. */
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
......@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
if (prev->s_op) {
ic->i_data_op = prev->s_op;
prev->s_op = NULL;
......@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
u32 pos;
u32 work_alloc;
int ret;
int nr_sig = 0;
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
......@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
send->s_wr.wr.atomic.compare_add = op->op_swap_add;
send->s_wr.wr.atomic.swap = 0;
}
rds_ib_set_wr_signal_state(ic, send, op->op_notify);
nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.num_sge = 1;
send->s_wr.next = NULL;
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
......@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
send->s_sge[0].addr, send->s_sge[0].length);
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
failed_wr = &send->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
......@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
......@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
int sent;
int ret;
int num_sge;
int nr_sig = 0;
/* map the op the first time we see it */
if (!op->op_mapped) {
......@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
send->s_queued = jiffies;
send->s_op = NULL;
rds_ib_set_wr_signal_state(ic, send, op->op_notify);
nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
send->s_wr.wr.rdma.remote_addr = remote_addr;
......@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
work_alloc = i;
}
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
......@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment