From dba0b9ac64749b13997976d6a3f2a810f898f37a Mon Sep 17 00:00:00 2001 From: Kazuhiko Shiozaki <kazuhiko@nexedi.com> Date: Mon, 25 Feb 2008 19:32:32 +0000 Subject: [PATCH] support setUpOnce() method that is only called once par setting up ERP5 site. its result will be saved with --save. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19499 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5Type/tests/ERP5TypeTestCase.py | 435 +++++++++++---------- 1 file changed, 224 insertions(+), 211 deletions(-) diff --git a/product/ERP5Type/tests/ERP5TypeTestCase.py b/product/ERP5Type/tests/ERP5TypeTestCase.py index d6766c9509..deef15ed06 100644 --- a/product/ERP5Type/tests/ERP5TypeTestCase.py +++ b/product/ERP5Type/tests/ERP5TypeTestCase.py @@ -351,14 +351,12 @@ class ERP5TypeTestCase(PortalTestCase): light_install = self.enableLightInstall() create_activities = self.enableActivityTool() hot_reindexing = self.enableHotReindexing() - setupERP5Site(business_template_list=new_template_list, - light_install=light_install, - portal_name=self.getPortalName(), - title=self.getTitle(), - create_activities=create_activities, - quiet=install_bt5_quiet, - hot_reindexing=hot_reindexing, - erp5_catalog_storage=erp5_catalog_storage) + self.setUpERP5Site(business_template_list=new_template_list, + light_install=light_install, + create_activities=create_activities, + quiet=install_bt5_quiet, + hot_reindexing=hot_reindexing, + erp5_catalog_storage=erp5_catalog_storage) PortalTestCase.setUp(self) global current_app current_app = self.app @@ -557,224 +555,239 @@ class ERP5TypeTestCase(PortalTestCase): self.assertEqual(object.getSimulationState(), reference_workflow_state) return workflow_error_message -def setupERP5Site( business_template_list=(), - app=None, - portal_name=portal_name, - title='', - quiet=0, - light_install=1, - create_activities=1, - hot_reindexing=1, - erp5_catalog_storage='erp5_mysql_innodb_catalog'): - ''' - Creates an ERP5 site. - business_template_list must be specified correctly - (e.g. '("erp5_base", )'). - ''' - from Products.ERP5Type.Base import _aq_reset - if portal_name in failed_portal_installation: - raise RuntimeError, 'Installation of %s already failed, giving up'\ - % portal_name - try: - if app is None: - app = ZopeTestCase.app() - # this app will be closed after setUp, but keep an reference anyway, to - # make it's REQUEST available during setup - global current_app - current_app = app - - global setup_done - if not (hasattr(aq_base(app), portal_name) and - setup_done.get(tuple(business_template_list))): - setup_done[tuple(business_template_list)] = 1 - try: - _start = time.time() - # Add user and log in - if not quiet: - ZopeTestCase._print('\nAdding ERP5TypeTestCase user ... \n') - uf = app.acl_users - uf._doAddUser('ERP5TypeTestCase', '', ['Manager', 'Member', 'Assignee', - 'Assignor', 'Author', 'Auditor', 'Associate'], []) - user = uf.getUserById('ERP5TypeTestCase').__of__(uf) - newSecurityManager(None, user) - # Add ERP5 Site - reindex = 1 - if hot_reindexing: - setattr(app, 'isIndexable', 0) - reindex = 0 - - portal = getattr(app, portal_name, None) - if portal is None: + def setUpERP5Site(self, + business_template_list=(), + app=None, + quiet=0, + light_install=1, + create_activities=1, + hot_reindexing=1, + erp5_catalog_storage='erp5_mysql_innodb_catalog'): + ''' + Creates an ERP5 site. + business_template_list must be specified correctly + (e.g. '("erp5_base", )'). + ''' + portal_name = self.getPortalName() + title = self.getTitle() + from Products.ERP5Type.Base import _aq_reset + if portal_name in failed_portal_installation: + raise RuntimeError, 'Installation of %s already failed, giving up'\ + % portal_name + try: + if app is None: + app = ZopeTestCase.app() + # this app will be closed after setUp, but keep an reference anyway, to + # make it's REQUEST available during setup + global current_app + current_app = app + + global setup_done + if not (hasattr(aq_base(app), portal_name) and + setup_done.get(tuple(business_template_list))): + setup_done[tuple(business_template_list)] = 1 + try: + _start = time.time() + # Add user and log in if not quiet: - ZopeTestCase._print('Adding %s ERP5 Site ... ' % portal_name) - - extra_constructor_kw = _getConnectionStringDict() - email_from_address = os.environ.get('email_from_address') - if email_from_address is not None: - extra_constructor_kw['email_from_address'] = email_from_address - - factory = app.manage_addProduct['ERP5'] - factory.manage_addERP5Site(portal_name, - erp5_catalog_storage=erp5_catalog_storage, - light_install=light_install, - reindex=reindex, - create_activities=create_activities, - **extra_constructor_kw ) - portal = app[portal_name] + ZopeTestCase._print('\nAdding ERP5TypeTestCase user ... \n') + uf = app.acl_users + uf._doAddUser('ERP5TypeTestCase', '', ['Manager', 'Member', 'Assignee', + 'Assignor', 'Author', 'Auditor', 'Associate'], []) + user = uf.getUserById('ERP5TypeTestCase').__of__(uf) + newSecurityManager(None, user) + # Add ERP5 Site + reindex = 1 + if hot_reindexing: + setattr(app, 'isIndexable', 0) + reindex = 0 + + portal = getattr(app, portal_name, None) + if portal is None: + if not quiet: + ZopeTestCase._print('Adding %s ERP5 Site ... ' % portal_name) + + extra_constructor_kw = _getConnectionStringDict() + email_from_address = os.environ.get('email_from_address') + if email_from_address is not None: + extra_constructor_kw['email_from_address'] = email_from_address + + factory = app.manage_addProduct['ERP5'] + factory.manage_addERP5Site(portal_name, + erp5_catalog_storage=erp5_catalog_storage, + light_install=light_install, + reindex=reindex, + create_activities=create_activities, + **extra_constructor_kw ) + portal = app[portal_name] - if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time() - _start)) - # Release locks - get_transaction().commit() + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time() - _start)) + # Release locks + get_transaction().commit() - if os.environ.get('erp5_load_data_fs'): - # Import local PropertySheets, Documents - for id_ in getLocalPropertySheetList(): - importLocalPropertySheet(id_) - for id_ in getLocalDocumentList(): - importLocalDocument(id_) - for id_ in getLocalConstraintList(): - importLocalConstraint(id_) - else: - # Remove all local PropertySheets, Documents - for id_ in getLocalPropertySheetList(): - removeLocalPropertySheet(id_) - for id_ in getLocalDocumentList(): - removeLocalDocument(id_) - for id_ in getLocalConstraintList(): - removeLocalConstraint(id_) - - update_business_templates = os.environ.get('update_business_templates') is not None - BusinessTemplate_getModifiedObject = aq_base(getattr(portal, 'BusinessTemplate_getModifiedObject', None)) - - # Disable reindexing before adding templates - # VERY IMPORTANT: Add some business templates - for url, bt_title in business_template_list: - start = time.time() - get_install_kw = False - if bt_title in [x.getTitle() for x in portal.portal_templates.getInstalledBusinessTemplateList()]: - if update_business_templates: - if not quiet: - ZopeTestCase._print('Updating %s business template ... ' % bt_title) - if BusinessTemplate_getModifiedObject is not None: - get_install_kw = True - else: - continue + if os.environ.get('erp5_load_data_fs'): + # Import local PropertySheets, Documents + for id_ in getLocalPropertySheetList(): + importLocalPropertySheet(id_) + for id_ in getLocalDocumentList(): + importLocalDocument(id_) + for id_ in getLocalConstraintList(): + importLocalConstraint(id_) else: + # Remove all local PropertySheets, Documents + for id_ in getLocalPropertySheetList(): + removeLocalPropertySheet(id_) + for id_ in getLocalDocumentList(): + removeLocalDocument(id_) + for id_ in getLocalConstraintList(): + removeLocalConstraint(id_) + + update_business_templates = os.environ.get('update_business_templates') is not None + BusinessTemplate_getModifiedObject = aq_base(getattr(portal, 'BusinessTemplate_getModifiedObject', None)) + + # Disable reindexing before adding templates + # VERY IMPORTANT: Add some business templates + for url, bt_title in business_template_list: + start = time.time() + get_install_kw = False + if bt_title in [x.getTitle() for x in portal.portal_templates.getInstalledBusinessTemplateList()]: + if update_business_templates: + if not quiet: + ZopeTestCase._print('Updating %s business template ... ' % bt_title) + if BusinessTemplate_getModifiedObject is not None: + get_install_kw = True + else: + continue + else: + if not quiet: + ZopeTestCase._print('Adding %s business template ... ' % bt_title) + bt = portal.portal_templates.download(url) if not quiet: - ZopeTestCase._print('Adding %s business template ... ' % bt_title) - bt = portal.portal_templates.download(url) - if not quiet: - ZopeTestCase._print('(imported in %.3fs) ' % (time.time() - start)) - install_kw = None - if get_install_kw: - listbox_object_list = BusinessTemplate_getModifiedObject.__of__(bt)() - for listbox_line in listbox_object_list: - install_kw[listbox_line.object_id] = listbox_line.choice_item_list[0][1] - bt.install(light_install=light_install, object_to_update=install_kw) - # Release locks + ZopeTestCase._print('(imported in %.3fs) ' % (time.time() - start)) + install_kw = None + if get_install_kw: + listbox_object_list = BusinessTemplate_getModifiedObject.__of__(bt)() + for listbox_line in listbox_object_list: + install_kw[listbox_line.object_id] = listbox_line.choice_item_list[0][1] + bt.install(light_install=light_install, object_to_update=install_kw) + # Release locks + get_transaction().commit() + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) + + setup_once = getattr(self, 'setUpOnce', None) + if setup_once is not None and \ + not getattr(portal, 'set_up_once_called', 0): + portal.set_up_once_called = 1 + if not quiet: + ZopeTestCase._print('Executing setUpOnce ... ') + start = time.time() + # setUpOnce method may use self.app and self.portal + self.app = app + self.portal = portal + setup_once() + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) + + # Enable reindexing + # Do hot reindexing # Does not work + if hot_reindexing: + setattr(app,'isIndexable', 1) + portal.portal_catalog.manage_hotReindexAll() + get_transaction().commit() - if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) - # Enable reindexing - # Do hot reindexing # Does not work - if hot_reindexing: - setattr(app,'isIndexable', 1) - portal.portal_catalog.manage_hotReindexAll() + portal_activities = getattr(portal, 'portal_activities', None) + if portal_activities is not None: + if not quiet: + ZopeTestCase._print('Executing pending activities ... ') + start = time.time() + count = 1000 + message_count = len(portal_activities.getMessageList()) + while message_count > 0: + portal_activities.distribute() + portal_activities.tic() + get_transaction().commit() + new_message_count = len(portal_activities.getMessageList()) + if new_message_count != message_count: + if not quiet: + ZopeTestCase._print('%i ' % (message_count, )) + message_count = new_message_count + count -= 1 + if count == 0: + raise RuntimeError, \ + 'tic is looping forever. These messages are pending: %r' % ( + [('/'.join(m.object_path), m.method_id, + m.processing_node, m.priority) + for m in portal_activities.getMessageList()],) + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) - get_transaction().commit() + # Reset aq dynamic, so all unit tests will start again + _aq_reset() - portal_activities = getattr(portal, 'portal_activities', None) - if portal_activities is not None: - if not quiet: - ZopeTestCase._print('Executing pending activities ... ') - start = time.time() - count = 1000 - message_count = len(portal_activities.getMessageList()) - while message_count > 0: - portal_activities.distribute() - portal_activities.tic() + if os.environ.get('erp5_save_data_fs'): + # Quit the test in order to get a clean site + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time()-_start,)) get_transaction().commit() - new_message_count = len(portal_activities.getMessageList()) - if new_message_count != message_count: - if not quiet: - ZopeTestCase._print('%i ' % (message_count, )) - message_count = new_message_count - count -= 1 - if count == 0: - raise RuntimeError, \ - 'tic is looping forever. These messages are pending: %r' % ( - [('/'.join(m.object_path), m.method_id, - m.processing_node, m.priority) - for m in portal_activities.getMessageList()],) - if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start)) - - # Reset aq dynamic, so all unit tests will start again - _aq_reset() + ZopeTestCase.close(app) + if not quiet: + ZopeTestCase._print('Data.fs created\n') + get_transaction().commit() + ZopeTestCase.close(app) + instance_home = os.environ['INSTANCE_HOME'] + command = 'mysqldump %s > %s/dump.sql' \ + % (getMySQLArguments(), instance_home) + if not quiet: + ZopeTestCase._print('Dumping MySQL database with %s... ' \ + % command) + os.system(command) + if not quiet: + ZopeTestCase._print('done\n') + if not quiet: + ZopeTestCase._print('Dumping static files... ') + for dir in ('Constraint', 'Document', 'PropertySheet'): + os.system('rm -rf %s/%s.bak' % (instance_home, dir)) + os.system('cp -ar %s/%s %s/%s.bak' % (instance_home, dir, instance_home, dir)) + if not quiet: + ZopeTestCase._print('done\n') - if os.environ.get('erp5_save_data_fs'): - # Quit the test in order to get a clean site + # Log out if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time()-_start,)) - get_transaction().commit() - ZopeTestCase.close(app) + ZopeTestCase._print('Logout ... \n') + noSecurityManager() if not quiet: - ZopeTestCase._print('Data.fs created\n') + ZopeTestCase._print('done (%.3fs)\n' % (time.time()-_start,)) + ZopeTestCase._print('Ran Unit test of %s\n' % title) + except: + get_transaction().abort() + raise + else: get_transaction().commit() ZopeTestCase.close(app) - instance_home = os.environ['INSTANCE_HOME'] - command = 'mysqldump %s > %s/dump.sql' \ - % (getMySQLArguments(), instance_home) - if not quiet: - ZopeTestCase._print('Dumping MySQL database with %s... ' \ - % command) - os.system(command) - if not quiet: - ZopeTestCase._print('done\n') - if not quiet: - ZopeTestCase._print('Dumping static files... ') - for dir in ('Constraint', 'Document', 'PropertySheet'): - os.system('rm -rf %s/%s.bak' % (instance_home, dir)) - os.system('cp -ar %s/%s %s/%s.bak' % (instance_home, dir, instance_home, dir)) - if not quiet: - ZopeTestCase._print('done\n') - - # Log out - if not quiet: - ZopeTestCase._print('Logout ... \n') - noSecurityManager() - if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time()-_start,)) - ZopeTestCase._print('Ran Unit test of %s\n' % title) - except: - get_transaction().abort() - raise - else: - get_transaction().commit() - ZopeTestCase.close(app) - - if os.environ.get('erp5_load_data_fs'): - # Import local PropertySheets, Documents - # when loading an environnement - for id_ in getLocalPropertySheetList(): - importLocalPropertySheet(id_) - for id_ in getLocalDocumentList(): - importLocalDocument(id_) - for id_ in getLocalConstraintList(): - importLocalConstraint(id_) - _aq_reset() - - except: - f = StringIO() - traceback.print_exc(file=f) - ZopeTestCase._print(f.getvalue()) - f.close() - failed_portal_installation[portal_name] = 1 - ZopeTestCase._print('Ran Unit test of %s (installation failed)\n' - % title) # run_unit_test depends on this string. - raise + + if os.environ.get('erp5_load_data_fs'): + # Import local PropertySheets, Documents + # when loading an environnement + for id_ in getLocalPropertySheetList(): + importLocalPropertySheet(id_) + for id_ in getLocalDocumentList(): + importLocalDocument(id_) + for id_ in getLocalConstraintList(): + importLocalConstraint(id_) + _aq_reset() + + except: + f = StringIO() + traceback.print_exc(file=f) + ZopeTestCase._print(f.getvalue()) + f.close() + failed_portal_installation[portal_name] = 1 + ZopeTestCase._print('Ran Unit test of %s (installation failed)\n' + % title) # run_unit_test depends on this string. + raise from unittest import _makeLoader, TestSuite -- 2.30.9