Commit 627642f0 authored by David S. Miller's avatar David S. Miller

Merge branch 'net-smc-add-failover-processing'

Karsten Graul says:

====================
net/smc: add failover processing

This patch series adds the actual SMC-R link failover processing and
improved link group termination. There will be one more (very small)
series after this which will complete the SMC-R link failover support.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents fc99584e 649758ff
......@@ -390,6 +390,7 @@ static int smcr_clnt_conf_first_link(struct smc_sock *smc)
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_CL : rc;
}
smc_llc_save_peer_uid(qentry);
rc = smc_llc_eval_conf_link(qentry, SMC_LLC_REQ);
smc_llc_flow_qentry_del(&link->lgr->llc_flow_lcl);
if (rc)
......@@ -1056,6 +1057,7 @@ static int smcr_serv_conf_first_link(struct smc_sock *smc)
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_CL : rc;
}
smc_llc_save_peer_uid(qentry);
rc = smc_llc_eval_conf_link(qentry, SMC_LLC_RESP);
smc_llc_flow_qentry_del(&link->lgr->llc_flow_lcl);
if (rc)
......
......@@ -143,6 +143,9 @@ struct smc_connection {
* .prod cf. TCP snd_nxt
* .cons cf. TCP sends ack
*/
union smc_host_cursor local_tx_ctrl_fin;
/* prod crsr - confirmed by peer
*/
union smc_host_cursor tx_curs_prep; /* tx - prepared data
* snd_max..wmem_alloc
*/
......@@ -154,6 +157,7 @@ struct smc_connection {
*/
atomic_t sndbuf_space; /* remaining space in sndbuf */
u16 tx_cdc_seq; /* sequence # for CDC send */
u16 tx_cdc_seq_fin; /* sequence # - tx completed */
spinlock_t send_lock; /* protect wr_sends */
struct delayed_work tx_work; /* retry of smc_cdc_msg_send */
u32 tx_off; /* base offset in peer rmb */
......@@ -184,12 +188,14 @@ struct smc_connection {
spinlock_t acurs_lock; /* protect cursors */
#endif
struct work_struct close_work; /* peer sent some closing */
struct work_struct abort_work; /* abort the connection */
struct tasklet_struct rx_tsklet; /* Receiver tasklet for SMC-D */
u8 rx_off; /* receive offset:
* 0 for SMC-R, 32 for SMC-D
*/
u64 peer_token; /* SMC-D token of peer */
u8 killed : 1; /* abnormal termination */
u8 out_of_sync : 1; /* out of sync with peer */
};
struct smc_sock { /* smc sock container */
......
......@@ -47,17 +47,20 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
smp_mb__after_atomic();
smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
conn);
conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
}
smc_tx_sndbuf_nonfull(smc);
bh_unlock_sock(&smc->sk);
}
int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_link *link,
struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend)
{
struct smc_link *link = conn->lnk;
int rc;
rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
......@@ -104,22 +107,64 @@ int smc_cdc_msg_send(struct smc_connection *conn,
if (!rc) {
smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
} else {
conn->tx_cdc_seq--;
conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
}
return rc;
}
/* send a validation msg indicating the move of a conn to an other QP link */
int smcr_cdc_msg_send_validation(struct smc_connection *conn)
{
struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
struct smc_link *link = conn->lnk;
struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf;
struct smc_cdc_msg *peer;
int rc;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
if (rc)
return rc;
peer = (struct smc_cdc_msg *)wr_buf;
peer->common.type = local->common.type;
peer->len = local->len;
peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
peer->token = htonl(local->token);
peer->prod_flags.failover_validation = 1;
rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
return rc;
}
static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
{
struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf;
struct smc_link *link;
bool again = false;
int rc;
rc = smc_cdc_get_free_slot(conn, &wr_buf, NULL, &pend);
again:
link = conn->lnk;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
if (rc)
return rc;
spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
/* link of connection changed, try again one time*/
spin_unlock_bh(&conn->send_lock);
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
if (again)
return -ENOLINK;
again = true;
goto again;
}
rc = smc_cdc_msg_send(conn, wr_buf, pend);
spin_unlock_bh(&conn->send_lock);
return rc;
......@@ -237,6 +282,28 @@ static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
sk_send_sigurg(&smc->sk);
}
static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
struct smc_link *link)
{
struct smc_connection *conn = &smc->conn;
u16 recv_seq = ntohs(cdc->seqno);
s16 diff;
/* check that seqnum was seen before */
diff = conn->local_rx_ctrl.seqno - recv_seq;
if (diff < 0) { /* diff larger than 0x7fff */
/* drop connection */
conn->out_of_sync = 1; /* prevent any further receives */
spin_lock_bh(&conn->send_lock);
conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
conn->lnk = link;
spin_unlock_bh(&conn->send_lock);
sock_hold(&smc->sk); /* sock_put in abort_work */
if (!schedule_work(&conn->abort_work))
sock_put(&smc->sk);
}
}
static void smc_cdc_msg_recv_action(struct smc_sock *smc,
struct smc_cdc_msg *cdc)
{
......@@ -367,16 +434,19 @@ static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)
read_lock_bh(&lgr->conns_lock);
conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
read_unlock_bh(&lgr->conns_lock);
if (!conn)
if (!conn || conn->out_of_sync)
return;
smc = container_of(conn, struct smc_sock, conn);
if (!cdc->prod_flags.failover_validation) {
if (cdc->prod_flags.failover_validation) {
smc_cdc_msg_validate(smc, cdc, link);
return;
}
if (smc_cdc_before(ntohs(cdc->seqno),
conn->local_rx_ctrl.seqno))
/* received seqno is old */
return;
}
smc_cdc_msg_recv(smc, cdc);
}
......
......@@ -304,6 +304,7 @@ struct smc_cdc_tx_pend {
};
int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_link *link,
struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend);
......@@ -312,6 +313,7 @@ int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf,
struct smc_cdc_tx_pend *pend);
int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn);
int smcd_cdc_msg_send(struct smc_connection *conn);
int smcr_cdc_msg_send_validation(struct smc_connection *conn);
int smc_cdc_init(void) __init;
void smcd_cdc_rx_init(struct smc_connection *conn);
......
This diff is collapsed.
......@@ -70,6 +70,8 @@ struct smc_rdma_wr { /* work requests per message
struct ib_rdma_wr wr_tx_rdma[SMC_MAX_RDMA_WRITES];
};
#define SMC_LGR_ID_SIZE 4
struct smc_link {
struct smc_ib_device *smcibdev; /* ib-device */
u8 ibport; /* port - values 1 | 2 */
......@@ -85,6 +87,7 @@ struct smc_link {
struct smc_rdma_sges *wr_tx_rdma_sges;/*RDMA WRITE gather meta data*/
struct smc_rdma_wr *wr_tx_rdmas; /* WR RDMA WRITE */
struct smc_wr_tx_pend *wr_tx_pends; /* WR send waiting for CQE */
struct completion *wr_tx_compl; /* WR send CQE completion */
/* above four vectors have wr_tx_cnt elements and use the same index */
dma_addr_t wr_tx_dma_addr; /* DMA address of wr_tx_bufs */
atomic_long_t wr_tx_id; /* seq # of last sent WR */
......@@ -115,7 +118,10 @@ struct smc_link {
u8 peer_mac[ETH_ALEN]; /* = gid[8:10||13:15] */
u8 peer_gid[SMC_GID_SIZE]; /* gid of peer*/
u8 link_id; /* unique # within link group */
u8 link_uid[SMC_LGR_ID_SIZE]; /* unique lnk id */
u8 peer_link_uid[SMC_LGR_ID_SIZE]; /* peer uid */
u8 link_idx; /* index in lgr link array */
u8 link_is_asym; /* is link asymmetric? */
struct smc_link_group *lgr; /* parent link group */
struct work_struct link_down_wrk; /* wrk to bring link down */
......@@ -176,7 +182,6 @@ struct smc_rtoken { /* address/key of remote RMB */
u32 rkey;
};
#define SMC_LGR_ID_SIZE 4
#define SMC_BUF_MIN_SIZE 16384 /* minimum size of an RMB */
#define SMC_RMBE_SIZES 16 /* number of distinct RMBE sizes */
/* theoretically, the RFC states that largest size would be 512K,
......@@ -269,6 +274,8 @@ struct smc_link_group {
/* protects llc flow */
int llc_testlink_time;
/* link keep alive time */
u32 llc_termination_rsn;
/* rsn code for termination */
};
struct { /* SMC-D */
u64 peer_gid;
......@@ -379,7 +386,12 @@ int smcr_link_init(struct smc_link_group *lgr, struct smc_link *lnk,
void smcr_link_clear(struct smc_link *lnk);
int smcr_buf_map_lgr(struct smc_link *lnk);
int smcr_buf_reg_lgr(struct smc_link *lnk);
void smcr_lgr_set_type(struct smc_link_group *lgr, enum smc_lgr_type new_type);
void smcr_lgr_set_type_asym(struct smc_link_group *lgr,
enum smc_lgr_type new_type, int asym_lnk_idx);
int smcr_link_reg_rmb(struct smc_link *link, struct smc_buf_desc *rmb_desc);
struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
struct smc_link *from_lnk, bool is_dev_err);
void smcr_link_down_cond(struct smc_link *lnk);
void smcr_link_down_cond_sched(struct smc_link *lnk);
......
......@@ -361,7 +361,6 @@ static int smc_llc_add_pending_send(struct smc_link *link,
int smc_llc_send_confirm_link(struct smc_link *link,
enum smc_llc_reqresp reqresp)
{
struct smc_link_group *lgr = smc_get_lgr(link);
struct smc_llc_msg_confirm_link *confllc;
struct smc_wr_tx_pend_priv *pend;
struct smc_wr_buf *wr_buf;
......@@ -382,7 +381,7 @@ int smc_llc_send_confirm_link(struct smc_link *link,
memcpy(confllc->sender_gid, link->gid, SMC_GID_SIZE);
hton24(confllc->sender_qp_num, link->roce_qp->qp_num);
confllc->link_num = link->link_id;
memcpy(confllc->link_uid, lgr->id, SMC_LGR_ID_SIZE);
memcpy(confllc->link_uid, link->link_uid, SMC_LGR_ID_SIZE);
confllc->max_links = SMC_LLC_ADD_LNK_MAX_LINKS;
/* send llc message */
rc = smc_wr_tx_send(link, pend);
......@@ -560,6 +559,25 @@ static int smc_llc_send_message(struct smc_link *link, void *llcbuf)
return smc_wr_tx_send(link, pend);
}
/* schedule an llc send on link, may wait for buffers,
* and wait for send completion notification.
* @return 0 on success
*/
static int smc_llc_send_message_wait(struct smc_link *link, void *llcbuf)
{
struct smc_wr_tx_pend_priv *pend;
struct smc_wr_buf *wr_buf;
int rc;
if (!smc_link_usable(link))
return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
return rc;
memcpy(wr_buf, llcbuf, sizeof(union smc_llc_msg));
return smc_wr_tx_send_wait(link, pend, SMC_LLC_WAIT_TIME);
}
/********************************* receive ***********************************/
static int smc_llc_alloc_alt_link(struct smc_link_group *lgr,
......@@ -752,6 +770,7 @@ static int smc_llc_cli_conf_link(struct smc_link *link,
smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
return -ENOLINK;
}
smc_llc_save_peer_uid(qentry);
smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
rc = smc_ib_modify_qp_rts(link_new);
......@@ -777,7 +796,11 @@ static int smc_llc_cli_conf_link(struct smc_link *link,
return -ENOLINK;
}
smc_llc_link_active(link_new);
lgr->type = lgr_new_t;
if (lgr_new_t == SMC_LGR_ASYMMETRIC_LOCAL ||
lgr_new_t == SMC_LGR_ASYMMETRIC_PEER)
smcr_lgr_set_type_asym(lgr, lgr_new_t, link_new->link_idx);
else
smcr_lgr_set_type(lgr, lgr_new_t);
return 0;
}
......@@ -822,7 +845,8 @@ int smc_llc_cli_add_link(struct smc_link *link, struct smc_llc_qentry *qentry)
if (rc)
goto out_reject;
smc_llc_save_add_link_info(lnk_new, llc);
lnk_new->link_id = llc->link_num;
lnk_new->link_id = llc->link_num; /* SMC server assigns link id */
smc_llc_link_set_uid(lnk_new);
rc = smc_ib_ready_link(lnk_new);
if (rc)
......@@ -933,7 +957,7 @@ static void smc_llc_delete_asym_link(struct smc_link_group *lgr)
return; /* no asymmetric link */
if (!smc_link_downing(&lnk_asym->state))
return;
/* tbd: lnk_new = smc_switch_conns(lgr, lnk_asym, false); */
lnk_new = smc_switch_conns(lgr, lnk_asym, false);
smc_wr_tx_wait_no_pending_sends(lnk_asym);
if (!lnk_new)
goto out_free;
......@@ -1018,8 +1042,13 @@ static int smc_llc_srv_conf_link(struct smc_link *link,
false, SMC_LLC_DEL_LOST_PATH);
return -ENOLINK;
}
smc_llc_save_peer_uid(qentry);
smc_llc_link_active(link_new);
lgr->type = lgr_new_t;
if (lgr_new_t == SMC_LGR_ASYMMETRIC_LOCAL ||
lgr_new_t == SMC_LGR_ASYMMETRIC_PEER)
smcr_lgr_set_type_asym(lgr, lgr_new_t, link_new->link_idx);
else
smcr_lgr_set_type(lgr, lgr_new_t);
smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
return 0;
}
......@@ -1195,7 +1224,7 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr)
smc_llc_send_message(lnk, &qentry->msg); /* response */
if (smc_link_downing(&lnk_del->state)) {
/* tbd: call smc_switch_conns(lgr, lnk_del, false); */
smc_switch_conns(lgr, lnk_del, false);
smc_wr_tx_wait_no_pending_sends(lnk_del);
}
smcr_link_clear(lnk_del);
......@@ -1204,9 +1233,9 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr)
if (lnk_del == lnk_asym) {
/* expected deletion of asym link, don't change lgr state */
} else if (active_links == 1) {
lgr->type = SMC_LGR_SINGLE;
smcr_lgr_set_type(lgr, SMC_LGR_SINGLE);
} else if (!active_links) {
lgr->type = SMC_LGR_NONE;
smcr_lgr_set_type(lgr, SMC_LGR_NONE);
smc_lgr_terminate_sched(lgr);
}
out_unlock:
......@@ -1215,6 +1244,29 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr)
kfree(qentry);
}
/* try to send a DELETE LINK ALL request on any active link,
* waiting for send completion
*/
void smc_llc_send_link_delete_all(struct smc_link_group *lgr, bool ord, u32 rsn)
{
struct smc_llc_msg_del_link delllc = {0};
int i;
delllc.hd.common.type = SMC_LLC_DELETE_LINK;
delllc.hd.length = sizeof(delllc);
if (ord)
delllc.hd.flags |= SMC_LLC_FLAG_DEL_LINK_ORDERLY;
delllc.hd.flags |= SMC_LLC_FLAG_DEL_LINK_ALL;
delllc.reason = htonl(rsn);
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (!smc_link_usable(&lgr->lnk[i]))
continue;
if (!smc_llc_send_message_wait(&lgr->lnk[i], &delllc))
break;
}
}
static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
{
struct smc_llc_msg_del_link *del_llc;
......@@ -1230,6 +1282,8 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
if (qentry->msg.delete_link.hd.flags & SMC_LLC_FLAG_DEL_LINK_ALL) {
/* delete entire lgr */
smc_llc_send_link_delete_all(lgr, true, ntohl(
qentry->msg.delete_link.reason));
smc_lgr_terminate_sched(lgr);
goto out;
}
......@@ -1245,7 +1299,7 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
goto out; /* asymmetric link already deleted */
if (smc_link_downing(&lnk_del->state)) {
/* tbd: call smc_switch_conns(lgr, lnk_del, false); */
smc_switch_conns(lgr, lnk_del, false);
smc_wr_tx_wait_no_pending_sends(lnk_del);
}
if (!list_empty(&lgr->list)) {
......@@ -1270,9 +1324,9 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
active_links = smc_llc_active_link_count(lgr);
if (active_links == 1) {
lgr->type = SMC_LGR_SINGLE;
smcr_lgr_set_type(lgr, SMC_LGR_SINGLE);
} else if (!active_links) {
lgr->type = SMC_LGR_NONE;
smcr_lgr_set_type(lgr, SMC_LGR_NONE);
smc_lgr_terminate_sched(lgr);
}
......@@ -1368,6 +1422,14 @@ static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
}
static void smc_llc_protocol_violation(struct smc_link_group *lgr, u8 type)
{
pr_warn_ratelimited("smc: SMC-R lg %*phN LLC protocol violation: "
"llc_type %d\n", SMC_LGR_ID_SIZE, &lgr->id, type);
smc_llc_set_termination_rsn(lgr, SMC_LLC_DEL_PROT_VIOL);
smc_lgr_terminate_sched(lgr);
}
/* flush the llc event queue */
static void smc_llc_event_flush(struct smc_link_group *lgr)
{
......@@ -1468,6 +1530,9 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
default:
smc_llc_protocol_violation(lgr, llc->raw.hdr.common.type);
break;
}
out:
kfree(qentry);
......@@ -1527,6 +1592,9 @@ static void smc_llc_rx_response(struct smc_link *link,
case SMC_LLC_CONFIRM_RKEY_CONT:
/* not used because max links is 3 */
break;
default:
smc_llc_protocol_violation(link->lgr, llc_type);
break;
}
kfree(qentry);
}
......@@ -1709,12 +1777,29 @@ int smc_llc_do_delete_rkey(struct smc_link_group *lgr,
return rc;
}
void smc_llc_link_set_uid(struct smc_link *link)
{
__be32 link_uid;
link_uid = htonl(*((u32 *)link->lgr->id) + link->link_id);
memcpy(link->link_uid, &link_uid, SMC_LGR_ID_SIZE);
}
/* save peers link user id, used for debug purposes */
void smc_llc_save_peer_uid(struct smc_llc_qentry *qentry)
{
memcpy(qentry->link->peer_link_uid, qentry->msg.confirm_link.link_uid,
SMC_LGR_ID_SIZE);
}
/* evaluate confirm link request or response */
int smc_llc_eval_conf_link(struct smc_llc_qentry *qentry,
enum smc_llc_reqresp type)
{
if (type == SMC_LLC_REQ) /* SMC server assigns link_id */
if (type == SMC_LLC_REQ) { /* SMC server assigns link_id */
qentry->link->link_id = qentry->msg.confirm_link.link_num;
smc_llc_link_set_uid(qentry->link);
}
if (!(qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_NO_RMBE_EYEC))
return -ENOTSUPP;
return 0;
......
......@@ -60,6 +60,14 @@ static inline struct smc_link *smc_llc_usable_link(struct smc_link_group *lgr)
return NULL;
}
/* set the termination reason code for the link group */
static inline void smc_llc_set_termination_rsn(struct smc_link_group *lgr,
u32 rsn)
{
if (!lgr->llc_termination_rsn)
lgr->llc_termination_rsn = rsn;
}
/* transmit */
int smc_llc_send_confirm_link(struct smc_link *lnk,
enum smc_llc_reqresp reqresp);
......@@ -84,11 +92,15 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow);
int smc_llc_eval_conf_link(struct smc_llc_qentry *qentry,
enum smc_llc_reqresp type);
void smc_llc_link_set_uid(struct smc_link *link);
void smc_llc_save_peer_uid(struct smc_llc_qentry *qentry);
struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
struct smc_link *lnk,
int time_out, u8 exp_msg);
struct smc_llc_qentry *smc_llc_flow_qentry_clr(struct smc_llc_flow *flow);
void smc_llc_flow_qentry_del(struct smc_llc_flow *flow);
void smc_llc_send_link_delete_all(struct smc_link_group *lgr, bool ord,
u32 rsn);
int smc_llc_cli_add_link(struct smc_link *link, struct smc_llc_qentry *qentry);
int smc_llc_srv_add_link(struct smc_link *link);
void smc_llc_srv_add_link_local(struct smc_link *link);
......
......@@ -482,12 +482,13 @@ static int smc_tx_rdma_writes(struct smc_connection *conn,
static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
{
struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
struct smc_link *link = conn->lnk;
struct smc_rdma_wr *wr_rdma_buf;
struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf;
int rc;
rc = smc_cdc_get_free_slot(conn, &wr_buf, &wr_rdma_buf, &pend);
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
if (rc < 0) {
if (rc == -EBUSY) {
struct smc_sock *smc =
......@@ -505,10 +506,17 @@ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
}
spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
/* link of connection changed, tx_work will restart */
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
rc = -ENOLINK;
goto out_unlock;
}
if (!pflags->urg_data_present) {
rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
if (rc) {
smc_wr_tx_put_slot(conn->lnk,
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
goto out_unlock;
}
......
......@@ -44,6 +44,7 @@ struct smc_wr_tx_pend { /* control data for a pending send request */
struct smc_link *link;
u32 idx;
struct smc_wr_tx_pend_priv priv;
u8 compl_requested;
};
/******************************** send queue *********************************/
......@@ -103,6 +104,8 @@ static inline void smc_wr_tx_process_cqe(struct ib_wc *wc)
if (pnd_snd_idx == link->wr_tx_cnt)
return;
link->wr_tx_pends[pnd_snd_idx].wc_status = wc->status;
if (link->wr_tx_pends[pnd_snd_idx].compl_requested)
complete(&link->wr_tx_compl[pnd_snd_idx]);
memcpy(&pnd_snd, &link->wr_tx_pends[pnd_snd_idx], sizeof(pnd_snd));
/* clear the full struct smc_wr_tx_pend including .priv */
memset(&link->wr_tx_pends[pnd_snd_idx], 0,
......@@ -275,6 +278,33 @@ int smc_wr_tx_send(struct smc_link *link, struct smc_wr_tx_pend_priv *priv)
return rc;
}
/* Send prepared WR slot via ib_post_send and wait for send completion
* notification.
* @priv: pointer to smc_wr_tx_pend_priv identifying prepared message buffer
*/
int smc_wr_tx_send_wait(struct smc_link *link, struct smc_wr_tx_pend_priv *priv,
unsigned long timeout)
{
struct smc_wr_tx_pend *pend;
int rc;
pend = container_of(priv, struct smc_wr_tx_pend, priv);
pend->compl_requested = 1;
init_completion(&link->wr_tx_compl[pend->idx]);
rc = smc_wr_tx_send(link, priv);
if (rc)
return rc;
/* wait for completion by smc_wr_tx_process_cqe() */
rc = wait_for_completion_interruptible_timeout(
&link->wr_tx_compl[pend->idx], timeout);
if (rc <= 0)
rc = -ENODATA;
if (rc > 0)
rc = 0;
return rc;
}
/* Register a memory region and wait for result. */
int smc_wr_reg_send(struct smc_link *link, struct ib_mr *mr)
{
......@@ -555,6 +585,8 @@ void smc_wr_free_link(struct smc_link *lnk)
void smc_wr_free_link_mem(struct smc_link *lnk)
{
kfree(lnk->wr_tx_compl);
lnk->wr_tx_compl = NULL;
kfree(lnk->wr_tx_pends);
lnk->wr_tx_pends = NULL;
kfree(lnk->wr_tx_mask);
......@@ -625,8 +657,15 @@ int smc_wr_alloc_link_mem(struct smc_link *link)
GFP_KERNEL);
if (!link->wr_tx_pends)
goto no_mem_wr_tx_mask;
link->wr_tx_compl = kcalloc(SMC_WR_BUF_CNT,
sizeof(link->wr_tx_compl[0]),
GFP_KERNEL);
if (!link->wr_tx_compl)
goto no_mem_wr_tx_pends;
return 0;
no_mem_wr_tx_pends:
kfree(link->wr_tx_pends);
no_mem_wr_tx_mask:
kfree(link->wr_tx_mask);
no_mem_wr_rx_sges:
......
......@@ -101,6 +101,8 @@ int smc_wr_tx_put_slot(struct smc_link *link,
struct smc_wr_tx_pend_priv *wr_pend_priv);
int smc_wr_tx_send(struct smc_link *link,
struct smc_wr_tx_pend_priv *wr_pend_priv);
int smc_wr_tx_send_wait(struct smc_link *link, struct smc_wr_tx_pend_priv *priv,
unsigned long timeout);
void smc_wr_tx_cq_handler(struct ib_cq *ib_cq, void *cq_context);
void smc_wr_tx_dismiss_slots(struct smc_link *lnk, u8 wr_rx_hdr_type,
smc_wr_tx_filter filter,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment