Commit 659b3613 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:

 - Allow blocking posix lock requests to be interrupted while waiting.
   This requires a cancel request to be sent to the userspace daemon
   where posix lock requests are processed across the cluster.

 - Fix a posix lock patch from the previous cycle in which lock requests
   from different file systems could be mixed up.

 - Fix some long standing problems with nfs posix lock cancelation.

 - Add a new debugfs file for printing queued callbacks.

 - Stop modifying buffers that have been used to receive a message.

 - Misc cleanups and some refactoring.

* tag 'dlm-6.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: fix plock lookup when using multiple lockspaces
  fs: dlm: don't use RCOM_NAMES for version detection
  fs: dlm: create midcomms nodes when configure
  fs: dlm: constify receive buffer
  fs: dlm: drop rxbuf manipulation in dlm_recover_master_copy
  fs: dlm: drop rxbuf manipulation in dlm_copy_master_names
  fs: dlm: get recovery sequence number as parameter
  fs: dlm: cleanup lock order
  fs: dlm: remove clear_members_cb
  fs: dlm: add plock dev tracepoints
  fs: dlm: check on plock ops when exit dlm
  fs: dlm: debugfs for queued callbacks
  fs: dlm: remove unused processed_nodes
  fs: dlm: add missing spin_unlock
  fs: dlm: fix F_CANCELLK to cancel pending request
  fs: dlm: allow to F_SETLKW getting interrupted
  fs: dlm: remove twice newline
parents e7e9423d 7c53e847
...@@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, ...@@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf,
memcpy(addr, buf, len); memcpy(addr, buf, len);
rv = dlm_lowcomms_addr(cm->nodeid, addr, len); rv = dlm_midcomms_addr(cm->nodeid, addr, len);
if (rv) { if (rv) {
kfree(addr); kfree(addr);
return rv; return rv;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "dlm_internal.h" #include "dlm_internal.h"
#include "midcomms.h" #include "midcomms.h"
#include "lock.h" #include "lock.h"
#include "ast.h"
#define DLM_DEBUG_BUF_LEN 4096 #define DLM_DEBUG_BUF_LEN 4096
static char debug_buf[DLM_DEBUG_BUF_LEN]; static char debug_buf[DLM_DEBUG_BUF_LEN];
...@@ -365,6 +366,52 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s) ...@@ -365,6 +366,52 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s)
unlock_rsb(r); unlock_rsb(r);
} }
static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb)
{
struct dlm_callback *cb;
/* lkb_id lkb_flags mode flags sb_status sb_flags */
spin_lock(&lkb->lkb_cb_lock);
list_for_each_entry(cb, &lkb->lkb_callbacks, list) {
seq_printf(s, "%x %x %d %x %d %x\n",
lkb->lkb_id,
dlm_iflags_val(lkb),
cb->mode,
cb->flags,
cb->sb_status,
cb->sb_flags);
}
spin_unlock(&lkb->lkb_cb_lock);
}
static void print_format5(struct dlm_rsb *r, struct seq_file *s)
{
struct dlm_lkb *lkb;
lock_rsb(r);
list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
print_format5_lock(s, lkb);
if (seq_has_overflowed(s))
goto out;
}
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
print_format5_lock(s, lkb);
if (seq_has_overflowed(s))
goto out;
}
list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) {
print_format5_lock(s, lkb);
if (seq_has_overflowed(s))
goto out;
}
out:
unlock_rsb(r);
}
struct rsbtbl_iter { struct rsbtbl_iter {
struct dlm_rsb *rsb; struct dlm_rsb *rsb;
unsigned bucket; unsigned bucket;
...@@ -408,6 +455,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) ...@@ -408,6 +455,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
} }
print_format4(ri->rsb, seq); print_format4(ri->rsb, seq);
break; break;
case 5:
if (ri->header) {
seq_puts(seq, "lkb_id lkb_flags mode flags sb_status sb_flags\n");
ri->header = 0;
}
print_format5(ri->rsb, seq);
break;
} }
return 0; return 0;
...@@ -417,6 +471,7 @@ static const struct seq_operations format1_seq_ops; ...@@ -417,6 +471,7 @@ static const struct seq_operations format1_seq_ops;
static const struct seq_operations format2_seq_ops; static const struct seq_operations format2_seq_ops;
static const struct seq_operations format3_seq_ops; static const struct seq_operations format3_seq_ops;
static const struct seq_operations format4_seq_ops; static const struct seq_operations format4_seq_ops;
static const struct seq_operations format5_seq_ops;
static void *table_seq_start(struct seq_file *seq, loff_t *pos) static void *table_seq_start(struct seq_file *seq, loff_t *pos)
{ {
...@@ -448,6 +503,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ...@@ -448,6 +503,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
ri->format = 3; ri->format = 3;
if (seq->op == &format4_seq_ops) if (seq->op == &format4_seq_ops)
ri->format = 4; ri->format = 4;
if (seq->op == &format5_seq_ops)
ri->format = 5;
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
...@@ -602,10 +659,18 @@ static const struct seq_operations format4_seq_ops = { ...@@ -602,10 +659,18 @@ static const struct seq_operations format4_seq_ops = {
.show = table_seq_show, .show = table_seq_show,
}; };
static const struct seq_operations format5_seq_ops = {
.start = table_seq_start,
.next = table_seq_next,
.stop = table_seq_stop,
.show = table_seq_show,
};
static const struct file_operations format1_fops; static const struct file_operations format1_fops;
static const struct file_operations format2_fops; static const struct file_operations format2_fops;
static const struct file_operations format3_fops; static const struct file_operations format3_fops;
static const struct file_operations format4_fops; static const struct file_operations format4_fops;
static const struct file_operations format5_fops;
static int table_open1(struct inode *inode, struct file *file) static int table_open1(struct inode *inode, struct file *file)
{ {
...@@ -683,7 +748,21 @@ static int table_open4(struct inode *inode, struct file *file) ...@@ -683,7 +748,21 @@ static int table_open4(struct inode *inode, struct file *file)
struct seq_file *seq; struct seq_file *seq;
int ret; int ret;
ret = seq_open(file, &format4_seq_ops); ret = seq_open(file, &format5_seq_ops);
if (ret)
return ret;
seq = file->private_data;
seq->private = inode->i_private; /* the dlm_ls */
return 0;
}
static int table_open5(struct inode *inode, struct file *file)
{
struct seq_file *seq;
int ret;
ret = seq_open(file, &format5_seq_ops);
if (ret) if (ret)
return ret; return ret;
...@@ -725,6 +804,14 @@ static const struct file_operations format4_fops = { ...@@ -725,6 +804,14 @@ static const struct file_operations format4_fops = {
.release = seq_release .release = seq_release
}; };
static const struct file_operations format5_fops = {
.owner = THIS_MODULE,
.open = table_open5,
.read = seq_read,
.llseek = seq_lseek,
.release = seq_release
};
/* /*
* dump lkb's on the ls_waiters list * dump lkb's on the ls_waiters list
*/ */
...@@ -793,6 +880,7 @@ void dlm_delete_debug_file(struct dlm_ls *ls) ...@@ -793,6 +880,7 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
debugfs_remove(ls->ls_debug_locks_dentry); debugfs_remove(ls->ls_debug_locks_dentry);
debugfs_remove(ls->ls_debug_all_dentry); debugfs_remove(ls->ls_debug_all_dentry);
debugfs_remove(ls->ls_debug_toss_dentry); debugfs_remove(ls->ls_debug_toss_dentry);
debugfs_remove(ls->ls_debug_queued_asts_dentry);
} }
static int dlm_state_show(struct seq_file *file, void *offset) static int dlm_state_show(struct seq_file *file, void *offset)
...@@ -936,6 +1024,17 @@ void dlm_create_debug_file(struct dlm_ls *ls) ...@@ -936,6 +1024,17 @@ void dlm_create_debug_file(struct dlm_ls *ls)
dlm_root, dlm_root,
ls, ls,
&waiters_fops); &waiters_fops);
/* format 5 */
memset(name, 0, sizeof(name));
snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_queued_asts", ls->ls_name);
ls->ls_debug_queued_asts_dentry = debugfs_create_file(name,
0644,
dlm_root,
ls,
&format5_fops);
} }
void __init dlm_register_debugfs(void) void __init dlm_register_debugfs(void)
......
...@@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls) ...@@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
} }
int dlm_recover_directory(struct dlm_ls *ls) int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
{ {
struct dlm_member *memb; struct dlm_member *memb;
char *b, *last_name = NULL; char *b, *last_name = NULL;
...@@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls) ...@@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
} }
error = dlm_rcom_names(ls, memb->nodeid, error = dlm_rcom_names(ls, memb->nodeid,
last_name, last_len); last_name, last_len, seq);
if (error) if (error)
goto out_free; goto out_free;
...@@ -196,7 +196,8 @@ int dlm_recover_directory(struct dlm_ls *ls) ...@@ -196,7 +196,8 @@ int dlm_recover_directory(struct dlm_ls *ls)
return error; return error;
} }
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
int len)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
uint32_t hash, bucket; uint32_t hash, bucket;
...@@ -232,7 +233,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) ...@@ -232,7 +233,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
for rsb's we're master of and whose directory node matches the requesting for rsb's we're master of and whose directory node matches the requesting
node. inbuf is the rsb name last sent, inlen is the name's length */ node. inbuf is the rsb name last sent, inlen is the name's length */
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid) char *outbuf, int outlen, int nodeid)
{ {
struct list_head *list; struct list_head *list;
...@@ -245,9 +246,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, ...@@ -245,9 +246,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
if (inlen > 1) { if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen); r = find_rsb_root(ls, inbuf, inlen);
if (!r) { if (!r) {
inbuf[inlen - 1] = '\0'; log_error(ls, "copy_master_names from %d start %d %.*s",
log_error(ls, "copy_master_names from %d start %d %s", nodeid, inlen, inlen, inbuf);
nodeid, inlen, inbuf);
goto out; goto out;
} }
list = r->res_root_list.next; list = r->res_root_list.next;
......
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_recover_dir_nodeid(struct dlm_ls *ls); void dlm_recover_dir_nodeid(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls); int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid); char *outbuf, int outlen, int nodeid);
#endif /* __DIR_DOT_H__ */ #endif /* __DIR_DOT_H__ */
...@@ -598,6 +598,7 @@ struct dlm_ls { ...@@ -598,6 +598,7 @@ struct dlm_ls {
struct dentry *ls_debug_locks_dentry; /* debugfs */ struct dentry *ls_debug_locks_dentry; /* debugfs */
struct dentry *ls_debug_all_dentry; /* debugfs */ struct dentry *ls_debug_all_dentry; /* debugfs */
struct dentry *ls_debug_toss_dentry; /* debugfs */ struct dentry *ls_debug_toss_dentry; /* debugfs */
struct dentry *ls_debug_queued_asts_dentry; /* debugfs */
wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
int ls_uevent_result; int ls_uevent_result;
......
...@@ -86,8 +86,8 @@ static int send_remove(struct dlm_rsb *r); ...@@ -86,8 +86,8 @@ static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms, bool local); const struct dlm_message *ms, bool local);
static int receive_extralen(struct dlm_message *ms); static int receive_extralen(const struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void toss_rsb(struct kref *kref); static void toss_rsb(struct kref *kref);
...@@ -984,8 +984,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no ...@@ -984,8 +984,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/ */
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
unsigned int flags, int *r_nodeid, int *result) int len, unsigned int flags, int *r_nodeid, int *result)
{ {
struct dlm_rsb *r = NULL; struct dlm_rsb *r = NULL;
uint32_t hash, b; uint32_t hash, b;
...@@ -1106,7 +1106,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) ...@@ -1106,7 +1106,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
} }
} }
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
{ {
struct dlm_rsb *r = NULL; struct dlm_rsb *r = NULL;
uint32_t hash, b; uint32_t hash, b;
...@@ -1459,7 +1459,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) ...@@ -1459,7 +1459,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
set RESEND and dlm_recover_waiters_post() */ set RESEND and dlm_recover_waiters_post() */
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int overlap_done = 0; int overlap_done = 0;
...@@ -1557,8 +1557,8 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) ...@@ -1557,8 +1557,8 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
/* Handles situations where we might be processing a "fake" or "local" reply in /* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */ which we can't try to take waiters_mutex again. */
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms, static int remove_from_waiters_ms(struct dlm_lkb *lkb,
bool local) const struct dlm_message *ms, bool local)
{ {
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error; int error;
...@@ -1800,7 +1800,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -1800,7 +1800,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* lkb is process copy (pc) */ /* lkb is process copy (pc) */
static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
int b; int b;
...@@ -1907,7 +1907,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -1907,7 +1907,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
} }
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
set_lvb_lock_pc(r, lkb, ms); set_lvb_lock_pc(r, lkb, ms);
_grant_lock(r, lkb); _grant_lock(r, lkb);
...@@ -1945,7 +1945,7 @@ static void munge_demoted(struct dlm_lkb *lkb) ...@@ -1945,7 +1945,7 @@ static void munge_demoted(struct dlm_lkb *lkb)
lkb->lkb_grmode = DLM_LOCK_NL; lkb->lkb_grmode = DLM_LOCK_NL;
} }
static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
{ {
if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
...@@ -3641,8 +3641,9 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) ...@@ -3641,8 +3641,9 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
} }
static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, static int send_lookup_reply(struct dlm_ls *ls,
int ret_nodeid, int rv) const struct dlm_message *ms_in, int ret_nodeid,
int rv)
{ {
struct dlm_rsb *r = &ls->ls_local_rsb; struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms; struct dlm_message *ms;
...@@ -3667,14 +3668,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, ...@@ -3667,14 +3668,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
of message, unlike the send side where we can safely send everything about of message, unlike the send side where we can safely send everything about
the lkb for any type of message */ the lkb for any type of message */
static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
{ {
lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
} }
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, static void receive_flags_reply(struct dlm_lkb *lkb,
const struct dlm_message *ms,
bool local) bool local)
{ {
if (local) if (local)
...@@ -3684,14 +3686,14 @@ static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, ...@@ -3684,14 +3686,14 @@ static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
} }
static int receive_extralen(struct dlm_message *ms) static int receive_extralen(const struct dlm_message *ms)
{ {
return (le16_to_cpu(ms->m_header.h_length) - return (le16_to_cpu(ms->m_header.h_length) -
sizeof(struct dlm_message)); sizeof(struct dlm_message));
} }
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
int len; int len;
...@@ -3719,7 +3721,7 @@ static void fake_astfn(void *astparam) ...@@ -3719,7 +3721,7 @@ static void fake_astfn(void *astparam)
} }
static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
...@@ -3741,7 +3743,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -3741,7 +3743,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
} }
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
if (lkb->lkb_status != DLM_LKSTS_GRANTED) if (lkb->lkb_status != DLM_LKSTS_GRANTED)
return -EBUSY; return -EBUSY;
...@@ -3756,7 +3758,7 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -3756,7 +3758,7 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
} }
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms) const struct dlm_message *ms)
{ {
if (receive_lvb(ls, lkb, ms)) if (receive_lvb(ls, lkb, ms))
return -ENOMEM; return -ENOMEM;
...@@ -3766,7 +3768,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -3766,7 +3768,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* We fill in the local-lkb fields with the info that send_xxxx_reply() /* We fill in the local-lkb fields with the info that send_xxxx_reply()
uses to send a reply and that the remote end uses to process the reply. */ uses to send a reply and that the remote end uses to process the reply. */
static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb = &ls->ls_local_lkb; struct dlm_lkb *lkb = &ls->ls_local_lkb;
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
...@@ -3776,7 +3778,7 @@ static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3776,7 +3778,7 @@ static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
/* This is called after the rsb is locked so that we can safely inspect /* This is called after the rsb is locked so that we can safely inspect
fields in the lkb. */ fields in the lkb. */
static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
{ {
int from = le32_to_cpu(ms->m_header.h_nodeid); int from = le32_to_cpu(ms->m_header.h_nodeid);
int error = 0; int error = 0;
...@@ -3828,7 +3830,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -3828,7 +3830,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error; return error;
} }
static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -3907,7 +3909,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3907,7 +3909,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
return error; return error;
} }
static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -3963,7 +3965,7 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3963,7 +3965,7 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
return error; return error;
} }
static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4015,7 +4017,7 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4015,7 +4017,7 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
return error; return error;
} }
static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4051,7 +4053,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4051,7 +4053,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
return error; return error;
} }
static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4082,7 +4084,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4082,7 +4084,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
} }
static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4110,7 +4112,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4110,7 +4112,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
} }
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
int len, error, ret_nodeid, from_nodeid, our_nodeid; int len, error, ret_nodeid, from_nodeid, our_nodeid;
...@@ -4130,7 +4132,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4130,7 +4132,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
send_lookup_reply(ls, ms, ret_nodeid, error); send_lookup_reply(ls, ms, ret_nodeid, error);
} }
static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
char name[DLM_RESNAME_MAXLEN+1]; char name[DLM_RESNAME_MAXLEN+1];
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4218,12 +4220,13 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4218,12 +4220,13 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
} }
} }
static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
{ {
do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
} }
static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) static int receive_request_reply(struct dlm_ls *ls,
const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4345,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4345,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
} }
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms, bool local) const struct dlm_message *ms, bool local)
{ {
/* this is the value returned from do_convert() on the master */ /* this is the value returned from do_convert() on the master */
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
...@@ -4388,8 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -4388,8 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
} }
} }
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, static void _receive_convert_reply(struct dlm_lkb *lkb,
bool local) const struct dlm_message *ms, bool local)
{ {
struct dlm_rsb *r = lkb->lkb_resource; struct dlm_rsb *r = lkb->lkb_resource;
int error; int error;
...@@ -4412,7 +4415,8 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, ...@@ -4412,7 +4415,8 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r); put_rsb(r);
} }
static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) static int receive_convert_reply(struct dlm_ls *ls,
const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
int error; int error;
...@@ -4426,8 +4430,8 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4426,8 +4430,8 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
} }
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, static void _receive_unlock_reply(struct dlm_lkb *lkb,
bool local) const struct dlm_message *ms, bool local)
{ {
struct dlm_rsb *r = lkb->lkb_resource; struct dlm_rsb *r = lkb->lkb_resource;
int error; int error;
...@@ -4463,7 +4467,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, ...@@ -4463,7 +4467,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r); put_rsb(r);
} }
static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) static int receive_unlock_reply(struct dlm_ls *ls,
const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
int error; int error;
...@@ -4477,8 +4482,8 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4477,8 +4482,8 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
} }
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, static void _receive_cancel_reply(struct dlm_lkb *lkb,
bool local) const struct dlm_message *ms, bool local)
{ {
struct dlm_rsb *r = lkb->lkb_resource; struct dlm_rsb *r = lkb->lkb_resource;
int error; int error;
...@@ -4515,7 +4520,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, ...@@ -4515,7 +4520,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
put_rsb(r); put_rsb(r);
} }
static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) static int receive_cancel_reply(struct dlm_ls *ls,
const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
int error; int error;
...@@ -4529,7 +4535,8 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4529,7 +4535,8 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
} }
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) static void receive_lookup_reply(struct dlm_ls *ls,
const struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -4608,7 +4615,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4608,7 +4615,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
} }
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq) uint32_t saved_seq)
{ {
int error = 0, noent = 0; int error = 0, noent = 0;
...@@ -4744,7 +4751,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, ...@@ -4744,7 +4751,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
requestqueue, to processing all the saved messages, to processing new requestqueue, to processing all the saved messages, to processing new
messages as they arrive. */ messages as they arrive. */
static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
int nodeid) int nodeid)
{ {
if (dlm_locking_stopped(ls)) { if (dlm_locking_stopped(ls)) {
...@@ -4767,7 +4774,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, ...@@ -4767,7 +4774,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
/* This is called by dlm_recoverd to process messages that were saved on /* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */ the requestqueue. */
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq) uint32_t saved_seq)
{ {
_receive_message(ls, ms, saved_seq); _receive_message(ls, ms, saved_seq);
...@@ -4778,9 +4785,9 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, ...@@ -4778,9 +4785,9 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
standard locking activity) or an RCOM (recovery message sent as part of standard locking activity) or an RCOM (recovery message sent as part of
lockspace recovery). */ lockspace recovery). */
void dlm_receive_buffer(union dlm_packet *p, int nodeid) void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
{ {
struct dlm_header *hd = &p->header; const struct dlm_header *hd = &p->header;
struct dlm_ls *ls; struct dlm_ls *ls;
int type = 0; int type = 0;
...@@ -5334,7 +5341,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, ...@@ -5334,7 +5341,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
/* needs at least dlm_rcom + rcom_lock */ /* needs at least dlm_rcom + rcom_lock */
static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_rsb *r, struct dlm_rcom *rc) struct dlm_rsb *r, const struct dlm_rcom *rc)
{ {
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
...@@ -5384,7 +5391,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -5384,7 +5391,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
back the rcom_lock struct we got but with the remid field filled in. */ back the rcom_lock struct we got but with the remid field filled in. */
/* needs at least dlm_rcom + rcom_lock */ /* needs at least dlm_rcom + rcom_lock */
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
__le32 *rl_remid, __le32 *rl_result)
{ {
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -5393,6 +5401,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -5393,6 +5401,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
int error; int error;
/* init rl_remid with rcom lock rl_remid */
*rl_remid = rl->rl_remid;
if (rl->rl_parent_lkid) { if (rl->rl_parent_lkid) {
error = -EOPNOTSUPP; error = -EOPNOTSUPP;
goto out; goto out;
...@@ -5448,7 +5459,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -5448,7 +5459,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
out_remid: out_remid:
/* this is the new value returned to the lock holder for /* this is the new value returned to the lock holder for
saving in its process-copy lkb */ saving in its process-copy lkb */
rl->rl_remid = cpu_to_le32(lkb->lkb_id); *rl_remid = cpu_to_le32(lkb->lkb_id);
lkb->lkb_recover_seq = ls->ls_recover_seq; lkb->lkb_recover_seq = ls->ls_recover_seq;
...@@ -5459,12 +5470,13 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -5459,12 +5470,13 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
if (error && error != -EEXIST) if (error && error != -EEXIST)
log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
from_nodeid, remid, error); from_nodeid, remid, error);
rl->rl_result = cpu_to_le32(error); *rl_result = cpu_to_le32(error);
return error; return error;
} }
/* needs at least dlm_rcom + rcom_lock */ /* needs at least dlm_rcom + rcom_lock */
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
uint64_t seq)
{ {
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r; struct dlm_rsb *r;
...@@ -5509,7 +5521,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -5509,7 +5521,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
result); result);
dlm_send_rcom_lock(r, lkb); dlm_send_rcom_lock(r, lkb, seq);
goto out; goto out;
case -EEXIST: case -EEXIST:
case 0: case 0:
......
...@@ -12,11 +12,11 @@ ...@@ -12,11 +12,11 @@
#define __LOCK_DOT_H__ #define __LOCK_DOT_H__
void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len); void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_print_lkb(struct dlm_lkb *lkb);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq); uint32_t saved_seq);
void dlm_receive_buffer(union dlm_packet *p, int nodeid); void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2); int dlm_modes_compat(int mode1, int mode2);
void dlm_put_rsb(struct dlm_rsb *r); void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r);
...@@ -25,8 +25,8 @@ void dlm_scan_rsbs(struct dlm_ls *ls); ...@@ -25,8 +25,8 @@ void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len, int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
unsigned int flags, int *r_nodeid, int *result); int len, unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
struct dlm_rsb **r_ret); struct dlm_rsb **r_ret);
...@@ -36,8 +36,10 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r); ...@@ -36,8 +36,10 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_recover_grant(struct dlm_ls *ls); void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc); __le32 *rl_remid, __le32 *rl_result);
int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
uint64_t seq);
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
uint32_t flags, void *name, unsigned int namelen); uint32_t flags, void *name, unsigned int namelen);
......
...@@ -863,7 +863,6 @@ struct dlm_processed_nodes { ...@@ -863,7 +863,6 @@ struct dlm_processed_nodes {
static void process_dlm_messages(struct work_struct *work) static void process_dlm_messages(struct work_struct *work)
{ {
struct processqueue_entry *pentry; struct processqueue_entry *pentry;
LIST_HEAD(processed_nodes);
spin_lock(&processqueue_lock); spin_lock(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue, pentry = list_first_entry_or_null(&processqueue,
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "midcomms.h" #include "midcomms.h"
#include "lowcomms.h" #include "lowcomms.h"
int dlm_slots_version(struct dlm_header *h) int dlm_slots_version(const struct dlm_header *h)
{ {
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS) if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
return 0; return 0;
...@@ -393,14 +393,9 @@ static void remove_remote_member(int nodeid) ...@@ -393,14 +393,9 @@ static void remove_remote_member(int nodeid)
dlm_midcomms_remove_member(nodeid); dlm_midcomms_remove_member(nodeid);
} }
static void clear_members_cb(int nodeid)
{
remove_remote_member(nodeid);
}
void dlm_clear_members(struct dlm_ls *ls) void dlm_clear_members(struct dlm_ls *ls)
{ {
clear_memb_list(&ls->ls_nodes, clear_members_cb); clear_memb_list(&ls->ls_nodes, remove_remote_member);
ls->ls_num_nodes = 0; ls->ls_num_nodes = 0;
} }
...@@ -454,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls) ...@@ -454,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls)
/* send a status request to all members just to establish comms connections */ /* send a status request to all members just to establish comms connections */
static int ping_members(struct dlm_ls *ls) static int ping_members(struct dlm_ls *ls, uint64_t seq)
{ {
struct dlm_member *memb; struct dlm_member *memb;
int error = 0; int error = 0;
...@@ -464,7 +459,7 @@ static int ping_members(struct dlm_ls *ls) ...@@ -464,7 +459,7 @@ static int ping_members(struct dlm_ls *ls)
error = -EINTR; error = -EINTR;
break; break;
} }
error = dlm_rcom_status(ls, memb->nodeid, 0); error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error) if (error)
break; break;
} }
...@@ -612,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) ...@@ -612,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
make_member_array(ls); make_member_array(ls);
*neg_out = neg; *neg_out = neg;
error = ping_members(ls); error = ping_members(ls, rv->seq);
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error; return error;
} }
......
...@@ -18,7 +18,7 @@ void dlm_clear_members_gone(struct dlm_ls *ls); ...@@ -18,7 +18,7 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
int dlm_is_removed(struct dlm_ls *ls, int nodeid); int dlm_is_removed(struct dlm_ls *ls, int nodeid);
int dlm_is_member(struct dlm_ls *ls, int nodeid); int dlm_is_member(struct dlm_ls *ls, int nodeid);
int dlm_slots_version(struct dlm_header *h); int dlm_slots_version(const struct dlm_header *h);
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
struct dlm_member *memb); struct dlm_member *memb);
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc); void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
......
...@@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node) ...@@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node)
wake_up(&node->shutdown_wait); wake_up(&node->shutdown_wait);
} }
static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) static struct midcomms_node *nodeid2node(int nodeid)
{ {
struct midcomms_node *node, *tmp; return __find_node(nodeid, nodeid_hash(nodeid));
int r = nodeid_hash(nodeid); }
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
{
int ret, r = nodeid_hash(nodeid);
struct midcomms_node *node;
node = __find_node(nodeid, r); ret = dlm_lowcomms_addr(nodeid, addr, len);
if (node || !alloc) if (ret)
return node; return ret;
node = kmalloc(sizeof(*node), alloc); node = kmalloc(sizeof(*node), GFP_NOFS);
if (!node) if (!node)
return NULL; return -ENOMEM;
node->nodeid = nodeid; node->nodeid = nodeid;
spin_lock_init(&node->state_lock); spin_lock_init(&node->state_lock);
...@@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) ...@@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
midcomms_node_reset(node); midcomms_node_reset(node);
spin_lock(&nodes_lock); spin_lock(&nodes_lock);
/* check again if there was somebody else
* earlier here to add the node
*/
tmp = __find_node(nodeid, r);
if (tmp) {
spin_unlock(&nodes_lock);
kfree(node);
return tmp;
}
hlist_add_head_rcu(&node->hlist, &node_hash[r]); hlist_add_head_rcu(&node->hlist, &node_hash[r]);
spin_unlock(&nodes_lock); spin_unlock(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node); node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return node; return 0;
} }
static int dlm_send_ack(int nodeid, uint32_t seq) static int dlm_send_ack(int nodeid, uint32_t seq)
...@@ -499,7 +494,8 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) ...@@ -499,7 +494,8 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
spin_unlock(&node->state_lock); spin_unlock(&node->state_lock);
} }
static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) static void dlm_receive_buffer_3_2_trace(uint32_t seq,
const union dlm_packet *p)
{ {
switch (p->header.h_cmd) { switch (p->header.h_cmd) {
case DLM_MSG: case DLM_MSG:
...@@ -513,7 +509,7 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) ...@@ -513,7 +509,7 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
} }
} }
static void dlm_midcomms_receive_buffer(union dlm_packet *p, static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
struct midcomms_node *node, struct midcomms_node *node,
uint32_t seq) uint32_t seq)
{ {
...@@ -602,113 +598,8 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, ...@@ -602,113 +598,8 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
} }
} }
static struct midcomms_node * static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p, int nodeid)
uint16_t msglen, int (*cb)(struct midcomms_node *node))
{
struct midcomms_node *node = NULL;
gfp_t allocation = 0;
int ret;
switch (p->header.h_cmd) {
case DLM_RCOM:
if (msglen < sizeof(struct dlm_rcom)) {
log_print("rcom msg too small: %u, will skip this message from node %d",
msglen, nodeid);
return NULL;
}
switch (p->rcom.rc_type) {
case cpu_to_le32(DLM_RCOM_NAMES):
fallthrough;
case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
fallthrough;
case cpu_to_le32(DLM_RCOM_STATUS):
fallthrough;
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
node = nodeid2node(nodeid, 0);
if (node) {
spin_lock(&node->state_lock);
if (node->state != DLM_ESTABLISHED)
pr_debug("receive begin RCOM msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
case DLM_CLOSED:
node->state = DLM_ESTABLISHED;
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_ESTABLISHED:
break;
default:
spin_unlock(&node->state_lock);
return NULL;
}
spin_unlock(&node->state_lock);
}
allocation = GFP_NOFS;
break;
default:
break;
}
break;
default:
break;
}
node = nodeid2node(nodeid, allocation);
if (!node) {
switch (p->header.h_cmd) {
case DLM_OPTS:
if (msglen < sizeof(struct dlm_opts)) {
log_print("opts msg too small: %u, will skip this message from node %d",
msglen, nodeid);
return NULL;
}
log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
p->opts.o_nextcmd, nodeid);
break;
default:
log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
p->header.h_cmd, nodeid);
break;
}
return NULL;
}
ret = cb(node);
if (ret < 0)
return NULL;
return node;
}
static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
{
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
break;
case DLM_VERSION_3_2:
break;
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_2, node->nodeid, node->version);
return -1;
}
return 0;
}
static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
{ {
int len = msglen; int len = msglen;
...@@ -757,7 +648,7 @@ static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodei ...@@ -757,7 +648,7 @@ static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodei
return 0; return 0;
} }
static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodeid)
{ {
uint16_t msglen = le16_to_cpu(p->header.h_length); uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node; struct midcomms_node *node;
...@@ -765,10 +656,37 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) ...@@ -765,10 +656,37 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
int ret, idx; int ret, idx;
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, node = nodeid2node(nodeid);
dlm_midcomms_version_check_3_2); if (WARN_ON_ONCE(!node))
if (!node) goto out;
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
spin_lock(&node->state_lock);
switch (node->state) {
case DLM_CLOSED:
node->state = DLM_ESTABLISHED;
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
default:
break;
}
spin_unlock(&node->state_lock);
break;
case DLM_VERSION_3_2:
break;
default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_2, node->nodeid, node->version);
goto out; goto out;
}
switch (p->header.h_cmd) { switch (p->header.h_cmd) {
case DLM_RCOM: case DLM_RCOM:
...@@ -858,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) ...@@ -858,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
} }
static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
{ {
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid);
if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
switch (node->version) { switch (node->version) {
case DLM_VERSION_NOT_SET: case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_1; node->version = DLM_VERSION_3_1;
...@@ -872,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) ...@@ -872,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
default: default:
log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
DLM_VERSION_3_1, node->nodeid, node->version); DLM_VERSION_3_1, node->nodeid, node->version);
return -1;
}
return 0;
}
static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
{
uint16_t msglen = le16_to_cpu(p->header.h_length);
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
dlm_midcomms_version_check_3_1);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
return; return;
} }
...@@ -977,10 +890,10 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) ...@@ -977,10 +890,10 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
switch (hd->h_version) { switch (hd->h_version) {
case cpu_to_le32(DLM_VERSION_3_1): case cpu_to_le32(DLM_VERSION_3_1):
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); dlm_midcomms_receive_buffer_3_1((const union dlm_packet *)ptr, nodeid);
break; break;
case cpu_to_le32(DLM_VERSION_3_2): case cpu_to_le32(DLM_VERSION_3_2):
dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); dlm_midcomms_receive_buffer_3_2((const union dlm_packet *)ptr, nodeid);
break; break;
default: default:
log_print("received invalid version header: %u from node %d, will skip this message", log_print("received invalid version header: %u from node %d, will skip this message",
...@@ -1003,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid) ...@@ -1003,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid)
int idx, ret; int idx, ret;
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0); node = nodeid2node(nodeid);
if (!node) { if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
return; return;
} }
...@@ -1090,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, ...@@ -1090,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
int idx; int idx;
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0); node = nodeid2node(nodeid);
if (!node) { if (WARN_ON_ONCE(!node))
WARN_ON_ONCE(1);
goto err; goto err;
}
/* this is a bug, however we going on and hope it will be resolved */ /* this is a bug, however we going on and hope it will be resolved */
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
...@@ -1235,8 +1146,34 @@ void dlm_midcomms_init(void) ...@@ -1235,8 +1146,34 @@ void dlm_midcomms_init(void)
dlm_lowcomms_init(); dlm_lowcomms_init();
} }
static void midcomms_node_release(struct rcu_head *rcu)
{
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
dlm_send_queue_flush(node);
kfree(node);
}
void dlm_midcomms_exit(void) void dlm_midcomms_exit(void)
{ {
struct midcomms_node *node;
int i, idx;
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
}
}
srcu_read_unlock(&nodes_srcu, idx);
dlm_lowcomms_exit(); dlm_lowcomms_exit();
} }
...@@ -1277,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid) ...@@ -1277,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid)
int idx; int idx;
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, GFP_NOFS); node = nodeid2node(nodeid);
if (!node) { if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
return; return;
} }
...@@ -1322,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid) ...@@ -1322,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid)
int idx; int idx;
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0); node = nodeid2node(nodeid);
if (!node) { if (WARN_ON_ONCE(!node)) {
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
return; return;
} }
...@@ -1367,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid) ...@@ -1367,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid)
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
} }
static void midcomms_node_release(struct rcu_head *rcu)
{
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
dlm_send_queue_flush(node);
kfree(node);
}
void dlm_midcomms_version_wait(void) void dlm_midcomms_version_wait(void)
{ {
struct midcomms_node *node; struct midcomms_node *node;
...@@ -1438,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node) ...@@ -1438,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
node->state == DLM_CLOSED || node->state == DLM_CLOSED ||
test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
DLM_SHUTDOWN_TIMEOUT); DLM_SHUTDOWN_TIMEOUT);
if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) if (!ret)
pr_debug("active shutdown timed out for node %d with state %s\n", pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state)); node->nodeid, dlm_state_str(node->state));
else else
...@@ -1456,14 +1384,6 @@ void dlm_midcomms_shutdown(void) ...@@ -1456,14 +1384,6 @@ void dlm_midcomms_shutdown(void)
for (i = 0; i < CONN_HASH_SIZE; i++) { for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
midcomms_shutdown(node); midcomms_shutdown(node);
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
} }
} }
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
...@@ -1479,7 +1399,7 @@ int dlm_midcomms_close(int nodeid) ...@@ -1479,7 +1399,7 @@ int dlm_midcomms_close(int nodeid)
idx = srcu_read_lock(&nodes_srcu); idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */ /* Abort pending close/remove operation */
node = nodeid2node(nodeid, 0); node = nodeid2node(nodeid);
if (node) { if (node) {
/* let shutdown waiters leave */ /* let shutdown waiters leave */
set_bit(DLM_NODE_FLAG_CLOSE, &node->flags); set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
...@@ -1489,20 +1409,32 @@ int dlm_midcomms_close(int nodeid) ...@@ -1489,20 +1409,32 @@ int dlm_midcomms_close(int nodeid)
synchronize_srcu(&nodes_srcu); synchronize_srcu(&nodes_srcu);
idx = srcu_read_lock(&nodes_srcu);
mutex_lock(&close_lock); mutex_lock(&close_lock);
node = nodeid2node(nodeid, 0); idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid);
if (!node) { if (!node) {
mutex_unlock(&close_lock);
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
return dlm_lowcomms_close(nodeid); return dlm_lowcomms_close(nodeid);
} }
ret = dlm_lowcomms_close(nodeid); ret = dlm_lowcomms_close(nodeid);
spin_lock(&node->state_lock); dlm_delete_debug_comms_file(node->debugfs);
midcomms_node_reset(node);
spin_unlock(&node->state_lock); spin_lock(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx); srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
synchronize_srcu(&nodes_srcu);
/* drop all pending dlm messages, this is fine as
* this function get called when the node is fenced
*/
dlm_send_queue_flush(node);
call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
mutex_unlock(&close_lock); mutex_unlock(&close_lock);
return ret; return ret;
......
...@@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, ...@@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc); gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
int namelen); int namelen);
int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
void dlm_midcomms_version_wait(void); void dlm_midcomms_version_wait(void);
int dlm_midcomms_close(int nodeid); int dlm_midcomms_close(int nodeid);
int dlm_midcomms_start(void); int dlm_midcomms_start(void);
......
...@@ -11,6 +11,8 @@ ...@@ -11,6 +11,8 @@
#include <linux/dlm_plock.h> #include <linux/dlm_plock.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <trace/events/dlm.h>
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lockspace.h" #include "lockspace.h"
...@@ -42,6 +44,27 @@ static inline void set_version(struct dlm_plock_info *info) ...@@ -42,6 +44,27 @@ static inline void set_version(struct dlm_plock_info *info)
info->version[2] = DLM_PLOCK_VERSION_PATCH; info->version[2] = DLM_PLOCK_VERSION_PATCH;
} }
static struct plock_op *plock_lookup_waiter(const struct dlm_plock_info *info)
{
struct plock_op *op = NULL, *iter;
list_for_each_entry(iter, &recv_list, list) {
if (iter->info.fsid == info->fsid &&
iter->info.number == info->number &&
iter->info.owner == info->owner &&
iter->info.pid == info->pid &&
iter->info.start == info->start &&
iter->info.end == info->end &&
iter->info.ex == info->ex &&
iter->info.wait) {
op = iter;
break;
}
}
return op;
}
static int check_version(struct dlm_plock_info *info) static int check_version(struct dlm_plock_info *info)
{ {
if ((DLM_PLOCK_VERSION_MAJOR != info->version[0]) || if ((DLM_PLOCK_VERSION_MAJOR != info->version[0]) ||
...@@ -74,30 +97,26 @@ static void send_op(struct plock_op *op) ...@@ -74,30 +97,26 @@ static void send_op(struct plock_op *op)
wake_up(&send_wq); wake_up(&send_wq);
} }
/* If a process was killed while waiting for the only plock on a file, static int do_lock_cancel(const struct dlm_plock_info *orig_info)
locks_remove_posix will not see any lock on the file so it won't
send an unlock-close to us to pass on to userspace to clean up the
abandoned waiter. So, we have to insert the unlock-close when the
lock call is interrupted. */
static void do_unlock_close(const struct dlm_plock_info *info)
{ {
struct plock_op *op; struct plock_op *op;
int rv;
op = kzalloc(sizeof(*op), GFP_NOFS); op = kzalloc(sizeof(*op), GFP_NOFS);
if (!op) if (!op)
return; return -ENOMEM;
op->info = *orig_info;
op->info.optype = DLM_PLOCK_OP_CANCEL;
op->info.wait = 0;
op->info.optype = DLM_PLOCK_OP_UNLOCK;
op->info.pid = info->pid;
op->info.fsid = info->fsid;
op->info.number = info->number;
op->info.start = 0;
op->info.end = OFFSET_MAX;
op->info.owner = info->owner;
op->info.flags |= DLM_PLOCK_FL_CLOSE;
send_op(op); send_op(op);
wait_event(recv_wq, (op->done != 0));
rv = op->info.rv;
dlm_release_plock_op(op);
return rv;
} }
int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
...@@ -156,7 +175,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, ...@@ -156,7 +175,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
send_op(op); send_op(op);
if (op->info.wait) { if (op->info.wait) {
rv = wait_event_killable(recv_wq, (op->done != 0)); rv = wait_event_interruptible(recv_wq, (op->done != 0));
if (rv == -ERESTARTSYS) { if (rv == -ERESTARTSYS) {
spin_lock(&ops_lock); spin_lock(&ops_lock);
/* recheck under ops_lock if we got a done != 0, /* recheck under ops_lock if we got a done != 0,
...@@ -166,17 +185,37 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, ...@@ -166,17 +185,37 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
spin_unlock(&ops_lock); spin_unlock(&ops_lock);
goto do_lock_wait; goto do_lock_wait;
} }
list_del(&op->list);
spin_unlock(&ops_lock); spin_unlock(&ops_lock);
rv = do_lock_cancel(&op->info);
switch (rv) {
case 0:
/* waiter was deleted in user space, answer will never come
* remove original request. The original request must be
* on recv_list because the answer of do_lock_cancel()
* synchronized it.
*/
spin_lock(&ops_lock);
list_del(&op->list);
spin_unlock(&ops_lock);
rv = -EINTR;
break;
case -ENOENT:
/* cancellation wasn't successful but op should be done */
fallthrough;
default:
/* internal error doing cancel we need to wait */
goto wait;
}
log_debug(ls, "%s: wait interrupted %x %llx pid %d", log_debug(ls, "%s: wait interrupted %x %llx pid %d",
__func__, ls->ls_global_id, __func__, ls->ls_global_id,
(unsigned long long)number, op->info.pid); (unsigned long long)number, op->info.pid);
do_unlock_close(&op->info);
dlm_release_plock_op(op); dlm_release_plock_op(op);
goto out; goto out;
} }
} else { } else {
wait:
wait_event(recv_wq, (op->done != 0)); wait_event(recv_wq, (op->done != 0));
} }
...@@ -240,8 +279,8 @@ static int dlm_plock_callback(struct plock_op *op) ...@@ -240,8 +279,8 @@ static int dlm_plock_callback(struct plock_op *op)
rv = notify(fl, 0); rv = notify(fl, 0);
if (rv) { if (rv) {
/* XXX: We need to cancel the fs lock here: */ /* XXX: We need to cancel the fs lock here: */
log_print("dlm_plock_callback: lock granted after lock request " log_print("%s: lock granted after lock request failed; dangling lock!",
"failed; dangling lock!\n"); __func__);
goto out; goto out;
} }
...@@ -318,6 +357,75 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, ...@@ -318,6 +357,75 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
} }
EXPORT_SYMBOL_GPL(dlm_posix_unlock); EXPORT_SYMBOL_GPL(dlm_posix_unlock);
/*
* NOTE: This implementation can only handle async lock requests as nfs
* do it. It cannot handle cancellation of a pending lock request sitting
* in wait_event(), but for now only nfs is the only user local kernel
* user.
*/
int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file,
struct file_lock *fl)
{
struct dlm_plock_info info;
struct plock_op *op;
struct dlm_ls *ls;
int rv;
/* this only works for async request for now and nfs is the only
* kernel user right now.
*/
if (WARN_ON_ONCE(!fl->fl_lmops || !fl->fl_lmops->lm_grant))
return -EOPNOTSUPP;
ls = dlm_find_lockspace_local(lockspace);
if (!ls)
return -EINVAL;
memset(&info, 0, sizeof(info));
info.pid = fl->fl_pid;
info.ex = (fl->fl_type == F_WRLCK);
info.fsid = ls->ls_global_id;
dlm_put_lockspace(ls);
info.number = number;
info.start = fl->fl_start;
info.end = fl->fl_end;
info.owner = (__u64)fl->fl_pid;
rv = do_lock_cancel(&info);
switch (rv) {
case 0:
spin_lock(&ops_lock);
/* lock request to cancel must be on recv_list because
* do_lock_cancel() synchronizes it.
*/
op = plock_lookup_waiter(&info);
if (WARN_ON_ONCE(!op)) {
spin_unlock(&ops_lock);
rv = -ENOLCK;
break;
}
list_del(&op->list);
spin_unlock(&ops_lock);
WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK);
op->data->callback(op->data->fl, -EINTR);
dlm_release_plock_op(op);
rv = -EINTR;
break;
case -ENOENT:
/* if cancel wasn't successful we probably were to late
* or it was a non-blocking lock request, so just unlock it.
*/
rv = dlm_posix_unlock(lockspace, number, file, fl);
break;
default:
break;
}
return rv;
}
EXPORT_SYMBOL_GPL(dlm_posix_cancel);
int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
struct file_lock *fl) struct file_lock *fl)
{ {
...@@ -403,6 +511,8 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count, ...@@ -403,6 +511,8 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
if (!op) if (!op)
return -EAGAIN; return -EAGAIN;
trace_dlm_plock_read(&info);
/* there is no need to get a reply from userspace for unlocks /* there is no need to get a reply from userspace for unlocks
that were generated by the vfs cleaning up for a close that were generated by the vfs cleaning up for a close
(the process did not make an unlock call). */ (the process did not make an unlock call). */
...@@ -430,6 +540,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, ...@@ -430,6 +540,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
if (copy_from_user(&info, u, sizeof(info))) if (copy_from_user(&info, u, sizeof(info)))
return -EFAULT; return -EFAULT;
trace_dlm_plock_write(&info);
if (check_version(&info)) if (check_version(&info))
return -EINVAL; return -EINVAL;
...@@ -441,22 +553,11 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, ...@@ -441,22 +553,11 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
*/ */
spin_lock(&ops_lock); spin_lock(&ops_lock);
if (info.wait) { if (info.wait) {
list_for_each_entry(iter, &recv_list, list) { op = plock_lookup_waiter(&info);
if (iter->info.fsid == info.fsid &&
iter->info.number == info.number &&
iter->info.owner == info.owner &&
iter->info.pid == info.pid &&
iter->info.start == info.start &&
iter->info.end == info.end &&
iter->info.ex == info.ex &&
iter->info.wait) {
op = iter;
break;
}
}
} else { } else {
list_for_each_entry(iter, &recv_list, list) { list_for_each_entry(iter, &recv_list, list) {
if (!iter->info.wait) { if (!iter->info.wait &&
iter->info.fsid == info.fsid) {
op = iter; op = iter;
break; break;
} }
...@@ -468,8 +569,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, ...@@ -468,8 +569,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
if (info.wait) if (info.wait)
WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK);
else else
WARN_ON(op->info.fsid != info.fsid || WARN_ON(op->info.number != info.number ||
op->info.number != info.number ||
op->info.owner != info.owner || op->info.owner != info.owner ||
op->info.optype != info.optype); op->info.optype != info.optype);
...@@ -534,5 +634,7 @@ int dlm_plock_init(void) ...@@ -534,5 +634,7 @@ int dlm_plock_init(void)
void dlm_plock_exit(void) void dlm_plock_exit(void)
{ {
misc_deregister(&plock_dev_misc); misc_deregister(&plock_dev_misc);
WARN_ON(!list_empty(&send_list));
WARN_ON(!list_empty(&recv_list));
} }
...@@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls) ...@@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls)
} }
static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, char *mb, int mb_len) struct dlm_rcom **rc_ret, char *mb, int mb_len,
uint64_t seq)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
...@@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, ...@@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_header.h_cmd = DLM_RCOM; rc->rc_header.h_cmd = DLM_RCOM;
rc->rc_type = cpu_to_le32(type); rc->rc_type = cpu_to_le32(type);
rc->rc_seq = cpu_to_le64(seq);
spin_lock(&ls->ls_recover_lock);
rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
spin_unlock(&ls->ls_recover_lock);
*rc_ret = rc; *rc_ret = rc;
} }
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
uint64_t seq)
{ {
int mb_len = sizeof(struct dlm_rcom) + len; int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
...@@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, ...@@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
return -ENOBUFS; return -ENOBUFS;
} }
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*mh_ret = mh; *mh_ret = mh;
return 0; return 0;
} }
static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
int len, struct dlm_rcom **rc_ret, int len, struct dlm_rcom **rc_ret,
struct dlm_msg **msg_ret) struct dlm_msg **msg_ret, uint64_t seq)
{ {
int mb_len = sizeof(struct dlm_rcom) + len; int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_msg *msg; struct dlm_msg *msg;
...@@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, ...@@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
return -ENOBUFS; return -ENOBUFS;
} }
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*msg_ret = msg; *msg_ret = msg;
return 0; return 0;
} }
...@@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls) ...@@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls)
* node's rcom_config. * node's rcom_config.
*/ */
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
uint64_t seq)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_msg *msg; struct dlm_msg *msg;
...@@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) ...@@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
retry: retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &msg); sizeof(struct rcom_status), &rc, &msg,
seq);
if (error) if (error)
goto out; goto out;
...@@ -220,7 +221,9 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) ...@@ -220,7 +221,9 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
return error; return error;
} }
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_status(struct dlm_ls *ls,
const struct dlm_rcom *rc_in,
uint64_t seq)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct rcom_status *rs; struct rcom_status *rs;
...@@ -251,7 +254,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -251,7 +254,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
do_create: do_create:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
len, &rc, &msg); len, &rc, &msg, seq);
if (error) if (error)
return; return;
...@@ -281,7 +284,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -281,7 +284,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom_stateless(msg, rc); send_rcom_stateless(msg, rc);
} }
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{ {
spin_lock(&ls->ls_rcom_spin); spin_lock(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
...@@ -302,17 +305,18 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -302,17 +305,18 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_rcom_spin); spin_unlock(&ls->ls_rcom_spin);
} }
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int last_len, uint64_t seq)
{ {
struct dlm_mhandle *mh;
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_msg *msg;
int error = 0; int error = 0;
ls->ls_recover_nodeid = nodeid; ls->ls_recover_nodeid = nodeid;
retry: retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
&rc, &msg); &rc, &mh, seq);
if (error) if (error)
goto out; goto out;
memcpy(rc->rc_buf, last_name, last_len); memcpy(rc->rc_buf, last_name, last_len);
...@@ -320,7 +324,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) ...@@ -320,7 +324,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
allow_sync_reply(ls, &rc->rc_id); allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
send_rcom_stateless(msg, rc); send_rcom(mh, rc);
error = dlm_wait_function(ls, &rcom_response); error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls); disallow_sync_reply(ls);
...@@ -330,19 +334,20 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) ...@@ -330,19 +334,20 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
return error; return error;
} }
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
uint64_t seq)
{ {
struct dlm_mhandle *mh;
struct dlm_rcom *rc; struct dlm_rcom *rc;
int error, inlen, outlen, nodeid; int error, inlen, outlen, nodeid;
struct dlm_msg *msg;
nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
inlen = le16_to_cpu(rc_in->rc_header.h_length) - inlen = le16_to_cpu(rc_in->rc_header.h_length) -
sizeof(struct dlm_rcom); sizeof(struct dlm_rcom);
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
&rc, &msg); &rc, &mh, seq);
if (error) if (error)
return; return;
rc->rc_id = rc_in->rc_id; rc->rc_id = rc_in->rc_id;
...@@ -350,10 +355,10 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -350,10 +355,10 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid); nodeid);
send_rcom_stateless(msg, rc); send_rcom(mh, rc);
} }
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
...@@ -361,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) ...@@ -361,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
int error; int error;
error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh); &rc, &mh, seq);
if (error) if (error)
goto out; goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length); memcpy(rc->rc_buf, r->res_name, r->res_length);
...@@ -372,7 +377,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) ...@@ -372,7 +377,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
return error; return error;
} }
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_lookup(struct dlm_ls *ls,
const struct dlm_rcom *rc_in, uint64_t seq)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
...@@ -387,7 +393,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -387,7 +393,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
return; return;
} }
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
seq);
if (error) if (error)
return; return;
...@@ -402,7 +409,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -402,7 +409,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom(mh, rc); send_rcom(mh, rc);
} }
static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_lookup_reply(struct dlm_ls *ls,
const struct dlm_rcom *rc_in)
{ {
dlm_recover_master_reply(ls, rc_in); dlm_recover_master_reply(ls, rc_in);
} }
...@@ -437,7 +445,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -437,7 +445,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
} }
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
struct dlm_rcom *rc; struct dlm_rcom *rc;
...@@ -448,7 +456,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -448,7 +456,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (lkb->lkb_lvbptr) if (lkb->lkb_lvbptr)
len += ls->ls_lvblen; len += ls->ls_lvblen;
error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
seq);
if (error) if (error)
goto out; goto out;
...@@ -462,23 +471,28 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -462,23 +471,28 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
} }
/* needs at least dlm_rcom + rcom_lock */ /* needs at least dlm_rcom + rcom_lock */
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
uint64_t seq)
{ {
__le32 rl_remid, rl_result;
struct rcom_lock *rl;
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
dlm_recover_master_copy(ls, rc_in); dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
sizeof(struct rcom_lock), &rc, &mh); sizeof(struct rcom_lock), &rc, &mh, seq);
if (error) if (error)
return; return;
/* We send back the same rcom_lock struct we received, but
dlm_recover_master_copy() has filled in rl_remid and rl_result */
memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
rl = (struct rcom_lock *)rc->rc_buf;
/* set rl_remid and rl_result from dlm_recover_master_copy() */
rl->rl_remid = rl_remid;
rl->rl_result = rl_result;
rc->rc_id = rc_in->rc_id; rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq; rc->rc_seq_reply = rc_in->rc_seq;
...@@ -488,7 +502,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -488,7 +502,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
/* If the lockspace doesn't exist then still send a status message /* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */ back; it's possible that it just doesn't have its global_id yet. */
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct rcom_config *rf; struct rcom_config *rf;
...@@ -566,7 +580,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) ...@@ -566,7 +580,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
/* Called by dlm_recv; corresponds to dlm_receive_message() but special /* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */ recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
{ {
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0, names = 0, lookup = 0, lock = 0; int stop, reply = 0, names = 0, lookup = 0, lock = 0;
...@@ -620,21 +634,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) ...@@ -620,21 +634,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
switch (rc->rc_type) { switch (rc->rc_type) {
case cpu_to_le32(DLM_RCOM_STATUS): case cpu_to_le32(DLM_RCOM_STATUS):
receive_rcom_status(ls, rc); receive_rcom_status(ls, rc, seq);
break; break;
case cpu_to_le32(DLM_RCOM_NAMES): case cpu_to_le32(DLM_RCOM_NAMES):
receive_rcom_names(ls, rc); receive_rcom_names(ls, rc, seq);
break; break;
case cpu_to_le32(DLM_RCOM_LOOKUP): case cpu_to_le32(DLM_RCOM_LOOKUP):
receive_rcom_lookup(ls, rc); receive_rcom_lookup(ls, rc, seq);
break; break;
case cpu_to_le32(DLM_RCOM_LOCK): case cpu_to_le32(DLM_RCOM_LOCK):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size) if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort; goto Eshort;
receive_rcom_lock(ls, rc); receive_rcom_lock(ls, rc, seq);
break; break;
case cpu_to_le32(DLM_RCOM_STATUS_REPLY): case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
...@@ -652,7 +666,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) ...@@ -652,7 +666,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
case cpu_to_le32(DLM_RCOM_LOCK_REPLY): case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size) if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort; goto Eshort;
dlm_recover_process_copy(ls, rc); dlm_recover_process_copy(ls, rc, seq);
break; break;
default: default:
......
...@@ -12,12 +12,15 @@ ...@@ -12,12 +12,15 @@
#ifndef __RCOM_DOT_H__ #ifndef __RCOM_DOT_H__
#define __RCOM_DOT_H__ #define __RCOM_DOT_H__
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); uint64_t seq);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); int last_len, uint64_t seq);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq);
void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc,
int nodeid);
int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in);
#endif #endif
...@@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) ...@@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
} }
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
int save_slots) int save_slots, uint64_t seq)
{ {
struct dlm_rcom *rc = ls->ls_recover_buf; struct dlm_rcom *rc = ls->ls_recover_buf;
struct dlm_member *memb; struct dlm_member *memb;
...@@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, ...@@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
goto out; goto out;
} }
error = dlm_rcom_status(ls, memb->nodeid, 0); error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error) if (error)
goto out; goto out;
...@@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, ...@@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
} }
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
uint32_t status_flags) uint32_t status_flags, uint64_t seq)
{ {
struct dlm_rcom *rc = ls->ls_recover_buf; struct dlm_rcom *rc = ls->ls_recover_buf;
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
...@@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, ...@@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
goto out; goto out;
} }
error = dlm_rcom_status(ls, nodeid, status_flags); error = dlm_rcom_status(ls, nodeid, status_flags, seq);
if (error) if (error)
break; break;
...@@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, ...@@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
return error; return error;
} }
static int wait_status(struct dlm_ls *ls, uint32_t status) static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq)
{ {
uint32_t status_all = status << 1; uint32_t status_all = status << 1;
int error; int error;
if (ls->ls_low_nodeid == dlm_our_nodeid()) { if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, status, 0); error = wait_status_all(ls, status, 0, seq);
if (!error) if (!error)
dlm_set_recover_status(ls, status_all); dlm_set_recover_status(ls, status_all);
} else } else
error = wait_status_low(ls, status_all, 0); error = wait_status_low(ls, status_all, 0, seq);
return error; return error;
} }
int dlm_recover_members_wait(struct dlm_ls *ls) int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
{ {
struct dlm_member *memb; struct dlm_member *memb;
struct dlm_slot *slots; struct dlm_slot *slots;
...@@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls) ...@@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
} }
if (ls->ls_low_nodeid == dlm_our_nodeid()) { if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, DLM_RS_NODES, 1); error = wait_status_all(ls, DLM_RS_NODES, 1, seq);
if (error) if (error)
goto out; goto out;
...@@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls) ...@@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
dlm_set_recover_status(ls, DLM_RS_NODES_ALL); dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
} }
} else { } else {
error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS); error = wait_status_low(ls, DLM_RS_NODES_ALL,
DLM_RSF_NEED_SLOTS, seq);
if (error) if (error)
goto out; goto out;
...@@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls) ...@@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
return error; return error;
} }
int dlm_recover_directory_wait(struct dlm_ls *ls) int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq)
{ {
return wait_status(ls, DLM_RS_DIR); return wait_status(ls, DLM_RS_DIR, seq);
} }
int dlm_recover_locks_wait(struct dlm_ls *ls) int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq)
{ {
return wait_status(ls, DLM_RS_LOCKS); return wait_status(ls, DLM_RS_LOCKS, seq);
} }
int dlm_recover_done_wait(struct dlm_ls *ls) int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq)
{ {
return wait_status(ls, DLM_RS_DONE); return wait_status(ls, DLM_RS_DONE, seq);
} }
/* /*
...@@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r) ...@@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r)
* equals our_nodeid below). * equals our_nodeid below).
*/ */
static int recover_master(struct dlm_rsb *r, unsigned int *count) static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
int our_nodeid, dir_nodeid; int our_nodeid, dir_nodeid;
...@@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count) ...@@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
error = 0; error = 0;
} else { } else {
recover_idr_add(r); recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid); error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
} }
(*count)++; (*count)++;
...@@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count) ...@@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
* the correct dir node. * the correct dir node.
*/ */
int dlm_recover_masters(struct dlm_ls *ls) int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
unsigned int total = 0; unsigned int total = 0;
...@@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls) ...@@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
if (nodir) if (nodir)
error = recover_master_static(r, &count); error = recover_master_static(r, &count);
else else
error = recover_master(r, &count); error = recover_master(r, &count, seq);
unlock_rsb(r); unlock_rsb(r);
cond_resched(); cond_resched();
total++; total++;
...@@ -563,7 +564,7 @@ int dlm_recover_masters(struct dlm_ls *ls) ...@@ -563,7 +564,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
return error; return error;
} }
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int ret_nodeid, new_master; int ret_nodeid, new_master;
...@@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
* an equal number of replies then recovery for the rsb is done * an equal number of replies then recovery for the rsb is done
*/ */
static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head,
uint64_t seq)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
int error = 0; int error = 0;
list_for_each_entry(lkb, head, lkb_statequeue) { list_for_each_entry(lkb, head, lkb_statequeue) {
error = dlm_send_rcom_lock(r, lkb); error = dlm_send_rcom_lock(r, lkb, seq);
if (error) if (error)
break; break;
r->res_recover_locks_count++; r->res_recover_locks_count++;
...@@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) ...@@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
return error; return error;
} }
static int recover_locks(struct dlm_rsb *r) static int recover_locks(struct dlm_rsb *r, uint64_t seq)
{ {
int error = 0; int error = 0;
...@@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r) ...@@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r)
DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r);); DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
error = recover_locks_queue(r, &r->res_grantqueue); error = recover_locks_queue(r, &r->res_grantqueue, seq);
if (error) if (error)
goto out; goto out;
error = recover_locks_queue(r, &r->res_convertqueue); error = recover_locks_queue(r, &r->res_convertqueue, seq);
if (error) if (error)
goto out; goto out;
error = recover_locks_queue(r, &r->res_waitqueue); error = recover_locks_queue(r, &r->res_waitqueue, seq);
if (error) if (error)
goto out; goto out;
...@@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r) ...@@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r)
return error; return error;
} }
int dlm_recover_locks(struct dlm_ls *ls) int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int error, count = 0; int error, count = 0;
...@@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls) ...@@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
goto out; goto out;
} }
error = recover_locks(r); error = recover_locks(r, seq);
if (error) { if (error) {
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
goto out; goto out;
......
...@@ -15,13 +15,13 @@ ...@@ -15,13 +15,13 @@
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)); int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls));
uint32_t dlm_recover_status(struct dlm_ls *ls); uint32_t dlm_recover_status(struct dlm_ls *ls);
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status); void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status);
int dlm_recover_members_wait(struct dlm_ls *ls); int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_directory_wait(struct dlm_ls *ls); int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_locks_wait(struct dlm_ls *ls); int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_done_wait(struct dlm_ls *ls); int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_masters(struct dlm_ls *ls); int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc); int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls); int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
void dlm_recovered_lock(struct dlm_rsb *r); void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls); int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls); void dlm_release_root_list(struct dlm_ls *ls);
......
...@@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_NODES); dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls); error = dlm_recover_members_wait(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_members_wait error %d", error); log_rinfo(ls, "dlm_recover_members_wait error %d", error);
goto fail; goto fail;
...@@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* nodes their master rsb names that hash to us. * nodes their master rsb names that hash to us.
*/ */
error = dlm_recover_directory(ls); error = dlm_recover_directory(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_directory error %d", error); log_rinfo(ls, "dlm_recover_directory error %d", error);
goto fail; goto fail;
...@@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DIR); dlm_set_recover_status(ls, DLM_RS_DIR);
error = dlm_recover_directory_wait(ls); error = dlm_recover_directory_wait(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_directory_wait error %d", error); log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
goto fail; goto fail;
...@@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* departed nodes. * departed nodes.
*/ */
error = dlm_recover_masters(ls); error = dlm_recover_masters(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_masters error %d", error); log_rinfo(ls, "dlm_recover_masters error %d", error);
goto fail; goto fail;
...@@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Send our locks on remastered rsb's to the new masters. * Send our locks on remastered rsb's to the new masters.
*/ */
error = dlm_recover_locks(ls); error = dlm_recover_locks(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_locks error %d", error); log_rinfo(ls, "dlm_recover_locks error %d", error);
goto fail; goto fail;
...@@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_LOCKS); dlm_set_recover_status(ls, DLM_RS_LOCKS);
error = dlm_recover_locks_wait(ls); error = dlm_recover_locks_wait(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error); log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail; goto fail;
...@@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
*/ */
dlm_set_recover_status(ls, DLM_RS_LOCKS); dlm_set_recover_status(ls, DLM_RS_LOCKS);
error = dlm_recover_locks_wait(ls); error = dlm_recover_locks_wait(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error); log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail; goto fail;
...@@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DONE); dlm_set_recover_status(ls, DLM_RS_DONE);
error = dlm_recover_done_wait(ls); error = dlm_recover_done_wait(ls, rv->seq);
if (error) { if (error) {
log_rinfo(ls, "dlm_recover_done_wait error %d", error); log_rinfo(ls, "dlm_recover_done_wait error %d", error);
goto fail; goto fail;
......
...@@ -30,7 +30,8 @@ struct rq_entry { ...@@ -30,7 +30,8 @@ struct rq_entry {
* lockspace is enabled on some while still suspended on others. * lockspace is enabled on some while still suspended on others.
*/ */
void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
const struct dlm_message *ms)
{ {
struct rq_entry *e; struct rq_entry *e;
int length = le16_to_cpu(ms->m_header.h_length) - int length = le16_to_cpu(ms->m_header.h_length) -
......
...@@ -11,7 +11,8 @@ ...@@ -11,7 +11,8 @@
#ifndef __REQUESTQUEUE_DOT_H__ #ifndef __REQUESTQUEUE_DOT_H__
#define __REQUESTQUEUE_DOT_H__ #define __REQUESTQUEUE_DOT_H__
void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms); void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
const struct dlm_message *ms);
int dlm_process_requestqueue(struct dlm_ls *ls); int dlm_process_requestqueue(struct dlm_ls *ls);
void dlm_wait_requestqueue(struct dlm_ls *ls); void dlm_wait_requestqueue(struct dlm_ls *ls);
void dlm_purge_requestqueue(struct dlm_ls *ls); void dlm_purge_requestqueue(struct dlm_ls *ls);
......
...@@ -1436,17 +1436,14 @@ static int gfs2_lock(struct file *file, int cmd, struct file_lock *fl) ...@@ -1436,17 +1436,14 @@ static int gfs2_lock(struct file *file, int cmd, struct file_lock *fl)
if (!(fl->fl_flags & FL_POSIX)) if (!(fl->fl_flags & FL_POSIX))
return -ENOLCK; return -ENOLCK;
if (cmd == F_CANCELLK) {
/* Hack: */
cmd = F_SETLK;
fl->fl_type = F_UNLCK;
}
if (unlikely(gfs2_withdrawn(sdp))) { if (unlikely(gfs2_withdrawn(sdp))) {
if (fl->fl_type == F_UNLCK) if (fl->fl_type == F_UNLCK)
locks_lock_file_wait(file, fl); locks_lock_file_wait(file, fl);
return -EIO; return -EIO;
} }
if (IS_GETLK(cmd)) if (cmd == F_CANCELLK)
return dlm_posix_cancel(ls->ls_dlm, ip->i_no_addr, file, fl);
else if (IS_GETLK(cmd))
return dlm_posix_get(ls->ls_dlm, ip->i_no_addr, file, fl); return dlm_posix_get(ls->ls_dlm, ip->i_no_addr, file, fl);
else if (fl->fl_type == F_UNLCK) else if (fl->fl_type == F_UNLCK)
return dlm_posix_unlock(ls->ls_dlm, ip->i_no_addr, file, fl); return dlm_posix_unlock(ls->ls_dlm, ip->i_no_addr, file, fl);
......
...@@ -738,18 +738,11 @@ static int user_plock(struct ocfs2_cluster_connection *conn, ...@@ -738,18 +738,11 @@ static int user_plock(struct ocfs2_cluster_connection *conn,
* *
* Internally, fs/dlm will pass these to a misc device, which * Internally, fs/dlm will pass these to a misc device, which
* a userspace daemon will read and write to. * a userspace daemon will read and write to.
*
* For now, cancel requests (which happen internally only),
* are turned into unlocks. Most of this function taken from
* gfs2_lock.
*/ */
if (cmd == F_CANCELLK) { if (cmd == F_CANCELLK)
cmd = F_SETLK; return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl);
fl->fl_type = F_UNLCK; else if (IS_GETLK(cmd))
}
if (IS_GETLK(cmd))
return dlm_posix_get(conn->cc_lockspace, ino, file, fl); return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
else if (fl->fl_type == F_UNLCK) else if (fl->fl_type == F_UNLCK)
return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
......
...@@ -11,6 +11,8 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, ...@@ -11,6 +11,8 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
int cmd, struct file_lock *fl); int cmd, struct file_lock *fl);
int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
struct file_lock *fl); struct file_lock *fl);
int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file,
struct file_lock *fl);
int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
struct file_lock *fl); struct file_lock *fl);
#endif #endif
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <linux/dlm.h> #include <linux/dlm.h>
#include <linux/dlmconstants.h> #include <linux/dlmconstants.h>
#include <uapi/linux/dlm_plock.h>
#include <linux/tracepoint.h> #include <linux/tracepoint.h>
#include "../../../fs/dlm/dlm_internal.h" #include "../../../fs/dlm/dlm_internal.h"
...@@ -585,6 +586,56 @@ TRACE_EVENT(dlm_recv_message, ...@@ -585,6 +586,56 @@ TRACE_EVENT(dlm_recv_message,
); );
DECLARE_EVENT_CLASS(dlm_plock_template,
TP_PROTO(const struct dlm_plock_info *info),
TP_ARGS(info),
TP_STRUCT__entry(
__field(uint8_t, optype)
__field(uint8_t, ex)
__field(uint8_t, wait)
__field(uint8_t, flags)
__field(uint32_t, pid)
__field(int32_t, nodeid)
__field(int32_t, rv)
__field(uint32_t, fsid)
__field(uint64_t, number)
__field(uint64_t, start)
__field(uint64_t, end)
__field(uint64_t, owner)
),
TP_fast_assign(
__entry->optype = info->optype;
__entry->ex = info->ex;
__entry->wait = info->wait;
__entry->flags = info->flags;
__entry->pid = info->pid;
__entry->nodeid = info->nodeid;
__entry->rv = info->rv;
__entry->fsid = info->fsid;
__entry->number = info->number;
__entry->start = info->start;
__entry->end = info->end;
__entry->owner = info->owner;
),
TP_printk("fsid=%u number=%llx owner=%llx optype=%d ex=%d wait=%d flags=%x pid=%u nodeid=%d rv=%d start=%llx end=%llx",
__entry->fsid, __entry->number, __entry->owner,
__entry->optype, __entry->ex, __entry->wait,
__entry->flags, __entry->pid, __entry->nodeid,
__entry->rv, __entry->start, __entry->end)
);
DEFINE_EVENT(dlm_plock_template, dlm_plock_read,
TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info));
DEFINE_EVENT(dlm_plock_template, dlm_plock_write,
TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info));
TRACE_EVENT(dlm_send, TRACE_EVENT(dlm_send,
TP_PROTO(int nodeid, int ret), TP_PROTO(int nodeid, int ret),
......
...@@ -22,6 +22,7 @@ enum { ...@@ -22,6 +22,7 @@ enum {
DLM_PLOCK_OP_LOCK = 1, DLM_PLOCK_OP_LOCK = 1,
DLM_PLOCK_OP_UNLOCK, DLM_PLOCK_OP_UNLOCK,
DLM_PLOCK_OP_GET, DLM_PLOCK_OP_GET,
DLM_PLOCK_OP_CANCEL,
}; };
#define DLM_PLOCK_FL_CLOSE 1 #define DLM_PLOCK_FL_CLOSE 1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment