Commit a490a99a authored by Łukasz Nowak's avatar Łukasz Nowak

Drop VifibMixin

parent 356b842a
...@@ -29,9 +29,14 @@ ...@@ -29,9 +29,14 @@
import random import random
import transaction import transaction
import unittest import unittest
import Products.Vifib.tests.VifibMixin from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import functools import functools
from Products.ERP5Type.tests.utils import DummyMailHost from Products.ERP5Type.tests.utils import DummyMailHost
from Products.ERP5Type.Utils import convertToUpperCase
import os
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager
def withAbort(func): def withAbort(func):
@functools.wraps(func) @functools.wraps(func)
...@@ -42,7 +47,67 @@ def withAbort(func): ...@@ -42,7 +47,67 @@ def withAbort(func):
transaction.abort() transaction.abort()
return wrapped return wrapped
class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin): class testSlapOSMixin(ERP5TypeTestCase):
def clearCache(self):
self.portal.portal_caches.clearAllCache()
self.portal.portal_workflow.refreshWorklistCache()
def getDefaultSitePreferenceId(self):
"""Default id, usefull method to override
"""
return "slapos_default_system_preference"
def setUpMemcached(self):
from Products.ERP5Type.tests.ERP5TypeTestCase import\
_getVolatileMemcachedServerDict, _getPersistentMemcachedServerDict
memcached_tool = self.getPortal().portal_memcached
# setup default volatile distributed memcached
connection_dict = _getVolatileMemcachedServerDict()
url_string = '%(hostname)s:%(port)s' % connection_dict
memcached_tool.default_memcached_plugin.setUrlString(url_string)
# setup default persistent distributed memcached
connection_dict = _getPersistentMemcachedServerDict()
url_string = '%(hostname)s:%(port)s' % connection_dict
memcached_tool.persistent_memcached_plugin.setUrlString(url_string)
def createAlarmStep(self):
def makeCallAlarm(alarm):
def callAlarm(*args, **kwargs):
sm = getSecurityManager()
self.login()
try:
alarm.activeSense(params=kwargs)
transaction.commit()
finally:
setSecurityManager(sm)
return callAlarm
for alarm in self.portal.portal_alarms.contentValues():
if alarm.isEnabled():
setattr(self, 'stepCall' + convertToUpperCase(alarm.getId()) \
+ 'Alarm', makeCallAlarm(alarm))
def setupPortalCertificateAuthority(self):
"""Sets up portal_certificate_authority"""
if not self.portal.hasObject('portal_certificate_authority'):
self.portal.manage_addProduct['ERP5'].manage_addTool(
'ERP5 Certificate Authority Tool', None)
self.portal.portal_certificate_authority.certificate_authority_path = \
os.environ['TEST_CA_PATH']
transaction.commit()
# reset test CA to have it always count from 0
open(os.path.join(os.environ['TEST_CA_PATH'], 'serial'), 'w').write('01')
open(os.path.join(os.environ['TEST_CA_PATH'], 'crlnumber'), 'w').write(
'01')
open(os.path.join(os.environ['TEST_CA_PATH'], 'index.txt'), 'w').write('')
def setupPortalAlarms(self):
if not self.portal.portal_alarms.isSubscribed():
self.portal.portal_alarms.subscribe()
self.assertTrue(self.portal.portal_alarms.isSubscribed())
def isLiveTest(self):
return 'ERP5TypeLiveTestCase' in [q.__name__ for q in self.__class__.mro()]
def _setUpDummyMailHost(self): def _setUpDummyMailHost(self):
"""Do not play with NON persistent replacement of MailHost""" """Do not play with NON persistent replacement of MailHost"""
...@@ -59,6 +124,12 @@ class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin): ...@@ -59,6 +124,12 @@ class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin):
self.deSetUpPersistentDummyMailHost() self.deSetUpPersistentDummyMailHost()
return return
def getUserFolder(self):
"""
Return the user folder
"""
return getattr(self.getPortal(), 'acl_users', None)
def afterSetUp(self): def afterSetUp(self):
self.login() self.login()
self.createAlarmStep() self.createAlarmStep()
......
This diff is collapsed.
...@@ -109,6 +109,128 @@ class TestVifibSlapWebServiceMixin(testVifibMixin): ...@@ -109,6 +109,128 @@ class TestVifibSlapWebServiceMixin(testVifibMixin):
assertUserCanAccessDocument =\ assertUserCanAccessDocument =\
AssertPermissionMethod(Permissions.AccessContentsInformation) AssertPermissionMethod(Permissions.AccessContentsInformation)
def stepTic(self, **kw):
def activateAlarm():
sm = getSecurityManager()
self.login()
try:
for alarm in self.portal.portal_alarms.contentValues():
if alarm.isEnabled() and (alarm.getId() not in \
('vifib_check_consistency',)):
alarm.activeSense()
finally:
setSecurityManager(sm)
if kw.get('sequence', None) is None:
# in case of using not in sequence commit transaction
transaction.commit()
# trigger activateAlarm before tic
activateAlarm()
transaction.commit()
self.tic()
# retrigger activateAlarm after tic
activateAlarm()
transaction.commit()
# tic after activateAlarm
self.tic()
try:
self.checkDivergency()
except AssertionError:
# try last time to solve deliveries
sm = getSecurityManager()
self.login()
try:
self.portal.portal_alarms.vifib_update_delivery_causality_state\
.activeSense()
transaction.commit()
self.tic()
self.portal.portal_alarms.vifib_solve_automatically.activeSense()
transaction.commit()
self.tic()
finally:
setSecurityManager(sm)
self.checkDivergency()
def stepCleanTic(self, **kw):
self.tic()
def stepLogout(self, **kw):
self.logout()
# access related steps
def stepLoginDefaultUser(self, **kw):
self.login('default_user')
def stepLoginTestHrAdmin(self, **kw):
self.login('test_hr_admin')
def stepLoginTestUpdatedVifibUser(self, **kw):
self.login('test_updated_vifib_user')
def stepLoginTestVifibAdmin(self, **kw):
self.login('test_vifib_admin')
def stepLoginTestVifibCustomer(self, **kw):
self.login('test_vifib_customer')
def stepLoginTestVifibCustomerA(self, **kw):
self.login('test_vifib_customer_a')
def stepLoginTestVifibDeveloper(self, **kw):
self.login('test_vifib_developer')
def stepLoginTestVifibMember(self, **kw):
self.login('test_vifib_member')
def stepLoginTestVifibUserAdmin(self, **kw):
self.login('test_vifib_user_admin')
def stepLoginTestVifibUserDeveloper(self, **kw):
self.login('test_vifib_user_developer')
def stepLoginERP5TypeTestCase(self, **kw):
self.login('ERP5TypeTestCase')
def clearCache(self):
self.portal.portal_caches.clearAllCache()
self.portal.portal_workflow.refreshWorklistCache()
def checkDivergency(self):
# there shall be no divergency
current_skin = self.app.REQUEST.get('portal_skin', 'View')
try:
# Note: Worklists are cached, so in order to have next correct result
# clear cache
self.clearCache()
self.changeSkin('RSS')
diverged_document_list = self.portal.portal_catalog(
portal_type=self.portal.getPortalDeliveryTypeList(),
causality_state='!= solved'
)
self.assertFalse('to Solve' in self.portal.ERP5Site_viewWorklist(),
'There are unsolved deliveries: %s' % ','.join([
' '.join((q.getTitle(), q.getPath(), q.getCausalityState())) \
for q in diverged_document_list]))
finally:
self.changeSkin(current_skin)
def stepCheckSiteConsistency(self, **kw):
self.portal.portal_alarms.vifib_check_consistency.activeSense()
transaction.commit()
self.tic()
self.assertEqual([], self.portal.portal_alarms.vifib_check_consistency\
.Alarm_getConsistencyCheckReportLineList())
self.assertFalse(self.portal.portal_alarms.vifib_check_consistency.sense())
self.checkDivergency()
def markManualCreation(self, document):
self.portal.portal_workflow.doActionFor(document, 'edit_action',
comment='Manually created by test.')
def fakeSlapAuth(self): def fakeSlapAuth(self):
fakeSlapAuth() fakeSlapAuth()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment