Commit 6778a6be authored by Karsten Graul's avatar Karsten Graul Committed by David S. Miller

net/smc: separate LLC wait queues for flow and messages

There might be races in scenarios where both SMC link groups are on the
same system. Prevent that by creating separate wait queues for LLC flows
and messages. Switch to non-interruptable versions of wait_event() and
wake_up() for the llc flow waiter to make sure the waiters get control
sequentially. Fine tune the llc_flow_lock to include the assignment of
the message. Write to system log when an unexpected message was
dropped. And remove an extra indirection and use the existing local
variable lgr in smc_llc_enqueue().

Fixes: 555da9af ("net/smc: add event-based llc_flow framework")
Reviewed-by: default avatarUrsula Braun <ubraun@linux.ibm.com>
Signed-off-by: default avatarKarsten Graul <kgraul@linux.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent a42e6aee
......@@ -247,7 +247,8 @@ static void smcr_lgr_link_deactivate_all(struct smc_link_group *lgr)
if (smc_link_usable(lnk))
lnk->state = SMC_LNK_INACTIVE;
}
wake_up_interruptible_all(&lgr->llc_waiter);
wake_up_all(&lgr->llc_msg_waiter);
wake_up_all(&lgr->llc_flow_waiter);
}
static void smc_lgr_free(struct smc_link_group *lgr);
......@@ -1130,18 +1131,19 @@ static void smcr_link_up(struct smc_link_group *lgr,
return;
if (lgr->llc_flow_lcl.type != SMC_LLC_FLOW_NONE) {
/* some other llc task is ongoing */
wait_event_interruptible_timeout(lgr->llc_waiter,
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE),
wait_event_timeout(lgr->llc_flow_waiter,
(list_empty(&lgr->list) ||
lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE),
SMC_LLC_WAIT_TIME);
}
if (list_empty(&lgr->list) ||
!smc_ib_port_active(smcibdev, ibport))
return; /* lgr or device no longer active */
link = smc_llc_usable_link(lgr);
if (!link)
return;
smc_llc_send_add_link(link, smcibdev->mac[ibport - 1], gid,
NULL, SMC_LLC_REQ);
/* lgr or device no longer active? */
if (!list_empty(&lgr->list) &&
smc_ib_port_active(smcibdev, ibport))
link = smc_llc_usable_link(lgr);
if (link)
smc_llc_send_add_link(link, smcibdev->mac[ibport - 1],
gid, NULL, SMC_LLC_REQ);
wake_up(&lgr->llc_flow_waiter); /* wake up next waiter */
}
}
......@@ -1195,13 +1197,17 @@ static void smcr_link_down(struct smc_link *lnk)
if (lgr->llc_flow_lcl.type != SMC_LLC_FLOW_NONE) {
/* another llc task is ongoing */
mutex_unlock(&lgr->llc_conf_mutex);
wait_event_interruptible_timeout(lgr->llc_waiter,
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE),
wait_event_timeout(lgr->llc_flow_waiter,
(list_empty(&lgr->list) ||
lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE),
SMC_LLC_WAIT_TIME);
mutex_lock(&lgr->llc_conf_mutex);
}
smc_llc_send_delete_link(to_lnk, del_link_id, SMC_LLC_REQ, true,
SMC_LLC_DEL_LOST_PATH);
if (!list_empty(&lgr->list))
smc_llc_send_delete_link(to_lnk, del_link_id,
SMC_LLC_REQ, true,
SMC_LLC_DEL_LOST_PATH);
wake_up(&lgr->llc_flow_waiter); /* wake up next waiter */
}
}
......@@ -1262,7 +1268,7 @@ static void smc_link_down_work(struct work_struct *work)
if (list_empty(&lgr->list))
return;
wake_up_interruptible_all(&lgr->llc_waiter);
wake_up_all(&lgr->llc_msg_waiter);
mutex_lock(&lgr->llc_conf_mutex);
smcr_link_down(link);
mutex_unlock(&lgr->llc_conf_mutex);
......
......@@ -262,8 +262,10 @@ struct smc_link_group {
struct work_struct llc_del_link_work;
struct work_struct llc_event_work;
/* llc event worker */
wait_queue_head_t llc_waiter;
wait_queue_head_t llc_flow_waiter;
/* w4 next llc event */
wait_queue_head_t llc_msg_waiter;
/* w4 next llc msg */
struct smc_llc_flow llc_flow_lcl;
/* llc local control field */
struct smc_llc_flow llc_flow_rmt;
......
......@@ -186,6 +186,26 @@ static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow,
flow->qentry = qentry;
}
static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type,
struct smc_llc_qentry *qentry)
{
u8 msg_type = qentry->msg.raw.hdr.common.type;
if ((msg_type == SMC_LLC_ADD_LINK || msg_type == SMC_LLC_DELETE_LINK) &&
flow_type != msg_type && !lgr->delayed_event) {
lgr->delayed_event = qentry;
return;
}
/* drop parallel or already-in-progress llc requests */
if (flow_type != msg_type)
pr_warn_once("smc: SMC-R lg %*phN dropped parallel "
"LLC msg: msg %d flow %d role %d\n",
SMC_LGR_ID_SIZE, &lgr->id,
qentry->msg.raw.hdr.common.type,
flow_type, lgr->role);
kfree(qentry);
}
/* try to start a new llc flow, initiated by an incoming llc msg */
static bool smc_llc_flow_start(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
......@@ -195,14 +215,7 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
spin_lock_bh(&lgr->llc_flow_lock);
if (flow->type) {
/* a flow is already active */
if ((qentry->msg.raw.hdr.common.type == SMC_LLC_ADD_LINK ||
qentry->msg.raw.hdr.common.type == SMC_LLC_DELETE_LINK) &&
!lgr->delayed_event) {
lgr->delayed_event = qentry;
} else {
/* forget this llc request */
kfree(qentry);
}
smc_llc_flow_parallel(lgr, flow->type, qentry);
spin_unlock_bh(&lgr->llc_flow_lock);
return false;
}
......@@ -222,8 +235,8 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
}
if (qentry == lgr->delayed_event)
lgr->delayed_event = NULL;
spin_unlock_bh(&lgr->llc_flow_lock);
smc_llc_flow_qentry_set(flow, qentry);
spin_unlock_bh(&lgr->llc_flow_lock);
return true;
}
......@@ -251,11 +264,11 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
return 0;
}
spin_unlock_bh(&lgr->llc_flow_lock);
rc = wait_event_interruptible_timeout(lgr->llc_waiter,
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote)),
SMC_LLC_WAIT_TIME);
rc = wait_event_timeout(lgr->llc_flow_waiter, (list_empty(&lgr->list) ||
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote))),
SMC_LLC_WAIT_TIME * 10);
if (!rc)
return -ETIMEDOUT;
goto again;
......@@ -272,7 +285,7 @@ void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow)
flow == &lgr->llc_flow_lcl)
schedule_work(&lgr->llc_event_work);
else
wake_up_interruptible(&lgr->llc_waiter);
wake_up(&lgr->llc_flow_waiter);
}
/* lnk is optional and used for early wakeup when link goes down, useful in
......@@ -283,26 +296,32 @@ struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
int time_out, u8 exp_msg)
{
struct smc_llc_flow *flow = &lgr->llc_flow_lcl;
u8 rcv_msg;
wait_event_interruptible_timeout(lgr->llc_waiter,
(flow->qentry ||
(lnk && !smc_link_usable(lnk)) ||
list_empty(&lgr->list)),
time_out);
wait_event_timeout(lgr->llc_msg_waiter,
(flow->qentry ||
(lnk && !smc_link_usable(lnk)) ||
list_empty(&lgr->list)),
time_out);
if (!flow->qentry ||
(lnk && !smc_link_usable(lnk)) || list_empty(&lgr->list)) {
smc_llc_flow_qentry_del(flow);
goto out;
}
if (exp_msg && flow->qentry->msg.raw.hdr.common.type != exp_msg) {
rcv_msg = flow->qentry->msg.raw.hdr.common.type;
if (exp_msg && rcv_msg != exp_msg) {
if (exp_msg == SMC_LLC_ADD_LINK &&
flow->qentry->msg.raw.hdr.common.type ==
SMC_LLC_DELETE_LINK) {
rcv_msg == SMC_LLC_DELETE_LINK) {
/* flow_start will delay the unexpected msg */
smc_llc_flow_start(&lgr->llc_flow_lcl,
smc_llc_flow_qentry_clr(flow));
return NULL;
}
pr_warn_once("smc: SMC-R lg %*phN dropped unexpected LLC msg: "
"msg %d exp %d flow %d role %d flags %x\n",
SMC_LGR_ID_SIZE, &lgr->id, rcv_msg, exp_msg,
flow->type, lgr->role,
flow->qentry->msg.raw.hdr.flags);
smc_llc_flow_qentry_del(flow);
}
out:
......@@ -1459,7 +1478,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* a flow is waiting for this message */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl,
qentry);
wake_up_interruptible(&lgr->llc_waiter);
wake_up(&lgr->llc_msg_waiter);
} else if (smc_llc_flow_start(&lgr->llc_flow_lcl,
qentry)) {
schedule_work(&lgr->llc_add_link_work);
......@@ -1474,7 +1493,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
if (lgr->llc_flow_lcl.type != SMC_LLC_FLOW_NONE) {
/* a flow is waiting for this message */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl, qentry);
wake_up_interruptible(&lgr->llc_waiter);
wake_up(&lgr->llc_msg_waiter);
return;
}
break;
......@@ -1485,7 +1504,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* DEL LINK REQ during ADD LINK SEQ */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl,
qentry);
wake_up_interruptible(&lgr->llc_waiter);
wake_up(&lgr->llc_msg_waiter);
} else if (smc_llc_flow_start(&lgr->llc_flow_lcl,
qentry)) {
schedule_work(&lgr->llc_del_link_work);
......@@ -1496,7 +1515,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* DEL LINK REQ during ADD LINK SEQ */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl,
qentry);
wake_up_interruptible(&lgr->llc_waiter);
wake_up(&lgr->llc_msg_waiter);
} else if (smc_llc_flow_start(&lgr->llc_flow_lcl,
qentry)) {
schedule_work(&lgr->llc_del_link_work);
......@@ -1581,7 +1600,7 @@ static void smc_llc_rx_response(struct smc_link *link,
case SMC_LLC_DELETE_RKEY:
/* assign responses to the local flow, we requested them */
smc_llc_flow_qentry_set(&link->lgr->llc_flow_lcl, qentry);
wake_up_interruptible(&link->lgr->llc_waiter);
wake_up(&link->lgr->llc_msg_waiter);
return;
case SMC_LLC_CONFIRM_RKEY_CONT:
/* not used because max links is 3 */
......@@ -1616,7 +1635,7 @@ static void smc_llc_enqueue(struct smc_link *link, union smc_llc_msg *llc)
spin_lock_irqsave(&lgr->llc_event_q_lock, flags);
list_add_tail(&qentry->list, &lgr->llc_event_q);
spin_unlock_irqrestore(&lgr->llc_event_q_lock, flags);
schedule_work(&link->lgr->llc_event_work);
schedule_work(&lgr->llc_event_work);
}
/* copy received msg and add it to the event queue */
......@@ -1677,7 +1696,8 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
INIT_LIST_HEAD(&lgr->llc_event_q);
spin_lock_init(&lgr->llc_event_q_lock);
spin_lock_init(&lgr->llc_flow_lock);
init_waitqueue_head(&lgr->llc_waiter);
init_waitqueue_head(&lgr->llc_flow_waiter);
init_waitqueue_head(&lgr->llc_msg_waiter);
mutex_init(&lgr->llc_conf_mutex);
lgr->llc_testlink_time = net->ipv4.sysctl_tcp_keepalive_time;
}
......@@ -1686,7 +1706,8 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
void smc_llc_lgr_clear(struct smc_link_group *lgr)
{
smc_llc_event_flush(lgr);
wake_up_interruptible_all(&lgr->llc_waiter);
wake_up_all(&lgr->llc_flow_waiter);
wake_up_all(&lgr->llc_msg_waiter);
cancel_work_sync(&lgr->llc_event_work);
cancel_work_sync(&lgr->llc_add_link_work);
cancel_work_sync(&lgr->llc_del_link_work);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment