Commit 487cc0d4 authored by Vincent Pelletier's avatar Vincent Pelletier

Fix process_shutdown and test it.

  Add missing "global" declarations.
  Simplify: there is no need for both a global variable and a lock do just carry a boolean with atomic access.
  Add the possibility to put activity tool back in a working state after process_shutdown has been called.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24988 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 712bd50b
...@@ -432,12 +432,17 @@ class ActiveWrapper: ...@@ -432,12 +432,17 @@ class ActiveWrapper:
return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self), return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
self.__dict__['__passive_self']) self.__dict__['__passive_self'])
# Set to False when shutting down. Access outside of process_shutdown must
# be done under the protection of is_running_lock lock.
is_running = True
# True when activities cannot be executing any more. # True when activities cannot be executing any more.
has_processed_shutdown = False has_processed_shutdown = False
def cancelProcessShutdown():
"""
This method reverts the effect of calling "process_shutdown" on activity
tool.
"""
global has_processed_shutdown
is_running_lock.release()
has_processed_shutdown = False
class ActivityTool (Folder, UniqueObject): class ActivityTool (Folder, UniqueObject):
""" """
...@@ -785,13 +790,12 @@ class ActivityTool (Folder, UniqueObject): ...@@ -785,13 +790,12 @@ class ActivityTool (Folder, UniqueObject):
Prevent shutdown from happening while an activity queue is Prevent shutdown from happening while an activity queue is
processing a batch. processing a batch.
""" """
is_running = False global has_processed_shutdown
if phase == 3 and not has_processed_shutdown: if phase == 3 and not has_processed_shutdown:
has_processed_shutdown = True has_processed_shutdown = True
LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.") LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
is_running_lock.acquire() is_running_lock.acquire()
LOG('CMFActivity', INFO, "Shutdown: Activities finished.") LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
is_running_lock.release()
def process_timer(self, tick, interval, prev="", next=""): def process_timer(self, tick, interval, prev="", next=""):
""" """
...@@ -907,13 +911,13 @@ class ActivityTool (Folder, UniqueObject): ...@@ -907,13 +911,13 @@ class ActivityTool (Folder, UniqueObject):
while has_awake_activity: while has_awake_activity:
has_awake_activity = 0 has_awake_activity = 0
for activity in activity_list: for activity in activity_list:
is_running_lock.acquire() acquired = is_running_lock.acquire(0)
try: if acquired:
if is_running: try:
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node) has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
finally: finally:
is_running_lock.release() is_running_lock.release()
finally: finally:
# decrease the number of active_threads # decrease the number of active_threads
tic_lock.acquire() tic_lock.acquire()
......
...@@ -44,6 +44,7 @@ import cPickle as pickle ...@@ -44,6 +44,7 @@ import cPickle as pickle
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
import random import random
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
import threading
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -3074,6 +3075,124 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -3074,6 +3075,124 @@ class TestCMFActivity(ERP5TypeTestCase):
delattr(Organisation, 'firstTest') delattr(Organisation, 'firstTest')
delattr(Organisation, 'secondTest') delattr(Organisation, 'secondTest')
def test_115_checkProcessShutdown(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck that no activity is executed after process_shutdown has been called'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
# Thread execution plan for this test:
# main ActivityThread ProcessShutdownThread
# start ActivityThread None None
# wait for rendez_vous_lock (run) None
# wait for rendez_vous_lock release rendez_vous_lock None
# start ProcessShutdownThread wait for activity_lock None
# release activity_lock wait for activity_lock internal wait
# wait for activity_thread (finish) internal wait
# wait for process_shutdown_thread None (finish)
#
# This test only checks that:
# - activity tool can exit between 2 processable activity batches
# - activity tool won't process activities after process_shutdown was called
# - process_shutdown returns before Activity.tic()
# This is not perect though, since it would require to have access to
# the waiting queue of CMFActivity's internal lock (is_running_lock) to
# make sure that it's what is preventing process_shutdown from returning.
portal = self.getPortalObject()
activity_tool = self.getActivityTool()
organisation = portal.organisation_module.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
activity_lock = threading.Lock()
activity_lock.acquire()
rendez_vous_lock = threading.Lock()
rendez_vous_lock.acquire()
def waitingActivity(context):
# Inform test that we arrived at rendez-vous.
rendez_vous_lock.release()
# When this lock is available, it means test has called process_shutdown.
activity_lock.acquire()
activity_lock.release()
from Products.CMFActivity.Activity.Queue import Queue
original_queue_tic = Queue.tic
queue_tic_test_dict = {}
def Queue_tic(self, activity_tool, processing_node):
result = original_queue_tic(self, activity_tool, processing_node)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
# This is a one-shot method, revert after execution
Queue.tic = original_queue_tic
return result
Queue.tic = Queue_tic
Organisation.waitingActivity = waitingActivity
try:
# Use SQLDict with no gorup method so that both activities won't be
# executed in the same batch, letting activity tool a chance to check
# if execution should stop processing activities.
organisation.activate(activity='SQLDict', priority=1).waitingActivity()
organisation.activate(activity='SQLDict', priority=2).getTitle()
get_transaction().commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
get_transaction().commit()
# Start a tic in another thread, so they can meet at rendez-vous.
class ActivityThread(threading.Thread):
def run(self):
# Call changeskin, since skin selection depend on thread id, and we
# are in a new thread.
activity_tool.changeSkin(None)
activity_tool.tic()
activity_thread = ActivityThread()
# Do not try to outlive main thread.
activity_thread.setDaemon(True)
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
# activity won't be executed.
class ProcessShutdownThread(threading.Thread):
def run(self):
activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True)
activity_thread.start()
# Wait at rendez-vous for activity to arrive.
arrived = False
while (not arrived) and activity_thread.isAlive():
arrived = rendez_vous_lock.acquire(1)
if not arrived:
raise Exception, 'Something wrong happened in activity thread.'
# Initiate shutdown
process_shutdown_thread.start()
try:
# Let waiting activity finish and wait for thread exit
activity_lock.release()
activity_thread.join()
process_shutdown_thread.join()
# Check that there is still one activity pending
message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution.
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1)
finally:
# Put activity tool back in a working state
from Products.CMFActivity.ActivityTool import cancelProcessShutdown
try:
cancelProcessShutdown()
except:
# If something failed in process_shutdown, shutdown lock might not
# be taken in CMFActivity, leading to a new esception here hiding
# test error.
pass
finally:
delattr(Organisation, 'waitingActivity')
Queue.tic = original_queue_tic
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment