Commit 5986a2ec authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2

* 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2: (22 commits)
  configfs: Zero terminate data in configfs attribute writes.
  [PATCH] ocfs2 heartbeat: clean up bio submission code
  ocfs2: introduce sc->sc_send_lock to protect outbound outbound messages
  [PATCH] ocfs2: drop INET from Kconfig, not needed
  ocfs2_dlm: Add timeout to dlm join domain
  ocfs2_dlm: Silence some messages during join domain
  ocfs2_dlm: disallow a domain join if node maps mismatch
  ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres
  ocfs2: Binds listener to the configured ip address
  ocfs2_dlm: Calling post handler function in assert master handler
  ocfs2: Added post handler callable function in o2net message handler
  ocfs2_dlm: Cookies in locks not being printed correctly in error messages
  ocfs2_dlm: Silence a failed convert
  ocfs2_dlm: wake up sleepers on the lockres waitqueue
  ocfs2_dlm: Dlm dispatch was stopping too early
  ocfs2_dlm: Drop inflight refmap even if no locks found on the lockres
  ocfs2_dlm: Flush dlm workqueue before starting to migrate
  ocfs2_dlm: Fix migrate lockres handler queue scanning
  ocfs2_dlm: Make dlmunlock() wait for migration to complete
  ocfs2_dlm: Fixes race between migrate and dirty
  ...
parents 43187902 ff05d1c4
......@@ -426,7 +426,6 @@ config OCFS2_FS
select CONFIGFS_FS
select JBD
select CRC32
select INET
help
OCFS2 is a general purpose extent based shared disk cluster file
system with many similarities to ext3. It supports 64 bit inode
......
......@@ -162,14 +162,17 @@ fill_write_buffer(struct configfs_buffer * buffer, const char __user * buf, size
int error;
if (!buffer->page)
buffer->page = (char *)get_zeroed_page(GFP_KERNEL);
buffer->page = (char *)__get_free_pages(GFP_KERNEL, 0);
if (!buffer->page)
return -ENOMEM;
if (count > PAGE_SIZE)
count = PAGE_SIZE;
if (count >= PAGE_SIZE)
count = PAGE_SIZE - 1;
error = copy_from_user(buffer->page,buf,count);
buffer->needs_read_fill = 1;
/* if buf is assumed to contain a string, terminate it by \0,
* so e.g. sscanf() can scan the string easily */
buffer->page[count] = 0;
return error ? -EFAULT : count;
}
......
......@@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
flush_scheduled_work();
}
static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
unsigned int num_ios)
static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
{
atomic_set(&wc->wc_num_reqs, num_ios);
atomic_set(&wc->wc_num_reqs, 1);
init_completion(&wc->wc_io_complete);
wc->wc_error = 0;
}
......@@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg,
struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
blk_run_address_space(mapping);
o2hb_bio_wait_dec(wc, 1);
wait_for_completion(&wc->wc_io_complete);
}
......@@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio,
return 1;
o2hb_bio_wait_dec(wc, 1);
bio_put(bio);
return 0;
}
......@@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio,
* start_slot. */
static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
struct o2hb_bio_wait_ctxt *wc,
unsigned int start_slot,
unsigned int num_slots)
unsigned int *current_slot,
unsigned int max_slots)
{
int i, nr_vecs, len, first_page, last_page;
int len, current_page;
unsigned int vec_len, vec_start;
unsigned int bits = reg->hr_block_bits;
unsigned int spp = reg->hr_slots_per_page;
unsigned int cs = *current_slot;
struct bio *bio;
struct page *page;
nr_vecs = (num_slots + spp - 1) / spp;
/* Testing has shown this allocation to take long enough under
* GFP_KERNEL that the local node can get fenced. It would be
* nicest if we could pre-allocate these bios and avoid this
* all together. */
bio = bio_alloc(GFP_ATOMIC, nr_vecs);
bio = bio_alloc(GFP_ATOMIC, 16);
if (!bio) {
mlog(ML_ERROR, "Could not alloc slots BIO!\n");
bio = ERR_PTR(-ENOMEM);
......@@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
}
/* Must put everything in 512 byte sectors for the bio... */
bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
bio->bi_bdev = reg->hr_bdev;
bio->bi_private = wc;
bio->bi_end_io = o2hb_bio_end_io;
first_page = start_slot / spp;
last_page = first_page + nr_vecs;
vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
for(i = first_page; i < last_page; i++) {
page = reg->hr_slot_data[i];
vec_start = (cs << bits) % PAGE_CACHE_SIZE;
while(cs < max_slots) {
current_page = cs / spp;
page = reg->hr_slot_data[current_page];
vec_len = PAGE_CACHE_SIZE;
/* last page might be short */
if (((i + 1) * spp) > (start_slot + num_slots))
vec_len = ((num_slots + start_slot) % spp) << bits;
vec_len -= vec_start;
vec_len = min(PAGE_CACHE_SIZE,
(max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
i, vec_len, vec_start);
current_page, vec_len, vec_start);
len = bio_add_page(bio, page, vec_len, vec_start);
if (len != vec_len) {
bio_put(bio);
bio = ERR_PTR(-EIO);
mlog(ML_ERROR, "Error adding page to bio i = %d, "
"vec_len = %u, len = %d\n, start = %u\n",
i, vec_len, len, vec_start);
goto bail;
}
if (len != vec_len) break;
cs += vec_len / (PAGE_CACHE_SIZE/spp);
vec_start = 0;
}
bail:
*current_slot = cs;
return bio;
}
/*
* Compute the maximum number of sectors the bdev can handle in one bio,
* as a power of two.
*
* Stolen from oracleasm, thanks Joel!
*/
static int compute_max_sectors(struct block_device *bdev)
{
int max_pages, max_sectors, pow_two_sectors;
struct request_queue *q;
q = bdev_get_queue(bdev);
max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
if (max_pages > BIO_MAX_PAGES)
max_pages = BIO_MAX_PAGES;
if (max_pages > q->max_phys_segments)
max_pages = q->max_phys_segments;
if (max_pages > q->max_hw_segments)
max_pages = q->max_hw_segments;
max_pages--; /* Handle I/Os that straddle a page */
if (max_pages) {
max_sectors = max_pages << (PAGE_SHIFT - 9);
} else {
/* If BIO contains 1 or less than 1 page. */
max_sectors = q->max_sectors;
}
/* Why is fls() 1-based???? */
pow_two_sectors = 1 << (fls(max_sectors) - 1);
return pow_two_sectors;
}
static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
unsigned int num_slots,
unsigned int *num_bios,
unsigned int *slots_per_bio)
{
unsigned int max_sectors, io_sectors;
max_sectors = compute_max_sectors(reg->hr_bdev);
io_sectors = num_slots << (reg->hr_block_bits - 9);
*num_bios = (io_sectors + max_sectors - 1) / max_sectors;
*slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
"device can handle %u sectors of I/O\n", io_sectors, num_slots,
max_sectors);
mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
*num_bios, *slots_per_bio);
}
static int o2hb_read_slots(struct o2hb_region *reg,
unsigned int max_slots)
{
unsigned int num_bios, slots_per_bio, start_slot, num_slots;
int i, status;
unsigned int current_slot=0;
int status;
struct o2hb_bio_wait_ctxt wc;
struct bio **bios;
struct bio *bio;
o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
o2hb_bio_wait_init(&wc);
bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
if (!bios) {
status = -ENOMEM;
mlog_errno(status);
return status;
}
o2hb_bio_wait_init(&wc, num_bios);
num_slots = slots_per_bio;
for(i = 0; i < num_bios; i++) {
start_slot = i * slots_per_bio;
/* adjust num_slots at last bio */
if (max_slots < (start_slot + num_slots))
num_slots = max_slots - start_slot;
bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
while(current_slot < max_slots) {
bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
if (IS_ERR(bio)) {
o2hb_bio_wait_dec(&wc, num_bios - i);
status = PTR_ERR(bio);
mlog_errno(status);
goto bail_and_wait;
}
bios[i] = bio;
atomic_inc(&wc.wc_num_reqs);
submit_bio(READ, bio);
}
......@@ -403,38 +319,30 @@ static int o2hb_read_slots(struct o2hb_region *reg,
if (wc.wc_error && !status)
status = wc.wc_error;
if (bios) {
for(i = 0; i < num_bios; i++)
if (bios[i])
bio_put(bios[i]);
kfree(bios);
}
return status;
}
static int o2hb_issue_node_write(struct o2hb_region *reg,
struct bio **write_bio,
struct o2hb_bio_wait_ctxt *write_wc)
{
int status;
unsigned int slot;
struct bio *bio;
o2hb_bio_wait_init(write_wc, 1);
o2hb_bio_wait_init(write_wc);
slot = o2nm_this_node();
bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
if (IS_ERR(bio)) {
status = PTR_ERR(bio);
mlog_errno(status);
goto bail;
}
atomic_inc(&write_wc->wc_num_reqs);
submit_bio(WRITE, bio);
*write_bio = bio;
status = 0;
bail:
return status;
......@@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
{
int i, ret, highest_node, change = 0;
unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
struct bio *write_bio;
struct o2hb_bio_wait_ctxt write_wc;
ret = o2nm_configured_node_map(configured_nodes,
......@@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
/* And fire off the write. Note that we don't wait on this I/O
* until later. */
ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
ret = o2hb_issue_node_write(reg, &write_wc);
if (ret < 0) {
mlog_errno(ret);
return ret;
......@@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
* people we find in our steady state have seen us.
*/
o2hb_wait_on_io(reg, &write_wc);
bio_put(write_bio);
if (write_wc.wc_error) {
/* Do not re-arm the write timeout on I/O error - we
* can't be sure that the new block ever made it to
......@@ -943,7 +849,6 @@ static int o2hb_thread(void *data)
{
int i, ret;
struct o2hb_region *reg = data;
struct bio *write_bio;
struct o2hb_bio_wait_ctxt write_wc;
struct timeval before_hb, after_hb;
unsigned int elapsed_msec;
......@@ -993,10 +898,9 @@ static int o2hb_thread(void *data)
*
* XXX: Should we skip this on unclean_stop? */
o2hb_prepare_block(reg, 0);
ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
ret = o2hb_issue_node_write(reg, &write_wc);
if (ret == 0) {
o2hb_wait_on_io(reg, &write_wc);
bio_put(write_bio);
} else {
mlog_errno(ret);
}
......
......@@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk,
sk->sk_data_ready = o2net_data_ready;
sk->sk_state_change = o2net_state_change;
mutex_init(&sc->sc_send_lock);
write_unlock_bh(&sk->sk_callback_lock);
}
......@@ -688,6 +690,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh)
* be given to the handler if their payload is longer than the max. */
int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
o2net_msg_handler_func *func, void *data,
o2net_post_msg_handler_func *post_func,
struct list_head *unreg_list)
{
struct o2net_msg_handler *nmh = NULL;
......@@ -722,6 +725,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
nmh->nh_func = func;
nmh->nh_func_data = data;
nmh->nh_post_func = post_func;
nmh->nh_msg_type = msg_type;
nmh->nh_max_len = max_len;
nmh->nh_key = key;
......@@ -856,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc,
ssize_t ret;
mutex_lock(&sc->sc_send_lock);
ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
virt_to_page(kmalloced_virt),
(long)kmalloced_virt & ~PAGE_MASK,
size, MSG_DONTWAIT);
mutex_unlock(&sc->sc_send_lock);
if (ret != size) {
mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
" failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
......@@ -974,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
/* finally, convert the message header to network byte-order
* and send */
mutex_lock(&sc->sc_send_lock);
ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
sizeof(struct o2net_msg) + caller_bytes);
mutex_unlock(&sc->sc_send_lock);
msglog(msg, "sending returned %d\n", ret);
if (ret < 0) {
mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
......@@ -1049,6 +1057,7 @@ static int o2net_process_message(struct o2net_sock_container *sc,
int ret = 0, handler_status;
enum o2net_system_error syserr;
struct o2net_msg_handler *nmh = NULL;
void *ret_data = NULL;
msglog(hdr, "processing message\n");
......@@ -1101,17 +1110,26 @@ static int o2net_process_message(struct o2net_sock_container *sc,
sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
be16_to_cpu(hdr->data_len),
nmh->nh_func_data);
nmh->nh_func_data, &ret_data);
do_gettimeofday(&sc->sc_tv_func_stop);
out_respond:
/* this destroys the hdr, so don't use it after this */
mutex_lock(&sc->sc_send_lock);
ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
handler_status);
mutex_unlock(&sc->sc_send_lock);
hdr = NULL;
mlog(0, "sending handler status %d, syserr %d returned %d\n",
handler_status, syserr, ret);
if (nmh) {
BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
if (nmh->nh_post_func)
(nmh->nh_post_func)(handler_status, nmh->nh_func_data,
ret_data);
}
out:
if (nmh)
o2net_handler_put(nmh);
......@@ -1795,13 +1813,13 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes)
ready(sk, bytes);
}
static int o2net_open_listening_sock(__be16 port)
static int o2net_open_listening_sock(__be32 addr, __be16 port)
{
struct socket *sock = NULL;
int ret;
struct sockaddr_in sin = {
.sin_family = PF_INET,
.sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },
.sin_addr = { .s_addr = (__force u32)addr },
.sin_port = (__force u16)port,
};
......@@ -1824,15 +1842,15 @@ static int o2net_open_listening_sock(__be16 port)
sock->sk->sk_reuse = 1;
ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
if (ret < 0) {
mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",
ntohs(port), ret);
mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, "
"ret=%d\n", NIPQUAD(addr), ntohs(port), ret);
goto out;
}
ret = sock->ops->listen(sock, 64);
if (ret < 0) {
mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",
ntohs(port), ret);
mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n",
NIPQUAD(addr), ntohs(port), ret);
}
out:
......@@ -1865,7 +1883,8 @@ int o2net_start_listening(struct o2nm_node *node)
return -ENOMEM; /* ? */
}
ret = o2net_open_listening_sock(node->nd_ipv4_port);
ret = o2net_open_listening_sock(node->nd_ipv4_address,
node->nd_ipv4_port);
if (ret) {
destroy_workqueue(o2net_wq);
o2net_wq = NULL;
......
......@@ -50,7 +50,10 @@ struct o2net_msg
__u8 buf[0];
};
typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data);
typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
typedef void (o2net_post_msg_handler_func)(int status, void *data,
void *ret_data);
#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
......@@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
o2net_msg_handler_func *func, void *data,
o2net_post_msg_handler_func *post_func,
struct list_head *unreg_list);
void o2net_unregister_handler_list(struct list_head *list);
......
......@@ -38,6 +38,12 @@
* locking semantics of the file system using the protocol. It should
* be somewhere else, I'm sure, but right now it isn't.
*
* New in version 7:
* - DLM join domain includes the live nodemap
*
* New in version 6:
* - DLM lockres remote refcount fixes.
*
* New in version 5:
* - Network timeout checking protocol
*
......@@ -51,7 +57,7 @@
* - full 64 bit i_size in the metadata lock lvbs
* - introduction of "rw" lock and pushing meta/data locking down
*/
#define O2NET_PROTOCOL_VERSION 5ULL
#define O2NET_PROTOCOL_VERSION 7ULL
struct o2net_handshake {
__be64 protocol_version;
__be64 connector_id;
......@@ -149,6 +155,8 @@ struct o2net_sock_container {
struct timeval sc_tv_func_stop;
u32 sc_msg_key;
u16 sc_msg_type;
struct mutex sc_send_lock;
};
struct o2net_msg_handler {
......@@ -158,6 +166,8 @@ struct o2net_msg_handler {
u32 nh_key;
o2net_msg_handler_func *nh_func;
o2net_msg_handler_func *nh_func_data;
o2net_post_msg_handler_func
*nh_post_func;
struct kref nh_kref;
struct list_head nh_unregister_item;
};
......
......@@ -263,7 +263,8 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
int ret;
unsigned int locklen;
......@@ -311,8 +312,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
past->type != DLM_BAST) {
mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu"
"name=%.*s\n", past->type,
dlm_get_lock_cookie_node(cookie),
dlm_get_lock_cookie_seq(cookie),
dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name);
ret = DLM_IVLOCKID;
goto leave;
......@@ -323,8 +324,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "got %sast for unknown lockres! "
"cookie=%u:%llu, name=%.*s, namelen=%u\n",
past->type == DLM_AST ? "" : "b",
dlm_get_lock_cookie_node(cookie),
dlm_get_lock_cookie_seq(cookie),
dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name, locklen);
ret = DLM_IVLOCKID;
goto leave;
......@@ -369,7 +370,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "got %sast for unknown lock! cookie=%u:%llu, "
"name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b",
dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie),
dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name, locklen);
ret = DLM_NORMAL;
......
......@@ -180,6 +180,11 @@ struct dlm_assert_master_priv
unsigned ignore_higher:1;
};
struct dlm_deref_lockres_priv
{
struct dlm_lock_resource *deref_res;
u8 deref_node;
};
struct dlm_work_item
{
......@@ -191,6 +196,7 @@ struct dlm_work_item
struct dlm_request_all_locks_priv ral;
struct dlm_mig_lockres_priv ml;
struct dlm_assert_master_priv am;
struct dlm_deref_lockres_priv dl;
} u;
};
......@@ -222,6 +228,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
#define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
#define DLM_LOCK_RES_MIGRATING 0x00000020
#define DLM_LOCK_RES_DROPPING_REF 0x00000040
#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000
#define DLM_LOCK_RES_SETREF_INPROG 0x00002000
/* max milliseconds to wait to sync up a network failure with a node death */
#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
......@@ -265,6 +274,8 @@ struct dlm_lock_resource
u8 owner; //node which owns the lock resource, or unknown
u16 state;
char lvb[DLM_LVB_LEN];
unsigned int inflight_locks;
unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
};
struct dlm_migratable_lock
......@@ -367,7 +378,7 @@ enum {
DLM_CONVERT_LOCK_MSG, /* 504 */
DLM_PROXY_AST_MSG, /* 505 */
DLM_UNLOCK_LOCK_MSG, /* 506 */
DLM_UNUSED_MSG2, /* 507 */
DLM_DEREF_LOCKRES_MSG, /* 507 */
DLM_MIGRATE_REQUEST_MSG, /* 508 */
DLM_MIG_LOCKRES_MSG, /* 509 */
DLM_QUERY_JOIN_MSG, /* 510 */
......@@ -417,6 +428,9 @@ struct dlm_master_request
u8 name[O2NM_MAX_NAME_LEN];
};
#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
#define DLM_ASSERT_MASTER_REQUERY 0x00000002
#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
......@@ -430,6 +444,8 @@ struct dlm_assert_master
u8 name[O2NM_MAX_NAME_LEN];
};
#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
struct dlm_migrate_request
{
u8 master;
......@@ -609,12 +625,16 @@ struct dlm_begin_reco
};
#define BITS_PER_BYTE 8
#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE)
struct dlm_query_join_request
{
u8 node_idx;
u8 pad1[2];
u8 name_len;
u8 domain[O2NM_MAX_NAME_LEN];
u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
};
struct dlm_assert_joined
......@@ -648,6 +668,16 @@ struct dlm_finalize_reco
__be32 pad2;
};
struct dlm_deref_lockres
{
u32 pad1;
u16 pad2;
u8 node_idx;
u8 namelen;
u8 name[O2NM_MAX_NAME_LEN];
};
static inline enum dlm_status
__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
{
......@@ -688,16 +718,20 @@ void dlm_lock_put(struct dlm_lock *lock);
void dlm_lock_attach_lockres(struct dlm_lock *lock,
struct dlm_lock_resource *res);
int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
void dlm_revert_pending_convert(struct dlm_lock_resource *res,
struct dlm_lock *lock);
void dlm_revert_pending_lock(struct dlm_lock_resource *res,
struct dlm_lock *lock);
int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
struct dlm_lock *lock);
void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
......@@ -721,8 +755,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
static inline void dlm_lockres_get(struct dlm_lock_resource *res)
{
/* This is called on every lookup, so it might be worth
......@@ -733,6 +765,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash);
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
......@@ -753,6 +789,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int namelen);
#define dlm_lockres_set_refmap_bit(bit,res) \
__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
#define dlm_lockres_clear_refmap_bit(bit,res) \
__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
static inline void __dlm_lockres_set_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: setting bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
set_bit(bit, res->refmap);
}
static inline void __dlm_lockres_clear_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
clear_bit(bit, res->refmap);
}
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
const char *file,
int line);
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int new_lockres,
const char *file,
int line);
#define dlm_lockres_drop_inflight_ref(d,r) \
__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref_new(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_do_local_ast(struct dlm_ctxt *dlm,
......@@ -801,10 +878,7 @@ int dlm_heartbeat_init(struct dlm_ctxt *dlm);
void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_migrate_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 target);
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_finish_migration(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 old_master);
......@@ -812,15 +886,27 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
void dlm_assert_master_post_handler(int status, void *data, void *ret_data);
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
......@@ -856,10 +942,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
int dlm_init_mle_cache(void);
void dlm_destroy_mle_cache(void);
void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_clean_master_list(struct dlm_ctxt *dlm,
u8 dead_node);
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
int __dlm_lockres_unused(struct dlm_lock_resource *res);
static inline const char * dlm_lock_mode_name(int mode)
......
......@@ -286,8 +286,8 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
__dlm_print_one_lock_resource(res);
mlog(ML_ERROR, "converting a remote lock that is already "
"converting! (cookie=%u:%llu, conv=%d)\n",
dlm_get_lock_cookie_node(lock->ml.cookie),
dlm_get_lock_cookie_seq(lock->ml.cookie),
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
lock->ml.convert_type);
status = DLM_DENIED;
goto bail;
......@@ -418,7 +418,8 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
* returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
* status from __dlmconvert_master
*/
int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
......@@ -428,7 +429,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
struct dlm_lockstatus *lksb;
enum dlm_status status = DLM_NORMAL;
u32 flags;
int call_ast = 0, kick_thread = 0, ast_reserved = 0;
int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
if (!dlm_grab(dlm)) {
dlm_error(DLM_REJECTED);
......@@ -479,25 +480,14 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
}
lock = NULL;
}
if (!lock) {
__dlm_print_one_lock_resource(res);
list_for_each(iter, &res->granted) {
lock = list_entry(iter, struct dlm_lock, list);
if (lock->ml.node == cnv->node_idx) {
mlog(ML_ERROR, "There is something here "
"for node %u, lock->ml.cookie=%llu, "
"cnv->cookie=%llu\n", cnv->node_idx,
(unsigned long long)lock->ml.cookie,
(unsigned long long)cnv->cookie);
break;
}
}
lock = NULL;
}
spin_unlock(&res->spinlock);
if (!lock) {
status = DLM_IVLOCKID;
dlm_error(status);
mlog(ML_ERROR, "did not find lock to convert on grant queue! "
"cookie=%u:%llu\n",
dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
__dlm_print_one_lock_resource(res);
goto leave;
}
......@@ -524,8 +514,11 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
cnv->requested_type,
&call_ast, &kick_thread);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
wake = 1;
}
spin_unlock(&res->spinlock);
if (wake)
wake_up(&res->wq);
if (status != DLM_NORMAL) {
if (status != DLM_NOTQUEUED)
......@@ -534,12 +527,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
}
leave:
if (!lock)
mlog(ML_ERROR, "did not find lock to convert on grant queue! "
"cookie=%u:%llu\n",
dlm_get_lock_cookie_node(cnv->cookie),
dlm_get_lock_cookie_seq(cnv->cookie));
else
if (lock)
dlm_lock_put(lock);
/* either queue the ast or release it, if reserved */
......
......@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
spin_unlock(&res->spinlock);
}
static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
{
int bit;
assert_spin_locked(&res->spinlock);
mlog(ML_NOTICE, " refmap nodes: [ ");
bit = 0;
while (1) {
bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
if (bit >= O2NM_MAX_NODES)
break;
printk("%u ", bit);
bit++;
}
printk("], inflight=%u\n", res->inflight_locks);
}
void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
{
struct list_head *iter2;
......@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
res->owner, res->state);
mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
res->last_used, list_empty(&res->purge) ? "no" : "yes");
dlm_print_lockres_refmap(res);
mlog(ML_NOTICE, " granted queue: \n");
list_for_each(iter2, &res->granted) {
lock = list_entry(iter2, struct dlm_lock, list);
......@@ -72,8 +90,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
"cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
lock->ml.type, lock->ml.convert_type, lock->ml.node,
dlm_get_lock_cookie_node(lock->ml.cookie),
dlm_get_lock_cookie_seq(lock->ml.cookie),
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
list_empty(&lock->ast_list) ? 'y' : 'n',
lock->ast_pending ? 'y' : 'n',
list_empty(&lock->bast_list) ? 'y' : 'n',
......@@ -87,8 +105,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
"cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
lock->ml.type, lock->ml.convert_type, lock->ml.node,
dlm_get_lock_cookie_node(lock->ml.cookie),
dlm_get_lock_cookie_seq(lock->ml.cookie),
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
list_empty(&lock->ast_list) ? 'y' : 'n',
lock->ast_pending ? 'y' : 'n',
list_empty(&lock->bast_list) ? 'y' : 'n',
......@@ -102,8 +120,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
"cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
lock->ml.type, lock->ml.convert_type, lock->ml.node,
dlm_get_lock_cookie_node(lock->ml.cookie),
dlm_get_lock_cookie_seq(lock->ml.cookie),
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
list_empty(&lock->ast_list) ? 'y' : 'n',
lock->ast_pending ? 'y' : 'n',
list_empty(&lock->bast_list) ? 'y' : 'n',
......
......@@ -48,6 +48,36 @@
#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
#include "cluster/masklog.h"
/*
* ocfs2 node maps are array of long int, which limits to send them freely
* across the wire due to endianness issues. To workaround this, we convert
* long ints to byte arrays. Following 3 routines are helper functions to
* set/test/copy bits within those array of bytes
*/
static inline void byte_set_bit(u8 nr, u8 map[])
{
map[nr >> 3] |= (1UL << (nr & 7));
}
static inline int byte_test_bit(u8 nr, u8 map[])
{
return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
}
static inline void byte_copymap(u8 dmap[], unsigned long smap[],
unsigned int sz)
{
unsigned int nn;
if (!sz)
return;
memset(dmap, 0, ((sz + 7) >> 3));
for (nn = 0 ; nn < sz; nn++)
if (test_bit(nn, smap))
byte_set_bit(nn, dmap);
}
static void dlm_free_pagevec(void **vec, int pages)
{
while (pages--)
......@@ -95,10 +125,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
#define DLM_DOMAIN_BACKOFF_MS 200
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data);
static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
......@@ -125,10 +159,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
hlist_add_head(&res->hash_node, bucket);
}
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
{
struct hlist_head *bucket;
struct hlist_node *list;
......@@ -154,6 +188,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
return NULL;
}
/* intended to be called by functions which do not care about lock
* resources which are being purged (most net _handler functions).
* this will return NULL for any lock resource which is found but
* currently in the process of dropping its mastery reference.
* use __dlm_lookup_lockres_full when you need the lock resource
* regardless (e.g. dlm_get_lock_resource) */
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
{
struct dlm_lock_resource *res = NULL;
mlog_entry("%.*s\n", len, name);
assert_spin_locked(&dlm->spinlock);
res = __dlm_lookup_lockres_full(dlm, name, len, hash);
if (res) {
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_DROPPING_REF) {
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
return NULL;
}
spin_unlock(&res->spinlock);
}
return res;
}
struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len)
......@@ -330,43 +395,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
wake_up(&dlm_domain_events);
}
static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
{
int i;
int i, num, n, ret = 0;
struct dlm_lock_resource *res;
struct hlist_node *iter;
struct hlist_head *bucket;
int dropped;
mlog(0, "Migrating locks from domain %s\n", dlm->name);
restart:
num = 0;
spin_lock(&dlm->spinlock);
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
struct dlm_lock_resource, hash_node);
/* need reference when manually grabbing lockres */
redo_bucket:
n = 0;
bucket = dlm_lockres_hash(dlm, i);
iter = bucket->first;
while (iter) {
n++;
res = hlist_entry(iter, struct dlm_lock_resource,
hash_node);
dlm_lockres_get(res);
/* this should unhash the lockres
* and exit with dlm->spinlock */
mlog(0, "purging res=%p\n", res);
if (dlm_lockres_is_dirty(dlm, res)) {
/* HACK! this should absolutely go.
* need to figure out why some empty
* lockreses are still marked dirty */
mlog(ML_ERROR, "lockres %.*s dirty!\n",
res->lockname.len, res->lockname.name);
spin_unlock(&dlm->spinlock);
dlm_kick_thread(dlm, res);
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_put(res);
goto restart;
}
dlm_purge_lockres(dlm, res);
/* migrate, if necessary. this will drop the dlm
* spinlock and retake it if it does migration. */
dropped = dlm_empty_lockres(dlm, res);
spin_lock(&res->spinlock);
__dlm_lockres_calc_usage(dlm, res);
iter = res->hash_node.next;
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
cond_resched_lock(&dlm->spinlock);
if (dropped)
goto redo_bucket;
}
num += n;
mlog(0, "%s: touched %d lockreses in bucket %d "
"(tot=%d)\n", dlm->name, n, i, num);
}
spin_unlock(&dlm->spinlock);
wake_up(&dlm->dlm_thread_wq);
/* let the dlm thread take care of purging, keep scanning until
* nothing remains in the hash */
if (num) {
mlog(0, "%s: %d lock resources in hash last pass\n",
dlm->name, num);
ret = -EAGAIN;
}
mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
return ret;
}
static int dlm_no_joining_node(struct dlm_ctxt *dlm)
......@@ -418,7 +500,8 @@ static void __dlm_print_nodes(struct dlm_ctxt *dlm)
printk("\n");
}
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
unsigned int node;
......@@ -571,7 +654,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
/* We changed dlm state, notify the thread */
dlm_kick_thread(dlm, NULL);
dlm_migrate_all_locks(dlm);
while (dlm_migrate_all_locks(dlm)) {
mlog(0, "%s: more migration to do\n", dlm->name);
}
dlm_mark_domain_leaving(dlm);
dlm_leave_domain(dlm);
dlm_complete_dlm_shutdown(dlm);
......@@ -580,11 +665,13 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
}
EXPORT_SYMBOL_GPL(dlm_unregister_domain);
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_query_join_request *query;
enum dlm_query_join_response response;
struct dlm_ctxt *dlm = NULL;
u8 nodenum;
query = (struct dlm_query_join_request *) msg->buf;
......@@ -608,6 +695,28 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&dlm_domain_lock);
dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
if (!dlm)
goto unlock_respond;
/*
* There is a small window where the joining node may not see the
* node(s) that just left but still part of the cluster. DISALLOW
* join request if joining node has different node map.
*/
nodenum=0;
while (nodenum < O2NM_MAX_NODES) {
if (test_bit(nodenum, dlm->domain_map)) {
if (!byte_test_bit(nodenum, query->node_map)) {
mlog(0, "disallow join as node %u does not "
"have node %u in its nodemap\n",
query->node_idx, nodenum);
response = JOIN_DISALLOW;
goto unlock_respond;
}
}
nodenum++;
}
/* Once the dlm ctxt is marked as leaving then we don't want
* to be put in someone's domain map.
* Also, explicitly disallow joining at certain troublesome
......@@ -626,15 +735,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
/* Disallow parallel joins. */
response = JOIN_DISALLOW;
} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
mlog(ML_NOTICE, "node %u trying to join, but recovery "
mlog(0, "node %u trying to join, but recovery "
"is ongoing.\n", bit);
response = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->recovery_map)) {
mlog(ML_NOTICE, "node %u trying to join, but it "
mlog(0, "node %u trying to join, but it "
"still needs recovery.\n", bit);
response = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->domain_map)) {
mlog(ML_NOTICE, "node %u trying to join, but it "
mlog(0, "node %u trying to join, but it "
"is still in the domain! needs recovery?\n",
bit);
response = JOIN_DISALLOW;
......@@ -649,6 +758,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
spin_unlock(&dlm->spinlock);
}
unlock_respond:
spin_unlock(&dlm_domain_lock);
respond:
......@@ -657,7 +767,8 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
return response;
}
static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_assert_joined *assert;
struct dlm_ctxt *dlm = NULL;
......@@ -694,7 +805,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
return 0;
}
static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_cancel_join *cancel;
struct dlm_ctxt *dlm = NULL;
......@@ -796,6 +908,9 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
join_msg.name_len = strlen(dlm->name);
memcpy(join_msg.domain, dlm->name, join_msg.name_len);
/* copy live node map to join message */
byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
sizeof(join_msg), node, &retval);
if (status < 0 && status != -ENOPROTOOPT) {
......@@ -1036,98 +1151,106 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
sizeof(struct dlm_master_request),
dlm_master_request_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
sizeof(struct dlm_assert_master),
dlm_assert_master_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, dlm_assert_master_post_handler,
&dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
sizeof(struct dlm_create_lock),
dlm_create_lock_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
DLM_CONVERT_LOCK_MAX_LEN,
dlm_convert_lock_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
DLM_UNLOCK_LOCK_MAX_LEN,
dlm_unlock_lock_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
DLM_PROXY_AST_MAX_LEN,
dlm_proxy_ast_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
sizeof(struct dlm_exit_domain),
dlm_exit_domain_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
sizeof(struct dlm_deref_lockres),
dlm_deref_lockres_handler,
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
sizeof(struct dlm_migrate_request),
dlm_migrate_request_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
DLM_MIG_LOCKRES_MAX_LEN,
dlm_mig_lockres_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
sizeof(struct dlm_master_requery),
dlm_master_requery_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
sizeof(struct dlm_lock_request),
dlm_request_all_locks_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
sizeof(struct dlm_reco_data_done),
dlm_reco_data_done_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
sizeof(struct dlm_begin_reco),
dlm_begin_reco_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
sizeof(struct dlm_finalize_reco),
dlm_finalize_reco_handler,
dlm, &dlm->dlm_domain_handlers);
dlm, NULL, &dlm->dlm_domain_handlers);
if (status)
goto bail;
......@@ -1141,6 +1264,8 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
static int dlm_join_domain(struct dlm_ctxt *dlm)
{
int status;
unsigned int backoff;
unsigned int total_backoff = 0;
BUG_ON(!dlm);
......@@ -1172,18 +1297,27 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
}
do {
unsigned int backoff;
status = dlm_try_to_join_domain(dlm);
/* If we're racing another node to the join, then we
* need to back off temporarily and let them
* complete. */
#define DLM_JOIN_TIMEOUT_MSECS 90000
if (status == -EAGAIN) {
if (signal_pending(current)) {
status = -ERESTARTSYS;
goto bail;
}
if (total_backoff >
msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
status = -ERESTARTSYS;
mlog(ML_NOTICE, "Timed out joining dlm domain "
"%s after %u msecs\n", dlm->name,
jiffies_to_msecs(total_backoff));
goto bail;
}
/*
* <chip> After you!
* <dale> No, after you!
......@@ -1193,6 +1327,7 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
*/
backoff = (unsigned int)(jiffies & 0x3);
backoff *= DLM_DOMAIN_BACKOFF_MS;
total_backoff += backoff;
mlog(0, "backoff %d\n", backoff);
msleep(backoff);
}
......@@ -1421,21 +1556,21 @@ static int dlm_register_net_handlers(void)
status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
sizeof(struct dlm_query_join_request),
dlm_query_join_handler,
NULL, &dlm_join_handlers);
NULL, NULL, &dlm_join_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
sizeof(struct dlm_assert_joined),
dlm_assert_joined_handler,
NULL, &dlm_join_handlers);
NULL, NULL, &dlm_join_handlers);
if (status)
goto bail;
status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
sizeof(struct dlm_cancel_join),
dlm_cancel_join_handler,
NULL, &dlm_join_handlers);
NULL, NULL, &dlm_join_handlers);
bail:
if (status < 0)
......
......@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
kick_thread = 1;
}
}
/* reduce the inflight count, this may result in the lockres
* being purged below during calc_usage */
if (lock->ml.node == dlm->node_num)
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
......@@ -437,7 +441,8 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
* held on exit: none
* returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
*/
int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
......
......@@ -99,9 +99,10 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
int idx);
static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
unsigned int namelen, void *nodemap,
u32 flags);
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
void *nodemap, u32 flags);
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
......@@ -237,7 +238,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
struct dlm_master_list_entry **mle,
char *name, unsigned int namelen);
static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
static int dlm_do_master_request(struct dlm_lock_resource *res,
struct dlm_master_list_entry *mle, int to);
static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
......@@ -687,6 +689,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
INIT_LIST_HEAD(&res->purge);
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
res->inflight_locks = 0;
kref_init(&res->refs);
......@@ -700,6 +703,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
res->last_used = 0;
memset(res->lvb, 0, DLM_LVB_LEN);
memset(res->refmap, 0, sizeof(res->refmap));
}
struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
......@@ -722,6 +726,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
return res;
}
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int new_lockres,
const char *file,
int line)
{
if (!new_lockres)
assert_spin_locked(&res->spinlock);
if (!test_bit(dlm->node_num, res->refmap)) {
BUG_ON(res->inflight_locks != 0);
dlm_lockres_set_refmap_bit(dlm->node_num, res);
}
res->inflight_locks++;
mlog(0, "%s:%.*s: inflight++: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_locks);
}
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
const char *file,
int line)
{
assert_spin_locked(&res->spinlock);
BUG_ON(res->inflight_locks == 0);
res->inflight_locks--;
mlog(0, "%s:%.*s: inflight--: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_locks);
if (res->inflight_locks == 0)
dlm_lockres_clear_refmap_bit(dlm->node_num, res);
wake_up(&res->wq);
}
/*
* lookup a lock resource by name.
* may already exist in the hashtable.
......@@ -752,6 +792,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
unsigned int hash;
int tries = 0;
int bit, wait_on_recovery = 0;
int drop_inflight_if_nonlocal = 0;
BUG_ON(!lockid);
......@@ -761,9 +802,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
lookup:
spin_lock(&dlm->spinlock);
tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
if (tmpres) {
int dropping_ref = 0;
spin_lock(&tmpres->spinlock);
if (tmpres->owner == dlm->node_num) {
BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
dlm_lockres_grab_inflight_ref(dlm, tmpres);
} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
dropping_ref = 1;
spin_unlock(&tmpres->spinlock);
spin_unlock(&dlm->spinlock);
/* wait until done messaging the master, drop our ref to allow
* the lockres to be purged, start over. */
if (dropping_ref) {
spin_lock(&tmpres->spinlock);
__dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
spin_unlock(&tmpres->spinlock);
dlm_lockres_put(tmpres);
tmpres = NULL;
goto lookup;
}
mlog(0, "found in hash!\n");
if (res)
dlm_lockres_put(res);
......@@ -793,6 +855,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, dlm->node_num);
__dlm_insert_lockres(dlm, res);
dlm_lockres_grab_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* lockres still marked IN_PROGRESS */
......@@ -805,29 +868,40 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
/* if we found a block, wait for lock to be mastered by another node */
blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
if (blocked) {
int mig;
if (mle->type == DLM_MLE_MASTER) {
mlog(ML_ERROR, "master entry for nonexistent lock!\n");
BUG();
} else if (mle->type == DLM_MLE_MIGRATION) {
/* migration is in progress! */
/* the good news is that we now know the
* "current" master (mle->master). */
}
mig = (mle->type == DLM_MLE_MIGRATION);
/* if there is a migration in progress, let the migration
* finish before continuing. we can wait for the absence
* of the MIGRATION mle: either the migrate finished or
* one of the nodes died and the mle was cleaned up.
* if there is a BLOCK here, but it already has a master
* set, we are too late. the master does not have a ref
* for us in the refmap. detach the mle and drop it.
* either way, go back to the top and start over. */
if (mig || mle->master != O2NM_MAX_NODES) {
BUG_ON(mig && mle->master == dlm->node_num);
/* we arrived too late. the master does not
* have a ref for us. retry. */
mlog(0, "%s:%.*s: late on %s\n",
dlm->name, namelen, lockid,
mig ? "MIGRATION" : "BLOCK");
spin_unlock(&dlm->master_lock);
assert_spin_locked(&dlm->spinlock);
/* set the lockres owner and hash it */
spin_lock(&res->spinlock);
dlm_set_lockres_owner(dlm, res, mle->master);
__dlm_insert_lockres(dlm, res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* master is known, detach */
dlm_mle_detach_hb_events(dlm, mle);
if (!mig)
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
mle = NULL;
goto wake_waiters;
/* this is lame, but we cant wait on either
* the mle or lockres waitqueue here */
if (mig)
msleep(100);
goto lookup;
}
} else {
/* go ahead and try to master lock on this node */
......@@ -858,6 +932,13 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
/* finally add the lockres to its hash bucket */
__dlm_insert_lockres(dlm, res);
/* since this lockres is new it doesnt not require the spinlock */
dlm_lockres_grab_inflight_ref_new(dlm, res);
/* if this node does not become the master make sure to drop
* this inflight reference below */
drop_inflight_if_nonlocal = 1;
/* get an extra ref on the mle in case this is a BLOCK
* if so, the creator of the BLOCK may try to put the last
* ref at this time in the assert master handler, so we
......@@ -910,7 +991,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
ret = -EINVAL;
dlm_node_iter_init(mle->vote_map, &iter);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
ret = dlm_do_master_request(mle, nodenum);
ret = dlm_do_master_request(res, mle, nodenum);
if (ret < 0)
mlog_errno(ret);
if (mle->master != O2NM_MAX_NODES) {
......@@ -960,6 +1041,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
wake_waiters:
spin_lock(&res->spinlock);
if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
dlm_lockres_drop_inflight_ref(dlm, res);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
......@@ -998,7 +1081,7 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
/* this will cause the master to re-assert across
* the whole cluster, freeing up mles */
if (res->owner != dlm->node_num) {
ret = dlm_do_master_request(mle, res->owner);
ret = dlm_do_master_request(res, mle, res->owner);
if (ret < 0) {
/* give recovery a chance to run */
mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
......@@ -1062,6 +1145,8 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
* now tell other nodes that I am
* mastering this. */
mle->master = dlm->node_num;
/* ref was grabbed in get_lock_resource
* will be dropped in dlmlock_master */
assert = 1;
sleep = 0;
}
......@@ -1087,7 +1172,8 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
(atomic_read(&mle->woken) == 1),
timeo);
if (res->owner == O2NM_MAX_NODES) {
mlog(0, "waiting again\n");
mlog(0, "%s:%.*s: waiting again\n", dlm->name,
res->lockname.len, res->lockname.name);
goto recheck;
}
mlog(0, "done waiting, master is %u\n", res->owner);
......@@ -1100,8 +1186,7 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
m = dlm->node_num;
mlog(0, "about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, m);
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len, mle->vote_map, 0);
ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
if (ret) {
/* This is a failure in the network path,
* not in the response to the assert_master
......@@ -1117,6 +1202,8 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
/* set the lockres owner */
spin_lock(&res->spinlock);
/* mastery reference obtained either during
* assert_master_handler or in get_lock_resource */
dlm_change_lockres_owner(dlm, res, m);
spin_unlock(&res->spinlock);
......@@ -1283,7 +1370,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
*
*/
static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
static int dlm_do_master_request(struct dlm_lock_resource *res,
struct dlm_master_list_entry *mle, int to)
{
struct dlm_ctxt *dlm = mle->dlm;
struct dlm_master_request request;
......@@ -1339,6 +1427,9 @@ static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
case DLM_MASTER_RESP_YES:
set_bit(to, mle->response_map);
mlog(0, "node %u is the master, response=YES\n", to);
mlog(0, "%s:%.*s: master node %u now knows I have a "
"reference\n", dlm->name, res->lockname.len,
res->lockname.name, to);
mle->master = to;
break;
case DLM_MASTER_RESP_NO:
......@@ -1379,7 +1470,8 @@ static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
*
* if possible, TRIM THIS DOWN!!!
*/
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
u8 response = DLM_MASTER_RESP_MAYBE;
struct dlm_ctxt *dlm = data;
......@@ -1417,10 +1509,11 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
/* take care of the easy cases up front */
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_RECOVERING) {
if (res->state & (DLM_LOCK_RES_RECOVERING|
DLM_LOCK_RES_MIGRATING)) {
spin_unlock(&res->spinlock);
mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
"being recovered\n");
"being recovered/migrated\n");
response = DLM_MASTER_RESP_ERROR;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
......@@ -1428,8 +1521,10 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
}
if (res->owner == dlm->node_num) {
mlog(0, "%s:%.*s: setting bit %u in refmap\n",
dlm->name, namelen, name, request->node_idx);
dlm_lockres_set_refmap_bit(request->node_idx, res);
spin_unlock(&res->spinlock);
// mlog(0, "this node is the master\n");
response = DLM_MASTER_RESP_YES;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
......@@ -1477,7 +1572,6 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "node %u is master, but trying to migrate to "
"node %u.\n", tmpmle->master, tmpmle->new_master);
if (tmpmle->master == dlm->node_num) {
response = DLM_MASTER_RESP_YES;
mlog(ML_ERROR, "no owner on lockres, but this "
"node is trying to migrate it to %u?!\n",
tmpmle->new_master);
......@@ -1494,6 +1588,10 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
dlm_lockres_set_refmap_bit(request->node_idx, res);
mlog(0, "%s:%.*s: setting bit %u in refmap\n",
dlm->name, namelen, name,
request->node_idx);
} else
response = DLM_MASTER_RESP_NO;
} else {
......@@ -1607,17 +1705,24 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
* can periodically run all locks owned by this node
* and re-assert across the cluster...
*/
static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
unsigned int namelen, void *nodemap,
u32 flags)
int dlm_do_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
void *nodemap, u32 flags)
{
struct dlm_assert_master assert;
int to, tmpret;
struct dlm_node_iter iter;
int ret = 0;
int reassert;
const char *lockname = res->lockname.name;
unsigned int namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
spin_lock(&res->spinlock);
res->state |= DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);
again:
reassert = 0;
......@@ -1647,6 +1752,7 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
mlog(0, "link to %d went down!\n", to);
/* any nonzero status return will do */
ret = tmpret;
r = 0;
} else if (r < 0) {
/* ok, something horribly messed. kill thyself. */
mlog(ML_ERROR,"during assert master of %.*s to %u, "
......@@ -1661,17 +1767,39 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
BUG();
} else if (r == EAGAIN) {
}
if (r & DLM_ASSERT_RESPONSE_REASSERT &&
!(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
mlog(ML_ERROR, "%.*s: very strange, "
"master MLE but no lockres on %u\n",
namelen, lockname, to);
}
if (r & DLM_ASSERT_RESPONSE_REASSERT) {
mlog(0, "%.*s: node %u create mles on other "
"nodes and requests a re-assert\n",
namelen, lockname, to);
reassert = 1;
}
if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
mlog(0, "%.*s: node %u has a reference to this "
"lockres, set the bit in the refmap\n",
namelen, lockname, to);
spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(to, res);
spin_unlock(&res->spinlock);
}
}
if (reassert)
goto again;
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
return ret;
}
......@@ -1684,7 +1812,8 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
*
* if possible, TRIM THIS DOWN!!!
*/
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_list_entry *mle = NULL;
......@@ -1693,7 +1822,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
char *name;
unsigned int namelen, hash;
u32 flags;
int master_request = 0;
int master_request = 0, have_lockres_ref = 0;
int ret = 0;
if (!dlm_grab(dlm))
......@@ -1851,6 +1980,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
spin_unlock(&mle->spinlock);
if (res) {
int wake = 0;
spin_lock(&res->spinlock);
if (mle->type == DLM_MLE_MIGRATION) {
mlog(0, "finishing off migration of lockres %.*s, "
......@@ -1858,12 +1988,16 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
res->lockname.len, res->lockname.name,
dlm->node_num, mle->new_master);
res->state &= ~DLM_LOCK_RES_MIGRATING;
wake = 1;
dlm_change_lockres_owner(dlm, res, mle->new_master);
BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
} else {
dlm_change_lockres_owner(dlm, res, mle->master);
}
spin_unlock(&res->spinlock);
have_lockres_ref = 1;
if (wake)
wake_up(&res->wq);
}
/* master is known, detach if not already detached.
......@@ -1913,12 +2047,28 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
done:
ret = 0;
if (res)
dlm_lockres_put(res);
if (res) {
spin_lock(&res->spinlock);
res->state |= DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);
*ret_data = (void *)res;
}
dlm_put(dlm);
if (master_request) {
mlog(0, "need to tell master to reassert\n");
ret = EAGAIN; // positive. negative would shoot down the node.
/* positive. negative would shoot down the node. */
ret |= DLM_ASSERT_RESPONSE_REASSERT;
if (!have_lockres_ref) {
mlog(ML_ERROR, "strange, got assert from %u, MASTER "
"mle present here for %s:%.*s, but no lockres!\n",
assert->node_idx, dlm->name, namelen, name);
}
}
if (have_lockres_ref) {
/* let the master know we have a reference to the lockres */
ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
dlm->name, namelen, name, assert->node_idx);
}
return ret;
......@@ -1929,11 +2079,25 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
__dlm_print_one_lock_resource(res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
dlm_lockres_put(res);
*ret_data = (void *)res;
dlm_put(dlm);
return -EINVAL;
}
void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
{
struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
if (ret_data) {
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
dlm_lockres_put(res);
}
return;
}
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int ignore_higher, u8 request_from, u32 flags)
......@@ -2023,9 +2187,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
* even if one or more nodes die */
mlog(0, "worker about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, dlm->node_num);
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len,
nodemap, flags);
ret = dlm_do_assert_master(dlm, res, nodemap, flags);
if (ret < 0) {
/* no need to restart, we are done */
if (!dlm_is_host_down(ret))
......@@ -2097,14 +2259,180 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
return ret;
}
/*
* DLM_DEREF_LOCKRES_MSG
*/
int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
{
struct dlm_deref_lockres deref;
int ret = 0, r;
const char *lockname;
unsigned int namelen;
lockname = res->lockname.name;
namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
mlog(0, "%s:%.*s: sending deref to %d\n",
dlm->name, namelen, lockname, res->owner);
memset(&deref, 0, sizeof(deref));
deref.node_idx = dlm->node_num;
deref.namelen = namelen;
memcpy(deref.name, lockname, namelen);
ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
&deref, sizeof(deref), res->owner, &r);
if (ret < 0)
mlog_errno(ret);
else if (r < 0) {
/* BAD. other node says I did not have a ref. */
mlog(ML_ERROR,"while dropping ref on %s:%.*s "
"(master=%u) got %d.\n", dlm->name, namelen,
lockname, res->owner, r);
dlm_print_one_lock_resource(res);
BUG();
}
return ret;
}
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
struct dlm_lock_resource *res = NULL;
char *name;
unsigned int namelen;
int ret = -EINVAL;
u8 node;
unsigned int hash;
struct dlm_work_item *item;
int cleared = 0;
int dispatch = 0;
if (!dlm_grab(dlm))
return 0;
name = deref->name;
namelen = deref->namelen;
node = deref->node_idx;
if (namelen > DLM_LOCKID_NAME_MAX) {
mlog(ML_ERROR, "Invalid name length!");
goto done;
}
if (deref->node_idx >= O2NM_MAX_NODES) {
mlog(ML_ERROR, "Invalid node number: %u\n", node);
goto done;
}
hash = dlm_lockid_hash(name, namelen);
spin_lock(&dlm->spinlock);
res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
if (!res) {
spin_unlock(&dlm->spinlock);
mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
dlm->name, namelen, name);
goto done;
}
spin_unlock(&dlm->spinlock);
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_SETREF_INPROG)
dispatch = 1;
else {
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
dlm_lockres_clear_refmap_bit(node, res);
cleared = 1;
}
}
spin_unlock(&res->spinlock);
if (!dispatch) {
if (cleared)
dlm_lockres_calc_usage(dlm, res);
else {
mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
"but it is already dropped!\n", dlm->name,
res->lockname.len, res->lockname.name, node);
__dlm_print_one_lock_resource(res);
}
ret = 0;
goto done;
}
item = kzalloc(sizeof(*item), GFP_NOFS);
if (!item) {
ret = -ENOMEM;
mlog_errno(ret);
goto done;
}
dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
item->u.dl.deref_res = res;
item->u.dl.deref_node = node;
spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);
queue_work(dlm->dlm_worker, &dlm->dispatched_work);
return 0;
done:
if (res)
dlm_lockres_put(res);
dlm_put(dlm);
return ret;
}
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
{
struct dlm_ctxt *dlm;
struct dlm_lock_resource *res;
u8 node;
u8 cleared = 0;
dlm = item->dlm;
res = item->u.dl.deref_res;
node = item->u.dl.deref_node;
spin_lock(&res->spinlock);
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
dlm_lockres_clear_refmap_bit(node, res);
cleared = 1;
}
spin_unlock(&res->spinlock);
if (cleared) {
mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
dlm->name, res->lockname.len, res->lockname.name, node);
dlm_lockres_calc_usage(dlm, res);
} else {
mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
"but it is already dropped!\n", dlm->name,
res->lockname.len, res->lockname.name, node);
__dlm_print_one_lock_resource(res);
}
dlm_lockres_put(res);
}
/*
* DLM_MIGRATE_LOCKRES
*/
int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 target)
static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 target)
{
struct dlm_master_list_entry *mle = NULL;
struct dlm_master_list_entry *oldmle = NULL;
......@@ -2116,7 +2444,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct list_head *queue, *iter;
int i;
struct dlm_lock *lock;
int empty = 1;
int empty = 1, wake = 0;
if (!dlm_grab(dlm))
return -EINVAL;
......@@ -2241,6 +2569,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
res->lockname.name, target);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
wake = 1;
spin_unlock(&res->spinlock);
ret = -EINVAL;
}
......@@ -2268,6 +2597,9 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* the lockres
*/
/* now that remote nodes are spinning on the MIGRATING flag,
* ensure that all assert_master work is flushed. */
flush_workqueue(dlm->dlm_worker);
/* get an extra reference on the mle.
* otherwise the assert_master from the new
......@@ -2296,6 +2628,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
wake = 1;
spin_unlock(&res->spinlock);
goto leave;
}
......@@ -2322,7 +2655,8 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
res->owner == target)
break;
mlog(0, "timed out during migration\n");
mlog(0, "%s:%.*s: timed out during migration\n",
dlm->name, res->lockname.len, res->lockname.name);
/* avoid hang during shutdown when migrating lockres
* to a node which also goes down */
if (dlm_is_node_dead(dlm, target)) {
......@@ -2330,20 +2664,20 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
"target %u is no longer up, restarting\n",
dlm->name, res->lockname.len,
res->lockname.name, target);
ret = -ERESTARTSYS;
ret = -EINVAL;
/* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
wake = 1;
spin_unlock(&res->spinlock);
goto leave;
}
}
if (ret == -ERESTARTSYS) {
/* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
spin_unlock(&res->spinlock);
goto leave;
}
/* TODO: if node died: stop, clean up, return error */
} else
mlog(0, "%s:%.*s: caught signal during migration\n",
dlm->name, res->lockname.len, res->lockname.name);
}
/* all done, set the owner, clear the flag */
......@@ -2366,6 +2700,11 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
if (ret < 0)
dlm_kick_thread(dlm, res);
/* wake up waiters if the MIGRATING flag got set
* but migration failed */
if (wake)
wake_up(&res->wq);
/* TODO: cleanup */
if (mres)
free_page((unsigned long)mres);
......@@ -2376,6 +2715,53 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
return ret;
}
#define DLM_MIGRATION_RETRY_MS 100
/* Should be called only after beginning the domain leave process.
* There should not be any remaining locks on nonlocal lock resources,
* and there should be no local locks left on locally mastered resources.
*
* Called with the dlm spinlock held, may drop it to do migration, but
* will re-acquire before exit.
*
* Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
{
int ret;
int lock_dropped = 0;
if (res->owner != dlm->node_num) {
if (!__dlm_lockres_unused(res)) {
mlog(ML_ERROR, "%s:%.*s: this node is not master, "
"trying to free this but locks remain\n",
dlm->name, res->lockname.len, res->lockname.name);
}
goto leave;
}
/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
spin_unlock(&dlm->spinlock);
lock_dropped = 1;
while (1) {
ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
if (ret >= 0)
break;
if (ret == -ENOTEMPTY) {
mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
res->lockname.len, res->lockname.name);
BUG();
}
mlog(0, "lockres %.*s: migrate failed, "
"retrying\n", res->lockname.len,
res->lockname.name);
msleep(DLM_MIGRATION_RETRY_MS);
}
spin_lock(&dlm->spinlock);
leave:
return lock_dropped;
}
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
{
int ret;
......@@ -2405,7 +2791,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
return can_proceed;
}
int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
int ret;
spin_lock(&res->spinlock);
......@@ -2434,8 +2821,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
__dlm_lockres_reserve_ast(res);
spin_unlock(&res->spinlock);
/* now flush all the pending asts.. hang out for a bit */
/* now flush all the pending asts */
dlm_kick_thread(dlm, res);
/* before waiting on DIRTY, block processes which may
* try to dirty the lockres before MIGRATING is set */
spin_lock(&res->spinlock);
BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
spin_unlock(&res->spinlock);
/* now wait on any pending asts and the DIRTY state */
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_release_ast(dlm, res);
......@@ -2461,6 +2855,13 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
mlog(0, "trying again...\n");
goto again;
}
/* now that we are sure the MIGRATING state is there, drop
* the unneded state which blocked threads trying to DIRTY */
spin_lock(&res->spinlock);
BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
spin_unlock(&res->spinlock);
/* did the target go down or die? */
spin_lock(&dlm->spinlock);
......@@ -2490,7 +2891,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
{
struct list_head *iter, *iter2;
struct list_head *queue = &res->granted;
int i;
int i, bit;
struct dlm_lock *lock;
assert_spin_locked(&res->spinlock);
......@@ -2508,12 +2909,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
BUG_ON(!list_empty(&lock->bast_list));
BUG_ON(lock->ast_pending);
BUG_ON(lock->bast_pending);
dlm_lockres_clear_refmap_bit(lock->ml.node, res);
list_del_init(&lock->list);
dlm_lock_put(lock);
}
}
queue++;
}
bit = 0;
while (1) {
bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
if (bit >= O2NM_MAX_NODES)
break;
/* do not clear the local node reference, if there is a
* process holding this, let it drop the ref itself */
if (bit != dlm->node_num) {
mlog(0, "%s:%.*s: node %u had a ref to this "
"migrating lockres, clearing\n", dlm->name,
res->lockname.len, res->lockname.name, bit);
dlm_lockres_clear_refmap_bit(bit, res);
}
bit++;
}
}
/* for now this is not too intelligent. we will
......@@ -2601,6 +3018,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
mlog(0, "migrate request (node %u) returned %d!\n",
nodenum, status);
ret = status;
} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
/* during the migration request we short-circuited
* the mastery of the lockres. make sure we have
* a mastery ref for nodenum */
mlog(0, "%s:%.*s: need ref for node %u\n",
dlm->name, res->lockname.len, res->lockname.name,
nodenum);
spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(nodenum, res);
spin_unlock(&res->spinlock);
}
}
......@@ -2619,7 +3046,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
* we will have no mle in the list to start with. now we can add an mle for
* the migration and this should be the only one found for those scanning the
* list. */
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_resource *res = NULL;
......@@ -2745,7 +3173,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
/* remove it from the list so that only one
* mle will be found */
list_del_init(&tmp->list);
__dlm_mle_detach_hb_events(dlm, mle);
/* this was obviously WRONG. mle is uninited here. should be tmp. */
__dlm_mle_detach_hb_events(dlm, tmp);
ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
"telling master to get ref for cleared out mle "
"during migration\n", dlm->name, namelen, name,
master, new_master);
}
spin_unlock(&tmp->spinlock);
}
......@@ -2753,6 +3187,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
/* now add a migration mle to the tail of the list */
dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
mle->new_master = new_master;
/* the new master will be sending an assert master for this.
* at that point we will get the refmap reference */
mle->master = master;
/* do this for consistency with other mle types */
set_bit(new_master, mle->maybe_map);
......@@ -2902,6 +3338,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
clear_bit(dlm->node_num, iter.node_map);
spin_unlock(&dlm->spinlock);
/* ownership of the lockres is changing. account for the
* mastery reference here since old_master will briefly have
* a reference after the migration completes */
spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(old_master, res);
spin_unlock(&res->spinlock);
mlog(0, "now time to do a migrate request to other nodes\n");
ret = dlm_do_migrate_request(dlm, res, old_master,
dlm->node_num, &iter);
......@@ -2914,8 +3357,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
res->lockname.len, res->lockname.name);
/* this call now finishes out the nodemap
* even if one or more nodes die */
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len, iter.node_map,
ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
/* no longer need to retry. all living nodes contacted. */
......@@ -2927,8 +3369,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
set_bit(old_master, iter.node_map);
mlog(0, "doing assert master of %.*s back to %u\n",
res->lockname.len, res->lockname.name, old_master);
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len, iter.node_map,
ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
mlog(0, "assert master to original master failed "
......
......@@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work)
dlm_workfunc_t *workfunc;
int tot=0;
if (!dlm_joined(dlm))
return;
spin_lock(&dlm->work_lock);
list_splice_init(&dlm->work_list, &tmp_list);
spin_unlock(&dlm->work_lock);
......@@ -821,7 +818,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
}
int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
......@@ -978,7 +976,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
}
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
......@@ -1129,6 +1128,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
if (total_locks == mres_total_locks)
mres->flags |= DLM_MRES_ALL_DONE;
mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
dlm->name, res->lockname.len, res->lockname.name,
orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
send_to);
/* send it */
ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
sz, send_to, &status);
......@@ -1213,6 +1217,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
return 0;
}
static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
struct dlm_migratable_lockres *mres)
{
struct dlm_lock dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.ml.cookie = 0;
dummy.ml.type = LKM_IVMODE;
dummy.ml.convert_type = LKM_IVMODE;
dummy.ml.highest_blocked = LKM_IVMODE;
dummy.lksb = NULL;
dummy.ml.node = dlm->node_num;
dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
}
static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
struct dlm_migratable_lock *ml,
u8 *nodenum)
{
if (unlikely(ml->cookie == 0 &&
ml->type == LKM_IVMODE &&
ml->convert_type == LKM_IVMODE &&
ml->highest_blocked == LKM_IVMODE &&
ml->list == DLM_BLOCKED_LIST)) {
*nodenum = ml->node;
return 1;
}
return 0;
}
int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres,
......@@ -1260,6 +1292,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
goto error;
}
}
if (total_locks == 0) {
/* send a dummy lock to indicate a mastery reference only */
mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
dlm->name, res->lockname.len, res->lockname.name,
send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
"migration");
dlm_add_dummy_lock(dlm, mres);
}
/* flush any remaining locks */
ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
if (ret < 0)
......@@ -1293,7 +1333,8 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* do we spin? returning an error only delays the problem really
*/
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_migratable_lockres *mres =
......@@ -1382,17 +1423,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
/* add an extra ref for just-allocated lockres
* otherwise the lockres will be purged immediately */
dlm_lockres_get(res);
}
/* at this point we have allocated everything we need,
* and we have a hashed lockres with an extra ref and
* the proper res->state flags. */
ret = 0;
spin_lock(&res->spinlock);
/* drop this either when master requery finds a different master
* or when a lock is added by the recovery worker */
dlm_lockres_grab_inflight_ref(dlm, res);
if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
/* migration cannot have an unknown master */
BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
......@@ -1400,10 +1445,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
"unknown owner.. will need to requery: "
"%.*s\n", mres->lockname_len, mres->lockname);
} else {
spin_lock(&res->spinlock);
/* take a reference now to pin the lockres, drop it
* when locks are added in the worker */
dlm_change_lockres_owner(dlm, res, dlm->node_num);
spin_unlock(&res->spinlock);
}
spin_unlock(&res->spinlock);
/* queue up work for dlm_mig_lockres_worker */
dlm_grab(dlm); /* get an extra ref for the work item */
......@@ -1459,6 +1505,9 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
"this node will take it.\n",
res->lockname.len, res->lockname.name);
} else {
spin_lock(&res->spinlock);
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
mlog(0, "master needs to respond to sender "
"that node %u still owns %.*s\n",
real_master, res->lockname.len,
......@@ -1578,7 +1627,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
/* this function cannot error, so unless the sending
* or receiving of the message failed, the owner can
* be trusted */
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
......@@ -1660,21 +1710,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
{
struct dlm_migratable_lock *ml;
struct list_head *queue;
struct list_head *tmpq = NULL;
struct dlm_lock *newlock = NULL;
struct dlm_lockstatus *lksb = NULL;
int ret = 0;
int i, bad;
int i, j, bad;
struct list_head *iter;
struct dlm_lock *lock = NULL;
u8 from = O2NM_MAX_NODES;
unsigned int added = 0;
mlog(0, "running %d locks for this lockres\n", mres->num_locks);
for (i=0; i<mres->num_locks; i++) {
ml = &(mres->ml[i]);
if (dlm_is_dummy_lock(dlm, ml, &from)) {
/* placeholder, just need to set the refmap bit */
BUG_ON(mres->num_locks != 1);
mlog(0, "%s:%.*s: dummy lock for %u\n",
dlm->name, mres->lockname_len, mres->lockname,
from);
spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(from, res);
spin_unlock(&res->spinlock);
added++;
break;
}
BUG_ON(ml->highest_blocked != LKM_IVMODE);
newlock = NULL;
lksb = NULL;
queue = dlm_list_num_to_pointer(res, ml->list);
tmpq = NULL;
/* if the lock is for the local node it needs to
* be moved to the proper location within the queue.
......@@ -1684,11 +1751,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
spin_lock(&res->spinlock);
list_for_each(iter, queue) {
lock = list_entry (iter, struct dlm_lock, list);
if (lock->ml.cookie != ml->cookie)
lock = NULL;
else
for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
tmpq = dlm_list_idx_to_ptr(res, j);
list_for_each(iter, tmpq) {
lock = list_entry (iter, struct dlm_lock, list);
if (lock->ml.cookie != ml->cookie)
lock = NULL;
else
break;
}
if (lock)
break;
}
......@@ -1698,12 +1770,20 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
u64 c = ml->cookie;
mlog(ML_ERROR, "could not find local lock "
"with cookie %u:%llu!\n",
dlm_get_lock_cookie_node(c),
dlm_get_lock_cookie_seq(c));
dlm_get_lock_cookie_node(be64_to_cpu(c)),
dlm_get_lock_cookie_seq(be64_to_cpu(c)));
__dlm_print_one_lock_resource(res);
BUG();
}
BUG_ON(lock->ml.node != ml->node);
if (tmpq != queue) {
mlog(0, "lock was on %u instead of %u for %.*s\n",
j, ml->list, res->lockname.len, res->lockname.name);
spin_unlock(&res->spinlock);
continue;
}
/* see NOTE above about why we do not update
* to match the master here */
......@@ -1711,6 +1791,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
/* do not alter lock refcount. switching lists. */
list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
added++;
mlog(0, "just reordered a local lock!\n");
continue;
......@@ -1799,14 +1880,14 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
"exists on this lockres!\n", dlm->name,
res->lockname.len, res->lockname.name,
dlm_get_lock_cookie_node(c),
dlm_get_lock_cookie_seq(c));
dlm_get_lock_cookie_node(be64_to_cpu(c)),
dlm_get_lock_cookie_seq(be64_to_cpu(c)));
mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
"node=%u, cookie=%u:%llu, queue=%d\n",
ml->type, ml->convert_type, ml->node,
dlm_get_lock_cookie_node(ml->cookie),
dlm_get_lock_cookie_seq(ml->cookie),
dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
ml->list);
__dlm_print_one_lock_resource(res);
......@@ -1817,12 +1898,22 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
if (!bad) {
dlm_lock_get(newlock);
list_add_tail(&newlock->list, queue);
mlog(0, "%s:%.*s: added lock for node %u, "
"setting refmap bit\n", dlm->name,
res->lockname.len, res->lockname.name, ml->node);
dlm_lockres_set_refmap_bit(ml->node, res);
added++;
}
spin_unlock(&res->spinlock);
}
mlog(0, "done running all the locks\n");
leave:
/* balance the ref taken when the work was queued */
spin_lock(&res->spinlock);
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
if (ret < 0) {
mlog_errno(ret);
if (newlock)
......@@ -1935,9 +2026,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
if (res->owner == dead_node) {
list_del_init(&res->recovering);
spin_lock(&res->spinlock);
/* new_master has our reference from
* the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
if (!__dlm_lockres_unused(res))
if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
......@@ -1977,9 +2070,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
dlm_lockres_put(res);
}
spin_lock(&res->spinlock);
/* new_master has our reference from
* the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
if (!__dlm_lockres_unused(res))
if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
......@@ -2048,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
{
struct list_head *iter, *tmpiter;
struct dlm_lock *lock;
unsigned int freed = 0;
/* this node is the lockres master:
* 1) remove any stale locks for the dead node
......@@ -2062,6 +2158,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
freed++;
}
}
list_for_each_safe(iter, tmpiter, &res->converting) {
......@@ -2069,6 +2166,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
freed++;
}
}
list_for_each_safe(iter, tmpiter, &res->blocked) {
......@@ -2076,9 +2174,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
freed++;
}
}
if (freed) {
mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
"dropping ref from lockres\n", dlm->name,
res->lockname.len, res->lockname.name, freed, dead_node);
BUG_ON(!test_bit(dead_node, res->refmap));
dlm_lockres_clear_refmap_bit(dead_node, res);
} else if (test_bit(dead_node, res->refmap)) {
mlog(0, "%s:%.*s: dead node %u had a ref, but had "
"no locks and had not purged before dying\n", dlm->name,
res->lockname.len, res->lockname.name, dead_node);
dlm_lockres_clear_refmap_bit(dead_node, res);
}
/* do not kick thread yet */
__dlm_dirty_lockres(dlm, res);
}
......@@ -2141,9 +2253,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
spin_lock(&res->spinlock);
/* zero the lvb if necessary */
dlm_revalidate_lvb(dlm, res, dead_node);
if (res->owner == dead_node)
if (res->owner == dead_node) {
if (res->state & DLM_LOCK_RES_DROPPING_REF)
mlog(0, "%s:%.*s: owned by "
"dead node %u, this node was "
"dropping its ref when it died. "
"continue, dropping the flag.\n",
dlm->name, res->lockname.len,
res->lockname.name, dead_node);
/* the wake_up for this will happen when the
* RECOVERING flag is dropped later */
res->state &= ~DLM_LOCK_RES_DROPPING_REF;
dlm_move_lockres_to_recovery_list(dlm, res);
else if (res->owner == dlm->node_num) {
} else if (res->owner == dlm->node_num) {
dlm_free_dead_locks(dlm, res, dead_node);
__dlm_lockres_calc_usage(dlm, res);
}
......@@ -2480,7 +2604,8 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
return ret;
}
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
......@@ -2608,7 +2733,8 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
return ret;
}
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
......
......@@ -54,9 +54,6 @@
#include "cluster/masklog.h"
static int dlm_thread(void *data);
static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
static void dlm_flush_asts(struct dlm_ctxt *dlm);
#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
......@@ -82,14 +79,33 @@ void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
current->state = TASK_RUNNING;
}
int __dlm_lockres_unused(struct dlm_lock_resource *res)
int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
{
if (list_empty(&res->granted) &&
list_empty(&res->converting) &&
list_empty(&res->blocked) &&
list_empty(&res->dirty))
return 1;
list_empty(&res->blocked))
return 0;
return 1;
}
/* "unused": the lockres has no locks, is not on the dirty list,
* has no inflight locks (in the gap between mastery and acquiring
* the first lock), and has no bits in its refmap.
* truly ready to be freed. */
int __dlm_lockres_unused(struct dlm_lock_resource *res)
{
if (!__dlm_lockres_has_locks(res) &&
(list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
/* try not to scan the bitmap unless the first two
* conditions are already true */
int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
if (bit >= O2NM_MAX_NODES) {
/* since the bit for dlm->node_num is not
* set, inflight_locks better be zero */
BUG_ON(res->inflight_locks != 0);
return 1;
}
}
return 0;
}
......@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
assert_spin_locked(&res->spinlock);
if (__dlm_lockres_unused(res)){
/* For now, just keep any resource we master */
if (res->owner == dlm->node_num)
{
if (!list_empty(&res->purge)) {
mlog(0, "we master %s:%.*s, but it is on "
"the purge list. Removing\n",
dlm->name, res->lockname.len,
res->lockname.name);
list_del_init(&res->purge);
dlm->purge_count--;
}
return;
}
if (list_empty(&res->purge)) {
mlog(0, "putting lockres %.*s from purge list\n",
res->lockname.len, res->lockname.name);
mlog(0, "putting lockres %.*s:%p onto purge list\n",
res->lockname.len, res->lockname.name, res);
res->last_used = jiffies;
dlm_lockres_get(res);
list_add_tail(&res->purge, &dlm->purge_list);
dlm->purge_count++;
/* if this node is not the owner, there is
* no way to keep track of who the owner could be.
* unhash it to avoid serious problems. */
if (res->owner != dlm->node_num) {
mlog(0, "%s:%.*s: doing immediate "
"purge of lockres owned by %u\n",
dlm->name, res->lockname.len,
res->lockname.name, res->owner);
dlm_purge_lockres_now(dlm, res);
}
}
} else if (!list_empty(&res->purge)) {
mlog(0, "removing lockres %.*s from purge list, "
"owner=%u\n", res->lockname.len, res->lockname.name,
res->owner);
mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
res->lockname.len, res->lockname.name, res, res->owner);
list_del_init(&res->purge);
dlm_lockres_put(res);
dlm->purge_count--;
}
}
......@@ -163,68 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
spin_unlock(&dlm->spinlock);
}
/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
* to do migration, but will re-acquire before exit. */
void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
static int dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
int master;
int ret;
spin_lock(&lockres->spinlock);
master = lockres->owner == dlm->node_num;
spin_unlock(&lockres->spinlock);
int ret = 0;
mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
lockres->lockname.name, master);
/* Non master is the easy case -- no migration required, just
* quit. */
spin_lock(&res->spinlock);
if (!__dlm_lockres_unused(res)) {
spin_unlock(&res->spinlock);
mlog(0, "%s:%.*s: tried to purge but not unused\n",
dlm->name, res->lockname.len, res->lockname.name);
return -ENOTEMPTY;
}
master = (res->owner == dlm->node_num);
if (!master)
goto finish;
/* Wheee! Migrate lockres here! */
spin_unlock(&dlm->spinlock);
again:
res->state |= DLM_LOCK_RES_DROPPING_REF;
spin_unlock(&res->spinlock);
ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
if (ret == -ENOTEMPTY) {
mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
lockres->lockname.len, lockres->lockname.name);
mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
res->lockname.name, master);
BUG();
} else if (ret < 0) {
mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
lockres->lockname.len, lockres->lockname.name);
msleep(100);
goto again;
if (!master) {
spin_lock(&res->spinlock);
/* This ensures that clear refmap is sent after the set */
__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
spin_unlock(&res->spinlock);
/* drop spinlock to do messaging, retake below */
spin_unlock(&dlm->spinlock);
/* clear our bit from the master's refmap, ignore errors */
ret = dlm_drop_lockres_ref(dlm, res);
if (ret < 0) {
mlog_errno(ret);
if (!dlm_is_host_down(ret))
BUG();
}
mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
dlm->name, res->lockname.len, res->lockname.name, ret);
spin_lock(&dlm->spinlock);
}
spin_lock(&dlm->spinlock);
finish:
if (!list_empty(&lockres->purge)) {
list_del_init(&lockres->purge);
if (!list_empty(&res->purge)) {
mlog(0, "removing lockres %.*s:%p from purgelist, "
"master = %d\n", res->lockname.len, res->lockname.name,
res, master);
list_del_init(&res->purge);
dlm_lockres_put(res);
dlm->purge_count--;
}
__dlm_unhash_lockres(lockres);
}
/* make an unused lockres go away immediately.
* as soon as the dlm spinlock is dropped, this lockres
* will not be found. kfree still happens on last put. */
static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres)
{
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&lockres->spinlock);
__dlm_unhash_lockres(res);
BUG_ON(!__dlm_lockres_unused(lockres));
if (!list_empty(&lockres->purge)) {
list_del_init(&lockres->purge);
dlm->purge_count--;
/* lockres is not in the hash now. drop the flag and wake up
* any processes waiting in dlm_get_lock_resource. */
if (!master) {
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_DROPPING_REF;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
}
__dlm_unhash_lockres(lockres);
return 0;
}
static void dlm_run_purge_list(struct dlm_ctxt *dlm,
......@@ -268,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
break;
}
mlog(0, "removing lockres %.*s:%p from purgelist\n",
lockres->lockname.len, lockres->lockname.name, lockres);
list_del_init(&lockres->purge);
dlm_lockres_put(lockres);
dlm->purge_count--;
/* This may drop and reacquire the dlm spinlock if it
* has to do migration. */
mlog(0, "calling dlm_purge_lockres!\n");
dlm_purge_lockres(dlm, lockres);
if (dlm_purge_lockres(dlm, lockres))
BUG();
mlog(0, "DONE calling dlm_purge_lockres!\n");
/* Avoid adding any scheduling latencies */
......@@ -467,12 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
assert_spin_locked(&res->spinlock);
/* don't shuffle secondary queues */
if ((res->owner == dlm->node_num) &&
!(res->state & DLM_LOCK_RES_DIRTY)) {
/* ref for dirty_list */
dlm_lockres_get(res);
list_add_tail(&res->dirty, &dlm->dirty_list);
res->state |= DLM_LOCK_RES_DIRTY;
if ((res->owner == dlm->node_num)) {
if (res->state & (DLM_LOCK_RES_MIGRATING |
DLM_LOCK_RES_BLOCK_DIRTY))
return;
if (list_empty(&res->dirty)) {
/* ref for dirty_list */
dlm_lockres_get(res);
list_add_tail(&res->dirty, &dlm->dirty_list);
res->state |= DLM_LOCK_RES_DIRTY;
}
}
}
......@@ -651,7 +648,7 @@ static int dlm_thread(void *data)
dlm_lockres_get(res);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_DIRTY;
/* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
list_del_init(&res->dirty);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
......@@ -675,10 +672,11 @@ static int dlm_thread(void *data)
/* it is now ok to move lockreses in these states
* to the dirty list, assuming that they will only be
* dirty for a short while. */
BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
DLM_LOCK_RES_MIGRATING |
DLM_LOCK_RES_RECOVERING)) {
/* move it to the tail and keep going */
res->state &= ~DLM_LOCK_RES_DIRTY;
spin_unlock(&res->spinlock);
mlog(0, "delaying list shuffling for in-"
"progress lockres %.*s, state=%d\n",
......@@ -699,6 +697,7 @@ static int dlm_thread(void *data)
/* called while holding lockres lock */
dlm_shuffle_lists(dlm, res);
res->state &= ~DLM_LOCK_RES_DIRTY;
spin_unlock(&res->spinlock);
dlm_lockres_calc_usage(dlm, res);
......@@ -709,11 +708,8 @@ static int dlm_thread(void *data)
/* if the lock was in-progress, stick
* it on the back of the list */
if (delay) {
/* ref for dirty_list */
dlm_lockres_get(res);
spin_lock(&res->spinlock);
list_add_tail(&res->dirty, &dlm->dirty_list);
res->state |= DLM_LOCK_RES_DIRTY;
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
}
dlm_lockres_put(res);
......
......@@ -147,6 +147,10 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
goto leave;
}
if (res->state & DLM_LOCK_RES_MIGRATING) {
status = DLM_MIGRATING;
goto leave;
}
/* see above for what the spec says about
* LKM_CANCEL and the lock queue state */
......@@ -244,8 +248,8 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
/* this should always be coupled with list removal */
BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
dlm_get_lock_cookie_node(lock->ml.cookie),
dlm_get_lock_cookie_seq(lock->ml.cookie),
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
atomic_read(&lock->lock_refs.refcount)-1);
dlm_lock_put(lock);
}
......@@ -379,7 +383,8 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
* returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
* return value from dlmunlock_master
*/
int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data)
int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
......@@ -502,8 +507,8 @@ int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data)
if (!found)
mlog(ML_ERROR, "failed to find lock to unlock! "
"cookie=%u:%llu\n",
dlm_get_lock_cookie_node(unlock->cookie),
dlm_get_lock_cookie_seq(unlock->cookie));
dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
else
dlm_lock_put(lock);
......
......@@ -887,7 +887,7 @@ static inline int ocfs2_translate_response(int response)
static int ocfs2_handle_response_message(struct o2net_msg *msg,
u32 len,
void *data)
void *data, void **ret_data)
{
unsigned int response_id, node_num;
int response_status;
......@@ -943,7 +943,7 @@ static int ocfs2_handle_response_message(struct o2net_msg *msg,
static int ocfs2_handle_vote_message(struct o2net_msg *msg,
u32 len,
void *data)
void *data, void **ret_data)
{
int status;
struct ocfs2_super *osb = data;
......@@ -1007,7 +1007,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
osb->net_key,
sizeof(struct ocfs2_response_msg),
ocfs2_handle_response_message,
osb, &osb->osb_net_handlers);
osb, NULL, &osb->osb_net_handlers);
if (status) {
mlog_errno(status);
goto bail;
......@@ -1017,7 +1017,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
osb->net_key,
sizeof(struct ocfs2_vote_msg),
ocfs2_handle_vote_message,
osb, &osb->osb_net_handlers);
osb, NULL, &osb->osb_net_handlers);
if (status) {
mlog_errno(status);
goto bail;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment