Commit c36258b5 authored by David Teigland's avatar David Teigland Committed by Steven Whitehouse

[DLM] block dlm_recv in recovery transition

Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
Signed-off-by: default avatarSteven Whitehouse <swhiteho@redhat.com>
parent b434eda6
...@@ -491,6 +491,7 @@ struct dlm_ls { ...@@ -491,6 +491,7 @@ struct dlm_ls {
uint64_t ls_recover_seq; uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args; struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */ struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */ struct list_head ls_requestqueue;/* queue remote requests */
struct mutex ls_requestqueue_mutex; struct mutex ls_requestqueue_mutex;
char *ls_recover_buf; char *ls_recover_buf;
......
...@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
} }
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{ {
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_ls *ls;
int error = 0;
if (!recovery)
dlm_message_in(ms);
ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("drop message %d from %d for unknown lockspace %d",
ms->m_type, nodeid, hd->h_lockspace);
return -EINVAL;
}
/* recovery may have just ended leaving a bunch of backed-up requests
in the requestqueue; wait while dlm_recoverd clears them */
if (!recovery)
dlm_wait_requestqueue(ls);
/* recovery may have just started while there were a bunch of
in-flight requests -- save them in requestqueue to be processed
after recovery. we can't let dlm_recvd block on the recovery
lock. if dlm_recoverd is calling this function to clear the
requestqueue, it needs to be interrupted (-EINTR) if another
recovery operation is starting. */
while (1) {
if (dlm_locking_stopped(ls)) {
if (recovery) {
error = -EINTR;
goto out;
}
error = dlm_add_requestqueue(ls, nodeid, hd);
if (error == -EAGAIN)
continue;
else {
error = -EINTR;
goto out;
}
}
if (dlm_lock_recovery_try(ls))
break;
schedule();
}
switch (ms->m_type) { switch (ms->m_type) {
/* messages sent to a master node */ /* messages sent to a master node */
...@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) ...@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
log_error(ls, "unknown message type %d", ms->m_type); log_error(ls, "unknown message type %d", ms->m_type);
} }
dlm_unlock_recovery(ls);
out:
dlm_put_lockspace(ls);
dlm_astd_wake(); dlm_astd_wake();
return error;
} }
/* If the lockspace is in recovery mode (locking stopped), then normal
messages are saved on the requestqueue for processing after recovery is
done. When not in recovery mode, we wait for dlm_recoverd to drain saved
messages off the requestqueue before we process new ones. This occurs right
after recovery completes when we transition from saving all messages on
requestqueue, to processing all the saved messages, to processing new
messages as they arrive. */
/* static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
* Recovery related int nodeid)
*/ {
if (dlm_locking_stopped(ls)) {
dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
} else {
dlm_wait_requestqueue(ls);
_receive_message(ls, ms);
}
}
/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
{
_receive_message(ls, ms);
}
/* This is called by the midcomms layer when something is received for
the lockspace. It could be either a MSG (normal message sent as part of
standard locking activity) or an RCOM (recovery message sent as part of
lockspace recovery). */
void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_rcom *rc = (struct dlm_rcom *) hd;
struct dlm_ls *ls;
int type = 0;
switch (hd->h_cmd) {
case DLM_MSG:
dlm_message_in(ms);
type = ms->m_type;
break;
case DLM_RCOM:
dlm_rcom_in(rc);
type = rc->rc_type;
break;
default:
log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
return;
}
if (hd->h_nodeid != nodeid) {
log_print("invalid h_nodeid %d from %d lockspace %x",
hd->h_nodeid, nodeid, hd->h_lockspace);
return;
}
ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("invalid h_lockspace %x from %d cmd %d type %d",
hd->h_lockspace, nodeid, hd->h_cmd, type);
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, rc);
return;
}
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */
down_read(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, ms, nodeid);
else
dlm_receive_rcom(ls, rc, nodeid);
up_read(&ls->ls_recv_active);
dlm_put_lockspace(ls);
}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{ {
......
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
void dlm_print_rsb(struct dlm_rsb *r); void dlm_print_rsb(struct dlm_rsb *r);
void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_print_lkb(struct dlm_lkb *lkb);
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery); void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
int dlm_modes_compat(int mode1, int mode2); int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret); unsigned int flags, struct dlm_rsb **r_ret);
......
...@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, ...@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
ls->ls_recover_seq = 0; ls->ls_recover_seq = 0;
ls->ls_recover_args = NULL; ls->ls_recover_args = NULL;
init_rwsem(&ls->ls_in_recovery); init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue); INIT_LIST_HEAD(&ls->ls_requestqueue);
mutex_init(&ls->ls_requestqueue_mutex); mutex_init(&ls->ls_requestqueue_mutex);
mutex_init(&ls->ls_clear_proc_locks); mutex_init(&ls->ls_clear_proc_locks);
......
...@@ -18,10 +18,6 @@ ...@@ -18,10 +18,6 @@
#include "rcom.h" #include "rcom.h"
#include "config.h" #include "config.h"
/*
* Following called by dlm_recoverd thread
*/
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
{ {
struct dlm_member *memb = NULL; struct dlm_member *memb = NULL;
...@@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) ...@@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
return error; return error;
} }
/* /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
* Following called from lockspace.c dlm_ls_start() is called on any of them to start the new recovery. */
*/
int dlm_ls_stop(struct dlm_ls *ls) int dlm_ls_stop(struct dlm_ls *ls)
{ {
int new; int new;
/* /*
* A stop cancels any recovery that's in progress (see RECOVERY_STOP, * Prevent dlm_recv from being in the middle of something when we do
* dlm_recovery_stopped()) and prevents any new locks from being * the stop. This includes ensuring dlm_recv isn't processing a
* processed (see RUNNING, dlm_locking_stopped()). * recovery message (rcom), while dlm_recoverd is aborting and
* resetting things from an in-progress recovery. i.e. we want
* dlm_recoverd to abort its recovery without worrying about dlm_recv
* processing an rcom at the same time. Stopping dlm_recv also makes
* it easy for dlm_receive_message() to check locking stopped and add a
* message to the requestqueue without races.
*/
down_write(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and tell any other threads running in the
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/ */
spin_lock(&ls->ls_recover_lock); spin_lock(&ls->ls_recover_lock);
...@@ -270,9 +278,15 @@ int dlm_ls_stop(struct dlm_ls *ls) ...@@ -270,9 +278,15 @@ int dlm_ls_stop(struct dlm_ls *ls)
ls->ls_recover_seq++; ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock); spin_unlock(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
up_write(&ls->ls_recv_active);
/* /*
* This in_recovery lock does two things: * This in_recovery lock does two things:
*
* 1) Keeps this function from returning until all threads are out * 1) Keeps this function from returning until all threads are out
* of locking routines and locking is truely stopped. * of locking routines and locking is truely stopped.
* 2) Keeps any new requests from being processed until it's unlocked * 2) Keeps any new requests from being processed until it's unlocked
...@@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls) ...@@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)
/* /*
* The recoverd suspend/resume makes sure that dlm_recoverd (if * The recoverd suspend/resume makes sure that dlm_recoverd (if
* running) has noticed the clearing of RUNNING above and quit * running) has noticed RECOVERY_STOP above and quit processing the
* processing the previous recovery. This will be true for all nodes * previous recovery.
* before any nodes start the new recovery.
*/ */
dlm_recoverd_suspend(ls); dlm_recoverd_suspend(ls);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
******************************************************************************* *******************************************************************************
** **
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
** **
** This copyrighted material is made available to anyone wishing to use, ** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions ** modify, copy, or redistribute it subject to the terms and conditions
...@@ -27,7 +27,6 @@ ...@@ -27,7 +27,6 @@
#include "dlm_internal.h" #include "dlm_internal.h"
#include "lowcomms.h" #include "lowcomms.h"
#include "config.h" #include "config.h"
#include "rcom.h"
#include "lock.h" #include "lock.h"
#include "midcomms.h" #include "midcomms.h"
...@@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base, ...@@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
offset &= (limit - 1); offset &= (limit - 1);
len -= msglen; len -= msglen;
switch (msg->h_cmd) { dlm_receive_buffer(msg, nodeid);
case DLM_MSG:
dlm_receive_message(msg, nodeid, 0);
break;
case DLM_RCOM:
dlm_receive_rcom(msg, nodeid);
break;
default:
log_print("unknown msg type %x from %u: %u %u %u %u",
msg->h_cmd, nodeid, msglen, len, offset, ret);
}
} }
if (msg != (struct dlm_header *) __tmp) if (msg != (struct dlm_header *) __tmp)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
******************************************************************************* *******************************************************************************
** **
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2005 Red Hat, Inc. All rights reserved. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
** **
** This copyrighted material is made available to anyone wishing to use, ** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions ** modify, copy, or redistribute it subject to the terms and conditions
...@@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_recover_process_copy(ls, rc_in); dlm_recover_process_copy(ls, rc_in);
} }
static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) /* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
struct rcom_config *rf; struct rcom_config *rf;
...@@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
return rv; return rv;
} }
/* Called by dlm_recvd; corresponds to dlm_receive_message() but special /* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */ recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_header *hd, int nodeid) void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{ {
struct dlm_rcom *rc = (struct dlm_rcom *) hd;
struct dlm_ls *ls;
dlm_rcom_in(rc);
/* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */
ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("lockspace %x from %d type %x not found",
hd->h_lockspace, nodeid, rc->rc_type);
if (rc->rc_type == DLM_RCOM_STATUS)
send_ls_not_ready(nodeid, rc);
return;
}
if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
log_debug(ls, "ignoring recovery message %x from %d", log_debug(ls, "ignoring recovery message %x from %d",
rc->rc_type, nodeid); rc->rc_type, nodeid);
...@@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid) ...@@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
if (is_old_reply(ls, rc)) if (is_old_reply(ls, rc))
goto out; goto out;
if (nodeid != rc->rc_header.h_nodeid) {
log_error(ls, "bad rcom nodeid %d from %d",
rc->rc_header.h_nodeid, nodeid);
goto out;
}
switch (rc->rc_type) { switch (rc->rc_type) {
case DLM_RCOM_STATUS: case DLM_RCOM_STATUS:
receive_rcom_status(ls, rc); receive_rcom_status(ls, rc);
...@@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid) ...@@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type);); DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
} }
out: out:
dlm_put_lockspace(ls); return;
} }
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
******************************************************************************* *******************************************************************************
** **
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2005 Red Hat, Inc. All rights reserved. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
** **
** This copyrighted material is made available to anyone wishing to use, ** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions ** modify, copy, or redistribute it subject to the terms and conditions
...@@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid); ...@@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_header *hd, int nodeid); void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
#endif #endif
...@@ -24,19 +24,28 @@ ...@@ -24,19 +24,28 @@
/* If the start for which we're re-enabling locking (seq) has been superseded /* If the start for which we're re-enabling locking (seq) has been superseded
by a newer stop (ls_recover_seq), we need to leave locking disabled. */ by a newer stop (ls_recover_seq), we need to leave locking disabled.
We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
enables locking and clears the requestqueue between a and b. */
static int enable_locking(struct dlm_ls *ls, uint64_t seq) static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{ {
int error = -EINTR; int error = -EINTR;
down_write(&ls->ls_recv_active);
spin_lock(&ls->ls_recover_lock); spin_lock(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) { if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags); set_bit(LSFL_RUNNING, &ls->ls_flags);
/* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery); up_write(&ls->ls_in_recovery);
error = 0; error = 0;
} }
spin_unlock(&ls->ls_recover_lock); spin_unlock(&ls->ls_recover_lock);
up_write(&ls->ls_recv_active);
return error; return error;
} }
......
/****************************************************************************** /******************************************************************************
******************************************************************************* *******************************************************************************
** **
** Copyright (C) 2005 Red Hat, Inc. All rights reserved. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
** **
** This copyrighted material is made available to anyone wishing to use, ** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions ** modify, copy, or redistribute it subject to the terms and conditions
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
struct rq_entry { struct rq_entry {
struct list_head list; struct list_head list;
int nodeid; int nodeid;
char request[1]; char request[0];
}; };
/* /*
...@@ -30,42 +30,39 @@ struct rq_entry { ...@@ -30,42 +30,39 @@ struct rq_entry {
* lockspace is enabled on some while still suspended on others. * lockspace is enabled on some while still suspended on others.
*/ */
int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
{ {
struct rq_entry *e; struct rq_entry *e;
int length = hd->h_length; int length = hd->h_length;
int rv = 0;
e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
if (!e) { if (!e) {
log_print("dlm_add_requestqueue: out of memory\n"); log_print("dlm_add_requestqueue: out of memory len %d", length);
return 0; return;
} }
e->nodeid = nodeid; e->nodeid = nodeid;
memcpy(e->request, hd, length); memcpy(e->request, hd, length);
/* We need to check dlm_locking_stopped() after taking the mutex to
avoid a race where dlm_recoverd enables locking and runs
process_requestqueue between our earlier dlm_locking_stopped check
and this addition to the requestqueue. */
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
if (dlm_locking_stopped(ls)) list_add_tail(&e->list, &ls->ls_requestqueue);
list_add_tail(&e->list, &ls->ls_requestqueue);
else {
log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
kfree(e);
rv = -EAGAIN;
}
mutex_unlock(&ls->ls_requestqueue_mutex); mutex_unlock(&ls->ls_requestqueue_mutex);
return rv;
} }
/*
* Called by dlm_recoverd to process normal messages saved while recovery was
* happening. Normal locking has been enabled before this is called. dlm_recv
* upon receiving a message, will wait for all saved messages to be drained
* here before processing the message it got. If a new dlm_ls_stop() arrives
* while we're processing these saved messages, it may block trying to suspend
* dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
* case, we don't abort since locking_stopped is still 0. If dlm_recv is not
* waiting for us, then this processing may be aborted due to locking_stopped.
*/
int dlm_process_requestqueue(struct dlm_ls *ls) int dlm_process_requestqueue(struct dlm_ls *ls)
{ {
struct rq_entry *e; struct rq_entry *e;
struct dlm_header *hd;
int error = 0; int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
...@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls) ...@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex); mutex_unlock(&ls->ls_requestqueue_mutex);
hd = (struct dlm_header *) e->request; dlm_receive_message_saved(ls, (struct dlm_message *)e->request);
error = dlm_receive_message(hd, e->nodeid, 1);
if (error == -EINTR) {
/* entry is left on requestqueue */
log_debug(ls, "process_requestqueue abort eintr");
break;
}
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list); list_del(&e->list);
...@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls) ...@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
/* /*
* After recovery is done, locking is resumed and dlm_recoverd takes all the * After recovery is done, locking is resumed and dlm_recoverd takes all the
* saved requests and processes them as they would have been by dlm_recvd. At * saved requests and processes them as they would have been by dlm_recv. At
* the same time, dlm_recvd will start receiving new requests from remote * the same time, dlm_recv will start receiving new requests from remote nodes.
* nodes. We want to delay dlm_recvd processing new requests until * We want to delay dlm_recv processing new requests until dlm_recoverd has
* dlm_recoverd has finished processing the old saved requests. * finished processing the old saved requests. We don't check for locking
* stopped here because dlm_ls_stop won't stop locking until it's suspended us
* (dlm_recv).
*/ */
void dlm_wait_requestqueue(struct dlm_ls *ls) void dlm_wait_requestqueue(struct dlm_ls *ls)
...@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls) ...@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls)
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
if (list_empty(&ls->ls_requestqueue)) if (list_empty(&ls->ls_requestqueue))
break; break;
if (dlm_locking_stopped(ls))
break;
mutex_unlock(&ls->ls_requestqueue_mutex); mutex_unlock(&ls->ls_requestqueue_mutex);
schedule(); schedule();
} }
......
/****************************************************************************** /******************************************************************************
******************************************************************************* *******************************************************************************
** **
** Copyright (C) 2005 Red Hat, Inc. All rights reserved. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
** **
** This copyrighted material is made available to anyone wishing to use, ** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions ** modify, copy, or redistribute it subject to the terms and conditions
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#ifndef __REQUESTQUEUE_DOT_H__ #ifndef __REQUESTQUEUE_DOT_H__
#define __REQUESTQUEUE_DOT_H__ #define __REQUESTQUEUE_DOT_H__
int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
int dlm_process_requestqueue(struct dlm_ls *ls); int dlm_process_requestqueue(struct dlm_ls *ls);
void dlm_wait_requestqueue(struct dlm_ls *ls); void dlm_wait_requestqueue(struct dlm_ls *ls);
void dlm_purge_requestqueue(struct dlm_ls *ls); void dlm_purge_requestqueue(struct dlm_ls *ls);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment