Commit 63e711b0 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: create midcomms nodes when configure

This patch puts the life of a midcomms node the same as a lowcomms
connection. The lowcomms connection lifetime was changed by commit
6f0b0b5d ("fs: dlm: remove dlm_node_addrs lookup list"). In the
future the midcomms node instances can be merged with lowcomms
connection structure as the lifetime is the same and states can be
controlled over values or flags.

Before midcomms nodes were generated during version detection. This is
not necessary anymore when the nodes are created when the cluster
manager configures DLM via configfs. When a midcomms node is created over
configfs it well set DLM_VERSION_NOT_SET as version. This indicates that
the version of the midcomms node is still unknown and need to be probed
via certain rcom messages.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 11519351
......@@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf,
memcpy(addr, buf, len);
rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
rv = dlm_midcomms_addr(cm->nodeid, addr, len);
if (rv) {
kfree(addr);
return rv;
......
......@@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
}
static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
static struct midcomms_node *nodeid2node(int nodeid)
{
struct midcomms_node *node, *tmp;
int r = nodeid_hash(nodeid);
return __find_node(nodeid, nodeid_hash(nodeid));
}
node = __find_node(nodeid, r);
if (node || !alloc)
return node;
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
{
int ret, r = nodeid_hash(nodeid);
struct midcomms_node *node;
node = kmalloc(sizeof(*node), alloc);
ret = dlm_lowcomms_addr(nodeid, addr, len);
if (ret)
return ret;
node = kmalloc(sizeof(*node), GFP_NOFS);
if (!node)
return NULL;
return -ENOMEM;
node->nodeid = nodeid;
spin_lock_init(&node->state_lock);
......@@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
midcomms_node_reset(node);
spin_lock(&nodes_lock);
/* check again if there was somebody else
* earlier here to add the node
*/
tmp = __find_node(nodeid, r);
if (tmp) {
spin_unlock(&nodes_lock);
kfree(node);
return tmp;
}
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
spin_unlock(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return node;
return 0;
}
static int dlm_send_ack(int nodeid, uint32_t seq)
......@@ -603,112 +598,6 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
}
}
static struct midcomms_node *
dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
uint16_t msglen, int (*cb)(struct midcomms_node *node))
{
struct midcomms_node *node = NULL;
gfp_t allocation = 0;
int ret;
switch (p->header.h_cmd) {
case DLM_RCOM:
if (msglen < sizeof(struct dlm_rcom)) {
log_print("rcom msg too small: %u, will skip this message from node %d",
msglen, nodeid);
return NULL;
}
switch (p->rcom.rc_type) {
case cpu_to_le32(DLM_RCOM_NAMES):
fallthrough;
case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
fallthrough;
case cpu_to_le32(DLM_RCOM_STATUS):
fallthrough;
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
node = nodeid2node(nodeid, 0);
if (node) {
spin_lock(&node->state_lock);
if (node->state != DLM_ESTABLISHED)
pr_debug("receive begin RCOM msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
case DLM_CLOSED:
node->state = DLM_ESTABLISHED;
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_ESTABLISHED:
break;
default:
spin_unlock(&node->state_lock);
return NULL;
}
spin_unlock(&node->state_lock);
}
allocation = GFP_NOFS;
break;
default:
break;
}
break;
default:
break;
}
node = nodeid2node(nodeid, allocation);
if (!node) {
switch (p->header.h_cmd) {
case DLM_OPTS:
if (msglen < sizeof(struct dlm_opts)) {
log_print("opts msg too small: %u, will skip this message from node %d",
msglen, nodeid);
return NULL;
}
log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
p->opts.o_nextcmd, nodeid);
break;
default:
log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
p->header.h_cmd, nodeid);
break;
}
return NULL;
}
ret = cb(node);
if (ret < 0)
return NULL;
return node;
}
static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
{
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
break;
case DLM_VERSION_3_2:
break;
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_2, node->nodeid, node->version);
return -1;
}
return 0;
}
static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
int nodeid)
{
......@@ -767,11 +656,38 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
int ret, idx;
idx = srcu_read_lock(&nodes_srcu);
node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
dlm_midcomms_version_check_3_2);
if (!node)
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node))
goto out;
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
spin_lock(&node->state_lock);
switch (node->state) {
case DLM_CLOSED:
node->state = DLM_ESTABLISHED;
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
default:
break;
}
spin_unlock(&node->state_lock);
break;
case DLM_VERSION_3_2:
break;
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_2, node->nodeid, node->version);
goto out;
}
switch (p->header.h_cmd) {
case DLM_RCOM:
/* these rcom message we use to determine version.
......@@ -860,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei
srcu_read_unlock(&nodes_srcu, idx);
}
static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
{
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_1;
......@@ -874,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_1, node->nodeid, node->version);
return -1;
}
return 0;
}
static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
{
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
dlm_midcomms_version_check_3_1);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1005,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid)
int idx, ret;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1092,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
WARN_ON_ONCE(1);
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node))
goto err;
}
/* this is a bug, however we going on and hope it will be resolved */
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
......@@ -1237,8 +1146,34 @@ void dlm_midcomms_init(void)
dlm_lowcomms_init();
}
static void midcomms_node_release(struct rcu_head *rcu)
{
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
dlm_send_queue_flush(node);
kfree(node);
}
void dlm_midcomms_exit(void)
{
struct midcomms_node *node;
int i, idx;
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
}
}
srcu_read_unlock(&nodes_srcu, idx);
dlm_lowcomms_exit();
}
......@@ -1279,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid)
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, GFP_NOFS);
if (!node) {
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1324,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid)
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1369,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid)
srcu_read_unlock(&nodes_srcu, idx);
}
static void midcomms_node_release(struct rcu_head *rcu)
{
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
dlm_send_queue_flush(node);
kfree(node);
}
void dlm_midcomms_version_wait(void)
{
struct midcomms_node *node;
......@@ -1440,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
node->state == DLM_CLOSED ||
test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
DLM_SHUTDOWN_TIMEOUT);
if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
if (!ret)
pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
else
......@@ -1458,14 +1384,6 @@ void dlm_midcomms_shutdown(void)
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
midcomms_shutdown(node);
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
}
}
srcu_read_unlock(&nodes_srcu, idx);
......@@ -1481,7 +1399,7 @@ int dlm_midcomms_close(int nodeid)
idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */
node = nodeid2node(nodeid, 0);
node = nodeid2node(nodeid);
if (node) {
/* let shutdown waiters leave */
set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
......@@ -1493,7 +1411,7 @@ int dlm_midcomms_close(int nodeid)
mutex_lock(&close_lock);
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
node = nodeid2node(nodeid);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
......@@ -1501,10 +1419,22 @@ int dlm_midcomms_close(int nodeid)
}
ret = dlm_lowcomms_close(nodeid);
spin_lock(&node->state_lock);
midcomms_node_reset(node);
spin_unlock(&node->state_lock);
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
synchronize_srcu(&nodes_srcu);
/* drop all pending dlm messages, this is fine as
* this function get called when the node is fenced
*/
dlm_send_queue_flush(node);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
mutex_unlock(&close_lock);
return ret;
......
......@@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen);
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
void dlm_midcomms_version_wait(void);
int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment