Commit 59fae4de authored by Christoph Hellwig's avatar Christoph Hellwig

IB/srpt: chain RDMA READ/WRITE requests

Remove struct rdma_iu and instead allocate the struct ib_rdma_wr array
early and fill out directly.  This allows us to chain the WRs, and thus
archives both less lock contention on the HCA workqueue as well as much
simpler error handling.
Signed-off-by: default avatarChristoph Hellwig <hch@lst.de>
Reviewed-by: default avatarBart Van Assche <bart.vanassche@sandisk.com>
parent 14d3a3b2
......@@ -93,6 +93,8 @@ MODULE_PARM_DESC(srpt_service_guid,
static struct ib_client srpt_client;
static void srpt_release_channel(struct srpt_rdma_ch *ch);
static int srpt_queue_status(struct se_cmd *cmd);
static void srpt_recv_done(struct ib_cq *cq, struct ib_wc *wc);
static void srpt_send_done(struct ib_cq *cq, struct ib_wc *wc);
/**
* opposite_dma_dir() - Swap DMA_TO_DEVICE and DMA_FROM_DEVICE.
......@@ -778,12 +780,12 @@ static int srpt_post_recv(struct srpt_device *sdev,
struct ib_recv_wr wr, *bad_wr;
BUG_ON(!sdev);
wr.wr_id = encode_wr_id(SRPT_RECV, ioctx->ioctx.index);
list.addr = ioctx->ioctx.dma;
list.length = srp_max_req_size;
list.lkey = sdev->pd->local_dma_lkey;
ioctx->ioctx.cqe.done = srpt_recv_done;
wr.wr_cqe = &ioctx->ioctx.cqe;
wr.next = NULL;
wr.sg_list = &list;
wr.num_sge = 1;
......@@ -819,8 +821,9 @@ static int srpt_post_send(struct srpt_rdma_ch *ch,
list.length = len;
list.lkey = sdev->pd->local_dma_lkey;
ioctx->ioctx.cqe.done = srpt_send_done;
wr.next = NULL;
wr.wr_id = encode_wr_id(SRPT_SEND, ioctx->ioctx.index);
wr.wr_cqe = &ioctx->ioctx.cqe;
wr.sg_list = &list;
wr.num_sge = 1;
wr.opcode = IB_WR_SEND;
......@@ -1052,13 +1055,13 @@ static void srpt_unmap_sg_to_ib_sge(struct srpt_rdma_ch *ch,
BUG_ON(!ch);
BUG_ON(!ioctx);
BUG_ON(ioctx->n_rdma && !ioctx->rdma_ius);
BUG_ON(ioctx->n_rdma && !ioctx->rdma_wrs);
while (ioctx->n_rdma)
kfree(ioctx->rdma_ius[--ioctx->n_rdma].sge);
kfree(ioctx->rdma_wrs[--ioctx->n_rdma].wr.sg_list);
kfree(ioctx->rdma_ius);
ioctx->rdma_ius = NULL;
kfree(ioctx->rdma_wrs);
ioctx->rdma_wrs = NULL;
if (ioctx->mapped_sg_count) {
sg = ioctx->sg;
......@@ -1082,7 +1085,7 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
struct scatterlist *sg, *sg_orig;
int sg_cnt;
enum dma_data_direction dir;
struct rdma_iu *riu;
struct ib_rdma_wr *riu;
struct srp_direct_buf *db;
dma_addr_t dma_addr;
struct ib_sge *sge;
......@@ -1109,23 +1112,24 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
ioctx->mapped_sg_count = count;
if (ioctx->rdma_ius && ioctx->n_rdma_ius)
nrdma = ioctx->n_rdma_ius;
if (ioctx->rdma_wrs && ioctx->n_rdma_wrs)
nrdma = ioctx->n_rdma_wrs;
else {
nrdma = (count + SRPT_DEF_SG_PER_WQE - 1) / SRPT_DEF_SG_PER_WQE
+ ioctx->n_rbuf;
ioctx->rdma_ius = kzalloc(nrdma * sizeof *riu, GFP_KERNEL);
if (!ioctx->rdma_ius)
ioctx->rdma_wrs = kcalloc(nrdma, sizeof(*ioctx->rdma_wrs),
GFP_KERNEL);
if (!ioctx->rdma_wrs)
goto free_mem;
ioctx->n_rdma_ius = nrdma;
ioctx->n_rdma_wrs = nrdma;
}
db = ioctx->rbufs;
tsize = cmd->data_length;
dma_len = ib_sg_dma_len(dev, &sg[0]);
riu = ioctx->rdma_ius;
riu = ioctx->rdma_wrs;
/*
* For each remote desc - calculate the #ib_sge.
......@@ -1139,9 +1143,9 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
j < count && i < ioctx->n_rbuf && tsize > 0; ++i, ++riu, ++db) {
rsize = be32_to_cpu(db->len);
raddr = be64_to_cpu(db->va);
riu->raddr = raddr;
riu->remote_addr = raddr;
riu->rkey = be32_to_cpu(db->key);
riu->sge_cnt = 0;
riu->wr.num_sge = 0;
/* calculate how many sge required for this remote_buf */
while (rsize > 0 && tsize > 0) {
......@@ -1165,33 +1169,35 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
rsize = 0;
}
++riu->sge_cnt;
++riu->wr.num_sge;
if (rsize > 0 && riu->sge_cnt == SRPT_DEF_SG_PER_WQE) {
if (rsize > 0 &&
riu->wr.num_sge == SRPT_DEF_SG_PER_WQE) {
++ioctx->n_rdma;
riu->sge =
kmalloc(riu->sge_cnt * sizeof *riu->sge,
GFP_KERNEL);
if (!riu->sge)
riu->wr.sg_list = kmalloc_array(riu->wr.num_sge,
sizeof(*riu->wr.sg_list),
GFP_KERNEL);
if (!riu->wr.sg_list)
goto free_mem;
++riu;
riu->sge_cnt = 0;
riu->raddr = raddr;
riu->wr.num_sge = 0;
riu->remote_addr = raddr;
riu->rkey = be32_to_cpu(db->key);
}
}
++ioctx->n_rdma;
riu->sge = kmalloc(riu->sge_cnt * sizeof *riu->sge,
GFP_KERNEL);
if (!riu->sge)
riu->wr.sg_list = kmalloc_array(riu->wr.num_sge,
sizeof(*riu->wr.sg_list),
GFP_KERNEL);
if (!riu->wr.sg_list)
goto free_mem;
}
db = ioctx->rbufs;
tsize = cmd->data_length;
riu = ioctx->rdma_ius;
riu = ioctx->rdma_wrs;
sg = sg_orig;
dma_len = ib_sg_dma_len(dev, &sg[0]);
dma_addr = ib_sg_dma_address(dev, &sg[0]);
......@@ -1200,7 +1206,7 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
for (i = 0, j = 0;
j < count && i < ioctx->n_rbuf && tsize > 0; ++i, ++riu, ++db) {
rsize = be32_to_cpu(db->len);
sge = riu->sge;
sge = riu->wr.sg_list;
k = 0;
while (rsize > 0 && tsize > 0) {
......@@ -1232,9 +1238,9 @@ static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
}
++k;
if (k == riu->sge_cnt && rsize > 0 && tsize > 0) {
if (k == riu->wr.num_sge && rsize > 0 && tsize > 0) {
++riu;
sge = riu->sge;
sge = riu->wr.sg_list;
k = 0;
} else if (rsize > 0 && tsize > 0)
++sge;
......@@ -1277,8 +1283,8 @@ static struct srpt_send_ioctx *srpt_get_send_ioctx(struct srpt_rdma_ch *ch)
ioctx->n_rbuf = 0;
ioctx->rbufs = NULL;
ioctx->n_rdma = 0;
ioctx->n_rdma_ius = 0;
ioctx->rdma_ius = NULL;
ioctx->n_rdma_wrs = 0;
ioctx->rdma_wrs = NULL;
ioctx->mapped_sg_count = 0;
init_completion(&ioctx->tx_done);
ioctx->queue_status_only = false;
......@@ -1380,118 +1386,44 @@ static int srpt_abort_cmd(struct srpt_send_ioctx *ioctx)
}
/**
* srpt_handle_send_err_comp() - Process an IB_WC_SEND error completion.
*/
static void srpt_handle_send_err_comp(struct srpt_rdma_ch *ch, u64 wr_id)
{
struct srpt_send_ioctx *ioctx;
enum srpt_command_state state;
u32 index;
atomic_inc(&ch->sq_wr_avail);
index = idx_from_wr_id(wr_id);
ioctx = ch->ioctx_ring[index];
state = srpt_get_cmd_state(ioctx);
WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
&& state != SRPT_STATE_MGMT_RSP_SENT
&& state != SRPT_STATE_NEED_DATA
&& state != SRPT_STATE_DONE);
/* If SRP_RSP sending failed, undo the ch->req_lim change. */
if (state == SRPT_STATE_CMD_RSP_SENT
|| state == SRPT_STATE_MGMT_RSP_SENT)
atomic_dec(&ch->req_lim);
srpt_abort_cmd(ioctx);
}
/**
* srpt_handle_send_comp() - Process an IB send completion notification.
*/
static void srpt_handle_send_comp(struct srpt_rdma_ch *ch,
struct srpt_send_ioctx *ioctx)
{
enum srpt_command_state state;
atomic_inc(&ch->sq_wr_avail);
state = srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
if (WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
&& state != SRPT_STATE_MGMT_RSP_SENT
&& state != SRPT_STATE_DONE))
pr_debug("state = %d\n", state);
if (state != SRPT_STATE_DONE) {
srpt_unmap_sg_to_ib_sge(ch, ioctx);
transport_generic_free_cmd(&ioctx->cmd, 0);
} else {
pr_err("IB completion has been received too late for"
" wr_id = %u.\n", ioctx->ioctx.index);
}
}
/**
* srpt_handle_rdma_comp() - Process an IB RDMA completion notification.
*
* XXX: what is now target_execute_cmd used to be asynchronous, and unmapping
* the data that has been transferred via IB RDMA had to be postponed until the
* check_stop_free() callback. None of this is necessary anymore and needs to
* be cleaned up.
*/
static void srpt_handle_rdma_comp(struct srpt_rdma_ch *ch,
struct srpt_send_ioctx *ioctx,
enum srpt_opcode opcode)
static void srpt_rdma_read_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct srpt_rdma_ch *ch = cq->cq_context;
struct srpt_send_ioctx *ioctx =
container_of(wc->wr_cqe, struct srpt_send_ioctx, ioctx.cqe);
WARN_ON(ioctx->n_rdma <= 0);
atomic_add(ioctx->n_rdma, &ch->sq_wr_avail);
if (opcode == SRPT_RDMA_READ_LAST) {
if (srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA,
SRPT_STATE_DATA_IN))
target_execute_cmd(&ioctx->cmd);
else
pr_err("%s[%d]: wrong state = %d\n", __func__,
__LINE__, srpt_get_cmd_state(ioctx));
} else if (opcode == SRPT_RDMA_ABORT) {
ioctx->rdma_aborted = true;
} else {
WARN(true, "unexpected opcode %d\n", opcode);
if (unlikely(wc->status != IB_WC_SUCCESS)) {
pr_info("RDMA_READ for ioctx 0x%p failed with status %d\n",
ioctx, wc->status);
srpt_abort_cmd(ioctx);
return;
}
if (srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA,
SRPT_STATE_DATA_IN))
target_execute_cmd(&ioctx->cmd);
else
pr_err("%s[%d]: wrong state = %d\n", __func__,
__LINE__, srpt_get_cmd_state(ioctx));
}
/**
* srpt_handle_rdma_err_comp() - Process an IB RDMA error completion.
*/
static void srpt_handle_rdma_err_comp(struct srpt_rdma_ch *ch,
struct srpt_send_ioctx *ioctx,
enum srpt_opcode opcode)
static void srpt_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
{
enum srpt_command_state state;
struct srpt_send_ioctx *ioctx =
container_of(wc->wr_cqe, struct srpt_send_ioctx, ioctx.cqe);
state = srpt_get_cmd_state(ioctx);
switch (opcode) {
case SRPT_RDMA_READ_LAST:
if (ioctx->n_rdma <= 0) {
pr_err("Received invalid RDMA read"
" error completion with idx %d\n",
ioctx->ioctx.index);
break;
}
atomic_add(ioctx->n_rdma, &ch->sq_wr_avail);
if (state == SRPT_STATE_NEED_DATA)
srpt_abort_cmd(ioctx);
else
pr_err("%s[%d]: wrong state = %d\n",
__func__, __LINE__, state);
break;
case SRPT_RDMA_WRITE_LAST:
break;
default:
pr_err("%s[%d]: opcode = %u\n", __func__, __LINE__, opcode);
break;
if (unlikely(wc->status != IB_WC_SUCCESS)) {
pr_info("RDMA_WRITE for ioctx 0x%p failed with status %d\n",
ioctx, wc->status);
srpt_abort_cmd(ioctx);
}
}
......@@ -1926,32 +1858,26 @@ static void srpt_handle_new_iu(struct srpt_rdma_ch *ch,
return;
}
static void srpt_process_rcv_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
struct ib_wc *wc)
static void srpt_recv_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct srpt_device *sdev = ch->sport->sdev;
struct srpt_recv_ioctx *ioctx;
u32 index;
struct srpt_rdma_ch *ch = cq->cq_context;
struct srpt_recv_ioctx *ioctx =
container_of(wc->wr_cqe, struct srpt_recv_ioctx, ioctx.cqe);
index = idx_from_wr_id(wc->wr_id);
if (wc->status == IB_WC_SUCCESS) {
int req_lim;
req_lim = atomic_dec_return(&ch->req_lim);
if (unlikely(req_lim < 0))
pr_err("req_lim = %d < 0\n", req_lim);
ioctx = sdev->ioctx_ring[index];
srpt_handle_new_iu(ch, ioctx, NULL);
} else {
pr_info("receiving failed for idx %u with status %d\n",
index, wc->status);
pr_info("receiving failed for ioctx %p with status %d\n",
ioctx, wc->status);
}
}
/**
* srpt_process_send_completion() - Process an IB send completion.
*
* Note: Although this has not yet been observed during tests, at least in
* theory it is possible that the srpt_get_send_ioctx() call invoked by
* srpt_handle_new_iu() fails. This is possible because the req_lim_delta
......@@ -1964,108 +1890,51 @@ static void srpt_process_rcv_completion(struct ib_cq *cq,
* are queued on cmd_wait_list. The code below processes these delayed
* requests one at a time.
*/
static void srpt_process_send_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
struct ib_wc *wc)
static void srpt_send_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct srpt_send_ioctx *send_ioctx;
uint32_t index;
enum srpt_opcode opcode;
struct srpt_rdma_ch *ch = cq->cq_context;
struct srpt_send_ioctx *ioctx =
container_of(wc->wr_cqe, struct srpt_send_ioctx, ioctx.cqe);
enum srpt_command_state state;
index = idx_from_wr_id(wc->wr_id);
opcode = opcode_from_wr_id(wc->wr_id);
send_ioctx = ch->ioctx_ring[index];
if (wc->status == IB_WC_SUCCESS) {
if (opcode == SRPT_SEND)
srpt_handle_send_comp(ch, send_ioctx);
else {
WARN_ON(opcode != SRPT_RDMA_ABORT &&
wc->opcode != IB_WC_RDMA_READ);
srpt_handle_rdma_comp(ch, send_ioctx, opcode);
}
state = srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
WARN_ON(state != SRPT_STATE_CMD_RSP_SENT &&
state != SRPT_STATE_MGMT_RSP_SENT);
atomic_inc(&ch->sq_wr_avail);
if (wc->status != IB_WC_SUCCESS) {
pr_info("sending response for ioctx 0x%p failed"
" with status %d\n", ioctx, wc->status);
atomic_dec(&ch->req_lim);
srpt_abort_cmd(ioctx);
goto out;
}
if (state != SRPT_STATE_DONE) {
srpt_unmap_sg_to_ib_sge(ch, ioctx);
transport_generic_free_cmd(&ioctx->cmd, 0);
} else {
if (opcode == SRPT_SEND) {
pr_info("sending response for idx %u failed"
" with status %d\n", index, wc->status);
srpt_handle_send_err_comp(ch, wc->wr_id);
} else if (opcode != SRPT_RDMA_MID) {
pr_info("RDMA t %d for idx %u failed with"
" status %d\n", opcode, index, wc->status);
srpt_handle_rdma_err_comp(ch, send_ioctx, opcode);
}
pr_err("IB completion has been received too late for"
" wr_id = %u.\n", ioctx->ioctx.index);
}
while (unlikely(opcode == SRPT_SEND
&& !list_empty(&ch->cmd_wait_list)
&& srpt_get_ch_state(ch) == CH_LIVE
&& (send_ioctx = srpt_get_send_ioctx(ch)) != NULL)) {
out:
while (!list_empty(&ch->cmd_wait_list) &&
srpt_get_ch_state(ch) == CH_LIVE &&
(ioctx = srpt_get_send_ioctx(ch)) != NULL) {
struct srpt_recv_ioctx *recv_ioctx;
recv_ioctx = list_first_entry(&ch->cmd_wait_list,
struct srpt_recv_ioctx,
wait_list);
list_del(&recv_ioctx->wait_list);
srpt_handle_new_iu(ch, recv_ioctx, send_ioctx);
srpt_handle_new_iu(ch, recv_ioctx, ioctx);
}
}
static void srpt_process_completion(struct ib_cq *cq, struct srpt_rdma_ch *ch)
{
struct ib_wc *const wc = ch->wc;
int i, n;
WARN_ON(cq != ch->cq);
ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
while ((n = ib_poll_cq(cq, ARRAY_SIZE(ch->wc), wc)) > 0) {
for (i = 0; i < n; i++) {
if (opcode_from_wr_id(wc[i].wr_id) == SRPT_RECV)
srpt_process_rcv_completion(cq, ch, &wc[i]);
else
srpt_process_send_completion(cq, ch, &wc[i]);
}
}
}
/**
* srpt_completion() - IB completion queue callback function.
*
* Notes:
* - It is guaranteed that a completion handler will never be invoked
* concurrently on two different CPUs for the same completion queue. See also
* Documentation/infiniband/core_locking.txt and the implementation of
* handle_edge_irq() in kernel/irq/chip.c.
* - When threaded IRQs are enabled, completion handlers are invoked in thread
* context instead of interrupt context.
*/
static void srpt_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
wake_up_interruptible(&ch->wait_queue);
}
static int srpt_compl_thread(void *arg)
{
struct srpt_rdma_ch *ch;
/* Hibernation / freezing of the SRPT kernel thread is not supported. */
current->flags |= PF_NOFREEZE;
ch = arg;
BUG_ON(!ch);
pr_info("Session %s: kernel thread %s (PID %d) started\n",
ch->sess_name, ch->thread->comm, current->pid);
while (!kthread_should_stop()) {
wait_event_interruptible(ch->wait_queue,
(srpt_process_completion(ch->cq, ch),
kthread_should_stop()));
}
pr_info("Session %s: kernel thread %s (PID %d) stopped\n",
ch->sess_name, ch->thread->comm, current->pid);
return 0;
}
/**
* srpt_create_ch_ib() - Create receive and send completion queues.
*/
......@@ -2075,7 +1944,6 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
struct srpt_port *sport = ch->sport;
struct srpt_device *sdev = sport->sdev;
u32 srp_sq_size = sport->port_attrib.srp_sq_size;
struct ib_cq_init_attr cq_attr = {};
int ret;
WARN_ON(ch->rq_size < 1);
......@@ -2086,9 +1954,8 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
goto out;
retry:
cq_attr.cqe = ch->rq_size + srp_sq_size;
ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch,
&cq_attr);
ch->cq = ib_alloc_cq(sdev->device, ch, ch->rq_size + srp_sq_size,
0 /* XXX: spread CQs */, IB_POLL_WORKQUEUE);
if (IS_ERR(ch->cq)) {
ret = PTR_ERR(ch->cq);
pr_err("failed to create CQ cqe= %d ret= %d\n",
......@@ -2131,18 +1998,6 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
if (ret)
goto err_destroy_qp;
init_waitqueue_head(&ch->wait_queue);
pr_debug("creating thread for session %s\n", ch->sess_name);
ch->thread = kthread_run(srpt_compl_thread, ch, "ib_srpt_compl");
if (IS_ERR(ch->thread)) {
pr_err("failed to create kernel thread %ld\n",
PTR_ERR(ch->thread));
ch->thread = NULL;
goto err_destroy_qp;
}
out:
kfree(qp_init);
return ret;
......@@ -2150,17 +2005,14 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
err_destroy_qp:
ib_destroy_qp(ch->qp);
err_destroy_cq:
ib_destroy_cq(ch->cq);
ib_free_cq(ch->cq);
goto out;
}
static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch)
{
if (ch->thread)
kthread_stop(ch->thread);
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->cq);
ib_free_cq(ch->cq);
}
/**
......@@ -2821,12 +2673,8 @@ static int srpt_cm_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
static int srpt_perform_rdmas(struct srpt_rdma_ch *ch,
struct srpt_send_ioctx *ioctx)
{
struct ib_rdma_wr wr;
struct ib_send_wr *bad_wr;
struct rdma_iu *riu;
int i;
int ret;
int sq_wr_avail;
int sq_wr_avail, ret, i;
enum dma_data_direction dir;
const int n_rdma = ioctx->n_rdma;
......@@ -2842,59 +2690,32 @@ static int srpt_perform_rdmas(struct srpt_rdma_ch *ch,
}
}
ioctx->rdma_aborted = false;
ret = 0;
riu = ioctx->rdma_ius;
memset(&wr, 0, sizeof wr);
for (i = 0; i < n_rdma; ++i, ++riu) {
if (dir == DMA_FROM_DEVICE) {
wr.wr.opcode = IB_WR_RDMA_WRITE;
wr.wr.wr_id = encode_wr_id(i == n_rdma - 1 ?
SRPT_RDMA_WRITE_LAST :
SRPT_RDMA_MID,
ioctx->ioctx.index);
} else {
wr.wr.opcode = IB_WR_RDMA_READ;
wr.wr.wr_id = encode_wr_id(i == n_rdma - 1 ?
SRPT_RDMA_READ_LAST :
SRPT_RDMA_MID,
ioctx->ioctx.index);
}
wr.wr.next = NULL;
wr.remote_addr = riu->raddr;
wr.rkey = riu->rkey;
wr.wr.num_sge = riu->sge_cnt;
wr.wr.sg_list = riu->sge;
for (i = 0; i < n_rdma; i++) {
struct ib_send_wr *wr = &ioctx->rdma_wrs[i].wr;
/* only get completion event for the last rdma write */
if (i == (n_rdma - 1) && dir == DMA_TO_DEVICE)
wr.wr.send_flags = IB_SEND_SIGNALED;
wr->opcode = (dir == DMA_FROM_DEVICE) ?
IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
ret = ib_post_send(ch->qp, &wr.wr, &bad_wr);
if (ret)
break;
if (i == n_rdma - 1) {
/* only get completion event for the last rdma read */
if (dir == DMA_TO_DEVICE) {
wr->send_flags = IB_SEND_SIGNALED;
ioctx->rdma_cqe.done = srpt_rdma_read_done;
} else {
ioctx->rdma_cqe.done = srpt_rdma_write_done;
}
wr->wr_cqe = &ioctx->rdma_cqe;
wr->next = NULL;
} else {
wr->wr_cqe = NULL;
wr->next = &ioctx->rdma_wrs[i + 1].wr;
}
}
ret = ib_post_send(ch->qp, &ioctx->rdma_wrs->wr, &bad_wr);
if (ret)
pr_err("%s[%d]: ib_post_send() returned %d for %d/%d\n",
__func__, __LINE__, ret, i, n_rdma);
if (ret && i > 0) {
wr.wr.num_sge = 0;
wr.wr.wr_id = encode_wr_id(SRPT_RDMA_ABORT, ioctx->ioctx.index);
wr.wr.send_flags = IB_SEND_SIGNALED;
while (ch->state == CH_LIVE &&
ib_post_send(ch->qp, &wr.wr, &bad_wr) != 0) {
pr_info("Trying to abort failed RDMA transfer [%d]\n",
ioctx->ioctx.index);
msleep(1000);
}
while (ch->state != CH_RELEASING && !ioctx->rdma_aborted) {
pr_info("Waiting until RDMA abort finished [%d]\n",
ioctx->ioctx.index);
msleep(1000);
}
}
out:
if (unlikely(dir == DMA_TO_DEVICE && ret < 0))
atomic_add(n_rdma, &ch->sq_wr_avail);
......
......@@ -128,36 +128,6 @@ enum {
DEFAULT_MAX_RDMA_SIZE = 65536,
};
enum srpt_opcode {
SRPT_RECV,
SRPT_SEND,
SRPT_RDMA_MID,
SRPT_RDMA_ABORT,
SRPT_RDMA_READ_LAST,
SRPT_RDMA_WRITE_LAST,
};
static inline u64 encode_wr_id(u8 opcode, u32 idx)
{
return ((u64)opcode << 32) | idx;
}
static inline enum srpt_opcode opcode_from_wr_id(u64 wr_id)
{
return wr_id >> 32;
}
static inline u32 idx_from_wr_id(u64 wr_id)
{
return (u32)wr_id;
}
struct rdma_iu {
u64 raddr;
u32 rkey;
struct ib_sge *sge;
u32 sge_cnt;
int mem_id;
};
/**
* enum srpt_command_state - SCSI command state managed by SRPT.
* @SRPT_STATE_NEW: New command arrived and is being processed.
......@@ -189,6 +159,7 @@ enum srpt_command_state {
* @index: Index of the I/O context in its ioctx_ring array.
*/
struct srpt_ioctx {
struct ib_cqe cqe;
void *buf;
dma_addr_t dma;
uint32_t index;
......@@ -215,32 +186,30 @@ struct srpt_recv_ioctx {
* @sg: Pointer to sg-list associated with this I/O context.
* @sg_cnt: SG-list size.
* @mapped_sg_count: ib_dma_map_sg() return value.
* @n_rdma_ius: Number of elements in the rdma_ius array.
* @rdma_ius: Array with information about the RDMA mapping.
* @n_rdma_wrs: Number of elements in the rdma_wrs array.
* @rdma_wrs: Array with information about the RDMA mapping.
* @tag: Tag of the received SRP information unit.
* @spinlock: Protects 'state'.
* @state: I/O context state.
* @rdma_aborted: If initiating a multipart RDMA transfer failed, whether
* the already initiated transfers have finished.
* @cmd: Target core command data structure.
* @sense_data: SCSI sense data.
*/
struct srpt_send_ioctx {
struct srpt_ioctx ioctx;
struct srpt_rdma_ch *ch;
struct rdma_iu *rdma_ius;
struct ib_rdma_wr *rdma_wrs;
struct ib_cqe rdma_cqe;
struct srp_direct_buf *rbufs;
struct srp_direct_buf single_rbuf;
struct scatterlist *sg;
struct list_head free_list;
spinlock_t spinlock;
enum srpt_command_state state;
bool rdma_aborted;
struct se_cmd cmd;
struct completion tx_done;
int sg_cnt;
int mapped_sg_count;
u16 n_rdma_ius;
u16 n_rdma_wrs;
u8 n_rdma;
u8 n_rbuf;
bool queue_status_only;
......@@ -267,9 +236,6 @@ enum rdma_ch_state {
/**
* struct srpt_rdma_ch - RDMA channel.
* @wait_queue: Allows the kernel thread to wait for more work.
* @thread: Kernel thread that processes the IB queues associated with
* the channel.
* @cm_id: IB CM ID associated with the channel.
* @qp: IB queue pair used for communicating over this channel.
* @cq: IB completion queue for this channel.
......@@ -299,8 +265,6 @@ enum rdma_ch_state {
* @release_done: Enables waiting for srpt_release_channel() completion.
*/
struct srpt_rdma_ch {
wait_queue_head_t wait_queue;
struct task_struct *thread;
struct ib_cm_id *cm_id;
struct ib_qp *qp;
struct ib_cq *cq;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment