Commit 241eef3e authored by Andy Grover's avatar Andy Grover

RDS: Implement silent atomics

Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent d37c9359
...@@ -336,7 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits); ...@@ -336,7 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
u32 *adv_credits, int need_posted, int max_posted); u32 *adv_credits, int need_posted, int max_posted);
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm);
/* ib_stats.c */ /* ib_stats.c */
DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
......
...@@ -225,15 +225,12 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) ...@@ -225,15 +225,12 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
/* In the error case, wc.opcode sometimes contains garbage */ /* In the error case, wc.opcode sometimes contains garbage */
switch (send->s_wr.opcode) { switch (send->s_wr.opcode) {
case IB_WR_SEND: case IB_WR_SEND:
if (send->s_rm)
rds_ib_send_unmap_rm(ic, send, wc.status);
break;
case IB_WR_RDMA_WRITE: case IB_WR_RDMA_WRITE:
case IB_WR_RDMA_READ: case IB_WR_RDMA_READ:
case IB_WR_ATOMIC_FETCH_AND_ADD: case IB_WR_ATOMIC_FETCH_AND_ADD:
case IB_WR_ATOMIC_CMP_AND_SWP: case IB_WR_ATOMIC_CMP_AND_SWP:
/* Nothing to be done - the SG list will be unmapped if (send->s_rm)
* when the SEND completes. */ rds_ib_send_unmap_rm(ic, send, wc.status);
break; break;
default: default:
if (printk_ratelimit()) if (printk_ratelimit())
...@@ -425,6 +422,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) ...@@ -425,6 +422,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
} }
static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
struct rds_ib_send_work *send,
bool notify)
{
/*
* We want to delay signaling completions just enough to get
* the batching benefits but not so much that we create dead time
* on the wire.
*/
if (ic->i_unsignaled_wrs-- == 0 || notify) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags |= IB_SEND_SIGNALED;
}
}
/* /*
* This can be called multiple times for a given message. The first time * This can be called multiple times for a given message. The first time
* we see a message we map its scatterlist into the IB device so that * we see a message we map its scatterlist into the IB device so that
...@@ -517,7 +529,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -517,7 +529,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
rm->data.m_count = 0; rm->data.m_count = 0;
} }
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
rds_message_addref(rm); rds_message_addref(rm);
ic->i_rm = rm; ic->i_rm = rm;
...@@ -608,15 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -608,15 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
} }
} }
/* rds_ib_set_wr_signal_state(ic, send, 0);
* We want to delay signaling completions just enough to get
* the batching benefits but not so much that we create dead time
* on the wire.
*/
if (ic->i_unsignaled_wrs-- == 0) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
}
/* /*
* Always signal the last one if we're stopping due to flow control. * Always signal the last one if we're stopping due to flow control.
...@@ -656,7 +659,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -656,7 +659,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
/* if we finished the message then send completion owns it */ /* if we finished the message then send completion owns it */
if (scat == &rm->data.m_sg[rm->data.m_count]) { if (scat == &rm->data.m_sg[rm->data.m_count]) {
prev->s_rm = ic->i_rm; prev->s_rm = ic->i_rm;
prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; prev->s_wr.send_flags |= IB_SEND_SOLICITED;
ic->i_rm = NULL; ic->i_rm = NULL;
} }
...@@ -698,9 +701,10 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -698,9 +701,10 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
* A simplified version of the rdma case, we always map 1 SG, and * A simplified version of the rdma case, we always map 1 SG, and
* only 8 bytes, for the return value from the atomic operation. * only 8 bytes, for the return value from the atomic operation.
*/ */
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
{ {
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rm_atomic_op *op = &rm->atomic;
struct rds_ib_send_work *send = NULL; struct rds_ib_send_work *send = NULL;
struct ib_send_wr *failed_wr; struct ib_send_wr *failed_wr;
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev;
...@@ -731,12 +735,20 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) ...@@ -731,12 +735,20 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
send->s_wr.wr.atomic.compare_add = op->op_swap_add; send->s_wr.wr.atomic.compare_add = op->op_swap_add;
send->s_wr.wr.atomic.swap = 0; send->s_wr.wr.atomic.swap = 0;
} }
send->s_wr.send_flags = IB_SEND_SIGNALED; rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.num_sge = 1; send->s_wr.num_sge = 1;
send->s_wr.next = NULL; send->s_wr.next = NULL;
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
send->s_wr.wr.atomic.rkey = op->op_rkey; send->s_wr.wr.atomic.rkey = op->op_rkey;
/*
* If there is no data or rdma ops in the message, then
* we must fill in s_rm ourselves, so we properly clean up
* on completion.
*/
if (!rm->rdma.m_rdma_op.r_active && !rm->data.op_active)
send->s_rm = rm;
/* map 8 byte retval buffer to the device */ /* map 8 byte retval buffer to the device */
ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
...@@ -836,14 +848,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) ...@@ -836,14 +848,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) { for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
send->s_wr.send_flags = 0; send->s_wr.send_flags = 0;
send->s_queued = jiffies; send->s_queued = jiffies;
/*
* We want to delay signaling completions just enough to get rds_ib_set_wr_signal_state(ic, send, op->r_notify);
* the batching benefits but not so much that we create dead time on the wire.
*/
if (ic->i_unsignaled_wrs-- == 0) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags = IB_SEND_SIGNALED;
}
send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
send->s_wr.wr.rdma.remote_addr = remote_addr; send->s_wr.wr.rdma.remote_addr = remote_addr;
...@@ -884,10 +890,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) ...@@ -884,10 +890,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
send = ic->i_sends; send = ic->i_sends;
} }
/* if we finished the message then send completion owns it */
if (scat == &op->r_sg[op->r_count])
prev->s_wr.send_flags = IB_SEND_SIGNALED;
if (i < work_alloc) { if (i < work_alloc) {
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
work_alloc = i; work_alloc = i;
......
...@@ -325,6 +325,8 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov, ...@@ -325,6 +325,8 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
sg++; sg++;
} }
rm->data.op_active = 1;
out: out:
return ret; return ret;
} }
......
...@@ -341,6 +341,7 @@ struct rds_message { ...@@ -341,6 +341,7 @@ struct rds_message {
struct rds_mr *m_rdma_mr; struct rds_mr *m_rdma_mr;
} rdma; } rdma;
struct rm_data_op { struct rm_data_op {
unsigned int op_active:1;
unsigned int m_nents; unsigned int m_nents;
unsigned int m_count; unsigned int m_count;
struct scatterlist *m_sg; struct scatterlist *m_sg;
...@@ -418,7 +419,7 @@ struct rds_transport { ...@@ -418,7 +419,7 @@ struct rds_transport {
int (*xmit_cong_map)(struct rds_connection *conn, int (*xmit_cong_map)(struct rds_connection *conn,
struct rds_cong_map *map, unsigned long offset); struct rds_cong_map *map, unsigned long offset);
int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op); int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm);
int (*recv)(struct rds_connection *conn); int (*recv)(struct rds_connection *conn);
int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
size_t size); size_t size);
......
...@@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn)
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); ret = conn->c_trans->xmit_atomic(conn, rm);
if (ret) if (ret)
break; break;
conn->c_xmit_atomic_sent = 1; conn->c_xmit_atomic_sent = 1;
...@@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn)
if (ret) if (ret)
break; break;
conn->c_xmit_rdma_sent = 1; conn->c_xmit_rdma_sent = 1;
/* rdmas need data sent, even if just the header */
rm->data.op_active = 1;
/* The transport owns the mapped memory for now. /* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */ * You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags); set_bit(RDS_MSG_MAPPED, &rm->m_flags);
} }
if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || if (rm->data.op_active
conn->c_xmit_sg < rm->data.m_nents) { && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
conn->c_xmit_sg < rm->data.m_nents)) {
ret = conn->c_trans->xmit(conn, rm, ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off, conn->c_xmit_hdr_off,
conn->c_xmit_sg, conn->c_xmit_sg,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment