From 631dff69f06054fbc8d0ff4f7f9f9f47cf3f7d09 Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Mon, 17 May 2010 10:10:41 +0000
Subject: [PATCH] Add support for ZEO-based unit tests with parallel execution
 of activities

The most simple way to use this feature is to use --activity_node option only,
but it is also possible to:
- run only a ZEO server (--activity_node=0)
- run only ZEO clients
- run only activity nodes, by specifying no test
- specify HOST:PORT to listen/connect for ZEO storage and ZServer

Load/save of catalog is done by the process running the ZEO server.
Load of static files is done by all processes. Save of static files is done by
the process running unit test.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@35374 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/ActivityTool.py           |   2 +-
 product/ERP5Type/ZopePatch.py                 |   1 +
 product/ERP5Type/patches/DemoStorage.py       | 141 +++++++++
 product/ERP5Type/tests/ERP5TypeTestCase.py    | 128 +++-----
 .../ERP5Type/tests/ProcessingNodeTestCase.py  | 173 +++++++++++
 product/ERP5Type/tests/backportUnittest.py    |  13 +-
 product/ERP5Type/tests/custom_zodb.py         | 102 ++++++-
 product/ERP5Type/tests/runUnitTest.py         | 280 ++++++++++--------
 product/ERP5Type/tests/utils.py               |  58 +++-
 9 files changed, 664 insertions(+), 234 deletions(-)
 create mode 100644 product/ERP5Type/patches/DemoStorage.py
 create mode 100644 product/ERP5Type/tests/ProcessingNodeTestCase.py

diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py
index d17dd0f1063..1fe991e082b 100644
--- a/product/CMFActivity/ActivityTool.py
+++ b/product/CMFActivity/ActivityTool.py
@@ -540,7 +540,7 @@ class ActivityTool (Folder, UniqueObject):
     # Filter content (ZMI))
     def filtered_meta_types(self, user=None):
         # Filters the list of available meta types.
-        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
+        all = Folder.filtered_meta_types(self)
         meta_types = []
         for meta_type in self.all_meta_types():
             if meta_type['name'] in self.allowed_types:
diff --git a/product/ERP5Type/ZopePatch.py b/product/ERP5Type/ZopePatch.py
index 93cda8bd02f..6eadc75e26b 100644
--- a/product/ERP5Type/ZopePatch.py
+++ b/product/ERP5Type/ZopePatch.py
@@ -59,6 +59,7 @@ from Products.ERP5Type.patches import StateChangeInfoPatch
 from Products.ERP5Type.patches import transforms
 from Products.ERP5Type.patches import OFSPdata
 from Products.ERP5Type.patches import make_hidden_input
+from Products.ERP5Type.patches import DemoStorage
 # BACK: Forward Compatibility with Zope 2.12 or CMF 2.2. Remove when we've
 # dropped support for older versions.
 from Products.ERP5Type.patches import TransactionAddBeforeCommitHook
diff --git a/product/ERP5Type/patches/DemoStorage.py b/product/ERP5Type/patches/DemoStorage.py
new file mode 100644
index 00000000000..bcc4ee21dbd
--- /dev/null
+++ b/product/ERP5Type/patches/DemoStorage.py
@@ -0,0 +1,141 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+from ZODB.DemoStorage import DemoStorage
+
+try:
+    loadEx = DemoStorage.loadEx
+except AttributeError:
+    pass # XXX Zope 2.12 ?
+else:
+    ##
+    # Fix bug in DemoStorage.loadEx (it uses 'load' instead of 'loadEx')
+    #
+    DemoStorage.loadEx = lambda *args: (loadEx(*args) + ('',))[:3]
+
+    ##
+    # Implemenent conflict resolution for DemoStorage
+    #
+    from ZODB import POSException
+    from ZODB.ConflictResolution import tryToResolveConflict, ResolvedSerial
+
+    # copied from ZODB/DemoStorage.py and patched
+    def store(self, oid, serial, data, version, transaction):
+        if transaction is not self._transaction:
+            raise POSException.StorageTransactionError(self, transaction)
+
+        self._lock_acquire()
+        try:
+            old = self._index.get(oid, None)
+            if old is None:
+                # Hm, nothing here, check the base version:
+                if self._base:
+                    try:
+                        p, tid = self._base.load(oid, '')
+                    except KeyError:
+                        pass
+                    else:
+                        old = oid, None, None, p, tid
+
+            nv=None
+            if old:
+                oid, pre, vdata, p, tid = old
+
+                if vdata:
+                    if vdata[0] != version:
+                        raise POSException.VersionLockError, oid
+
+                    nv=vdata[1]
+                else:
+                    nv=old
+
+                if serial != tid:
+                  # <patch>
+                  rdata = tryToResolveConflict(self, oid, tid, serial, data)
+                  if rdata is None:
+                    raise POSException.ConflictError(
+                        oid=oid, serials=(tid, serial), data=data)
+                  data = rdata
+                  # </patch>
+
+            r = [oid, old, version and (version, nv) or None, data, self._tid]
+            self._tindex.append(r)
+
+            s=self._tsize
+            s=s+72+(data and (16+len(data)) or 4)
+            if version: s=s+32+len(version)
+
+            if self._quota is not None and s > self._quota:
+                raise POSException.StorageError, (
+                    '''<b>Quota Exceeded</b><br>
+                    The maximum quota for this demonstration storage
+                    has been exceeded.<br>Have a nice day.''')
+
+        finally: self._lock_release()
+        # <patch>
+        if old and serial != tid:
+            return ResolvedSerial
+        # </patch>
+        return self._tid
+
+    DemoStorage.store = store
+
+    def loadSerial(self, oid, serial):
+        # XXX should I use self._lock_acquire and self._lock_release ?
+        pre = self._index.get(oid)
+        while pre:
+            oid, pre, vdata, p, tid = pre
+            if tid == serial:
+                return p
+        return self._base.loadSerial(oid, serial)
+
+    DemoStorage.loadSerial = loadSerial
+
+    def loadBefore(self, oid, tid):
+        # XXX should I use self._lock_acquire and self._lock_release ?
+        end_time = None
+        pre = self._index.get(oid)
+        while pre:
+            oid, pre, vdata, p, start_time = pre
+            if start_time < tid:
+                return p, start_time, end_time
+            end_time = start_time
+        base = self._base.loadBefore(oid, tid)
+        if base:
+            p, start_time, base_end_time = base
+            return p, start_time, base_end_time or end_time
+
+    DemoStorage.loadBefore = loadBefore
+
+    def history(self, oid, version=None, length=1, filter=None):
+        assert not version
+        self._lock_acquire()
+        try:
+            r = []
+            pre = self._index.get(oid)
+            while length and pre:
+                oid, pre, vdata, p, tid = pre
+                assert vdata is None
+                d = {'tid': tid, 'size': len(p), 'version': ''}
+                if filter is None or filter(d):
+                    r.append(d)
+                    length -= 1
+            if length:
+                r += self._base.history(oid, version, length, filter)
+            return r
+        finally:
+            self._lock_release()
+
+    DemoStorage.history = history
diff --git a/product/ERP5Type/tests/ERP5TypeTestCase.py b/product/ERP5Type/tests/ERP5TypeTestCase.py
index 98f07de82ab..8e62163f189 100644
--- a/product/ERP5Type/tests/ERP5TypeTestCase.py
+++ b/product/ERP5Type/tests/ERP5TypeTestCase.py
@@ -61,14 +61,15 @@ except ImportError:
 
 import transaction
 from Testing import ZopeTestCase
-from Testing.ZopeTestCase.PortalTestCase import PortalTestCase, user_name
+from Testing.ZopeTestCase import PortalTestCase, user_name
 from Products.CMFCore.utils import getToolByName
 from Products.DCWorkflow.DCWorkflow import ValidationFailed
 from Products.ERP5Type.Base import _aq_reset
 from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
 from zLOG import LOG, DEBUG
 
-import backportUnittest
+from Products.ERP5Type.tests.backportUnittest import SetupSiteError
+from Products.ERP5Type.tests.utils import DummyMailHost, parseListeningAddress
 
 # Quiet messages when installing products
 install_product_quiet = 1
@@ -153,6 +154,9 @@ try:
 except ImportError:
   pass
 
+from Products.ERP5Type.tests.ProcessingNodeTestCase import \
+  ProcessingNodeTestCase
+
 ZopeTestCase.installProduct('TimerService', quiet=install_product_quiet)
 
 # CMF
@@ -268,7 +272,7 @@ def profile_if_environ(environment_var_name):
       # No profiling, return identity decorator
       return lambda self, method: method
 
-class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
+class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase):
     """TestCase for ERP5 based tests.
 
     This TestCase setups an ERP5Site and installs business templates.
@@ -566,7 +570,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
     def _setUpDummyMailHost(self):
       """Replace Original Mail Host by Dummy Mail Host.
       """
-      from Products.ERP5Type.tests.utils import DummyMailHost
       if 'MailHost' in self.portal.objectIds():
         self.portal.manage_delObjects(['MailHost'])
         self.portal._setObject('MailHost', DummyMailHost('MailHost'))
@@ -713,50 +716,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
         if rule.getValidationState() != 'validated':
           rule.validate()
 
-    def tic(self, verbose=0):
-      """
-      Start all messages
-      """
-      portal_activities = getattr(self.getPortal(),'portal_activities',None)
-      if portal_activities is not None:
-        if verbose:
-          ZopeTestCase._print('Executing pending activities ...')
-          old_message_count = 0
-          start = time.time()
-        count = 1000
-        message_count = len(portal_activities.getMessageList())
-        while message_count:
-          if verbose and old_message_count != message_count:
-            ZopeTestCase._print(' %i' % message_count)
-            old_message_count = message_count
-          portal_activities.process_timer(None, None)
-          message_count = len(portal_activities.getMessageList())
-          # This prevents an infinite loop.
-          count -= 1
-          if count == 0:
-            # Get the last error message from error_log.
-            error_message = ''
-            error_log = self.getPortal().error_log._getLog()
-            if len(error_log):
-              last_log = error_log[-1]
-              error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
-                last_log['type'],
-                last_log['value'],
-                last_log['tb_text'],
-                )
-            raise RuntimeError,\
-              'tic is looping forever. These messages are pending: %r %s' % (
-            [('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
-            for m in portal_activities.getMessageList()],
-            error_message
-            )
-          # This give some time between messages
-          if count % 10 == 0:
-            from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
-            portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
-        if verbose:
-          ZopeTestCase._print(' done (%.3fs)\n' % (time.time() - start))
-
     def createSimpleUser(self, title, reference, function):
       """
         Helper function to create a Simple ERP5 User.
@@ -832,37 +791,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
       self.assertEqual(method(), reference_workflow_state)
       return workflow_error_message
 
-    def startZServer(self):
-      """Starts an HTTP ZServer thread."""
-      from Testing.ZopeTestCase import threadutils, utils
-      if utils._Z2HOST is None:
-        randint = random.Random(hash(os.environ['INSTANCE_HOME'])).randint
-        def zserverRunner():
-          try:
-            threadutils.zserverRunner(utils._Z2HOST, utils._Z2PORT)
-          except socket.error, e:
-            if e.args[0] != errno.EADDRINUSE:
-              raise
-            utils._Z2HOST = None
-        from ZServer import setNumberOfThreads
-        setNumberOfThreads(1)
-        port_list = []
-        for i in range(3):
-          utils._Z2HOST = '127.0.0.1'
-          utils._Z2PORT = randint(55000, 55500)
-          t = threadutils.QuietThread(target=zserverRunner)
-          t.setDaemon(1)
-          t.start()
-          time.sleep(0.1)
-          if utils._Z2HOST:
-            ZopeTestCase._print("Running ZServer on port %i\n" % utils._Z2PORT)
-            break
-          port_list.append(str(utils._Z2PORT))
-        else:
-          ZopeTestCase._print("Can't find free port to start ZServer"
-                              " (tried ports %s)\n" % ', '.join(port_list))
-      return utils._Z2HOST, utils._Z2PORT
-
     def _installBusinessTemplateList(self, business_template_list,
                                      light_install=True,
                                      quiet=True):
@@ -921,7 +849,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
       title = self.getTitle()
       from Products.ERP5Type.Base import _aq_reset
       if portal_name in failed_portal_installation:
-        raise backportUnittest.SetupSiteError(
+        raise SetupSiteError(
             'Installation of %s already failed, giving up' % portal_name)
       try:
         if app is None:
@@ -930,6 +858,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
         # make it's REQUEST available during setup
         global current_app
         current_app = app
+        app.test_portal_name = portal_name
 
         global setup_done
         if not (hasattr(aq_base(app), portal_name) and
@@ -992,6 +921,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
               except ImportError:
                 pass
               self.serverhost, self.serverport = self.startZServer()
+              self._registerNode(distributing=1, processing=1)
 
             self._updateConversionServerConfiguration()
             self._updateConnectionStrings()
@@ -1073,11 +1003,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
         if count:
           LOG('Products.ERP5Type.tests.ERP5TypeTestCase.beforeClose', DEBUG,
               'dropped %d left-over activity messages' % (count,))
-        # portal_activities.process_timer automatically registers current node
-        # (localhost:<random_port>). We must unregister it so that Data.fs can
-        # be reused without reconfiguring portal_activities.
-        del portal_activities.distributingNode
-        del portal_activities._nodes
         transaction.commit()
       except AttributeError:
         pass
@@ -1248,11 +1173,6 @@ class ERP5ReportTestCase(ERP5TypeTestCase):
     if diff_list:
       self.fail('Lines differs:\n' + '\n'.join(diff_list))
 
-from unittest import _makeLoader, TestSuite
-
-def dummy_makeSuite(testCaseClass, prefix='dummy_test', sortUsing=cmp, suiteClass=TestSuite):
-  return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase(testCaseClass)
-
 def dummy_setUp(self):
   '''
   This one is overloaded so that it dos not execute beforeSetUp and afterSetUp
@@ -1276,6 +1196,34 @@ def dummy_tearDown(self):
   '''
   self._clear(1)
 
+class ZEOServerTestCase(ERP5TypeTestCase):
+  """TestCase class to run a ZEO storage
+
+  Main method is 'asyncore_loop' (inherited) since there is nothing to do
+  except processing I/O.
+  """
+
+  def setUp(self):
+    # Start ZEO storage and send address to parent process if any.
+    from Zope2.custom_zodb import zeo_client, Storage
+    from ZEO.StorageServer import StorageServer
+    storage = {'1': Storage}
+    for host_port in parseListeningAddress(os.environ.get('zeo_server')):
+      try:
+        self.zeo_server = StorageServer(host_port, storage)
+        break
+      except socket.error, e:
+        if e[0] != errno.EADDRINUSE:
+          raise
+    if zeo_client:
+      os.write(zeo_client, repr(host_port))
+      os.close(zeo_client)
+    ZopeTestCase._print("\nZEO Storage started at %s:%s ... " % host_port)
+
+  def tearDown(self):
+    self.zeo_server.close_server()
+
+
 @onsetup
 def optimize():
   '''Significantly reduces portal creation time.'''
diff --git a/product/ERP5Type/tests/ProcessingNodeTestCase.py b/product/ERP5Type/tests/ProcessingNodeTestCase.py
new file mode 100644
index 00000000000..88bccbdd2d2
--- /dev/null
+++ b/product/ERP5Type/tests/ProcessingNodeTestCase.py
@@ -0,0 +1,173 @@
+# This module must be imported before CMFActivity product is installed.
+
+import base64, errno, select, socket, time
+from threading import Thread
+import Lifetime
+import transaction
+from BTrees.OIBTree import OIBTree
+from Testing import ZopeTestCase
+from Products.CMFActivity import ActivityTool as _ActivityTool
+from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
+from Products.ERP5Type.tests import backportUnittest
+from Products.ERP5Type.tests.utils import createZServer
+
+
+class ActivityTool(_ActivityTool.ActivityTool):
+  """Class redefining CMFActivity.ActivityTool.ActivityTool for unit tests
+  """
+
+  # When a ZServer can't be started, the node name ends with ':' (no port).
+  def _isValidNodeName(self, node_name):
+    return True
+
+  # Divert location to register processing and distributing nodes.
+  # Load balancing is configured at the root instead of the activity tool,
+  # so that additional can register even if there is no portal set up yet.
+  # Properties at the root are:
+  # - 'test_processing_nodes' to list processing nodes
+  # - 'test_distributing_node' to select the distributing node
+  def getNodeDict(self):
+    app = self.getPhysicalRoot()
+    if getattr(app, 'test_processing_nodes', None) is None:
+      app.test_processing_nodes = OIBTree()
+    return app.test_processing_nodes
+
+  def getDistributingNode(self):
+    return self.getPhysicalRoot().test_distributing_node
+
+  def manage_setDistributingNode(self, distributingNode, REQUEST=None):
+    # A property to catch setattr on 'distributingNode' doesn't work
+    # because self would lose all acquisition wrappers.
+    previous_node = self.distributingNode
+    try:
+      super(ActivityTool, self).manage_setDistributingNode(distributingNode,
+                                                           REQUEST=REQUEST)
+      self.getPhysicalRoot().test_distributing_node = self.distributingNode
+    finally:
+      self.distributingNode = previous_node
+
+  # When there is more than 1 node, prevent the distributing node from
+  # processing activities.
+  def tic(self, processing_node=1, force=0):
+    processing_node_list = self.getProcessingNodeList()
+    if len(processing_node_list) > 1 and \
+       self.getCurrentNode() == self.getDistributingNode():
+      # Sleep between each distribute.
+      time.sleep(0.3)
+      transaction.commit()
+    else:
+      super(ActivityTool, self).tic(processing_node, force)
+
+_ActivityTool.ActivityTool = ActivityTool
+
+
+class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
+  """Minimal ERP5 TestCase class to process activities
+
+  When a processing node starts, the portal may not exist yet, or its name is
+  unknown, so an additional 'test_portal_name' property at the root is set by
+  the node running the unit tests to tell other nodes on which portal activities
+  should be processed.
+  """
+
+  @staticmethod
+  def asyncore_loop():
+    try:
+      Lifetime.lifetime_loop()
+    except KeyboardInterrupt:
+      pass
+    Lifetime.graceful_shutdown_loop()
+
+  def startZServer(self):
+    """Start HTTP ZServer in background"""
+    utils = ZopeTestCase.utils
+    if utils._Z2HOST is None:
+      try:
+        hs = createZServer()
+      except RuntimeError, e:
+        ZopeTestCase._print(str(e))
+      else:
+        utils._Z2HOST, utils._Z2PORT = hs.server_name, hs.server_port
+        t = Thread(target=Lifetime.loop)
+        t.setDaemon(1)
+        t.start()
+    return utils._Z2HOST, utils._Z2PORT
+
+  def _registerNode(self, distributing, processing):
+    """Register node to process and/or distribute activities"""
+    try:
+      activity_tool = self.portal.portal_activities
+    except AttributeError:
+      activity_tool = ActivityTool().__of__(self.app)
+    currentNode = activity_tool.getCurrentNode()
+    if distributing:
+      activity_tool.manage_setDistributingNode(currentNode)
+    if processing:
+      activity_tool.manage_addToProcessingList((currentNode,))
+    else:
+      activity_tool.manage_removeFromProcessingList((currentNode,))
+
+  def tic(self, verbose=0):
+    """Execute pending activities"""
+    portal_activities = self.portal.portal_activities
+    if 1:
+      if verbose:
+        ZopeTestCase._print('Executing pending activities ...')
+        old_message_count = 0
+        start = time.time()
+      count = 1000
+      getMessageList = portal_activities.getMessageList
+      message_count = len(getMessageList(include_processing=1))
+      while message_count:
+        if verbose and old_message_count != message_count:
+          ZopeTestCase._print(' %i' % message_count)
+          old_message_count = message_count
+        portal_activities.process_timer(None, None)
+        if Lifetime._shutdown_phase:
+          # XXX CMFActivity contains bare excepts
+          raise KeyboardInterrupt
+        message_count = len(getMessageList(include_processing=1))
+        # This prevents an infinite loop.
+        count -= 1
+        if count == 0:
+          # Get the last error message from error_log.
+          error_message = ''
+          error_log = self.portal.error_log._getLog()
+          if len(error_log):
+            last_log = error_log[-1]
+            error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
+              last_log['type'],
+              last_log['value'],
+              last_log['tb_text'],
+              )
+          raise RuntimeError,\
+            'tic is looping forever. These messages are pending: %r %s' % (
+          [('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
+          for m in portal_activities.getMessageList()],
+          error_message
+          )
+        # This give some time between messages
+        if count % 10 == 0:
+          portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
+      if verbose:
+        ZopeTestCase._print(' done (%.3fs)\n' % (time.time() - start))
+
+  def afterSetUp(self):
+    """Initialize a node that will only process activities"""
+    createZServer() #self.startZServer()
+    self._registerNode(distributing=0, processing=1)
+    transaction.commit()
+
+  def processing_node(self):
+    """Main loop for nodes that process activities"""
+    try:
+      while not Lifetime._shutdown_phase:
+        time.sleep(.3)
+        transaction.begin()
+        try:
+          portal = self.app[self.app.test_portal_name]
+        except AttributeError:
+          continue
+        portal.portal_activities.process_timer(None, None)
+    except KeyboardInterrupt:
+      pass
diff --git a/product/ERP5Type/tests/backportUnittest.py b/product/ERP5Type/tests/backportUnittest.py
index a25e2eb2c36..06920f8130f 100644
--- a/product/ERP5Type/tests/backportUnittest.py
+++ b/product/ERP5Type/tests/backportUnittest.py
@@ -107,8 +107,6 @@ class TestCase(unittest.TestCase):
       _testMethodDoc = property(lambda self: self.__testMethodDoc)
 
     def run(self, result=None):
-        import pdb
-        #pdb.set_trace()
         orig_result = result
         if result is None:
             result = self.defaultTestResult()
@@ -138,6 +136,8 @@ class TestCase(unittest.TestCase):
                 result.addSkip(self, str(e))
             except SetupSiteError, e:
                 result.errors.append(None)
+            except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
+                raise                               #       Python >= 2.5
             except Exception:
                 result.addError(self, sys.exc_info())
             else:
@@ -151,6 +151,8 @@ class TestCase(unittest.TestCase):
                     result.addUnexpectedSuccess(self)
                 except SkipTest, e:
                     result.addSkip(self, str(e))
+                except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
+                    raise                               #       Python >= 2.5
                 except Exception:
                     result.addError(self, sys.exc_info())
                 else:
@@ -158,6 +160,8 @@ class TestCase(unittest.TestCase):
 
                 try:
                     self.tearDown()
+                except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
+                    raise                               #       Python >= 2.5
                 except Exception:
                     result.addError(self, sys.exc_info())
                     success = False
@@ -252,7 +256,10 @@ class TextTestRunner(unittest.TextTestRunner):
         result = self._makeResult()
         startTime = time.time()
         # BACK: 2.7 implementation wraps run with result.(start|stop)TestRun
-        test(result)
+        try:
+          test(result)
+        except KeyboardInterrupt:
+          pass
         stopTime = time.time()
         timeTaken = stopTime - startTime
         result.printErrors()
diff --git a/product/ERP5Type/tests/custom_zodb.py b/product/ERP5Type/tests/custom_zodb.py
index 18b248d274d..51f26d2efea 100644
--- a/product/ERP5Type/tests/custom_zodb.py
+++ b/product/ERP5Type/tests/custom_zodb.py
@@ -1,48 +1,120 @@
 import os
 import shutil
+import socket
 import sys
 import glob
+import threading
 import ZODB
+from asyncore import socket_map
 from ZODB.DemoStorage import DemoStorage
 from ZODB.FileStorage import FileStorage
-from Products.ERP5Type.tests.utils import getMySQLArguments
+from ZEO.ClientStorage import ClientStorage
+from Products.ERP5Type.tests.utils import getMySQLArguments, instance_random
 from Products.ERP5Type.tests.runUnitTest import instance_home, static_dir_list
 
 def _print(message):
   sys.stderr.write(message + "\n")
 
+zserver_list = os.environ.get('zserver', '').split(',')
+os.environ['zserver'] = zserver_list[0]
+
+zeo_client = os.environ.get('zeo_client')
+if zeo_client:
+  zeo_client = zeo_client.rsplit(':', 1)
+  zeo_client = (len(zeo_client) == 1 and 'localhost' or zeo_client[0],
+                int(zeo_client[-1]))
+try:
+  activity_node = int(os.environ['activity_node'])
+except KeyError:
+  activity_node = (zeo_client or 'zeo_server' in os.environ) and 1 or None
+
 data_fs_path = os.environ.get('erp5_tests_data_fs_path',
                               os.path.join(instance_home, 'Data.fs'))
 load = int(os.environ.get('erp5_load_data_fs', 0))
 save = int(os.environ.get('erp5_save_data_fs', 0))
 
+save_mysql = None
+if not zeo_client:
+  def save_mysql(verbosity=1):
+    # The output of mysqldump needs to merge many lines at a time
+    # for performance reasons (merging lines is at most 10 times
+    # faster, so this produce somewhat not nice to read sql
+    command = 'mysqldump %s > dump.sql' % getMySQLArguments()
+    if verbosity:
+      _print('Dumping MySQL database with %s...' % command)
+    os.system(command)
+
 _print("Cleaning static files ... ")
 for dir in static_dir_list:
   for f in glob.glob(os.path.join(instance_home, dir, '*')):
     os.remove(f)
 
 if load:
-  dump_sql = os.path.join(instance_home, 'dump.sql')
-  if os.path.exists(dump_sql):
-    _print("Restoring MySQL database ... ")
-    ret = os.system("mysql %s < %s" % (getMySQLArguments(), dump_sql))
-    assert not ret
-  else:
-    os.environ['erp5_tests_recreate_catalog'] = '1'
+  if save_mysql:
+    dump_sql = os.path.join(instance_home, 'dump.sql')
+    if os.path.exists(dump_sql):
+      _print("Restoring MySQL database ... ")
+      ret = os.system("mysql %s < %s" % (getMySQLArguments(), dump_sql))
+      assert not ret
+    else:
+      os.environ['erp5_tests_recreate_catalog'] = '1'
   _print("Restoring static files ... ")
   for dir in static_dir_list:
     full_path = os.path.join(instance_home, dir)
     if os.path.exists(full_path + '.bak'):
       os.rmdir(full_path)
       shutil.copytree(full_path + '.bak', full_path, symlinks=True)
-elif save and os.path.exists(data_fs_path):
+elif save and not zeo_client and os.path.exists(data_fs_path):
   os.remove(data_fs_path)
 
-if save:
-  Storage = FileStorage(data_fs_path)
-elif load:
-  Storage = DemoStorage(base=FileStorage(data_fs_path))
+zeo_server_pid = None
+zeo_client_pid_list = []
+
+ZEvent = sys.modules.get('ZServer.PubCore.ZEvent')
+def fork():
+  pid = os.fork()
+  if pid:
+    # recreate the event pipe if it already exists
+    for obj in socket_map.values():
+      assert obj is ZEvent.the_trigger
+      obj.close()
+      ZEvent.the_trigger = ZEvent.simple_trigger()
+    # make sure parent and child have 2 different RNG
+    instance_random.seed(instance_random.random())
+  return pid
+
+while not zeo_client:
+  if activity_node:
+    r, zeo_client = os.pipe()
+    zeo_server_pid = fork()
+    if zeo_server_pid:
+      save_mysql = None
+      os.close(zeo_client)
+      zeo_client = eval(os.fdopen(r).read())
+      continue
+    else:
+      zeo_client_pid_list = activity_node = None
+      os.close(r)
+  elif activity_node is not None:
+    # run ZEO server but no need to fork
+    zeo_server_pid = 0
+
+  if save:
+    Storage = FileStorage(data_fs_path)
+  elif load:
+    Storage = DemoStorage(base=FileStorage(data_fs_path))
+  else:
+    Storage = DemoStorage()
+  break
 else:
-  Storage = DemoStorage()
+  for i in xrange(1, activity_node):
+    pid = fork()
+    if not pid:
+      zeo_client_pid_list = None
+      os.environ['zserver'] = i < len(zserver_list) and zserver_list[i] or ''
+      break
+    zeo_client_pid_list.append(pid)
+  Storage = ClientStorage(zeo_client)
 
-_print("Instance at %r loaded ... " % instance_home)
+if zeo_client_pid_list is not None:
+  _print("Instance at %r loaded ... " % instance_home)
diff --git a/product/ERP5Type/tests/runUnitTest.py b/product/ERP5Type/tests/runUnitTest.py
index 56e2f14019d..2dcf99e27ae 100755
--- a/product/ERP5Type/tests/runUnitTest.py
+++ b/product/ERP5Type/tests/runUnitTest.py
@@ -6,6 +6,7 @@ import re
 import time
 import getopt
 import unittest
+import signal
 import shutil
 import errno
 import random
@@ -58,52 +59,52 @@ Options:
                              dependency provider (ie, the name of the BT
                              containing ZSQLMethods matching the desired
                              catalog storage).
-  --run_only=STRING
-                             Run only specified test methods delimited with
+  --run_only=STRING          Run only specified test methods delimited with
                              commas (e.g. testFoo,testBar). This can be regular
                              expressions.
-  -D
-                             Invoke debugger on errors / failures.
+  -D                         Invoke debugger on errors / failures.
   --update_business_templates
                              Update all business templates prior to runing
                              tests. This only has a meaning when doing
                              upgratability checks, in conjunction with --load.
                              --update_only can be use to restrict the list of
                              templates to update.
-  --update_only=STRING
-                             Specify the list of business template to update if
+  --update_only=STRING       Specify the list of business template to update if
                              you don't want to update them all. You can give a list
                              delimited with commas (e.g. erp5_core,erp5_xhtml_style).
                              This can be regular expressions. 
-
   --enable_full_indexing=STRING
                              By default, unit test do not reindex everything
                              for performance reasons. Provide list of documents
                              (delimited with comas) for which we want to force
                              indexing. This can only be for now 'portal_types'
-
   --conversion_server_hostname=STRING
                              Hostname used to connect to conversion server (Oood),
                              this value will stored at default preference.
                              By default localhost is used.
-
   --conversion_server_port=STRING
                              Port number used to connect to conversion server
                              (Oood), the value will be stored at default preference.
                              By default 8008 is used.
-
-  --use_dummy_mail_host
-                             Replace the MailHost by DummyMailHost.
+  --use_dummy_mail_host      Replace the MailHost by DummyMailHost.
                              This prevent the instance send emails.
                              By default Original MailHost is used.
-
   --random_activity_priority=[SEED]
                              Force activities to have a random priority, to make
                              random failures (due to bad activity dependencies)
                              almost always reproducible. A random number
                              generator with the specified seed (or a random one
                              otherwise) is created for this purpose.
-
+  --activity_node=NUMBER     Create given number of ZEO clients, to process
+                             activities.
+  --zeo_server=[[HOST:]PORT] Bind the ZEO server to the given host/port.
+  --zeo_client=[HOST:]PORT   Use specified ZEO server as storage.
+  --zserver=[HOST:]PORT[,...]
+                             Make ZServer listen on given host:port
+                             If used with --activity_node=, this can be a
+                             comma-separated list of addresses.
+
+When no unit test is specified, only activities are processed.
 """
 
 # This script is usually executed directly, and is also imported using its full
@@ -260,17 +261,15 @@ tests_home = os.path.join(instance_home, 'tests')
 initializeInstanceHome(tests_framework_home, real_instance_home, instance_home)
 
 
-class FilteredTestSuite(unittest.TestSuite):
-  """Marker class to identify TestSuites that we have already filtered"""
-  pass
-
 class ERP5TypeTestLoader(unittest.TestLoader):
   """Load test cases from the name passed on the command line.
   """
-  def __init__(self, test_pattern_list=None):
-    super(ERP5TypeTestLoader, self).__init__()
-    if test_pattern_list is not None:
-      self.test_pattern_list = map(re.compile, test_pattern_list)
+  filter_test_list = None
+  _testMethodPrefix = 'test'
+
+  testMethodPrefix = property(
+    lambda self: self._testMethodPrefix,
+    lambda self, value: None)
 
   def loadTestsFromName(self, name, module=None):
     """This method is here for compatibility with old style arguments.
@@ -281,47 +280,32 @@ class ERP5TypeTestLoader(unittest.TestLoader):
     if name.endswith('.py'):
       name = name[:-3]
     name = name.replace(':', '.')
-    return unittest.TestLoader.loadTestsFromName(self, name, module)
+    return super(ERP5TypeTestLoader, self).loadTestsFromName(name, module)
 
   def loadTestsFromModule(self, module):
     """ERP5Type test loader supports a function named 'test_suite'
     """
     if hasattr(module, 'test_suite'):
       return self.suiteClass(module.test_suite())
-    return unittest.TestLoader.loadTestsFromModule(self, module)
-
-  def _filterTestList(self, test_list):
-    """test_list is a list of TestCase or TestSuite instances.
-    Returns a list of objects that contain only TestCase/TestSuite
-    matching --run_only"""
-    filtered = []
-
-    # Using FilteredTestSuite as a marker ensures that each Test
-    # is checked only once.
-    for item in test_list:
-      if isinstance(item, unittest.TestCase):
-        test_method_name = item.id().rsplit('.', 1)[-1]
-        for valid_test_method_name_re in self.test_pattern_list:
-          if valid_test_method_name_re.search(test_method_name):
-            filtered.append(item)
-      elif isinstance(item, FilteredTestSuite):
-        # has already been filtered, dont check it again
-        filtered.append(item)
-      elif isinstance(item, unittest.TestSuite):
-        filtered.append(FilteredTestSuite(self._filterTestList(item)))
-      else:
-        # should not happen.
-        raise ValueError, "What what what?"
-
-    return filtered
+    return super(ERP5TypeTestLoader, self).loadTestsFromModule(module)
 
-  def suiteClass(self, test_list):
-    """Constructs a Test Suite from test lists.
-    Keep only tests matching commandline parameter --run_only"""
-    if hasattr(self, 'test_pattern_list'):
-      test_list = self._filterTestList(test_list)
+  def getTestCaseNames(self, testCaseClass):
+    """Return a sorted sequence of method names found within testCaseClass
 
-    return FilteredTestSuite(test_list)
+    The returned list only contain names matching --run_only
+    """
+    name_list = super(ERP5TypeTestLoader, self).getTestCaseNames(testCaseClass)
+    if self.filter_test_list:
+      filtered_name_list = []
+      for name in name_list:
+        for test in self.filter_test_list:
+          if test(name):
+            filtered_name_list.append(name)
+            break
+      return filtered_name_list
+    return name_list
+
+unittest.TestLoader = ERP5TypeTestLoader
 
 class DebugTestResult:
   """Wrap an unittest.TestResult, invoking pdb on errors / failures
@@ -356,9 +340,10 @@ class DebugTestResult:
 _print = sys.stderr.write
 
 def runUnitTestList(test_list, verbosity=1, debug=0):
-  if not test_list:
-    _print("No test to run, exiting immediately.\n")
-    return
+  if "zeo_client" in os.environ and "zeo_server" in os.environ:
+    _print("conflicting options: --zeo_client and --zeo_server")
+    sys.exit(1)
+
   os.environ.setdefault('INSTANCE_HOME', instance_home)
   os.environ.setdefault('SOFTWARE_HOME', software_home)
   os.environ.setdefault('COPY_OF_INSTANCE_HOME', instance_home)
@@ -449,41 +434,6 @@ def runUnitTestList(test_list, verbosity=1, debug=0):
   # it is then possible to run the debugger by "import pdb; pdb.set_trace()"
   sys.path.insert(0, tests_framework_home)
 
-
-  save = int(os.environ.get('erp5_save_data_fs', 0))
-  dummy_test = save and (int(os.environ.get('update_business_templates', 0))
-                         or not int(os.environ.get('erp5_load_data_fs', 0)))
-  
-  TestRunner = backportUnittest.TextTestRunner
-
-  if dummy_test:
-    # Skip all tests in save mode and monkeypatch PortalTestCase.setUp
-    # to skip beforeSetUp and afterSetUp. Also patch unittest.makeSuite,
-    # as it's used in test_suite function in test cases.
-    from Products.ERP5Type.tests.ERP5TypeTestCase import \
-                  dummy_makeSuite, dummy_setUp, dummy_tearDown
-    from Testing.ZopeTestCase.PortalTestCase import PortalTestCase
-    unittest.makeSuite = dummy_makeSuite
-    PortalTestCase.setUp = dummy_setUp
-    PortalTestCase.tearDown = dummy_tearDown
-    test_loader = ERP5TypeTestLoader()
-    test_loader.testMethodPrefix = 'dummy_test'
-  else:
-    # Hack the profiler to run only specified test methods, and wrap results when
-    # running in debug mode.
-    if debug:
-      class DebugTextTestRunner(TestRunner):
-        def _makeResult(self):
-          result = super(DebugTextTestRunner, self)._makeResult()
-          return DebugTestResult(result)
-
-      TestRunner = DebugTextTestRunner
-
-    test_method_list = os.environ.get('run_only', '').split(',')
-    test_loader = ERP5TypeTestLoader(test_method_list)
-
-  suite = test_loader.loadTestsFromNames(test_list)
-
   # change current directory to the test home, to create zLOG.log in this dir.
   os.chdir(tests_home)
   try:
@@ -499,28 +449,105 @@ def runUnitTestList(test_list, verbosity=1, debug=0):
     # ourselves
     layer.ZopeLite.setUp()
 
-  _print('done (%.3fs)' % (time.time() - _start))
-  result = TestRunner(verbosity=verbosity).run(suite)
+  TestRunner = backportUnittest.TextTestRunner
+
+  import Lifetime
+  from ZEO.ClientStorage import ClientStorage
+  from Zope2.custom_zodb import \
+      save_mysql, zeo_server_pid, zeo_client_pid_list, Storage
+  from Products.ERP5Type.tests.ERP5TypeTestCase import \
+      ProcessingNodeTestCase, ZEOServerTestCase, dummy_setUp, dummy_tearDown
+  def shutdown(signum, frame, signum_set=set()):
+    Lifetime.shutdown(0)
+    signum_set.add(signum)
+    if zeo_client_pid_list is None and len(signum_set) > 1:
+      # in case of ^C, a child should also receive a SIGHUP from the parent,
+      # so we merge the first 2 different signals in a single exception
+      signum_set.remove(signal.SIGHUP)
+    else:
+      raise KeyboardInterrupt
+  signal.signal(signal.SIGINT, shutdown)
+  signal.signal(signal.SIGHUP, shutdown)
+
+  try:
+    save = int(os.environ.get('erp5_save_data_fs', 0))
+    load = int(os.environ.get('erp5_load_data_fs', 0))
+    dummy = save and (int(os.environ.get('update_business_templates', 0))
+                      or not load)
+    if zeo_server_pid == 0:
+      suite = ZEOServerTestCase('asyncore_loop')
+    elif zeo_client_pid_list is None or not test_list:
+      suite = ProcessingNodeTestCase('processing_node')
+      if not (dummy or load):
+        _print('WARNING: either --save or --load should be used because static'
+               ' files are only reloaded by the node installing business'
+               ' templates.')
+    else:
+      if dummy:
+        # Skip all tests and monkeypatch PortalTestCase to skip
+        # afterSetUp/beforeTearDown.
+        ERP5TypeTestLoader._testMethodPrefix = 'dummy_test'
+        ZopeTestCase.PortalTestCase.setUp = dummy_setUp
+        ZopeTestCase.PortalTestCase.tearDown = dummy_tearDown
+      elif debug:
+        # Hack the profiler to run only specified test methods,
+        # and wrap results when running in debug mode.
+        class DebugTextTestRunner(TestRunner):
+          def _makeResult(self):
+            result = super(DebugTextTestRunner, self)._makeResult()
+            return DebugTestResult(result)
+        TestRunner = DebugTextTestRunner
+      suite = ERP5TypeTestLoader().loadTestsFromNames(test_list)
+
+    if not isinstance(Storage, ClientStorage):
+      # Remove nodes that were registered during previous execution.
+      # Set an empty dict (instead of delete the property)
+      # in order to avoid conflicts on / when several ZEO clients registers.
+      from BTrees.OIBTree import OIBTree
+      app = ZopeTestCase.app()
+      app.test_processing_nodes = OIBTree()
+      import transaction
+      transaction.commit()
+      ZopeTestCase.close(app)
+
+    if zeo_client_pid_list is None:
+      result = suite()
+    else:
+      _print('done (%.3fs)' % (time.time() - _start))
+      result = TestRunner(verbosity=verbosity).run(suite)
+  finally:
+    Storage.close()
+    if zeo_client_pid_list is not None:
+      # Wait that child processes exit. Stop ZEO storage (if any) after all
+      # other nodes disconnected.
+      for pid in zeo_client_pid_list:
+        os.kill(pid, signal.SIGHUP)
+      for pid in zeo_client_pid_list:
+        os.waitpid(pid, 0)
+      if zeo_server_pid:
+        os.kill(zeo_server_pid, signal.SIGHUP)
+        os.waitpid(zeo_server_pid, 0)
 
   if save:
     os.chdir(instance_home)
-    from Products.ERP5Type.tests.utils import getMySQLArguments
-    # The output of mysqldump needs to merge many lines at a time
-    # for performance reasons (merging lines is at most 10 times
-    # faster, so this produce somewhat not nice to read sql
-    command = 'mysqldump %s > dump.sql' % getMySQLArguments()
-    if verbosity:
-      _print('Dumping MySQL database with %s...\n' % command)
-    os.system(command)
-    if verbosity:
-      _print('Dumping static files...\n')
-    for static_dir in static_dir_list:
-      try:
-        shutil.rmtree(static_dir + '.bak')
-      except OSError, e:
-        if e.errno != errno.ENOENT:
-          raise
-      shutil.copytree(static_dir, static_dir + '.bak', symlinks=True)
+    if save_mysql:
+      save_mysql(verbosity)
+    if suite not in (ProcessingNodeTestCase, ZEOServerTestCase):
+      # Static files are modified by the node installing business templates,
+      # i.e. by the node running the unit test. There is no point saving them
+      # on a ZEO server, or on nodes that only process activities: this has to
+      # be done manually.
+      if verbosity:
+        _print('Dumping static files...\n')
+      for static_dir in static_dir_list:
+        try:
+          shutil.rmtree(static_dir + '.bak')
+        except OSError, e:
+          if e.errno != errno.ENOENT:
+            raise
+        shutil.copytree(static_dir, static_dir + '.bak', symlinks=True)
+    elif zeo_client_pid_list is not None:
+      _print('WARNING: No static files saved. You will have to do it manually.')
 
   return result
 
@@ -551,6 +578,10 @@ def main():
         "use_dummy_mail_host",
         "update_business_templates",
         "random_activity_priority=",
+        "activity_node=",
+        "zeo_client=",
+        "zeo_server=",
+        "zserver=",
         ])
   except getopt.GetoptError, msg:
     usage(sys.stderr, msg)
@@ -605,7 +636,8 @@ def main():
     elif opt == "--erp5_catalog_storage":
       os.environ["erp5_catalog_storage"] = arg
     elif opt == "--run_only":
-      os.environ["run_only"] = arg
+      ERP5TypeTestLoader.filter_test_list = [re.compile(x).search
+                                             for x in arg.split(',')]
     elif opt == "--update_only":
       os.environ["update_only"] = arg
       os.environ["update_business_templates"] = "1"
@@ -620,13 +652,16 @@ def main():
     elif opt == "--random_activity_priority":
       os.environ["random_activity_priority"] = arg or \
         str(random.randrange(0, 1<<16))
-
-  test_list = args
-  if not test_list:
-    _print("No test to run, exiting immediately.\n")
-    sys.exit(1)
-
-  result = runUnitTestList(test_list=test_list,
+    elif opt == "--activity_node":
+      os.environ["activity_node"] = arg
+    elif opt == "--zeo_client":
+      os.environ["zeo_client"] = arg
+    elif opt == "--zeo_server":
+      os.environ["zeo_server"] = arg
+    elif opt == "--zserver":
+      os.environ["zserver"] = arg
+
+  result = runUnitTestList(test_list=args,
                            verbosity=verbosity,
                            debug=debug)
   try:
@@ -636,7 +671,7 @@ def main():
       _print("Profiler support is not available from ZopeTestCase in Zope 2.12\n")
   else:
     profiler.print_stats()
-  sys.exit(len(result.failures) + len(result.errors))
+  return result and len(result.failures) + len(result.errors) or 0
 
 if __name__ == '__main__':
   # Force stdout to be totally unbuffered.
@@ -644,5 +679,4 @@ if __name__ == '__main__':
     sys.stdout = os.fdopen(1, "wb", 0)
   except OSError:
     pass
-
-  main()
+  sys.exit(main())
diff --git a/product/ERP5Type/tests/utils.py b/product/ERP5Type/tests/utils.py
index 06a84e4843a..2f998972ddd 100644
--- a/product/ERP5Type/tests/utils.py
+++ b/product/ERP5Type/tests/utils.py
@@ -28,10 +28,12 @@
 
 """Utility functions and classes for unit testing
 """
-
+import errno
 import os
 import logging
-
+import random
+import socket
+import sys
 import transaction
 import zLOG
 import Products.ERP5Type
@@ -249,6 +251,58 @@ def getExtraSqlConnectionStringList():
   return os.environ.get('extra_sql_connection_string_list',
                         'test2 test2:test3 test3').split(':')
 
+instance_random = random.Random(hash(os.environ['INSTANCE_HOME']))
+
+def parseListeningAddress(host_port=None, default_host='127.0.0.1'):
+  """Parse string specifying the address to bind to
+
+  If the specified address is incomplete or missing, several (host, random_port)
+  will be returned. It must be used as follows (an appropriate error is raised
+  if all returned values failed):
+
+    for host, port in parseListeningAddress(os.environ.get('some_address')):
+      try:
+        s.bind((host, port))
+        break
+      except socket.error, e:
+        if e[0] != errno.EADDRINUSE:
+          raise
+  """
+  if host_port:
+    host_port = host_port.rsplit(':', 1)
+    if len(host_port) == 1:
+      host_port = default_host, host_port[0]
+    try:
+      yield host_port[0], int(host_port[1])
+      raise RuntimeError("Can't bind to %s:%s" % host_port)
+    except ValueError:
+      default_host = host_port[1]
+  port_list = []
+  for i in xrange(3):
+    port_list.append(instance_random.randint(55000, 55500))
+    yield default_host, port_list[-1]
+  raise RuntimeError("Can't find free port (tried ports %s)\n"
+                     % ', '.join(map(str, port_list)))
+
+def createZServer(log=os.devnull):
+  from ZServer import logger, zhttp_server, zhttp_handler
+  lg = logger.file_logger(log)
+  class new_zhttp_server:
+    # I can't use __new__ because zhttp_handler is an old-style class :(
+    def __init__(self):
+      self.__class__ = zhttp_server
+  for ip, port in parseListeningAddress(os.environ.get('zserver')):
+    hs = new_zhttp_server()
+    try:
+      hs.__init__(ip, port, resolver=None, logger_object=lg)
+      hs.install_handler(zhttp_handler(module='Zope2', uri_base=''))
+      sys.stderr.write("Running ZServer at %s:%s\n" % (ip, port))
+      return hs
+    except socket.error, e:
+      if e[0] != errno.EADDRINUSE:
+        raise
+      hs.close()
+
 # decorators
 class reindex(object):
   """Decorator to commit transaction and flush activities after the method is
-- 
2.30.9