Commit 0349a2b6 authored by Jérome Perrin's avatar Jérome Perrin

TimerService/CMFActivity: remove "process_shutdown"

This was something supported only in ClockServer. Zope's ZServer had an
API to control the shutdown sequence, by allowing servers to implement a
clean_shutdown_control method that was be called at every phase of the
shutdown:

https://github.com/zopefoundation/ZServer/blob/0259e288/src/Lifetime/__init__.py#L20-L38

ClockServer was implementing clean_shutdown_control method:
https://lab.nexedi.com/nexedi/erp5/blob/07a136ec3/product/ClockServer/ClockServer.py#L57

and as described in the README ( https://lab.nexedi.com/nexedi/erp5/blob/07a136ec3/product/ClockServer/README#L32 ),
it was expected to be configured to call TimerService's process_shutdown.

It seems we were using a mix of ClockServer + TimerService at the time,
but this was maybe never used.

In 49400d40 (we don't use ClockServer. we use TimerService instead.,
2013-10-07) we removed ClockServer. Since this day this code is not used.

In WSGI mode, there's no "clean_shutdown_control" API, so this
parent 00fe60e7
...@@ -597,18 +597,6 @@ class ActiveWrapper(object): ...@@ -597,18 +597,6 @@ class ActiveWrapper(object):
return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self), return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
self.__passive_url) self.__passive_url)
# True when activities cannot be executing any more.
has_processed_shutdown = False
def cancelProcessShutdown():
"""
This method reverts the effect of calling "process_shutdown" on activity
tool.
"""
global has_processed_shutdown
is_running_lock.release()
has_processed_shutdown = False
# Due to a circular import dependency between this module and # Due to a circular import dependency between this module and
# Products.ERP5Type.Core.Folder, both modules must import after the definitions # Products.ERP5Type.Core.Folder, both modules must import after the definitions
# of getCurrentNode and Folder (the later is a base class of BaseTool). # of getCurrentNode and Folder (the later is a base class of BaseTool).
...@@ -1252,19 +1240,6 @@ class ActivityTool (BaseTool): ...@@ -1252,19 +1240,6 @@ class ActivityTool (BaseTool):
'/manageLoadBalancing?manage_tabs_message=' + '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote(message)) urllib.quote(message))
security.declarePrivate('process_shutdown')
def process_shutdown(self, phase, time_in_phase):
"""
Prevent shutdown from happening while an activity queue is
processing a batch.
"""
global has_processed_shutdown
if phase == 3 and not has_processed_shutdown:
has_processed_shutdown = True
LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
is_running_lock.acquire()
LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
security.declareProtected(CMFCorePermissions.ManagePortal, 'process_timer') security.declareProtected(CMFCorePermissions.ManagePortal, 'process_timer')
def process_timer(self, tick, interval, prev="", next=""): def process_timer(self, tick, interval, prev="", next=""):
""" """
......
...@@ -1640,112 +1640,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1640,112 +1640,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del Organisation.firstTest del Organisation.firstTest
del Organisation.secondTest del Organisation.secondTest
def test_115_checkProcessShutdown(self):
# Thread execution plan for this test:
# main ActivityThread ProcessShutdownThread
# start ActivityThread None None
# wait for rendez_vous_lock (run) None
# wait for rendez_vous_lock release rendez_vous_lock None
# start ProcessShutdownThread wait for activity_lock None
# release activity_lock wait for activity_lock internal wait
# wait for activity_thread (finish) internal wait
# wait for process_shutdown_thread None (finish)
#
# This test only checks that:
# - activity tool can exit between 2 processable activity batches
# - activity tool won't process activities after process_shutdown was called
# - process_shutdown returns before Activity.tic()
# This is not perfect though, since it would require to have access to
# the waiting queue of CMFActivity's internal lock (is_running_lock) to
# make sure that it's what is preventing process_shutdown from returning.
activity_tool = self.getActivityTool()
organisation = self.portal.organisation_module.newContent(
portal_type='Organisation')
self.tic()
activity_event = threading.Event()
rendez_vous_event = threading.Event()
def waitingActivity(context):
# Inform test that we arrived at rendez-vous.
rendez_vous_event.set()
# When this event is available, it means test has called process_shutdown.
assert activity_event.wait(10)
original_dequeue = SQLDict.dequeueMessage
queue_tic_test_dict = {}
def dequeueMessage(self, activity_tool, processing_node, node_family_id_set):
# This is a one-shot method, revert after execution
SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
return result
SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity
try:
# Use SQLDict with no group method so that both activities won't be
# executed in the same batch, letting activity tool a chance to check
# if execution should stop processing activities.
organisation.activate(activity='SQLDict', tag='foo').waitingActivity()
organisation.activate(activity='SQLDict', after_tag='foo').getTitle()
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
self.commit()
# Start a tic in another thread, so they can meet at rendez-vous.
class ActivityThread(threading.Thread):
def run(self):
# Call changeskin, since skin selection depend on thread id, and we
# are in a new thread.
activity_tool.changeSkin(None)
activity_tool.tic()
activity_thread = ActivityThread()
# Do not try to outlive main thread.
activity_thread.setDaemon(True)
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
# activity won't be executed.
class ProcessShutdownThread(threading.Thread):
def run(self):
activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True)
activity_thread.start()
# Wait at rendez-vous for activity to arrive.
assert rendez_vous_event.wait(10)
# Initiate shutdown
process_shutdown_thread.start()
try:
# Let waiting activity finish and wait for thread exit
activity_event.set()
activity_thread.join(10)
assert not activity_thread.is_alive()
process_shutdown_thread.join(10)
assert not process_shutdown_thread.is_alive()
# Check that there is still one activity pending
message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution.
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1)
finally:
# Put activity tool back in a working state
try:
cancelProcessShutdown()
except StandardException:
# If something failed in process_shutdown, shutdown lock might not
# be taken in CMFActivity, leading to a new esception here hiding
# test error.
pass
finally:
del Organisation.waitingActivity
SQLDict.dequeueMessage = original_dequeue
self.tic()
def test_hasActivity(self): def test_hasActivity(self):
active_object = self.portal.organisation_module.newContent( active_object = self.portal.organisation_module.newContent(
portal_type='Organisation') portal_type='Organisation')
......
...@@ -50,27 +50,6 @@ class TimerService(SimpleItem): ...@@ -50,27 +50,6 @@ class TimerService(SimpleItem):
self._subscribers = [] self._subscribers = []
self._version = 1 self._version = 1
security.declarePublic('process_shutdown')
def process_shutdown(self, phase, time_in_phase):
""" """
subscriptions = []
for path in self._subscribers:
try:
subscriptions.append(self.unrestrictedTraverse(path))
except KeyError:
pass
for subscriber in subscriptions:
process_shutdown = getattr(subscriber, 'process_shutdown', None)
if process_shutdown is not None:
try:
subscriber.process_shutdown(phase=phase,
time_in_phase=time_in_phase)
except:
LOG('TimerService', ERROR, 'Process shutdown error',
error = sys.exc_info())
raise
security.declarePublic('process_timer') security.declarePublic('process_timer')
def process_timer(self, interval): def process_timer(self, interval):
""" """ """ """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment