Commit d00725ca authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: handle sequence numbers as atomic

Currently seq_next is only be read on the receive side which processed
in an ordered way. The seq_send is being protected by locks. To being
able to read the seq_next value on send side as well we convert it to an
atomic_t value. The atomic_cmpxchg() is probably not necessary, however
the atomic_inc() depends on a if coniditional and this should be handled
in an atomic context.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 75a7d601
...@@ -152,8 +152,8 @@ ...@@ -152,8 +152,8 @@
struct midcomms_node { struct midcomms_node {
int nodeid; int nodeid;
uint32_t version; uint32_t version;
uint32_t seq_send; atomic_t seq_send;
uint32_t seq_next; atomic_t seq_next;
/* These queues are unbound because we cannot drop any message in dlm. /* These queues are unbound because we cannot drop any message in dlm.
* We could send a fence signal for a specific node to the cluster * We could send a fence signal for a specific node to the cluster
* manager if queues hits some maximum value, however this handling * manager if queues hits some maximum value, however this handling
...@@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node) ...@@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node)
{ {
pr_debug("reset node %d\n", node->nodeid); pr_debug("reset node %d\n", node->nodeid);
node->seq_next = DLM_SEQ_INIT; atomic_set(&node->seq_next, DLM_SEQ_INIT);
node->seq_send = DLM_SEQ_INIT; atomic_set(&node->seq_send, DLM_SEQ_INIT);
node->version = DLM_VERSION_NOT_SET; node->version = DLM_VERSION_NOT_SET;
node->flags = 0; node->flags = 0;
...@@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, ...@@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
struct midcomms_node *node, struct midcomms_node *node,
uint32_t seq) uint32_t seq)
{ {
if (seq == node->seq_next) { bool is_expected_seq;
node->seq_next++; uint32_t oval, nval;
do {
oval = atomic_read(&node->seq_next);
is_expected_seq = (oval == seq);
if (!is_expected_seq)
break;
nval = oval + 1;
} while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
if (is_expected_seq) {
switch (p->header.h_cmd) { switch (p->header.h_cmd) {
case DLM_FIN: case DLM_FIN:
spin_lock(&node->state_lock); spin_lock(&node->state_lock);
...@@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, ...@@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (node->state) { switch (node->state) {
case DLM_ESTABLISHED: case DLM_ESTABLISHED:
dlm_send_ack(node->nodeid, node->seq_next); dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1 /* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by * additional we check if the node is used by
...@@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, ...@@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
} }
break; break;
case DLM_FIN_WAIT1: case DLM_FIN_WAIT1:
dlm_send_ack(node->nodeid, node->seq_next); dlm_send_ack(node->nodeid, nval);
node->state = DLM_CLOSING; node->state = DLM_CLOSING;
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n", pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state)); node->nodeid, dlm_state_str(node->state));
break; break;
case DLM_FIN_WAIT2: case DLM_FIN_WAIT2:
dlm_send_ack(node->nodeid, node->seq_next); dlm_send_ack(node->nodeid, nval);
midcomms_node_reset(node); midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n", pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state)); node->nodeid, dlm_state_str(node->state));
...@@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, ...@@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
/* retry to ack message which we already have by sending back /* retry to ack message which we already have by sending back
* current node->seq_next number as ack. * current node->seq_next number as ack.
*/ */
if (seq < node->seq_next) if (seq < oval)
dlm_send_ack(node->nodeid, node->seq_next); dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
seq, node->seq_next, node->nodeid); seq, oval, node->nodeid);
} }
} }
...@@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid) ...@@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid)
switch (node->state) { switch (node->state) {
case DLM_ESTABLISHED: case DLM_ESTABLISHED:
spin_unlock(&node->state_lock); spin_unlock(&node->state_lock);
dlm_send_ack(node->nodeid, node->seq_next); dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
break; break;
default: default:
spin_unlock(&node->state_lock); spin_unlock(&node->state_lock);
...@@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data) ...@@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data)
list_add_tail_rcu(&mh->list, &mh->node->send_queue); list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock_bh(&mh->node->send_queue_lock); spin_unlock_bh(&mh->node->send_queue_lock);
mh->seq = mh->node->seq_send++; mh->seq = atomic_fetch_inc(&mh->node->seq_send);
} }
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
...@@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data) ...@@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data)
switch (h->h_cmd) { switch (h->h_cmd) {
case DLM_OPTS: case DLM_OPTS:
if (!h->u.h_seq) if (!h->u.h_seq)
h->u.h_seq = cpu_to_le32(rd->node->seq_send++); h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
break; break;
default: default:
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment