Commit a7e7ffac authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: rename stub to local message flag

This patch renames DLM_IFL_STUB_MS to DLM_IFL_LOCAL_MS flag. The
DLM_IFL_STUB_MS flag is somewhat misnamed, it means the dlm message is
used for local message transfer only. It is used by recovery to resolve
lock states if a node got fenced.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 01c7a597
...@@ -203,7 +203,7 @@ struct dlm_args { ...@@ -203,7 +203,7 @@ struct dlm_args {
#define DLM_IFL_OVERLAP_CANCEL 0x00100000 #define DLM_IFL_OVERLAP_CANCEL 0x00100000
#define DLM_IFL_ENDOFLIFE 0x00200000 #define DLM_IFL_ENDOFLIFE 0x00200000
#define DLM_IFL_DEADLOCK_CANCEL 0x01000000 #define DLM_IFL_DEADLOCK_CANCEL 0x01000000
#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */ #define DLM_IFL_LOCAL_MS 0x02000000 /* magic number for m_flags */
#define DLM_IFL_CB_PENDING_BIT 0 #define DLM_IFL_CB_PENDING_BIT 0
...@@ -593,9 +593,9 @@ struct dlm_ls { ...@@ -593,9 +593,9 @@ struct dlm_ls {
int ls_slots_size; int ls_slots_size;
struct dlm_slot *ls_slots; struct dlm_slot *ls_slots;
struct dlm_rsb ls_stub_rsb; /* for returning errors */ struct dlm_rsb ls_local_rsb; /* for returning errors */
struct dlm_lkb ls_stub_lkb; /* for returning errors */ struct dlm_lkb ls_local_lkb; /* for returning errors */
struct dlm_message ls_stub_ms; /* for faking a reply */ struct dlm_message ls_local_ms; /* for faking a reply */
struct dentry *ls_debug_rsb_dentry; /* debugfs */ struct dentry *ls_debug_rsb_dentry; /* debugfs */
struct dentry *ls_debug_waiters_dentry; /* debugfs */ struct dentry *ls_debug_waiters_dentry; /* debugfs */
......
...@@ -1558,7 +1558,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) ...@@ -1558,7 +1558,7 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
return error; return error;
} }
/* Handles situations where we might be processing a "fake" or "stub" reply in /* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */ which we can't try to take waiters_mutex again. */
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
...@@ -1566,10 +1566,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -1566,10 +1566,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
struct dlm_ls *ls = lkb->lkb_resource->res_ls; struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error; int error;
if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) if (ms->m_flags != cpu_to_le32(DLM_IFL_LOCAL_MS))
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) if (ms->m_flags != cpu_to_le32(DLM_IFL_LOCAL_MS))
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
return error; return error;
} }
...@@ -3486,10 +3486,10 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -3486,10 +3486,10 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* down conversions go without a reply from the master */ /* down conversions go without a reply from the master */
if (!error && down_conversion(lkb)) { if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS); r->res_ls->ls_local_ms.m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_stub_ms.m_result = 0; r->res_ls->ls_local_ms.m_result = 0;
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms);
} }
return error; return error;
...@@ -3648,7 +3648,7 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) ...@@ -3648,7 +3648,7 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
int ret_nodeid, int rv) int ret_nodeid, int rv)
{ {
struct dlm_rsb *r = &ls->ls_stub_rsb; struct dlm_rsb *r = &ls->ls_local_rsb;
struct dlm_message *ms; struct dlm_message *ms;
struct dlm_mhandle *mh; struct dlm_mhandle *mh;
int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
...@@ -3681,7 +3681,7 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -3681,7 +3681,7 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{ {
if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS)) if (ms->m_flags == cpu_to_le32(DLM_IFL_LOCAL_MS))
return; return;
lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
...@@ -3768,12 +3768,12 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -3768,12 +3768,12 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
return 0; return 0;
} }
/* We fill in the stub-lkb fields with the info that send_xxxx_reply() /* We fill in the local-lkb fields with the info that send_xxxx_reply()
uses to send a reply and that the remote end uses to process the reply. */ uses to send a reply and that the remote end uses to process the reply. */
static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
{ {
struct dlm_lkb *lkb = &ls->ls_stub_lkb; struct dlm_lkb *lkb = &ls->ls_local_lkb;
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
lkb->lkb_remid = le32_to_cpu(ms->m_lkid); lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
} }
...@@ -3906,8 +3906,8 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3906,8 +3906,8 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
le32_to_cpu(ms->m_lkid), from_nodeid, error); le32_to_cpu(ms->m_lkid), from_nodeid, error);
} }
setup_stub_lkb(ls, ms); setup_local_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error; return error;
} }
...@@ -3962,8 +3962,8 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3962,8 +3962,8 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
fail: fail:
setup_stub_lkb(ls, ms); setup_local_lkb(ls, ms);
send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error; return error;
} }
...@@ -4014,8 +4014,8 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4014,8 +4014,8 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
fail: fail:
setup_stub_lkb(ls, ms); setup_local_lkb(ls, ms);
send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error; return error;
} }
...@@ -4050,8 +4050,8 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -4050,8 +4050,8 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
return 0; return 0;
fail: fail:
setup_stub_lkb(ls, ms); setup_local_lkb(ls, ms);
send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
return error; return error;
} }
...@@ -4403,7 +4403,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -4403,7 +4403,7 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error) if (error)
goto out; goto out;
/* stub reply can happen with waiters_mutex held */ /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms); error = remove_from_waiters_ms(lkb, ms);
if (error) if (error)
goto out; goto out;
...@@ -4440,7 +4440,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -4440,7 +4440,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error) if (error)
goto out; goto out;
/* stub reply can happen with waiters_mutex held */ /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms); error = remove_from_waiters_ms(lkb, ms);
if (error) if (error)
goto out; goto out;
...@@ -4490,7 +4490,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) ...@@ -4490,7 +4490,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
if (error) if (error)
goto out; goto out;
/* stub reply can happen with waiters_mutex held */ /* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms); error = remove_from_waiters_ms(lkb, ms);
if (error) if (error)
goto out; goto out;
...@@ -4834,16 +4834,16 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) ...@@ -4834,16 +4834,16 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
} }
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
struct dlm_message *ms_stub) struct dlm_message *ms_local)
{ {
if (middle_conversion(lkb)) { if (middle_conversion(lkb)) {
hold_lkb(lkb); hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message)); memset(ms_local, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_convert_reply(lkb, ms_stub); _receive_convert_reply(lkb, ms_local);
/* Same special case as in receive_rcom_lock_args() */ /* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV; lkb->lkb_grmode = DLM_LOCK_IV;
...@@ -4882,12 +4882,12 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -4882,12 +4882,12 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
void dlm_recover_waiters_pre(struct dlm_ls *ls) void dlm_recover_waiters_pre(struct dlm_ls *ls)
{ {
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub; struct dlm_message *ms_local;
int wait_type, stub_unlock_result, stub_cancel_result; int wait_type, local_unlock_result, local_cancel_result;
int dir_nodeid; int dir_nodeid;
ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL); ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
if (!ms_stub) if (!ms_local)
return; return;
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
...@@ -4923,8 +4923,8 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4923,8 +4923,8 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
continue; continue;
wait_type = lkb->lkb_wait_type; wait_type = lkb->lkb_wait_type;
stub_unlock_result = -DLM_EUNLOCK; local_unlock_result = -DLM_EUNLOCK;
stub_cancel_result = -DLM_ECANCEL; local_cancel_result = -DLM_ECANCEL;
/* Main reply may have been received leaving a zero wait_type, /* Main reply may have been received leaving a zero wait_type,
but a reply for the overlapping op may not have been but a reply for the overlapping op may not have been
...@@ -4935,17 +4935,17 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4935,17 +4935,17 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (is_overlap_cancel(lkb)) { if (is_overlap_cancel(lkb)) {
wait_type = DLM_MSG_CANCEL; wait_type = DLM_MSG_CANCEL;
if (lkb->lkb_grmode == DLM_LOCK_IV) if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_cancel_result = 0; local_cancel_result = 0;
} }
if (is_overlap_unlock(lkb)) { if (is_overlap_unlock(lkb)) {
wait_type = DLM_MSG_UNLOCK; wait_type = DLM_MSG_UNLOCK;
if (lkb->lkb_grmode == DLM_LOCK_IV) if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_unlock_result = -ENOENT; local_unlock_result = -ENOENT;
} }
log_debug(ls, "rwpre overlap %x %x %d %d %d", log_debug(ls, "rwpre overlap %x %x %d %d %d",
lkb->lkb_id, lkb->lkb_flags, wait_type, lkb->lkb_id, lkb->lkb_flags, wait_type,
stub_cancel_result, stub_unlock_result); local_cancel_result, local_unlock_result);
} }
switch (wait_type) { switch (wait_type) {
...@@ -4955,28 +4955,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4955,28 +4955,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
break; break;
case DLM_MSG_CONVERT: case DLM_MSG_CONVERT:
recover_convert_waiter(ls, lkb, ms_stub); recover_convert_waiter(ls, lkb, ms_local);
break; break;
case DLM_MSG_UNLOCK: case DLM_MSG_UNLOCK:
hold_lkb(lkb); hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message)); memset(ms_local, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result)); ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_unlock_reply(lkb, ms_stub); _receive_unlock_reply(lkb, ms_local);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
break; break;
case DLM_MSG_CANCEL: case DLM_MSG_CANCEL:
hold_lkb(lkb); hold_lkb(lkb);
memset(ms_stub, 0, sizeof(struct dlm_message)); memset(ms_local, 0, sizeof(struct dlm_message));
ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result)); ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
_receive_cancel_reply(lkb, ms_stub); _receive_cancel_reply(lkb, ms_local);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
break; break;
...@@ -4987,7 +4987,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4987,7 +4987,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
schedule(); schedule();
} }
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_stub); kfree(ms_local);
} }
static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
......
...@@ -529,8 +529,8 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -529,8 +529,8 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_total_weight = 0; ls->ls_total_weight = 0;
ls->ls_node_array = NULL; ls->ls_node_array = NULL;
memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb)); memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
ls->ls_stub_rsb.res_ls = ls; ls->ls_local_rsb.res_ls = ls;
ls->ls_debug_rsb_dentry = NULL; ls->ls_debug_rsb_dentry = NULL;
ls->ls_debug_waiters_dentry = NULL; ls->ls_debug_waiters_dentry = NULL;
......
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
{ DLM_IFL_OVERLAP_CANCEL, "OVERLAP_CANCEL" }, \ { DLM_IFL_OVERLAP_CANCEL, "OVERLAP_CANCEL" }, \
{ DLM_IFL_ENDOFLIFE, "ENDOFLIFE" }, \ { DLM_IFL_ENDOFLIFE, "ENDOFLIFE" }, \
{ DLM_IFL_DEADLOCK_CANCEL, "DEADLOCK_CANCEL" }, \ { DLM_IFL_DEADLOCK_CANCEL, "DEADLOCK_CANCEL" }, \
{ DLM_IFL_STUB_MS, "STUB_MS" }, \ { DLM_IFL_LOCAL_MS, "LOCAL_MS" }, \
{ DLM_IFL_USER, "USER" }, \ { DLM_IFL_USER, "USER" }, \
{ DLM_IFL_ORPHAN, "ORPHAN" }) { DLM_IFL_ORPHAN, "ORPHAN" })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment