From a3a1ef60bba1ee44f55e47ce3a6f243c2696322e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A9rome=20Perrin?= <jerome@nexedi.com>
Date: Tue, 13 Nov 2007 09:46:58 +0000
Subject: [PATCH] Test that selections are persistent and resist to concurent
 modifications.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17549 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/ERP5Form/tests/testSelectionTool.py | 126 ++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/product/ERP5Form/tests/testSelectionTool.py b/product/ERP5Form/tests/testSelectionTool.py
index e9b9e5cee7..73436b0d52 100644
--- a/product/ERP5Form/tests/testSelectionTool.py
+++ b/product/ERP5Form/tests/testSelectionTool.py
@@ -27,10 +27,15 @@
 ##############################################################################
 
 import unittest
+from threading import Thread
+from thread import get_ident
+
+from Testing import ZODButil
 
 from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 from AccessControl.SecurityManagement import newSecurityManager
 from Products.ERP5Form.Selection import Selection
+from Products.ERP5Form.SelectionTool import SelectionTool
 
 
 class TestSelectionTool(ERP5TypeTestCase):
@@ -231,7 +236,128 @@ class TestSelectionTool(ERP5TypeTestCase):
     self.assertEquals(None,
                       self.portal_selections.getSelectionIndexFor('test_selection'))
 
+
+
+class TestSelectionPersistence(unittest.TestCase):
+  """SelectionTool tests that needs a "real" FileStorage to make sure selection
+  are really persistent and supports conflict resolution.
+  """
+  def setUp(self):
+    # patch selection tool class so that we don't need a portal_membership to
+    # find the current user name
+    SelectionTool._getUserId_saved = SelectionTool._getUserId
+    SelectionTool._getUserId = lambda self: 'user'
+
+    self.db = ZODButil.makeDB()
+    self.cnx = self.db.open()
+    self.portal_selections = \
+      self.cnx.root().portal_selections = SelectionTool()
+    self.portal_selections.setSelectionFor('test_selection', Selection())
+    get_transaction().commit()
+    
+  def tearDown(self):
+    # revert the patch from setUp
+    SelectionTool._getUserId = SelectionTool._getUserId_saved
+    self.cnx.close()
+    ZODButil.cleanDB()
+  
+  def _runWithAnotherConnection(self, thread_func):
+    """runs `thread_func` with another ZODB connection
+
+    thread_func must be a callable accepting the connection object as only
+    argument.
+    """
+    t = Thread(target=thread_func, args=(self.db.open(),))
+    t.start()
+    t.join(60)
+    self.assertFalse(t.isAlive())
+
+  def testSelectionParamConflictResolution(self):
+    # same user edits the same selection with two different parameters
+    self.portal_selections.setSelectionParamsFor(
+                       'test_selection', dict(a="b"))
+    def thread_func(cnx):
+      try:
+        portal_selections = cnx.root().portal_selections
+        portal_selections.setSelectionParamsFor(
+                              'test_selection', dict(a="c"))
+        get_transaction().commit()
+      finally:
+        cnx.close()
+    self._runWithAnotherConnection(thread_func)
+
+    # This would raise a ConflictError without conflict resolution code
+    get_transaction().commit()
+    params = self.portal_selections.getSelectionParamsFor('test_selection')
+    self.assertTrue(params.get('a'))
+
+  def testSelectionNameConflictResolution(self):
+    # same user edits two different selections
+    self.portal_selections.setSelectionParamsFor(
+                       'test_selection2', dict(a="b"))
+    def thread_func(cnx):
+      try:
+        portal_selections = cnx.root().portal_selections
+        portal_selections.setSelectionParamsFor(
+                       'test_selection1', dict(a="b"))
+        get_transaction().commit()
+      finally:
+        cnx.close()
+    self._runWithAnotherConnection(thread_func)
+
+    # This would raise a ConflictError without conflict resolution code
+    get_transaction().commit()
+    params = self.portal_selections.getSelectionParamsFor('test_selection1')
+    self.assertEquals(params.get('a'), 'b')
+    params = self.portal_selections.getSelectionParamsFor('test_selection2')
+    self.assertEquals(params.get('a'), 'b')
+
+  def testDifferentUsernameConflictResolution(self):
+    # different users edits selections
+    SelectionTool._getUserId = lambda self: 'user-%s' % get_ident()
+    # Note that in current implementation, the first time we initialized a
+    # selection for a user the mapping user -> selections is modified, which
+    # will generate a conflict if we have two new users at the same time.
+    # This test just checks that once we have initialized a user it doesn't
+    # generate conflicts when another users also modifies it owns selection.
+    # So we make sure that selection container is initialized for this user
+    self.portal_selections.setSelectionParamsFor(
+                       'test_selection', dict(initialized="1"))
+    get_transaction().commit()
+
+    self.portal_selections.setSelectionParamsFor(
+                       'test_selection', dict(a="b"))
+    def thread_func(cnx):
+      try:
+        portal_selections = cnx.root().portal_selections
+        portal_selections.setSelectionParamsFor(
+                       'test_selection', dict(a="b"))
+        get_transaction().commit()
+      finally:
+        cnx.close()
+    self._runWithAnotherConnection(thread_func)
+
+    get_transaction().commit()
+    # this check is quite low level.
+    # we know that setUp stored one selection, and each of our 2 threads stored
+    # one selection.
+    self.assertEquals(3, len(self.portal_selections.selection_data.keys()))
+
+  def testPersistentSelections(self):
+    # test that selection parameters are persistent
+    self.portal_selections.setSelectionParamsFor(
+                 'test_selection', dict(key="saved_value"))
+    get_transaction().commit()
+    self.cnx.close()
+
+    self.cnx = self.db.open()
+    portal_selections = self.cnx.root().portal_selections
+    self.assertEquals('saved_value',
+        portal_selections.getSelectionParamsFor('test_selection').get('key'))
+
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestSelectionTool))
+  suite.addTest(unittest.makeSuite(TestSelectionPersistence))
   return suite
-- 
2.30.9