Commit 932d2d1f authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.12' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:

 - Remove some unnecesary hold/unhold rsb refcounting in cases where an
   existing refcount is known to exist

 - Remove some unnecessary checking for zero nodeids, which should never
   exist, and add some warning if they do

 - Make the slow freeing of structs in release_lockspace() async, run
   from a workqueue

 - Prior rcu freeing allows some further struct lookups to run without a
   lock

 - Use blocking kernel_connect on sockets to avoid EINPROGRESS

* tag 'dlm-6.12' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: add missing -ENOMEM if alloc_workqueue() fails
  dlm: do synchronized socket connect call
  dlm: move lkb xarray lookup out of lock
  dlm: move dlm_search_rsb_tree() out of lock
  dlm: use RSB_HASHED to avoid lookup twice
  dlm: async freeing of lockspace resources
  dlm: drop kobject release callback handling
  dlm: warn about invalid nodeid comparsions
  dlm: never return invalid nodeid by dlm_our_nodeid()
  dlm: remove unnecessary refcounts
  dlm: cleanup memory allocation helpers
parents 8751b21a 652b0ae6
......@@ -928,7 +928,7 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
int dlm_our_nodeid(void)
{
return local_comm ? local_comm->nodeid : 0;
return local_comm->nodeid;
}
/* num 0 is first addr, num 1 is second addr */
......
......@@ -295,6 +295,7 @@ struct dlm_lkb {
void *lkb_astparam; /* caller's ast arg */
struct dlm_user_args *lkb_ua;
};
struct rcu_head rcu;
};
/*
......@@ -660,6 +661,8 @@ struct dlm_ls {
const struct dlm_lockspace_ops *ls_ops;
void *ls_ops_arg;
struct work_struct ls_free_work;
int ls_namelen;
char ls_name[DLM_LOCKSPACE_LEN + 1];
};
......@@ -803,6 +806,8 @@ static inline void dlm_set_sbflags_val(struct dlm_lkb *lkb, uint32_t val)
__DLM_SBF_MAX_BIT);
}
extern struct workqueue_struct *dlm_wq;
int dlm_plock_init(void);
void dlm_plock_exit(void);
......
......@@ -600,7 +600,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
{
struct dlm_rsb *r;
r = dlm_allocate_rsb(ls);
r = dlm_allocate_rsb();
if (!r)
return -ENOMEM;
......@@ -733,11 +733,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
}
......@@ -918,11 +920,13 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
int error;
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
/* check if the rsb is in active state under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
}
......@@ -1151,7 +1155,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
r->res_dir_nodeid = our_nodeid;
}
if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
/* Recovery uses this function to set a new master when
* the previous master failed. Setting NEW_MASTER will
* force dlm_recover_masters to call recover_master on this
......@@ -1276,43 +1280,45 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na
}
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto not_found;
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_inactive;
}
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
}
hold_rsb(r);
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r);
goto do_inactive;
}
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
flags, r_nodeid, result);
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
hold_rsb(r);
read_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r);
return 0;
} else {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
}
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
flags, r_nodeid, result);
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
return 0;
do_inactive:
/* unlikely path - relookup under write */
/* unlikely path - check if still part of ls_rsbtbl */
write_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
/* see comment in find_rsb_dir */
if (rsb_flag(r, RSB_HASHED)) {
if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock);
/* something as changed, very unlikely but
......@@ -1403,14 +1409,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
struct dlm_rsb *r = NULL;
int error;
read_lock_bh(&ls->ls_rsbtbl_lock);
rcu_read_lock();
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error)
goto out;
dlm_dump_rsb(r);
out:
read_unlock_bh(&ls->ls_rsbtbl_lock);
rcu_read_unlock();
}
static void deactivate_rsb(struct kref *kref)
......@@ -1442,18 +1448,6 @@ static void deactivate_rsb(struct kref *kref)
}
}
/* See comment for unhold_lkb */
static void unhold_rsb(struct dlm_rsb *r)
{
int rv;
/* inactive rsbs are not ref counted */
WARN_ON(rsb_flag(r, RSB_INACTIVE));
rv = kref_put(&r->res_ref, deactivate_rsb);
DLM_ASSERT(!rv, dlm_dump_rsb(r););
}
void free_inactive_rsb(struct dlm_rsb *r)
{
WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
......@@ -1497,7 +1491,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
limit.max = end;
limit.min = start;
lkb = dlm_allocate_lkb(ls);
lkb = dlm_allocate_lkb();
if (!lkb)
return -ENOMEM;
......@@ -1533,11 +1527,21 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
read_lock_bh(&ls->ls_lkbxa_lock);
rcu_read_lock();
lkb = xa_load(&ls->ls_lkbxa, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
read_unlock_bh(&ls->ls_lkbxa_lock);
if (lkb) {
/* check if lkb is still part of lkbxa under lkbxa_lock as
* the lkb_ref is tight to the lkbxa data structure, see
* __put_lkb().
*/
read_lock_bh(&ls->ls_lkbxa_lock);
if (kref_read(&lkb->lkb_ref))
kref_get(&lkb->lkb_ref);
else
lkb = NULL;
read_unlock_bh(&ls->ls_lkbxa_lock);
}
rcu_read_unlock();
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
......@@ -1675,10 +1679,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
{
hold_lkb(lkb);
del_lkb(r, lkb);
add_lkb(r, lkb, sts);
unhold_lkb(lkb);
}
static int msg_reply_type(int mstype)
......@@ -4323,16 +4325,27 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
write_lock_bh(&ls->ls_rsbtbl_lock);
rcu_read_lock();
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (rv) {
rcu_read_unlock();
/* should not happen */
log_error(ls, "%s from %d not found %s", __func__,
from_nodeid, name);
return;
}
write_lock_bh(&ls->ls_rsbtbl_lock);
if (!rsb_flag(r, RSB_HASHED)) {
rcu_read_unlock();
write_unlock_bh(&ls->ls_rsbtbl_lock);
/* should not happen */
log_error(ls, "%s from %d got removed during removal %s",
__func__, from_nodeid, name);
return;
}
/* at this stage the rsb can only being freed here */
rcu_read_unlock();
if (!rsb_flag(r, RSB_INACTIVE)) {
if (r->res_master_nodeid != from_nodeid) {
......@@ -5297,7 +5310,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
case DLM_MSG_LOOKUP:
case DLM_MSG_REQUEST:
_request_lock(r, lkb);
if (is_master(r))
if (r->res_nodeid != -1 && is_master(r))
confirm_master(r, 0);
break;
case DLM_MSG_CONVERT:
......@@ -5409,9 +5422,8 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
return;
list_for_each_entry(r, root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
if (is_master(r)) {
if (r->res_nodeid != -1 && is_master(r)) {
purge_dead_list(ls, r, &r->res_grantqueue,
nodeid_gone, &lkb_count);
purge_dead_list(ls, r, &r->res_convertqueue,
......@@ -5420,7 +5432,7 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
nodeid_gone, &lkb_count);
}
unlock_rsb(r);
unhold_rsb(r);
cond_resched();
}
......
......@@ -66,6 +66,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
static inline int is_master(struct dlm_rsb *r)
{
WARN_ON_ONCE(r->res_nodeid == -1);
return !r->res_nodeid;
}
......
......@@ -174,12 +174,6 @@ static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
return a->store ? a->store(ls, buf, len) : len;
}
static void lockspace_kobj_release(struct kobject *k)
{
struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
kfree(ls);
}
static const struct sysfs_ops dlm_attr_ops = {
.show = dlm_attr_show,
.store = dlm_attr_store,
......@@ -188,7 +182,6 @@ static const struct sysfs_ops dlm_attr_ops = {
static struct kobj_type dlm_ktype = {
.default_groups = dlm_groups,
.sysfs_ops = &dlm_attr_ops,
.release = lockspace_kobj_release,
};
static struct kset *dlm_kset;
......@@ -322,13 +315,50 @@ static int threads_start(void)
return error;
}
static int lkb_idr_free(struct dlm_lkb *lkb)
{
if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
return 0;
}
static void rhash_free_rsb(void *ptr, void *arg)
{
struct dlm_rsb *rsb = ptr;
dlm_free_rsb(rsb);
}
static void free_lockspace(struct work_struct *work)
{
struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
struct dlm_lkb *lkb;
unsigned long id;
/*
* Free all lkb's in xa
*/
xa_for_each(&ls->ls_lkbxa, id, lkb) {
lkb_idr_free(lkb);
}
xa_destroy(&ls->ls_lkbxa);
/*
* Free all rsb's on rsbtbl
*/
rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
kfree(ls);
}
static int new_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops, void *ops_arg,
int *ops_result, dlm_lockspace_t **lockspace)
{
struct dlm_ls *ls;
int do_unreg = 0;
int namelen = strlen(name);
int error;
......@@ -453,6 +483,8 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_cb_lock);
INIT_LIST_HEAD(&ls->ls_cb_delay);
INIT_WORK(&ls->ls_free_work, free_lockspace);
ls->ls_recoverd_task = NULL;
mutex_init(&ls->ls_recoverd_active);
spin_lock_init(&ls->ls_recover_lock);
......@@ -530,9 +562,6 @@ static int new_lockspace(const char *name, const char *cluster,
wait_event(ls->ls_recover_lock_wait,
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
/* let kobject handle freeing of ls if there's an error */
do_unreg = 1;
ls->ls_kobj.kset = dlm_kset;
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
"%s", ls->ls_name);
......@@ -580,10 +609,8 @@ static int new_lockspace(const char *name, const char *cluster,
xa_destroy(&ls->ls_lkbxa);
rhashtable_destroy(&ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
kobject_put(&ls->ls_kobj);
else
kfree(ls);
kobject_put(&ls->ls_kobj);
kfree(ls);
out:
module_put(THIS_MODULE);
return error;
......@@ -640,15 +667,6 @@ int dlm_new_user_lockspace(const char *name, const char *cluster,
ops_arg, ops_result, lockspace);
}
static int lkb_idr_free(struct dlm_lkb *lkb)
{
if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
dlm_free_lvb(lkb->lkb_lvbptr);
dlm_free_lkb(lkb);
return 0;
}
/* NOTE: We check the lkbxa here rather than the resource table.
This is because there may be LKBs queued as ASTs that have been unlinked
from their RSBs and are pending deletion once the AST has been delivered */
......@@ -680,17 +698,8 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
return rv;
}
static void rhash_free_rsb(void *ptr, void *arg)
{
struct dlm_rsb *rsb = ptr;
dlm_free_rsb(rsb);
}
static int release_lockspace(struct dlm_ls *ls, int force)
{
struct dlm_lkb *lkb;
unsigned long id;
int busy, rv;
busy = lockspace_busy(ls, force);
......@@ -743,22 +752,11 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_delete_debug_file(ls);
kobject_put(&ls->ls_kobj);
xa_destroy(&ls->ls_recover_xa);
kfree(ls->ls_recover_buf);
/*
* Free all lkb's in xa
*/
xa_for_each(&ls->ls_lkbxa, id, lkb) {
lkb_idr_free(lkb);
}
xa_destroy(&ls->ls_lkbxa);
/*
* Free all rsb's on rsbtbl
*/
rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
/*
* Free structures on any other lists
*/
......@@ -768,10 +766,11 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_clear_members(ls);
dlm_clear_members_gone(ls);
kfree(ls->ls_node_array);
log_rinfo(ls, "release_lockspace final free");
kobject_put(&ls->ls_kobj);
/* The ls structure will be freed when the kobject is done with */
log_rinfo(ls, "%s final free", __func__);
/* delayed free of data structures see free_lockspace() */
queue_work(dlm_wq, &ls->ls_free_work);
module_put(THIS_MODULE);
return 0;
}
......
......@@ -161,8 +161,6 @@ struct dlm_proto_ops {
const char *name;
int proto;
int (*connect)(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len);
void (*sockopts)(struct socket *sock);
int (*bind)(struct socket *sock);
int (*listen_validate)(void);
......@@ -1599,8 +1597,7 @@ static int dlm_connect(struct connection *con)
log_print_ratelimited("connecting to %d", con->nodeid);
make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
addr_len);
result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0);
switch (result) {
case -EINPROGRESS:
/* not an error */
......@@ -1634,13 +1631,6 @@ static void process_send_sockets(struct work_struct *work)
switch (ret) {
case 0:
break;
case -EINPROGRESS:
/* avoid spamming resched on connection
* we might can switch to a state_change
* event based mechanism if established
*/
msleep(100);
break;
default:
/* CF_SEND_PENDING not cleared */
up_write(&con->sock_lock);
......@@ -1831,12 +1821,6 @@ static int dlm_tcp_bind(struct socket *sock)
return 0;
}
static int dlm_tcp_connect(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len)
{
return kernel_connect(sock, addr, addr_len, O_NONBLOCK);
}
static int dlm_tcp_listen_validate(void)
{
/* We don't support multi-homed hosts */
......@@ -1873,7 +1857,6 @@ static int dlm_tcp_listen_bind(struct socket *sock)
static const struct dlm_proto_ops dlm_tcp_ops = {
.name = "TCP",
.proto = IPPROTO_TCP,
.connect = dlm_tcp_connect,
.sockopts = dlm_tcp_sockopts,
.bind = dlm_tcp_bind,
.listen_validate = dlm_tcp_listen_validate,
......@@ -1886,22 +1869,6 @@ static int dlm_sctp_bind(struct socket *sock)
return sctp_bind_addrs(sock, 0);
}
static int dlm_sctp_connect(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len)
{
int ret;
/*
* Make kernel_connect() function return in specified time,
* since O_NONBLOCK argument in connect() function does not work here,
* then, we should restore the default value of this attribute.
*/
sock_set_sndtimeo(sock->sk, 5);
ret = kernel_connect(sock, addr, addr_len, 0);
sock_set_sndtimeo(sock->sk, 0);
return ret;
}
static int dlm_sctp_listen_validate(void)
{
if (!IS_ENABLED(CONFIG_IP_SCTP)) {
......@@ -1929,7 +1896,6 @@ static const struct dlm_proto_ops dlm_sctp_ops = {
.name = "SCTP",
.proto = IPPROTO_SCTP,
.try_new_addr = true,
.connect = dlm_sctp_connect,
.sockopts = dlm_sctp_sockopts,
.bind = dlm_sctp_bind,
.listen_validate = dlm_sctp_listen_validate,
......
......@@ -22,6 +22,8 @@
#define CREATE_TRACE_POINTS
#include <trace/events/dlm.h>
struct workqueue_struct *dlm_wq;
static int __init init_dlm(void)
{
int error;
......@@ -50,10 +52,18 @@ static int __init init_dlm(void)
if (error)
goto out_user;
dlm_wq = alloc_workqueue("dlm_wq", 0, 0);
if (!dlm_wq) {
error = -ENOMEM;
goto out_plock;
}
printk("DLM installed\n");
return 0;
out_plock:
dlm_plock_exit();
out_user:
dlm_user_exit();
out_debug:
......@@ -70,6 +80,8 @@ static int __init init_dlm(void)
static void __exit exit_dlm(void)
{
/* be sure every pending work e.g. freeing is done */
destroy_workqueue(dlm_wq);
dlm_plock_exit();
dlm_user_exit();
dlm_config_exit();
......
......@@ -366,6 +366,8 @@ int dlm_is_member(struct dlm_ls *ls, int nodeid)
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
{
WARN_ON_ONCE(!nodeid || nodeid == -1);
if (find_memb(&ls->ls_nodes_gone, nodeid))
return 1;
return 0;
......
......@@ -84,10 +84,7 @@ void dlm_memory_exit(void)
char *dlm_allocate_lvb(struct dlm_ls *ls)
{
char *p;
p = kzalloc(ls->ls_lvblen, GFP_ATOMIC);
return p;
return kzalloc(ls->ls_lvblen, GFP_ATOMIC);
}
void dlm_free_lvb(char *p)
......@@ -95,12 +92,9 @@ void dlm_free_lvb(char *p)
kfree(p);
}
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
struct dlm_rsb *dlm_allocate_rsb(void)
{
struct dlm_rsb *r;
r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
return r;
return kmem_cache_zalloc(rsb_cache, GFP_ATOMIC);
}
static void __free_rsb_rcu(struct rcu_head *rcu)
......@@ -116,16 +110,15 @@ void dlm_free_rsb(struct dlm_rsb *r)
call_rcu(&r->rcu, __free_rsb_rcu);
}
struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
struct dlm_lkb *dlm_allocate_lkb(void)
{
struct dlm_lkb *lkb;
lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
return lkb;
return kmem_cache_zalloc(lkb_cache, GFP_ATOMIC);
}
void dlm_free_lkb(struct dlm_lkb *lkb)
static void __free_lkb_rcu(struct rcu_head *rcu)
{
struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu);
if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
struct dlm_user_args *ua;
ua = lkb->lkb_ua;
......@@ -138,6 +131,11 @@ void dlm_free_lkb(struct dlm_lkb *lkb)
kmem_cache_free(lkb_cache, lkb);
}
void dlm_free_lkb(struct dlm_lkb *lkb)
{
call_rcu(&lkb->rcu, __free_lkb_rcu);
}
struct dlm_mhandle *dlm_allocate_mhandle(void)
{
return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC);
......
......@@ -14,9 +14,9 @@
int dlm_memory_init(void);
void dlm_memory_exit(void);
struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls);
struct dlm_rsb *dlm_allocate_rsb(void);
void dlm_free_rsb(struct dlm_rsb *r);
struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
struct dlm_lkb *dlm_allocate_lkb(void);
void dlm_free_lkb(struct dlm_lkb *l);
char *dlm_allocate_lvb(struct dlm_ls *ls);
void dlm_free_lvb(char *l);
......
......@@ -452,10 +452,11 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
int is_removed = 0;
int error;
if (is_master(r))
if (r->res_nodeid != -1 && is_master(r))
return 0;
is_removed = dlm_is_removed(ls, r->res_nodeid);
if (r->res_nodeid != -1)
is_removed = dlm_is_removed(ls, r->res_nodeid);
if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
return 0;
......@@ -664,7 +665,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
int error, count = 0;
list_for_each_entry(r, root_list, res_root_list) {
if (is_master(r)) {
if (r->res_nodeid != -1 && is_master(r)) {
rsb_clear_flag(r, RSB_NEW_MASTER);
continue;
}
......@@ -858,7 +859,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
list_for_each_entry(r, root_list, res_root_list) {
lock_rsb(r);
if (is_master(r)) {
if (r->res_nodeid != -1 && is_master(r)) {
if (rsb_flag(r, RSB_RECOVER_CONVERT))
recover_conversion(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment