Commit 6f90a906 authored by Nicolas Wavrant's avatar Nicolas Wavrant

tests: add StoredSequence to restore a test state

parent a72c1dd8
...@@ -217,6 +217,10 @@ DateTime._parse_args = _parse_args ...@@ -217,6 +217,10 @@ DateTime._parse_args = _parse_args
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
"""Mixin class for ERP5 based tests. """Mixin class for ERP5 based tests.
""" """
def __init__(self, *args, **kw):
super(ERP5TypeTestCaseMixin, self).__init__(*args, **kw)
self.sequence_string_registry = {}
def dummy_test(self): def dummy_test(self):
ZopeTestCase._print('All tests are skipped when --save option is passed ' ZopeTestCase._print('All tests are skipped when --save option is passed '
'with --update_business_templates or without --load') 'with --update_business_templates or without --load')
...@@ -864,6 +868,53 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -864,6 +868,53 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
setup_once() setup_once()
ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start))
def _getCleanupDict(self):
"""
You must override this. Return the documents that should be
stored while saving/restoring a StoredSequence as a dict,
the keys being the module containing them, and the values
the list of ids of documents
"""
return {}
def registerSequenceString(self, sequence_title, sequence_string):
self.sequence_string_registry[sequence_title] = sequence_string
def getSequenceString(self, sequence_title):
return self.sequence_string_registry[sequence_title]
def stepStoreSequence(self, sequence):
sequence_title = "sequence_title"
document_dict = self._getCleanupDict()
if sequence_title in self.portal.portal_trash:
self.portal.portal_trash.manage_delObjects(ids=[sequence_title])
trashbin_value = self.portal.portal_trash.newContent(
portal_type="Trash Bin",
id=sequence_title,
title=sequence_title,
serialized_sequence=sequence.serializeSequenceDict(),
document_dict=document_dict,
)
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
self.portal.portal_trash.backupObject(
trashbin_value, [module_id], object_id, save=True, keep_subobjects=True
)
def stepRestoreSequence(self, sequence):
sequence_title = "sequence_title"
trashbin_value = self.portal.portal_trash[sequence_title]
document_dict = trashbin_value.getProperty('document_dict')
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
self.portal.portal_trash.restoreObject(
trashbin_value, [module_id], object_id, pass_if_exist=True
)
sequence.deserializeSequenceDict(
trashbin_value.getProperty("serialized_sequence"),
)
self.tic()
class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin): class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
__original_ZMySQLDA_connect = None __original_ZMySQLDA_connect = None
......
...@@ -175,6 +175,109 @@ class Sequence: ...@@ -175,6 +175,109 @@ class Sequence:
step = step[4:] step = step[4:]
self.addStep(step) self.addStep(step)
class StoredSequence(Sequence):
"""A StoredSequence is a Sequence that can store an ERP5's state into
a Trash Bin and restore it before before being played. If the state is
not stored yet, then it will create it then store it.
This capability is interesting when multiple tests share a same initial
state, as the state needs to be generated only once and can be reused
for all of them.
"""
def __init__(self, context, id):
Sequence.__init__(self, context)
self._id = id
def serialiseSequenceDict(self):
def _serialise(key, value):
result_dict = {'key': key}
if (
isinstance(value, str) or
isinstance(value, int) or
isinstance(value, float) or
isinstance(value, dict) or
value is None
):
result_dict['type'] = "raw"
result_dict['value'] = value
elif isinstance(value, list):
result_dict['type'] = "list"
result_dict['value'] = [_serialise(key, x) for x in value]
else:
result_dict['type'] = "erp5_object"
result_dict['value'] = value.getRelativeUrl()
return result_dict
result_list = []
for key, value in self._dict.iteritems():
result_list.append(_serialise(key, value))
return result_list
def deserialiseSequenceDict(self, data):
portal = self._context.getPortalObject()
def _deserialise(serialised_dict):
if serialised_dict["type"] == "raw":
return serialised_dict["value"]
elif serialised_dict["type"] == "list":
return [_deserialise(x) for x in serialised_dict["value"]]
elif serialised_dict["type"] == "erp5_object":
return portal.restrictedTraverse(serialised_dict['value'])
else:
raise TypeError("Unknown serialised type %s", serialised_dict["type"])
for serialised_dict in data:
self._dict[serialised_dict['key']] = _deserialise(serialised_dict)
def store(self, context):
context.login()
document_dict = context._getCleanupDict()
if self._id in context.portal.portal_trash:
context.portal.portal_trash.manage_delObjects(ids=[self._id])
trashbin_value = context.portal.portal_trash.newContent(
portal_type="Trash Bin",
id=self._id,
title=self._id,
serialised_sequence=self.serialiseSequenceDict(),
document_dict=document_dict,
)
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
context.portal.portal_trash.backupObject(
trashbin_value, [module_id], object_id, save=True, keep_subobjects=True
)
context.tic()
context.logout()
def restore(self, context):
context.login()
trashbin_value = context.portal.portal_trash[self._id]
document_dict = trashbin_value.getProperty('document_dict')
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
context.portal.portal_trash.restoreObject(
trashbin_value, [module_id], object_id, pass_if_exist=True
)
self.deserialiseSequenceDict(
trashbin_value.getProperty("serialised_sequence"),
)
context.tic()
context.logout()
def play(self, context, **kw):
portal = self._context.getPortal()
if getattr(portal.portal_trash, self._id, None) is None:
ZopeTestCase._print('\nRunning and saving stored sequence \"%s\" ...' % self._id)
sequence = Sequence()
sequence.setSequenceString(context.getSequenceString(self._id))
sequence.play(context)
self._dict = sequence._dict.copy()
self.store(context)
else:
ZopeTestCase._print('\nRestoring stored sequence \"%s\" ...' % self._id)
self.restore(context)
Sequence.play(self, context, **kw)
class SequenceList: class SequenceList:
def __init__(self): def __init__(self):
......
##############################################################################
#
# Copyright (c) 2021 Nexedi SARL and Contributors. All Rights Reserved.
# Nicolas Wavrant <nicolas.wavrant@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList, StoredSequence
class TestStoredSequence(ERP5TypeTestCase):
def afterSetUp(self):
self.portal = self.getPortalObject()
def getBusinessTemplateList(self):
return ('erp5_base',)
registerSequenceString = ERP5TypeTestCase.registerSequenceString
def _getCleanupDict(self):
return {
"person_module": list(self.portal.person_module.objectIds()),
}
def stepLogin(self, sequence):
self.login()
def stepCreatePerson(self, sequence):
sequence['person'] = self.portal.person_module.newContent(
id="person",
title="Person",
)
def stepUpdatePerson1(self, sequence):
sequence['person'].setTitle(sequence['person'].getTitle() + " 1")
def stepUpdatePerson2(self, sequence):
sequence['person'].setTitle(sequence['person'].getTitle() + " 2")
def stepFillSequenceDict(self, sequence):
sequence["string"] = "a string"
sequence["int"] = 10
sequence["float"] = 3.14
sequence["erp5_document"] = self.portal.person_module.newContent(
portal_type="Person",
id="erp5_document_0",
)
sequence["list_of_int"] = [1, 2]
sequence["list_of_erp5_document"] = [
self.portal.person_module.newContent(
portal_type="Person",
id="erp5_document_%d" % i,
) for i in range(1, 3)
]
def test_storedSequenceCanRestoreAState(self):
sequence_id = "sequence_can_restore"
self.registerSequenceString(sequence_id, """
stepCreatePerson
""")
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepUpdatePerson1")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
self.assertEqual(self.portal.person_module.person.getTitle(), "Person 1")
trashbin_value = self.portal.portal_trash[sequence_id]
self.assertEqual(trashbin_value.person_module.person.getTitle(), "Person")
self.assertEqual(
trashbin_value.getProperty("serialised_sequence"),
({"key": "person", "type": "erp5_object", "value": "person_module/person"},)
)
self.portal.person_module.manage_delObjects(ids=["person"])
# Run new sequence, with same base sequence.
# Update the title of the person document in the trashbin to be
# sure it has been restored from trash and not created
trashbin_value.person_module.person.setTitle("Trash Person")
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepUpdatePerson2")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
self.assertEqual(trashbin_value.person_module.person.getTitle(), "Trash Person")
self.assertEqual(self.portal.person_module.person.getTitle(), "Trash Person 2")
def test_serialisationOfSequenceDict(self):
sequence_id = "serialisation"
self.registerSequenceString(sequence_id, "stepFillSequenceDict")
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepLogin")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
sequence_dict = sequence._dict
# sequence._dict will be recalculated
sequence.deserialiseSequenceDict(self.portal.portal_trash[sequence_id].serialised_sequence)
self.assertEqual(
sequence_dict,
sequence._dict,
)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestStoredSequence))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment