Commit 91a683cd authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.7' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set of patches has some minor fixes for message handling, some
  misc cleanups, and updates the maintainers entry"

* tag 'dlm-6.7' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  MAINTAINERS: Update dlm maintainer and web page
  dlm: slow down filling up processing queue
  dlm: fix no ack after final message
  dlm: be sure we reset all nodes at forced shutdown
  dlm: fix remove member after close call
  dlm: fix creating multiple node structures
  fs: dlm: Remove some useless memset()
  fs: dlm: Fix the size of a buffer in dlm_create_debug_file()
  fs: dlm: Simplify buffer size computation in dlm_create_debug_file()
parents 17fc8084 eb53c018
...@@ -6172,11 +6172,11 @@ F: drivers/video/fbdev/udlfb.c ...@@ -6172,11 +6172,11 @@ F: drivers/video/fbdev/udlfb.c
F: include/video/udlfb.h F: include/video/udlfb.h
DISTRIBUTED LOCK MANAGER (DLM) DISTRIBUTED LOCK MANAGER (DLM)
M: Christine Caulfield <ccaulfie@redhat.com> M: Alexander Aring <aahringo@redhat.com>
M: David Teigland <teigland@redhat.com> M: David Teigland <teigland@redhat.com>
L: gfs2@lists.linux.dev L: gfs2@lists.linux.dev
S: Supported S: Supported
W: http://sources.redhat.com/cluster/ W: https://pagure.io/dlm
T: git git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm.git T: git git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm.git
F: fs/dlm/ F: fs/dlm/
......
...@@ -973,7 +973,8 @@ void dlm_delete_debug_comms_file(void *ctx) ...@@ -973,7 +973,8 @@ void dlm_delete_debug_comms_file(void *ctx)
void dlm_create_debug_file(struct dlm_ls *ls) void dlm_create_debug_file(struct dlm_ls *ls)
{ {
char name[DLM_LOCKSPACE_LEN + 8]; /* Reserve enough space for the longest file name */
char name[DLM_LOCKSPACE_LEN + sizeof("_queued_asts")];
/* format 1 */ /* format 1 */
...@@ -985,8 +986,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -985,8 +986,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
/* format 2 */ /* format 2 */
memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s_locks", ls->ls_name);
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_locks", ls->ls_name);
ls->ls_debug_locks_dentry = debugfs_create_file(name, ls->ls_debug_locks_dentry = debugfs_create_file(name,
0644, 0644,
...@@ -996,8 +996,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -996,8 +996,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
/* format 3 */ /* format 3 */
memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s_all", ls->ls_name);
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_all", ls->ls_name);
ls->ls_debug_all_dentry = debugfs_create_file(name, ls->ls_debug_all_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO, S_IFREG | S_IRUGO,
...@@ -1007,8 +1006,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -1007,8 +1006,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
/* format 4 */ /* format 4 */
memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s_toss", ls->ls_name);
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_toss", ls->ls_name);
ls->ls_debug_toss_dentry = debugfs_create_file(name, ls->ls_debug_toss_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO, S_IFREG | S_IRUGO,
...@@ -1016,8 +1014,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -1016,8 +1014,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
ls, ls,
&format4_fops); &format4_fops);
memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s_waiters", ls->ls_name);
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_waiters", ls->ls_name);
ls->ls_debug_waiters_dentry = debugfs_create_file(name, ls->ls_debug_waiters_dentry = debugfs_create_file(name,
0644, 0644,
...@@ -1027,8 +1024,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -1027,8 +1024,7 @@ void dlm_create_debug_file(struct dlm_ls *ls)
/* format 5 */ /* format 5 */
memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "%s_queued_asts", ls->ls_name);
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_queued_asts", ls->ls_name);
ls->ls_debug_queued_asts_dentry = debugfs_create_file(name, ls->ls_debug_queued_asts_dentry = debugfs_create_file(name,
0644, 0644,
......
...@@ -63,6 +63,7 @@ ...@@ -63,6 +63,7 @@
#include "config.h" #include "config.h"
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
#define DLM_MAX_PROCESS_BUFFERS 24
#define NEEDED_RMEM (4*1024*1024) #define NEEDED_RMEM (4*1024*1024)
struct connection { struct connection {
...@@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops; ...@@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops;
#define DLM_IO_END 1 #define DLM_IO_END 1
#define DLM_IO_EOF 2 #define DLM_IO_EOF 2
#define DLM_IO_RESCHED 3 #define DLM_IO_RESCHED 3
#define DLM_IO_FLUSH 4
static void process_recv_sockets(struct work_struct *work); static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work);
...@@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work); ...@@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages); static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock); static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending; static bool process_dlm_messages_pending;
static atomic_t processqueue_count;
static LIST_HEAD(processqueue); static LIST_HEAD(processqueue);
bool dlm_lowcomms_is_running(void) bool dlm_lowcomms_is_running(void)
...@@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work) ...@@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work)
} }
list_del(&pentry->list); list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock); spin_unlock(&processqueue_lock);
for (;;) { for (;;) {
...@@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work) ...@@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work)
} }
list_del(&pentry->list); list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock); spin_unlock(&processqueue_lock);
} }
} }
...@@ -962,6 +967,7 @@ static int receive_from_sock(struct connection *con, int buflen) ...@@ -962,6 +967,7 @@ static int receive_from_sock(struct connection *con, int buflen)
con->rx_leftover); con->rx_leftover);
spin_lock(&processqueue_lock); spin_lock(&processqueue_lock);
ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue); list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) { if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true; process_dlm_messages_pending = true;
...@@ -969,6 +975,9 @@ static int receive_from_sock(struct connection *con, int buflen) ...@@ -969,6 +975,9 @@ static int receive_from_sock(struct connection *con, int buflen)
} }
spin_unlock(&processqueue_lock); spin_unlock(&processqueue_lock);
if (ret > DLM_MAX_PROCESS_BUFFERS)
return DLM_IO_FLUSH;
return DLM_IO_SUCCESS; return DLM_IO_SUCCESS;
} }
...@@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work) ...@@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct *work)
wake_up(&con->shutdown_wait); wake_up(&con->shutdown_wait);
/* CF_RECV_PENDING cleared */ /* CF_RECV_PENDING cleared */
break; break;
case DLM_IO_FLUSH:
flush_workqueue(process_workqueue);
fallthrough;
case DLM_IO_RESCHED: case DLM_IO_RESCHED:
cond_resched(); cond_resched();
queue_work(io_workqueue, &con->rwork); queue_work(io_workqueue, &con->rwork);
......
...@@ -337,13 +337,21 @@ static struct midcomms_node *nodeid2node(int nodeid) ...@@ -337,13 +337,21 @@ static struct midcomms_node *nodeid2node(int nodeid)
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
{ {
int ret, r = nodeid_hash(nodeid); int ret, idx, r = nodeid_hash(nodeid);
struct midcomms_node *node; struct midcomms_node *node;
ret = dlm_lowcomms_addr(nodeid, addr, len); ret = dlm_lowcomms_addr(nodeid, addr, len);
if (ret) if (ret)
return ret; return ret;
idx = srcu_read_lock(&nodes_srcu);
node = __find_node(nodeid, r);
if (node) {
srcu_read_unlock(&nodes_srcu, idx);
return 0;
}
srcu_read_unlock(&nodes_srcu, idx);
node = kmalloc(sizeof(*node), GFP_NOFS); node = kmalloc(sizeof(*node), GFP_NOFS);
if (!node) if (!node)
return -ENOMEM; return -ENOMEM;
...@@ -1030,15 +1038,15 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, ...@@ -1030,15 +1038,15 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
break; break;
case DLM_VERSION_3_2: case DLM_VERSION_3_2:
/* send ack back if necessary */
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation, msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
ppc); ppc);
if (!msg) { if (!msg) {
dlm_free_mhandle(mh); dlm_free_mhandle(mh);
goto err; goto err;
} }
/* send ack back if necessary */
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
break; break;
default: default:
dlm_free_mhandle(mh); dlm_free_mhandle(mh);
...@@ -1260,12 +1268,23 @@ void dlm_midcomms_remove_member(int nodeid) ...@@ -1260,12 +1268,23 @@ void dlm_midcomms_remove_member(int nodeid)
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid); node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) { /* in case of dlm_midcomms_close() removes node */
if (!node) {
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
return; return;
} }
spin_lock(&node->state_lock); spin_lock(&node->state_lock);
/* case of dlm_midcomms_addr() created node but
* was not added before because dlm_midcomms_close()
* removed the node
*/
if (!node->users) {
spin_unlock(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
return;
}
node->users--; node->users--;
pr_debug("node %d users dec count %d\n", nodeid, node->users); pr_debug("node %d users dec count %d\n", nodeid, node->users);
...@@ -1386,10 +1405,16 @@ void dlm_midcomms_shutdown(void) ...@@ -1386,10 +1405,16 @@ void dlm_midcomms_shutdown(void)
midcomms_shutdown(node); midcomms_shutdown(node);
} }
} }
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
dlm_lowcomms_shutdown(); dlm_lowcomms_shutdown();
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
midcomms_node_reset(node);
}
}
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
} }
int dlm_midcomms_close(int nodeid) int dlm_midcomms_close(int nodeid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment