Commit a2b1da09 authored by Ilya Dryomov's avatar Ilya Dryomov

rbd: lock should be quiesced on reacquire

Quiesce exclusive lock at the top of rbd_reacquire_lock() instead
of only when ceph_cls_set_cookie() fails.  This avoids a deadlock on
rbd_dev->lock_rwsem.

If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can
hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O
reply, because, like lock and unlock requests, set_cookie is sent and
waited upon with rbd_dev->lock_rwsem held for write.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarDongsheng Yang <dongsheng.yang@easystack.cn>
parent 793333a3
...@@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) ...@@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
{ {
struct rbd_client_id cid = rbd_get_cid(rbd_dev); struct rbd_client_id cid = rbd_get_cid(rbd_dev);
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
strcpy(rbd_dev->lock_cookie, cookie); strcpy(rbd_dev->lock_cookie, cookie);
rbd_set_owner_cid(rbd_dev, &cid); rbd_set_owner_cid(rbd_dev, &cid);
queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
...@@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev) ...@@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev)
if (ret) if (ret)
return ret; return ret;
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
__rbd_lock(rbd_dev, cookie); __rbd_lock(rbd_dev, cookie);
return 0; return 0;
} }
...@@ -3411,13 +3411,11 @@ static void rbd_acquire_lock(struct work_struct *work) ...@@ -3411,13 +3411,11 @@ static void rbd_acquire_lock(struct work_struct *work)
} }
} }
/* static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
* lock_rwsem must be held for write
*/
static bool rbd_release_lock(struct rbd_device *rbd_dev)
{ {
dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, dout("%s rbd_dev %p\n", __func__, rbd_dev);
rbd_dev->lock_state); lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
return false; return false;
...@@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) ...@@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
up_read(&rbd_dev->lock_rwsem); up_read(&rbd_dev->lock_rwsem);
down_write(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
return false; return false;
return true;
}
/*
* lock_rwsem must be held for write
*/
static void rbd_release_lock(struct rbd_device *rbd_dev)
{
if (!rbd_quiesce_lock(rbd_dev))
return;
rbd_unlock(rbd_dev); rbd_unlock(rbd_dev);
/* /*
* Give others a chance to grab the lock - we would re-acquire * Give others a chance to grab the lock - we would re-acquire
* almost immediately if we got new IO during ceph_osdc_sync() * almost immediately if we got new IO during ceph_osdc_sync()
...@@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev) ...@@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
* after wake_requests() in rbd_handle_released_lock(). * after wake_requests() in rbd_handle_released_lock().
*/ */
cancel_delayed_work(&rbd_dev->lock_dwork); cancel_delayed_work(&rbd_dev->lock_dwork);
return true;
} }
static void rbd_release_lock_work(struct work_struct *work) static void rbd_release_lock_work(struct work_struct *work)
...@@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) ...@@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
char cookie[32]; char cookie[32];
int ret; int ret;
WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); if (!rbd_quiesce_lock(rbd_dev))
return;
format_lock_cookie(rbd_dev, cookie); format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
...@@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) ...@@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
* Lock cookie cannot be updated on older OSDs, so do * Lock cookie cannot be updated on older OSDs, so do
* a manual release and queue an acquire. * a manual release and queue an acquire.
*/ */
if (rbd_release_lock(rbd_dev)) rbd_unlock(rbd_dev);
queue_delayed_work(rbd_dev->task_wq, queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
&rbd_dev->lock_dwork, 0);
} else { } else {
__rbd_lock(rbd_dev, cookie); __rbd_lock(rbd_dev, cookie);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment