Commit 789dbca9 authored by Sebastien Robin's avatar Sebastien Robin

added test for subtransaction


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@877 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 8712bfbf
...@@ -249,9 +249,9 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -249,9 +249,9 @@ class TestCMFActivity(ERP5TypeTestCase):
def TryTwoMethodsAndFlushThem(self, activity): def TryTwoMethodsAndFlushThem(self, activity):
portal = self.getPortal() portal = self.getPortal()
def DeferredSetTitle(self,value): def DeferredSetTitle(self,value):
self.setTitle(value) self.activate(activity=activity).setTitle(value)
def DeferredSetDescription(self,value): def DeferredSetDescription(self,value):
self.setDescription(value) self.activate(activity=activity).setDescription(value)
from Products.ERP5Type.Document.Organisation import Organisation from Products.ERP5Type.Document.Organisation import Organisation
Organisation.DeferredSetTitle = DeferredSetTitle Organisation.DeferredSetTitle = DeferredSetTitle
Organisation.DeferredSetDescription = DeferredSetDescription Organisation.DeferredSetDescription = DeferredSetDescription
...@@ -270,6 +270,37 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -270,6 +270,37 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getTitle(),self.title1) self.assertEquals(organisation.getTitle(),self.title1)
self.assertEquals(organisation.getDescription(),self.title1) self.assertEquals(organisation.getDescription(),self.title1)
def TryActivateFlushActivateTic(self, activity,second=None,commit_sub=0):
portal = self.getPortal()
def DeferredSetTitle(self,value,commit_sub=0):
if commit_sub:
get_transaction().commit(1)
self.activate(activity=second or activity,priority=4).setTitle(value)
def DeferredSetDescription(self,value,commit_sub=0):
if commit_sub:
get_transaction().commit(1)
self.activate(activity=second or activity,priority=4).setDescription(value)
from Products.ERP5Type.Document.Organisation import Organisation
Organisation.DeferredSetTitle = DeferredSetTitle
Organisation.DeferredSetDescription = DeferredSetDescription
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(None)
organisation.setDescription(None)
organisation.activate(activity=activity).DeferredSetTitle(self.title1,commit_sub=commit_sub)
organisation.flushActivity(invoke=1)
organisation.activate(activity=activity).DeferredSetDescription(self.title1,commit_sub=commit_sub)
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(organisation.getTitle(),self.title1)
self.assertEquals(organisation.getDescription(),self.title1)
def TryMessageWithErrorOnActivity(self, activity): def TryMessageWithErrorOnActivity(self, activity):
portal = self.getPortal() portal = self.getPortal()
def crashThisActivity(self): def crashThisActivity(self):
...@@ -581,6 +612,89 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -581,6 +612,89 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryTwoMethodsAndFlushThem('RAMQueue') self.TryTwoMethodsAndFlushThem('RAMQueue')
def test_33_TryActivateFlushActivateTicWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Flush Activate Tic With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('SQLDict')
def test_34_TryActivateFlushActivateTicWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Flush Activate Tic With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('SQLQueue')
def test_35_TryActivateFlushActivateTicWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Flush Activate Tic With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('RAMDict')
def test_36_TryActivateFlushActivateTicWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Flush Activate Tic With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('RAMQueue')
def test_37_TryActivateFlushActivateTicWithMultipleActivities(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Flush Activate Tic With MultipleActivities '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('SQLQueue',second='SQLDict')
self.TryActivateFlushActivateTic('SQLDict',second='SQLQueue')
def test_38_TryCommitSubTransactionWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Commit Sub Transaction With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('SQLDict',commit_sub=1)
def test_39_TryCommitSubTransactionWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Commit Sub Transaction With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('SQLQueue',commit_sub=1)
def test_40_TryCommitSubTransactionWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Commit Sub Transaction With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('RAMDict',commit_sub=1)
def test_41_TryCommitSubTransactionWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Commit Sub Transaction With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('RAMQueue',commit_sub=1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment