Commit 6b52ea79 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: drop mutex use in waiters recovery

The waiters_mutex no longer needs to be used in the waiters recovery
functions dlm_recover_waiters_pre() and dlm_recover_waiters_pre().
During recovery, ordinary locking operations are paused, and the
recovery thread is the only context accessing the waiters list,
so the lock is not needed.

Access to the waiters list from debugfs functions is avoided by
taking the top level recovery lock in the debugfs dump function.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 3ae67760
...@@ -737,6 +737,12 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf, ...@@ -737,6 +737,12 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv; size_t len = DLM_DEBUG_BUF_LEN, pos = 0, ret, rv;
mutex_lock(&debug_buf_lock); mutex_lock(&debug_buf_lock);
ret = dlm_lock_recovery_try(ls);
if (!ret) {
rv = -EAGAIN;
goto out;
}
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
memset(debug_buf, 0, sizeof(debug_buf)); memset(debug_buf, 0, sizeof(debug_buf));
...@@ -749,8 +755,10 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf, ...@@ -749,8 +755,10 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
pos += ret; pos += ret;
} }
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
dlm_unlock_recovery(ls);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos); rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
out:
mutex_unlock(&debug_buf_lock); mutex_unlock(&debug_buf_lock);
return rv; return rv;
} }
...@@ -772,7 +780,12 @@ static ssize_t waiters_write(struct file *file, const char __user *user_buf, ...@@ -772,7 +780,12 @@ static ssize_t waiters_write(struct file *file, const char __user *user_buf,
if (n != 3) if (n != 3)
return -EINVAL; return -EINVAL;
error = dlm_lock_recovery_try(ls);
if (!error)
return -EAGAIN;
error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid); error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid);
dlm_unlock_recovery(ls);
if (error) if (error)
return error; return error;
......
...@@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r) ...@@ -201,7 +201,7 @@ void dlm_dump_rsb(struct dlm_rsb *r)
/* Threads cannot use the lockspace while it's being recovered */ /* Threads cannot use the lockspace while it's being recovered */
static inline void dlm_lock_recovery(struct dlm_ls *ls) void dlm_lock_recovery(struct dlm_ls *ls)
{ {
down_read(&ls->ls_in_recovery); down_read(&ls->ls_in_recovery);
} }
...@@ -1556,7 +1556,11 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) ...@@ -1556,7 +1556,11 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
} }
/* Handles situations where we might be processing a "fake" or "local" reply in /* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */ * the recovery context which stops any locking activity. Only debugfs might
* change the lockspace waiters but they will held the recovery lock to ensure
* remove_from_waiters_ms() in local case will be the only user manipulating the
* lockspace waiters in recovery context.
*/
static int remove_from_waiters_ms(struct dlm_lkb *lkb, static int remove_from_waiters_ms(struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local) const struct dlm_message *ms, bool local)
...@@ -1566,6 +1570,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, ...@@ -1566,6 +1570,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
if (!local) if (!local)
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
else
WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
!dlm_locking_stopped(ls));
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
if (!local) if (!local)
mutex_unlock(&ls->ls_waiters_mutex); mutex_unlock(&ls->ls_waiters_mutex);
...@@ -4398,7 +4405,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, ...@@ -4398,7 +4405,6 @@ static void _receive_convert_reply(struct dlm_lkb *lkb,
if (error) if (error)
goto out; goto out;
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local); error = remove_from_waiters_ms(lkb, ms, local);
if (error) if (error)
goto out; goto out;
...@@ -4437,7 +4443,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, ...@@ -4437,7 +4443,6 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb,
if (error) if (error)
goto out; goto out;
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local); error = remove_from_waiters_ms(lkb, ms, local);
if (error) if (error)
goto out; goto out;
...@@ -4489,7 +4494,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, ...@@ -4489,7 +4494,6 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb,
if (error) if (error)
goto out; goto out;
/* local reply can happen with waiters_mutex held */
error = remove_from_waiters_ms(lkb, ms, local); error = remove_from_waiters_ms(lkb, ms, local);
if (error) if (error)
goto out; goto out;
...@@ -4890,8 +4894,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4890,8 +4894,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!ms_local) if (!ms_local)
return; return;
mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
...@@ -4984,7 +4986,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) ...@@ -4984,7 +4986,6 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
} }
schedule(); schedule();
} }
mutex_unlock(&ls->ls_waiters_mutex);
kfree(ms_local); kfree(ms_local);
} }
......
...@@ -23,6 +23,7 @@ void dlm_hold_rsb(struct dlm_rsb *r); ...@@ -23,6 +23,7 @@ void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb); int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls); void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_lock_recovery(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment