From bf35001bb5f27079737329895cdb15349e312696 Mon Sep 17 00:00:00 2001
From: Vincent Pelletier <vincent@nexedi.com>
Date: Tue, 14 May 2019 15:02:13 +0900
Subject: [PATCH] CMFActivity: Implement node families.

The intent is to be able to tell that an independently-defined group of
activity nodes may execute given activity, and no other node.
This allows more flexible parallelism control than serialization_tag.
---
 product/CMFActivity/ActiveObject.py           |   2 +
 product/CMFActivity/Activity/Queue.py         |   5 +-
 product/CMFActivity/Activity/SQLBase.py       |  41 ++--
 product/CMFActivity/Activity/SQLJoblib.py     |   6 +-
 product/CMFActivity/ActivityTool.py           | 215 +++++++++++++++++-
 .../CMFActivity/dtml/manageLoadBalancing.dtml |  41 ++++
 product/CMFActivity/tests/testCMFActivity.py  | 102 +++++++++
 7 files changed, 387 insertions(+), 25 deletions(-)

diff --git a/product/CMFActivity/ActiveObject.py b/product/CMFActivity/ActiveObject.py
index f5126be48f..56d3b478cb 100644
--- a/product/CMFActivity/ActiveObject.py
+++ b/product/CMFActivity/ActiveObject.py
@@ -70,6 +70,8 @@ class ActiveObject(ExtensionClass.Base):
         - "": no node preference
         - None (default): let CMFActivity decide between the above 2 choice
                           (see ActivityTool.activateObject)
+        - the name of a family: Only processing nodes member of that family
+                                may execute this activity.
 
       at_date           --  request execution date for this activate call
                             (default: date of commit)
diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py
index 5bdc08365f..c5c14e9fdb 100644
--- a/product/CMFActivity/Activity/Queue.py
+++ b/product/CMFActivity/Activity/Queue.py
@@ -82,7 +82,8 @@ class Queue(object):
       activity_tool.deferredDeleteMessage(self, m)
     m.is_deleted = 1
 
-  def dequeueMessage(self, activity_tool, processing_node):
+  def dequeueMessage(self, activity_tool, processing_node,
+                     node_family_id_list):
     raise NotImplementedError
 
   def distribute(self, activity_tool, node_count):
@@ -187,7 +188,7 @@ class Queue(object):
     """
     pass
 
-  def getPriority(self, activity_tool, node):
+  def getPriority(self, activity_tool, processing_node, node_set):
     """
       Get priority from this queue.
       Lower number means higher priority value.
diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py
index 600f886ab0..ecd3863b27 100644
--- a/product/CMFActivity/Activity/SQLBase.py
+++ b/product/CMFActivity/Activity/SQLBase.py
@@ -278,8 +278,8 @@ CREATE TABLE %s (
     return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
       self.sql_table, " AND ".join(where) or "1")
 
-  def getPriority(self, activity_tool, node=None):
-    if node is None:
+  def getPriority(self, activity_tool, processing_node, node_set=None):
+    if node_set is None:
       q = ("SELECT 3*priority, date FROM %s"
         " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
         " ORDER BY priority, date LIMIT 1" % self.sql_table)
@@ -287,17 +287,20 @@ CREATE TABLE %s (
       subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
         " WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
         " ORDER BY priority, date LIMIT 1)" % self.sql_table).format
-      node = 'node=%s' % node
-      q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
+      node = 'node=%s' % processing_node
+      # "ALL" on all but one, to incur deduplication cost only once.
+      # "UNION ALL" between the two naturally distinct sets.
+      q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
         " ORDER BY effective_priority, date LIMIT 1" % (
           subquery(-1, node),
           subquery('', 'node=0'),
-          subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
+          subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
+          ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
         ))
     result = activity_tool.getSQLConnection().query(q, 0)[1]
     if result:
       return result[0]
-    return Queue.getPriority(self, activity_tool, node)
+    return Queue.getPriority(self, activity_tool, processing_node, node_set)
 
   def _retryOnLockError(self, method, args=(), kw={}):
     while True:
@@ -416,7 +419,7 @@ CREATE TABLE %s (
       where_kw['above_uid'] = line.uid
 
   def getReservedMessageList(self, db, date, processing_node, limit,
-                             group_method_id=None, node=None):
+                             group_method_id=None, node_set=None):
     """
       Get and reserve a list of messages.
       limit
@@ -436,7 +439,7 @@ CREATE TABLE %s (
     # for users and reduce the probability to do the same work several times
     # (think of an object that is modified several times in a short period of
     # time).
-    if node is None:
+    if node_set is None:
       result = Results(query(
         "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
         " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
@@ -447,13 +450,16 @@ CREATE TABLE %s (
       subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
         " WHERE {} AND processing_node=0 AND %s%s"
         " ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
-      node = 'node=%s' % node
+      node = 'node=%s' % processing_node
       result = Results(query(
-        "SELECT * FROM (%s UNION ALL %s UNION %s) as t"
+        # "ALL" on all but one, to incur deduplication cost only once.
+        # "UNION ALL" between the two naturally distinct sets.
+        "SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
         " ORDER BY effective_priority, date LIMIT %s"% (
             subquery(-1, node),
             subquery('', 'node=0'),
-            subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
+            subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
+            ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
             limit), 0))
     if result:
       # Reserve messages.
@@ -478,7 +484,8 @@ CREATE TABLE %s (
       return m, uid, ()
     return load
 
-  def getProcessableMessageList(self, activity_tool, processing_node):
+  def getProcessableMessageList(self, activity_tool, processing_node,
+                                node_family_id_list):
     """
       Always true:
         For each reserved message, delete redundant messages when it gets
@@ -523,7 +530,7 @@ CREATE TABLE %s (
           result = Results(result)
         else:
           result = self.getReservedMessageList(db, now_date, processing_node,
-                                               1, node=processing_node)
+                                               1, node_set=node_family_id_list)
           if not result:
             break
         load = self.getProcessableMessageLoader(db, processing_node)
@@ -552,7 +559,7 @@ CREATE TABLE %s (
                 # adding more results from getReservedMessageList if the
                 # limit is not reached.
               or self.getReservedMessageList(db, now_date, processing_node,
-                limit, group_method_id, processing_node))
+                limit, group_method_id, node_family_id_list))
             for line in result:
               if line.uid in uid_to_duplicate_uid_list_dict:
                 continue
@@ -597,9 +604,11 @@ CREATE TABLE %s (
       raise
 
   # Queue semantic
-  def dequeueMessage(self, activity_tool, processing_node):
+  def dequeueMessage(self, activity_tool, processing_node,
+                     node_family_id_list):
     message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
-      self.getProcessableMessageList(activity_tool, processing_node)
+      self.getProcessableMessageList(activity_tool, processing_node,
+        node_family_id_list)
     if message_list:
       # Remove group_id parameter from group_method_id
       if group_method_id is not None:
diff --git a/product/CMFActivity/Activity/SQLJoblib.py b/product/CMFActivity/Activity/SQLJoblib.py
index fd308bcaa3..949a378c65 100644
--- a/product/CMFActivity/Activity/SQLJoblib.py
+++ b/product/CMFActivity/Activity/SQLJoblib.py
@@ -179,10 +179,10 @@ CREATE TABLE %s (
       return None, original_uid, [uid]
     return load
 
-  def getPriority(self, activity_tool, node):
-    return SQLDict.getPriority(self, activity_tool)
+  def getPriority(self, activity_tool, processing_node, node_set):
+    return SQLDict.getPriority(self, activity_tool, processing_node)
 
   def getReservedMessageList(self, db, date, processing_node,
-                             limit=None, group_method_id=None, node=None):
+                             limit=None, group_method_id=None, node_set=None):
     return SQLDict.getReservedMessageList(self, db,
       date, processing_node, limit, group_method_id)
diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py
index 33a17a36e3..6f423f9be5 100644
--- a/product/CMFActivity/ActivityTool.py
+++ b/product/CMFActivity/ActivityTool.py
@@ -50,8 +50,9 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile
 from Acquisition import aq_base, aq_inner, aq_parent
 from ActivityBuffer import ActivityBuffer
 from ActivityRuntimeEnvironment import BaseMessage
-from zExceptions import ExceptionFormatter
+from zExceptions import ExceptionFormatter, Redirect
 from BTrees.OIBTree import OIBTree
+from BTrees.OOBTree import OOBTree
 from Zope2 import app
 from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
 from zope.site.hooks import setSite
@@ -650,6 +651,8 @@ class ActivityTool (BaseTool):
 
     distributingNode = ''
     _nodes = ()
+    _family_list = ()
+    _node_family_dict = None
     activity_creation_trace = False
     activity_tracking = False
     activity_timing_log = False
@@ -904,6 +907,205 @@ class ActivityTool (BaseTool):
         self._nodes = nodes = new_nodes
       return nodes
 
+    def _getNodeFamilyIdDict(self):
+      result = self._node_family_dict
+      if result is None:
+        result = self._node_family_dict = OOBTree()
+      return result
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyIdSet')
+    def getCurrentNodeFamilyIdSet(self):
+      """
+      Returns the tuple of family ids current node is member of.
+      """
+      return self._getNodeFamilyIdDict().get(getCurrentNode(), ())
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyNameSet')
+    def getCurrentNodeFamilyNameSet(self):
+      """
+      Returns the tuple of family names current node is member of.
+      """
+      return [
+        self._family_list[-x - 1]
+        for x in self._getNodeFamilyIdDict().get(getCurrentNode(), ())
+      ]
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyId')
+    def getFamilyId(self, name):
+      """
+      Raises ValueError for unknown family names.
+      """
+      # First family is -1, second is -2, etc.
+      return -self._family_list.index(name) - 1
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'addNodeToFamily')
+    def addNodeToFamily(self, node_id, family_name):
+      """
+      Silently does nothing if node is already a member of family_name.
+      """
+      family_id = self.getFamilyId(family_name)
+      node_family_id_dict = self._getNodeFamilyIdDict()
+      family_id_list = node_family_id_dict.get(node_id, ())
+      if family_id not in family_id_list:
+        node_family_id_dict[node_id] = family_id_list + (family_id, )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addNodeSetToFamily')
+    def manage_addNodeSetToFamily(self, family_new_node_list, REQUEST):
+      """
+      Add selected nodes to family.
+      """
+      family_name = REQUEST['manage_addNodeSetToFamily']
+      if isinstance(family_new_node_list, basestring):
+        family_new_node_list = [family_new_node_list]
+      for node_id in family_new_node_list:
+        self.addNodeToFamily(node_id, family_name)
+      REQUEST.RESPONSE.redirect(
+        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
+        urllib.quote('Nodes added to family.'),
+      )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'removeNodeFromFamily')
+    def removeNodeFromFamily(self, node_id, family_name):
+      """
+      Silently does nothing if node is not member of family_name.
+      """
+      family_id = self.getFamilyId(family_name)
+      node_family_id_dict = self._getNodeFamilyIdDict()
+      family_id_list = node_family_id_dict.get(node_id, ())
+      if family_id in family_id_list:
+        node_family_id_dict[node_id] = tuple(
+          x
+          for x in family_id_list
+          if x != family_id
+        )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeNodeSetFromFamily')
+    def manage_removeNodeSetFromFamily(self, REQUEST):
+      """
+      Remove selected nodes from family.
+      """
+      family_name = REQUEST['manage_removeNodeSetFromFamily']
+      node_to_remove_list = REQUEST['family_member_set_' + family_name]
+      if isinstance(node_to_remove_list, basestring):
+        node_to_remove_list = [node_to_remove_list]
+      for node_id in node_to_remove_list:
+        self.removeNodeFromFamily(node_id, family_name)
+      REQUEST.RESPONSE.redirect(
+        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
+        urllib.quote('Nodes removed from family.'),
+      )
+
+    def _checkFamilyName(self, name):
+      if not isinstance(name, basestring):
+        raise TypeError('Name must be a string')
+      if name in self._family_list:
+        raise ValueError('Already in use')
+      if name in ('', 'same'):
+        raise ValueError('Reserved family name')
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'createFamily')
+    def createFamily(self, name):
+      """
+      Raises ValueError if family already exists.
+      """
+      self._checkFamilyName(name)
+      new_family_list = []
+      for existing_name in self._family_list:
+        if existing_name is None and name is not None:
+          new_family_list.append(name)
+          name = None
+        else:
+          new_family_list.append(existing_name)
+      if name is None:
+        # A free spot has been recycled.
+        self._family_list = tuple(new_family_list)
+      else:
+        # No free spot, append.
+        self._family_list += (name, )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_createFamily')
+    def manage_createFamily(self, new_family_name, family_new_node_list=None, REQUEST):
+      """Create a family"""
+      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
+      if family_new_node_list is None:
+        family_new_node_list = []
+      elif isinstance(family_new_node_list, basestring):
+        family_new_node_list = [family_new_node_list]
+      try:
+        self.createFamily(new_family_name)
+        for node_id in family_new_node_list:
+          self.addNodeToFamily(node_id, new_family_name)
+      except ValueError as exc:
+        raise Redirect(redirect_url + urllib.quote(str(exc)))
+      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family created.'))
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'renameFamily')
+    def renameFamily(self, old_name, new_name):
+      """
+      Raises ValueError if old_name does not exist.
+      """
+      self._checkFamilyName(new_name)
+      family_list = self._family_list
+      if old_name not in family_list:
+        raise ValueError('Unknown family')
+      self._family_list = tuple(
+        new_name if x == old_name else x
+        for x in family_list
+      )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_renameFamily')
+    def manage_renameFamily(self, REQUEST):
+      """Rename a family"""
+      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
+      old_family_name = REQUEST['manage_renameFamily']
+      new_family_name = REQUEST['family_new_name_' + old_family_name]
+      try:
+        self.renameFamily(old_family_name, new_family_name)
+      except ValueError as exc:
+        raise Redirect(redirect_url + urllib.quote(str(exc)))
+      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family renamed.'))
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'deleteFamily')
+    def deleteFamily(self, name):
+      """
+      Raises ValueError if name does not exist.
+      """
+      for node_id in self._getNodeFamilyIdDict():
+        self.removeNodeFromFamily(node_id, name)
+      self._family_list = tuple(
+        None if x == name else x
+        for x in self._family_list
+      )
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_deleteFamily')
+    def manage_deleteFamily(self, REQUEST):
+      """Delete families"""
+      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
+      family_name = REQUEST['manage_deleteFamily']
+      try:
+        self.deleteFamily(family_name)
+      except ValueError as exc:
+        raise Redirect(redirect_url + urllib.quote(str(exc)))
+      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family deleted'))
+
+    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNameList')
+    def getFamilyNameList(self):
+      """
+      Return the list of existing family names.
+      """
+      return [x for x in self._family_list if x is not None]
+
+    def getFamilyNodeList(self, family_name):
+      """
+      Return the list of node names in given family.
+      """
+      family_id = self.getFamilyId(family_name)
+      return [
+        x
+        for x, y in self._getNodeFamilyIdDict().items()
+        if family_id in y
+      ]
+
     def registerNode(self, node):
       node_dict = self.getNodeDict()
       if node not in node_dict:
@@ -1117,13 +1319,17 @@ class ActivityTool (BaseTool):
         #      getPriority does not see messages dequeueMessage would process.
         activity_list = activity_dict.values()
         def sort_key(activity):
-          return activity.getPriority(self, processing_node)
+          return activity.getPriority(self, processing_node,
+            node_family_id_set)
         while is_running_lock.acquire(0):
+          # May have changed since previous iteration.
+          node_family_id_set = self.getCurrentNodeFamilyIdSet()
           try:
             activity_list.sort(key=sort_key) # stable sort
             for i, activity in enumerate(activity_list):
               # Transaction processing is the responsability of the activity
-              if not activity.dequeueMessage(inner_self, processing_node):
+              if not activity.dequeueMessage(inner_self, processing_node,
+                node_family_id_set):
                 activity_list.append(activity_list.pop(i))
                 break
             else:
@@ -1222,7 +1428,8 @@ class ActivityTool (BaseTool):
         elif node == '':
           break
         elif node != 'same':
-          raise ValueError("Invalid node argument %r" % node)
+          kw['node'] = self.getFamilyId(node)
+          break
         try:
           kw['node'] = 1 + self.getNodeList(
             role=ROLE_PROCESSING).index(getCurrentNode())
diff --git a/product/CMFActivity/dtml/manageLoadBalancing.dtml b/product/CMFActivity/dtml/manageLoadBalancing.dtml
index 0c68d0e440..b6566147a5 100644
--- a/product/CMFActivity/dtml/manageLoadBalancing.dtml
+++ b/product/CMFActivity/dtml/manageLoadBalancing.dtml
@@ -217,6 +217,47 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
     <td>&nbsp;</td>
      <td>&nbsp;</td>
   </tr>
+  <tr class="list-header">
+    <td align="left" valign="top" colspan=2>
+      <div class="form-label">Node Families</div>
+    </td>
+  </tr>
+  <tr>
+    <td colspan=2>
+      <div style="float: left;">
+        Available Nodes:<br />
+        <select name="family_new_node_list" size="10" multiple="multiple" style="width: 100%;">
+        <dtml-in getNodeList prefix="node">
+          <option value="<dtml-var node_item>"><dtml-var node_item></option>
+        </dtml-in>
+        </select><br/>
+        <input name="new_family_name" />
+        <button type="submit" class="form-element" name="manage_createFamily:method">Create Family</button>
+      </div>
+      <dtml-in getFamilyNameList prefix="family">
+      <div style="float: left; margin-left: 1em;">
+        <input value="<dtml-var family_item>" name="family_new_name_<dtml-var family_item>" />(#<dtml-var expr="getFamilyId(family_item)">)<br />
+        <select name="family_member_set_<dtml-var family_item>" size="10" multiple="multiple" style="width: 100%;">
+        <dtml-in expr="getFamilyNodeList(family_item) or ['(no members)']" prefix="node">
+          <option value="<dtml-var node_item>"><dtml-var node_item></option>
+        </dtml-in>
+        </select><br />
+        <button type="submit" class="form-element" name="manage_removeNodeSetFromFamily:method" value="<dtml-var family_item>">Remove Nodes</button>
+        <button type="submit" class="form-element" name="manage_addNodeSetToFamily:method" value="<dtml-var family_item>">Add Nodes</button>
+        <div style="border: 1px solid #F00;">
+        Danger zone:<br />
+        <button type="submit" class="form-element" name="manage_renameFamily:method" value="<dtml-var family_item>">Rename</button>
+        <button type="submit" class="form-element" name="manage_deleteFamily:method" value="<dtml-var family_item>">Delete</button>
+        </div>
+      </div>
+      </dtml-in>
+      <div style="clear: both"></div>
+    </td>
+  </tr>
+  <tr>
+    <td>&nbsp;</td>
+    <td>&nbsp;</td>
+  </tr>
   <tr class="list-header">
     <td align="left" valign="top" colspan=2>
       <div class="form-label">Subscribe/Unsubscribe from Timer Service</div>
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index 84bed8f04a..25a48940da 100644
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -2499,6 +2499,108 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
           self.assertEqual(o.getTitle(), title, (activities, expected))
         self.assertFalse(activity_tool.getMessageList())
 
+  def test_nodeFamilies(self):
+    """
+    Test node families, i.e. 'node' parameter of activate() beyond "", "same"
+    and None.
+    """
+    activity_tool = self.portal.portal_activities
+    node_id, = activity_tool.getNodeDict()
+    other = 'boo'
+    member = 'foo'
+    non_member = 'bar'
+    does_not_exist = 'baz'
+
+    # Family declaration API
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [])
+    self.assertRaises(
+        ValueError,
+        activity_tool.createFamily, 'same', # Reserved name
+    )
+    self.assertRaises(
+        TypeError,
+        activity_tool.createFamily, -5, # Not a string
+    )
+    activity_tool.createFamily(other)
+    self.assertRaises(
+        ValueError,
+        activity_tool.createFamily, other, # Exists
+    )
+    activity_tool.createFamily(member)
+    self.assertRaises(
+        ValueError,
+        activity_tool.renameFamily, other, member, # New name exists
+    )
+    self.assertRaises(
+        ValueError,
+        activity_tool.renameFamily, does_not_exist, member, # Old name does not exist
+    )
+    self.assertRaises(
+        TypeError,
+        activity_tool.renameFamily, other, -4, # New name not a string
+    )
+    activity_tool.deleteFamily(member)
+    # Silent success
+    activity_tool.deleteFamily(member)
+    activity_tool.createFamily(non_member)
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
+
+    # API for node a-/di-ssociation with/from families
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [])
+    activity_tool.addNodeToFamily(node_id, other)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
+    # Silent success
+    activity_tool.addNodeToFamily(node_id, other)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
+    activity_tool.addNodeToFamily(node_id, non_member)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, non_member])
+    activity_tool.removeNodeFromFamily(node_id, non_member)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
+    # Silent success
+    activity_tool.removeNodeFromFamily(node_id, non_member)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
+    activity_tool.createFamily(does_not_exist)
+    activity_tool.addNodeToFamily(node_id, does_not_exist)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, does_not_exist])
+    activity_tool.deleteFamily(does_not_exist)
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
+    activity_tool.renameFamily(other, member)
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
+    activity_tool.createFamily(other)
+    activity_tool.addNodeToFamily(node_id, other)
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member, other])
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member, other])
+    activity_tool.deleteFamily(other)
+
+    self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
+    self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
+    o = self.getOrganisation()
+    for activity in 'SQLDict', 'SQLQueue':
+      # Sanity check.
+      self.assertEqual(self.getMessageList(activity), [])
+      self.assertRaises(
+        ValueError,
+        o.activate, activity=activity, node=does_not_exist,
+      )
+      for node, expected in (member, '1'), (non_member, '0'), ('', '1'), ('same', '1'):
+        o._setTitle('0')
+        o.activate(activity=activity, node=node)._setTitle('1')
+        self.commit()
+        self.ticOnce()
+        self.assertEqual(
+          o.getTitle(),
+          expected,
+          (activity, o.getTitle(), expected),
+        )
+        if expected == '0':
+          # The activity must still exist, waiting for a node of the
+          # appropriate family.
+          result = self.getMessageList(activity)
+          self.assertEqual(len(result), 1)
+          self.deleteMessageList(activity, result)
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCMFActivity))
-- 
2.30.9