Commit 5cb051d5 authored by Vincent Pelletier's avatar Vincent Pelletier

Offer a method to trigger a clean shutdown.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@22388 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 09113090
...@@ -73,6 +73,7 @@ max_active_threads = 1 # 2 will cause more bug to appear (he he) ...@@ -73,6 +73,7 @@ max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = False is_initialized = False
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
is_running_lock = threading.Lock()
first_run = True first_run = True
currentNode = None currentNode = None
ROLE_IDLE = 0 ROLE_IDLE = 0
...@@ -375,6 +376,11 @@ class ActivityTool (Folder, UniqueObject): ...@@ -375,6 +376,11 @@ class ActivityTool (Folder, UniqueObject):
distributingNode = '' distributingNode = ''
_nodes = () _nodes = ()
# Set to False when shutting down. Access outside of process_shutdown must
# be done under the protection of is_running_lock lock.
_is_running = True
# True when activities cannot be executing any more.
_has_processed_shutdown = False
def __init__(self): def __init__(self):
return Folder.__init__(self, ActivityTool.id) return Folder.__init__(self, ActivityTool.id)
...@@ -592,6 +598,19 @@ class ActivityTool (Folder, UniqueObject): ...@@ -592,6 +598,19 @@ class ActivityTool (Folder, UniqueObject):
'/manageLoadBalancing?manage_tabs_message=' + '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote(message)) urllib.quote(message))
def process_shutdown(self, phase, time_in_phase):
"""
Prevent shutdown from happening while an activity queue is
processing a batch.
"""
self._is_running = False
if phase == 3 and not self._has_processed_shutdown:
self._has_processed_shutdown = True
LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
is_running_lock.acquire()
LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
is_running_lock.release()
def process_timer(self, tick, interval, prev="", next=""): def process_timer(self, tick, interval, prev="", next=""):
""" """
Call distribute() if we are the Distributing Node and call tic() Call distribute() if we are the Distributing Node and call tic()
...@@ -706,8 +725,13 @@ class ActivityTool (Folder, UniqueObject): ...@@ -706,8 +725,13 @@ class ActivityTool (Folder, UniqueObject):
while has_awake_activity: while has_awake_activity:
has_awake_activity = 0 has_awake_activity = 0
for activity in activity_list: for activity in activity_list:
is_running_lock.acquire()
try:
if self._is_running:
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node) has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
finally:
is_running_lock.release()
finally: finally:
# decrease the number of active_threads # decrease the number of active_threads
tic_lock.acquire() tic_lock.acquire()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment