Commit ba2bf218 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

ocfs2_dlm: fix cluster-wide refcounting of lock resources

This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 5331be09
...@@ -38,6 +38,9 @@ ...@@ -38,6 +38,9 @@
* locking semantics of the file system using the protocol. It should * locking semantics of the file system using the protocol. It should
* be somewhere else, I'm sure, but right now it isn't. * be somewhere else, I'm sure, but right now it isn't.
* *
* New in version 6:
* - DLM lockres remote refcount fixes.
*
* New in version 5: * New in version 5:
* - Network timeout checking protocol * - Network timeout checking protocol
* *
...@@ -51,7 +54,7 @@ ...@@ -51,7 +54,7 @@
* - full 64 bit i_size in the metadata lock lvbs * - full 64 bit i_size in the metadata lock lvbs
* - introduction of "rw" lock and pushing meta/data locking down * - introduction of "rw" lock and pushing meta/data locking down
*/ */
#define O2NET_PROTOCOL_VERSION 5ULL #define O2NET_PROTOCOL_VERSION 6ULL
struct o2net_handshake { struct o2net_handshake {
__be64 protocol_version; __be64 protocol_version;
__be64 connector_id; __be64 connector_id;
......
...@@ -222,6 +222,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm, ...@@ -222,6 +222,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
#define DLM_LOCK_RES_DIRTY 0x00000008 #define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 #define DLM_LOCK_RES_IN_PROGRESS 0x00000010
#define DLM_LOCK_RES_MIGRATING 0x00000020 #define DLM_LOCK_RES_MIGRATING 0x00000020
#define DLM_LOCK_RES_DROPPING_REF 0x00000040
/* max milliseconds to wait to sync up a network failure with a node death */ /* max milliseconds to wait to sync up a network failure with a node death */
#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
...@@ -265,6 +266,8 @@ struct dlm_lock_resource ...@@ -265,6 +266,8 @@ struct dlm_lock_resource
u8 owner; //node which owns the lock resource, or unknown u8 owner; //node which owns the lock resource, or unknown
u16 state; u16 state;
char lvb[DLM_LVB_LEN]; char lvb[DLM_LVB_LEN];
unsigned int inflight_locks;
unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
}; };
struct dlm_migratable_lock struct dlm_migratable_lock
...@@ -367,7 +370,7 @@ enum { ...@@ -367,7 +370,7 @@ enum {
DLM_CONVERT_LOCK_MSG, /* 504 */ DLM_CONVERT_LOCK_MSG, /* 504 */
DLM_PROXY_AST_MSG, /* 505 */ DLM_PROXY_AST_MSG, /* 505 */
DLM_UNLOCK_LOCK_MSG, /* 506 */ DLM_UNLOCK_LOCK_MSG, /* 506 */
DLM_UNUSED_MSG2, /* 507 */ DLM_DEREF_LOCKRES_MSG, /* 507 */
DLM_MIGRATE_REQUEST_MSG, /* 508 */ DLM_MIGRATE_REQUEST_MSG, /* 508 */
DLM_MIG_LOCKRES_MSG, /* 509 */ DLM_MIG_LOCKRES_MSG, /* 509 */
DLM_QUERY_JOIN_MSG, /* 510 */ DLM_QUERY_JOIN_MSG, /* 510 */
...@@ -417,6 +420,9 @@ struct dlm_master_request ...@@ -417,6 +420,9 @@ struct dlm_master_request
u8 name[O2NM_MAX_NAME_LEN]; u8 name[O2NM_MAX_NAME_LEN];
}; };
#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 #define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
#define DLM_ASSERT_MASTER_REQUERY 0x00000002 #define DLM_ASSERT_MASTER_REQUERY 0x00000002
#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
...@@ -430,6 +436,8 @@ struct dlm_assert_master ...@@ -430,6 +436,8 @@ struct dlm_assert_master
u8 name[O2NM_MAX_NAME_LEN]; u8 name[O2NM_MAX_NAME_LEN];
}; };
#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
struct dlm_migrate_request struct dlm_migrate_request
{ {
u8 master; u8 master;
...@@ -648,6 +656,16 @@ struct dlm_finalize_reco ...@@ -648,6 +656,16 @@ struct dlm_finalize_reco
__be32 pad2; __be32 pad2;
}; };
struct dlm_deref_lockres
{
u32 pad1;
u16 pad2;
u8 node_idx;
u8 namelen;
u8 name[O2NM_MAX_NAME_LEN];
};
static inline enum dlm_status static inline enum dlm_status
__dlm_lockres_state_to_status(struct dlm_lock_resource *res) __dlm_lockres_state_to_status(struct dlm_lock_resource *res)
{ {
...@@ -721,8 +739,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, ...@@ -721,8 +739,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res); struct dlm_lock_resource *res);
void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res); struct dlm_lock_resource *res);
void dlm_purge_lockres(struct dlm_ctxt *dlm, int dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres); struct dlm_lock_resource *lockres);
static inline void dlm_lockres_get(struct dlm_lock_resource *res) static inline void dlm_lockres_get(struct dlm_lock_resource *res)
{ {
/* This is called on every lookup, so it might be worth /* This is called on every lookup, so it might be worth
...@@ -733,6 +751,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res); ...@@ -733,6 +751,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_unhash_lockres(struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm, void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res); struct dlm_lock_resource *res);
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash);
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int len, unsigned int len,
...@@ -753,6 +775,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, ...@@ -753,6 +775,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int namelen); unsigned int namelen);
#define dlm_lockres_set_refmap_bit(bit,res) \
__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
#define dlm_lockres_clear_refmap_bit(bit,res) \
__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
static inline void __dlm_lockres_set_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: setting bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
set_bit(bit, res->refmap);
}
static inline void __dlm_lockres_clear_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
clear_bit(bit, res->refmap);
}
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
const char *file,
int line);
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int new_lockres,
const char *file,
int line);
#define dlm_lockres_drop_inflight_ref(d,r) \
__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref_new(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_do_local_ast(struct dlm_ctxt *dlm, void dlm_do_local_ast(struct dlm_ctxt *dlm,
...@@ -805,6 +868,7 @@ int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); ...@@ -805,6 +868,7 @@ int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_migrate_lockres(struct dlm_ctxt *dlm, int dlm_migrate_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res,
u8 target); u8 target);
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_finish_migration(struct dlm_ctxt *dlm, int dlm_finish_migration(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res,
u8 old_master); u8 old_master);
...@@ -814,6 +878,7 @@ void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res); ...@@ -814,6 +878,7 @@ void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
...@@ -856,10 +921,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res) ...@@ -856,10 +921,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
int dlm_init_mle_cache(void); int dlm_init_mle_cache(void);
void dlm_destroy_mle_cache(void); void dlm_destroy_mle_cache(void);
void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_clean_master_list(struct dlm_ctxt *dlm, void dlm_clean_master_list(struct dlm_ctxt *dlm,
u8 dead_node); u8 dead_node);
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
int __dlm_lockres_unused(struct dlm_lock_resource *res); int __dlm_lockres_unused(struct dlm_lock_resource *res);
static inline const char * dlm_lock_mode_name(int mode) static inline const char * dlm_lock_mode_name(int mode)
......
...@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res) ...@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
} }
static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
{
int bit;
assert_spin_locked(&res->spinlock);
mlog(ML_NOTICE, " refmap nodes: [ ");
bit = 0;
while (1) {
bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
if (bit >= O2NM_MAX_NODES)
break;
printk("%u ", bit);
bit++;
}
printk("], inflight=%u\n", res->inflight_locks);
}
void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
{ {
struct list_head *iter2; struct list_head *iter2;
...@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) ...@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
res->owner, res->state); res->owner, res->state);
mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
res->last_used, list_empty(&res->purge) ? "no" : "yes"); res->last_used, list_empty(&res->purge) ? "no" : "yes");
dlm_print_lockres_refmap(res);
mlog(ML_NOTICE, " granted queue: \n"); mlog(ML_NOTICE, " granted queue: \n");
list_for_each(iter2, &res->granted) { list_for_each(iter2, &res->granted) {
lock = list_entry(iter2, struct dlm_lock, list); lock = list_entry(iter2, struct dlm_lock, list);
......
...@@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, ...@@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
hlist_add_head(&res->hash_node, bucket); hlist_add_head(&res->hash_node, bucket);
} }
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int len, unsigned int len,
unsigned int hash) unsigned int hash)
{ {
struct hlist_head *bucket; struct hlist_head *bucket;
struct hlist_node *list; struct hlist_node *list;
...@@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, ...@@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
return NULL; return NULL;
} }
/* intended to be called by functions which do not care about lock
* resources which are being purged (most net _handler functions).
* this will return NULL for any lock resource which is found but
* currently in the process of dropping its mastery reference.
* use __dlm_lookup_lockres_full when you need the lock resource
* regardless (e.g. dlm_get_lock_resource) */
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
{
struct dlm_lock_resource *res = NULL;
mlog_entry("%.*s\n", len, name);
assert_spin_locked(&dlm->spinlock);
res = __dlm_lookup_lockres_full(dlm, name, len, hash);
if (res) {
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_DROPPING_REF) {
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
return NULL;
}
spin_unlock(&res->spinlock);
}
return res;
}
struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int len) unsigned int len)
...@@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) ...@@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
wake_up(&dlm_domain_events); wake_up(&dlm_domain_events);
} }
static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
{ {
int i; int i, num, n, ret = 0;
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
struct hlist_node *iter;
struct hlist_head *bucket;
int dropped;
mlog(0, "Migrating locks from domain %s\n", dlm->name); mlog(0, "Migrating locks from domain %s\n", dlm->name);
restart:
num = 0;
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
for (i = 0; i < DLM_HASH_BUCKETS; i++) { for (i = 0; i < DLM_HASH_BUCKETS; i++) {
while (!hlist_empty(dlm_lockres_hash(dlm, i))) { redo_bucket:
res = hlist_entry(dlm_lockres_hash(dlm, i)->first, n = 0;
struct dlm_lock_resource, hash_node); bucket = dlm_lockres_hash(dlm, i);
/* need reference when manually grabbing lockres */ iter = bucket->first;
while (iter) {
n++;
res = hlist_entry(iter, struct dlm_lock_resource,
hash_node);
dlm_lockres_get(res); dlm_lockres_get(res);
/* this should unhash the lockres /* migrate, if necessary. this will drop the dlm
* and exit with dlm->spinlock */ * spinlock and retake it if it does migration. */
mlog(0, "purging res=%p\n", res); dropped = dlm_empty_lockres(dlm, res);
if (dlm_lockres_is_dirty(dlm, res)) {
/* HACK! this should absolutely go. spin_lock(&res->spinlock);
* need to figure out why some empty __dlm_lockres_calc_usage(dlm, res);
* lockreses are still marked dirty */ iter = res->hash_node.next;
mlog(ML_ERROR, "lockres %.*s dirty!\n", spin_unlock(&res->spinlock);
res->lockname.len, res->lockname.name);
spin_unlock(&dlm->spinlock);
dlm_kick_thread(dlm, res);
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_put(res);
goto restart;
}
dlm_purge_lockres(dlm, res);
dlm_lockres_put(res); dlm_lockres_put(res);
cond_resched_lock(&dlm->spinlock);
if (dropped)
goto redo_bucket;
} }
num += n;
mlog(0, "%s: touched %d lockreses in bucket %d "
"(tot=%d)\n", dlm->name, n, i, num);
} }
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
wake_up(&dlm->dlm_thread_wq);
/* let the dlm thread take care of purging, keep scanning until
* nothing remains in the hash */
if (num) {
mlog(0, "%s: %d lock resources in hash last pass\n",
dlm->name, num);
ret = -EAGAIN;
}
mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
return ret;
} }
static int dlm_no_joining_node(struct dlm_ctxt *dlm) static int dlm_no_joining_node(struct dlm_ctxt *dlm)
...@@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) ...@@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
/* We changed dlm state, notify the thread */ /* We changed dlm state, notify the thread */
dlm_kick_thread(dlm, NULL); dlm_kick_thread(dlm, NULL);
dlm_migrate_all_locks(dlm); while (dlm_migrate_all_locks(dlm)) {
mlog(0, "%s: more migration to do\n", dlm->name);
}
dlm_mark_domain_leaving(dlm); dlm_mark_domain_leaving(dlm);
dlm_leave_domain(dlm); dlm_leave_domain(dlm);
dlm_complete_dlm_shutdown(dlm); dlm_complete_dlm_shutdown(dlm);
...@@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) ...@@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
if (status) if (status)
goto bail; goto bail;
status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
sizeof(struct dlm_deref_lockres),
dlm_deref_lockres_handler,
dlm, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
sizeof(struct dlm_migrate_request), sizeof(struct dlm_migrate_request),
dlm_migrate_request_handler, dlm_migrate_request_handler,
......
...@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, ...@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
kick_thread = 1; kick_thread = 1;
} }
} }
/* reduce the inflight count, this may result in the lockres
* being purged below during calc_usage */
if (lock->ml.node == dlm->node_num)
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
wake_up(&res->wq); wake_up(&res->wq);
......
This diff is collapsed.
...@@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, ...@@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
if (total_locks == mres_total_locks) if (total_locks == mres_total_locks)
mres->flags |= DLM_MRES_ALL_DONE; mres->flags |= DLM_MRES_ALL_DONE;
mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
dlm->name, res->lockname.len, res->lockname.name,
orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
send_to);
/* send it */ /* send it */
ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
sz, send_to, &status); sz, send_to, &status);
...@@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock, ...@@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
return 0; return 0;
} }
static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
struct dlm_migratable_lockres *mres)
{
struct dlm_lock dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.ml.cookie = 0;
dummy.ml.type = LKM_IVMODE;
dummy.ml.convert_type = LKM_IVMODE;
dummy.ml.highest_blocked = LKM_IVMODE;
dummy.lksb = NULL;
dummy.ml.node = dlm->node_num;
dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
}
static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
struct dlm_migratable_lock *ml,
u8 *nodenum)
{
if (unlikely(ml->cookie == 0 &&
ml->type == LKM_IVMODE &&
ml->convert_type == LKM_IVMODE &&
ml->highest_blocked == LKM_IVMODE &&
ml->list == DLM_BLOCKED_LIST)) {
*nodenum = ml->node;
return 1;
}
return 0;
}
int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres, struct dlm_migratable_lockres *mres,
...@@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
goto error; goto error;
} }
} }
if (total_locks == 0) {
/* send a dummy lock to indicate a mastery reference only */
mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
dlm->name, res->lockname.len, res->lockname.name,
send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
"migration");
dlm_add_dummy_lock(dlm, mres);
}
/* flush any remaining locks */ /* flush any remaining locks */
ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
if (ret < 0) if (ret < 0)
...@@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
/* add an extra ref for just-allocated lockres /* add an extra ref for just-allocated lockres
* otherwise the lockres will be purged immediately */ * otherwise the lockres will be purged immediately */
dlm_lockres_get(res); dlm_lockres_get(res);
} }
/* at this point we have allocated everything we need, /* at this point we have allocated everything we need,
* and we have a hashed lockres with an extra ref and * and we have a hashed lockres with an extra ref and
* the proper res->state flags. */ * the proper res->state flags. */
ret = 0; ret = 0;
spin_lock(&res->spinlock);
/* drop this either when master requery finds a different master
* or when a lock is added by the recovery worker */
dlm_lockres_grab_inflight_ref(dlm, res);
if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
/* migration cannot have an unknown master */ /* migration cannot have an unknown master */
BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
...@@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
"unknown owner.. will need to requery: " "unknown owner.. will need to requery: "
"%.*s\n", mres->lockname_len, mres->lockname); "%.*s\n", mres->lockname_len, mres->lockname);
} else { } else {
spin_lock(&res->spinlock); /* take a reference now to pin the lockres, drop it
* when locks are added in the worker */
dlm_change_lockres_owner(dlm, res, dlm->node_num); dlm_change_lockres_owner(dlm, res, dlm->node_num);
spin_unlock(&res->spinlock);
} }
spin_unlock(&res->spinlock);
/* queue up work for dlm_mig_lockres_worker */ /* queue up work for dlm_mig_lockres_worker */
dlm_grab(dlm); /* get an extra ref for the work item */ dlm_grab(dlm); /* get an extra ref for the work item */
...@@ -1459,6 +1504,9 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) ...@@ -1459,6 +1504,9 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
"this node will take it.\n", "this node will take it.\n",
res->lockname.len, res->lockname.name); res->lockname.len, res->lockname.name);
} else { } else {
spin_lock(&res->spinlock);
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
mlog(0, "master needs to respond to sender " mlog(0, "master needs to respond to sender "
"that node %u still owns %.*s\n", "that node %u still owns %.*s\n",
real_master, res->lockname.len, real_master, res->lockname.len,
...@@ -1666,10 +1714,25 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, ...@@ -1666,10 +1714,25 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
int i, bad; int i, bad;
struct list_head *iter; struct list_head *iter;
struct dlm_lock *lock = NULL; struct dlm_lock *lock = NULL;
u8 from = O2NM_MAX_NODES;
unsigned int added = 0;
mlog(0, "running %d locks for this lockres\n", mres->num_locks); mlog(0, "running %d locks for this lockres\n", mres->num_locks);
for (i=0; i<mres->num_locks; i++) { for (i=0; i<mres->num_locks; i++) {
ml = &(mres->ml[i]); ml = &(mres->ml[i]);
if (dlm_is_dummy_lock(dlm, ml, &from)) {
/* placeholder, just need to set the refmap bit */
BUG_ON(mres->num_locks != 1);
mlog(0, "%s:%.*s: dummy lock for %u\n",
dlm->name, mres->lockname_len, mres->lockname,
from);
spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(from, res);
spin_unlock(&res->spinlock);
added++;
break;
}
BUG_ON(ml->highest_blocked != LKM_IVMODE); BUG_ON(ml->highest_blocked != LKM_IVMODE);
newlock = NULL; newlock = NULL;
lksb = NULL; lksb = NULL;
...@@ -1711,6 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, ...@@ -1711,6 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
/* do not alter lock refcount. switching lists. */ /* do not alter lock refcount. switching lists. */
list_move_tail(&lock->list, queue); list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
added++;
mlog(0, "just reordered a local lock!\n"); mlog(0, "just reordered a local lock!\n");
continue; continue;
...@@ -1817,12 +1881,24 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, ...@@ -1817,12 +1881,24 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
if (!bad) { if (!bad) {
dlm_lock_get(newlock); dlm_lock_get(newlock);
list_add_tail(&newlock->list, queue); list_add_tail(&newlock->list, queue);
mlog(0, "%s:%.*s: added lock for node %u, "
"setting refmap bit\n", dlm->name,
res->lockname.len, res->lockname.name, ml->node);
dlm_lockres_set_refmap_bit(ml->node, res);
added++;
} }
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
} }
mlog(0, "done running all the locks\n"); mlog(0, "done running all the locks\n");
leave: leave:
/* balance the ref taken when the work was queued */
if (added > 0) {
spin_lock(&res->spinlock);
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
}
if (ret < 0) { if (ret < 0) {
mlog_errno(ret); mlog_errno(ret);
if (newlock) if (newlock)
...@@ -1935,9 +2011,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, ...@@ -1935,9 +2011,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
if (res->owner == dead_node) { if (res->owner == dead_node) {
list_del_init(&res->recovering); list_del_init(&res->recovering);
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* new_master has our reference from
* the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master); dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING; res->state &= ~DLM_LOCK_RES_RECOVERING;
if (!__dlm_lockres_unused(res)) if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res); __dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
wake_up(&res->wq); wake_up(&res->wq);
...@@ -1977,9 +2055,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, ...@@ -1977,9 +2055,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
dlm_lockres_put(res); dlm_lockres_put(res);
} }
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* new_master has our reference from
* the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master); dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING; res->state &= ~DLM_LOCK_RES_RECOVERING;
if (!__dlm_lockres_unused(res)) if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res); __dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
wake_up(&res->wq); wake_up(&res->wq);
...@@ -2048,6 +2128,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, ...@@ -2048,6 +2128,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
{ {
struct list_head *iter, *tmpiter; struct list_head *iter, *tmpiter;
struct dlm_lock *lock; struct dlm_lock *lock;
unsigned int freed = 0;
/* this node is the lockres master: /* this node is the lockres master:
* 1) remove any stale locks for the dead node * 1) remove any stale locks for the dead node
...@@ -2062,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, ...@@ -2062,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) { if (lock->ml.node == dead_node) {
list_del_init(&lock->list); list_del_init(&lock->list);
dlm_lock_put(lock); dlm_lock_put(lock);
freed++;
} }
} }
list_for_each_safe(iter, tmpiter, &res->converting) { list_for_each_safe(iter, tmpiter, &res->converting) {
...@@ -2069,6 +2151,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, ...@@ -2069,6 +2151,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) { if (lock->ml.node == dead_node) {
list_del_init(&lock->list); list_del_init(&lock->list);
dlm_lock_put(lock); dlm_lock_put(lock);
freed++;
} }
} }
list_for_each_safe(iter, tmpiter, &res->blocked) { list_for_each_safe(iter, tmpiter, &res->blocked) {
...@@ -2076,9 +2159,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, ...@@ -2076,9 +2159,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) { if (lock->ml.node == dead_node) {
list_del_init(&lock->list); list_del_init(&lock->list);
dlm_lock_put(lock); dlm_lock_put(lock);
freed++;
} }
} }
if (freed) {
mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
"dropping ref from lockres\n", dlm->name,
res->lockname.len, res->lockname.name, freed, dead_node);
BUG_ON(!test_bit(dead_node, res->refmap));
dlm_lockres_clear_refmap_bit(dead_node, res);
} else if (test_bit(dead_node, res->refmap)) {
mlog(0, "%s:%.*s: dead node %u had a ref, but had "
"no locks and had not purged before dying\n", dlm->name,
res->lockname.len, res->lockname.name, dead_node);
dlm_lockres_clear_refmap_bit(dead_node, res);
}
/* do not kick thread yet */ /* do not kick thread yet */
__dlm_dirty_lockres(dlm, res); __dlm_dirty_lockres(dlm, res);
} }
...@@ -2141,9 +2238,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -2141,9 +2238,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* zero the lvb if necessary */ /* zero the lvb if necessary */
dlm_revalidate_lvb(dlm, res, dead_node); dlm_revalidate_lvb(dlm, res, dead_node);
if (res->owner == dead_node) if (res->owner == dead_node) {
if (res->state & DLM_LOCK_RES_DROPPING_REF)
mlog(0, "%s:%.*s: owned by "
"dead node %u, this node was "
"dropping its ref when it died. "
"continue, dropping the flag.\n",
dlm->name, res->lockname.len,
res->lockname.name, dead_node);
/* the wake_up for this will happen when the
* RECOVERING flag is dropped later */
res->state &= ~DLM_LOCK_RES_DROPPING_REF;
dlm_move_lockres_to_recovery_list(dlm, res); dlm_move_lockres_to_recovery_list(dlm, res);
else if (res->owner == dlm->node_num) { } else if (res->owner == dlm->node_num) {
dlm_free_dead_locks(dlm, res, dead_node); dlm_free_dead_locks(dlm, res, dead_node);
__dlm_lockres_calc_usage(dlm, res); __dlm_lockres_calc_usage(dlm, res);
} }
......
...@@ -54,9 +54,6 @@ ...@@ -54,9 +54,6 @@
#include "cluster/masklog.h" #include "cluster/masklog.h"
static int dlm_thread(void *data); static int dlm_thread(void *data);
static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
static void dlm_flush_asts(struct dlm_ctxt *dlm); static void dlm_flush_asts(struct dlm_ctxt *dlm);
#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
...@@ -82,14 +79,33 @@ void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) ...@@ -82,14 +79,33 @@ void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
current->state = TASK_RUNNING; current->state = TASK_RUNNING;
} }
int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
int __dlm_lockres_unused(struct dlm_lock_resource *res)
{ {
if (list_empty(&res->granted) && if (list_empty(&res->granted) &&
list_empty(&res->converting) && list_empty(&res->converting) &&
list_empty(&res->blocked) && list_empty(&res->blocked))
list_empty(&res->dirty)) return 0;
return 1; return 1;
}
/* "unused": the lockres has no locks, is not on the dirty list,
* has no inflight locks (in the gap between mastery and acquiring
* the first lock), and has no bits in its refmap.
* truly ready to be freed. */
int __dlm_lockres_unused(struct dlm_lock_resource *res)
{
if (!__dlm_lockres_has_locks(res) &&
list_empty(&res->dirty)) {
/* try not to scan the bitmap unless the first two
* conditions are already true */
int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
if (bit >= O2NM_MAX_NODES) {
/* since the bit for dlm->node_num is not
* set, inflight_locks better be zero */
BUG_ON(res->inflight_locks != 0);
return 1;
}
}
return 0; return 0;
} }
...@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, ...@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
assert_spin_locked(&res->spinlock); assert_spin_locked(&res->spinlock);
if (__dlm_lockres_unused(res)){ if (__dlm_lockres_unused(res)){
/* For now, just keep any resource we master */
if (res->owner == dlm->node_num)
{
if (!list_empty(&res->purge)) {
mlog(0, "we master %s:%.*s, but it is on "
"the purge list. Removing\n",
dlm->name, res->lockname.len,
res->lockname.name);
list_del_init(&res->purge);
dlm->purge_count--;
}
return;
}
if (list_empty(&res->purge)) { if (list_empty(&res->purge)) {
mlog(0, "putting lockres %.*s from purge list\n", mlog(0, "putting lockres %.*s:%p onto purge list\n",
res->lockname.len, res->lockname.name); res->lockname.len, res->lockname.name, res);
res->last_used = jiffies; res->last_used = jiffies;
dlm_lockres_get(res);
list_add_tail(&res->purge, &dlm->purge_list); list_add_tail(&res->purge, &dlm->purge_list);
dlm->purge_count++; dlm->purge_count++;
/* if this node is not the owner, there is
* no way to keep track of who the owner could be.
* unhash it to avoid serious problems. */
if (res->owner != dlm->node_num) {
mlog(0, "%s:%.*s: doing immediate "
"purge of lockres owned by %u\n",
dlm->name, res->lockname.len,
res->lockname.name, res->owner);
dlm_purge_lockres_now(dlm, res);
}
} }
} else if (!list_empty(&res->purge)) { } else if (!list_empty(&res->purge)) {
mlog(0, "removing lockres %.*s from purge list, " mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
"owner=%u\n", res->lockname.len, res->lockname.name, res->lockname.len, res->lockname.name, res, res->owner);
res->owner);
list_del_init(&res->purge); list_del_init(&res->purge);
dlm_lockres_put(res);
dlm->purge_count--; dlm->purge_count--;
} }
} }
...@@ -163,68 +154,60 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, ...@@ -163,68 +154,60 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
} }
/* TODO: Eventual API: Called with the dlm spinlock held, may drop it int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
* to do migration, but will re-acquire before exit. */
void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
{ {
int master; int master;
int ret; int ret = 0;
spin_lock(&lockres->spinlock);
master = lockres->owner == dlm->node_num;
spin_unlock(&lockres->spinlock);
mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
lockres->lockname.name, master);
/* Non master is the easy case -- no migration required, just spin_lock(&res->spinlock);
* quit. */ if (!__dlm_lockres_unused(res)) {
spin_unlock(&res->spinlock);
mlog(0, "%s:%.*s: tried to purge but not unused\n",
dlm->name, res->lockname.len, res->lockname.name);
return -ENOTEMPTY;
}
master = (res->owner == dlm->node_num);
if (!master) if (!master)
goto finish; res->state |= DLM_LOCK_RES_DROPPING_REF;
spin_unlock(&res->spinlock);
/* Wheee! Migrate lockres here! */
spin_unlock(&dlm->spinlock);
again:
ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
if (ret == -ENOTEMPTY) { res->lockname.name, master);
mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
lockres->lockname.len, lockres->lockname.name);
BUG(); if (!master) {
} else if (ret < 0) { /* drop spinlock to do messaging, retake below */
mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", spin_unlock(&dlm->spinlock);
lockres->lockname.len, lockres->lockname.name); /* clear our bit from the master's refmap, ignore errors */
msleep(100); ret = dlm_drop_lockres_ref(dlm, res);
goto again; if (ret < 0) {
mlog_errno(ret);
if (!dlm_is_host_down(ret))
BUG();
}
mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
dlm->name, res->lockname.len, res->lockname.name, ret);
spin_lock(&dlm->spinlock);
} }
spin_lock(&dlm->spinlock); if (!list_empty(&res->purge)) {
mlog(0, "removing lockres %.*s:%p from purgelist, "
finish: "master = %d\n", res->lockname.len, res->lockname.name,
if (!list_empty(&lockres->purge)) { res, master);
list_del_init(&lockres->purge); list_del_init(&res->purge);
dlm_lockres_put(res);
dlm->purge_count--; dlm->purge_count--;
} }
__dlm_unhash_lockres(lockres); __dlm_unhash_lockres(res);
}
/* make an unused lockres go away immediately.
* as soon as the dlm spinlock is dropped, this lockres
* will not be found. kfree still happens on last put. */
static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres)
{
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&lockres->spinlock);
BUG_ON(!__dlm_lockres_unused(lockres));
if (!list_empty(&lockres->purge)) { /* lockres is not in the hash now. drop the flag and wake up
list_del_init(&lockres->purge); * any processes waiting in dlm_get_lock_resource. */
dlm->purge_count--; if (!master) {
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_DROPPING_REF;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
} }
__dlm_unhash_lockres(lockres); return 0;
} }
static void dlm_run_purge_list(struct dlm_ctxt *dlm, static void dlm_run_purge_list(struct dlm_ctxt *dlm,
...@@ -268,13 +251,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm, ...@@ -268,13 +251,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
break; break;
} }
mlog(0, "removing lockres %.*s:%p from purgelist\n",
lockres->lockname.len, lockres->lockname.name, lockres);
list_del_init(&lockres->purge); list_del_init(&lockres->purge);
dlm_lockres_put(lockres);
dlm->purge_count--; dlm->purge_count--;
/* This may drop and reacquire the dlm spinlock if it /* This may drop and reacquire the dlm spinlock if it
* has to do migration. */ * has to do migration. */
mlog(0, "calling dlm_purge_lockres!\n"); mlog(0, "calling dlm_purge_lockres!\n");
dlm_purge_lockres(dlm, lockres); if (dlm_purge_lockres(dlm, lockres))
BUG();
mlog(0, "DONE calling dlm_purge_lockres!\n"); mlog(0, "DONE calling dlm_purge_lockres!\n");
/* Avoid adding any scheduling latencies */ /* Avoid adding any scheduling latencies */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment