From 46e300cb640fb821ed718969c9c42004e28f6ca5 Mon Sep 17 00:00:00 2001 From: Romain Courteaud <romain@nexedi.com> Date: Tue, 19 Feb 2008 17:11:51 +0000 Subject: [PATCH] Instead of generating security query which hardcoded column names, use new configuration parameter defined on the catalog tool. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19414 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5Catalog/CatalogTool.py | 59 ++++-- product/ERP5Catalog/tests/testERP5Catalog.py | 196 ++++++++++++++++++- 2 files changed, 239 insertions(+), 16 deletions(-) diff --git a/product/ERP5Catalog/CatalogTool.py b/product/ERP5Catalog/CatalogTool.py index aa91e32053..54bc9aeccf 100644 --- a/product/ERP5Catalog/CatalogTool.py +++ b/product/ERP5Catalog/CatalogTool.py @@ -197,7 +197,6 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): manage_options = ({ 'label' : 'Overview', 'action' : 'manage_overview' }, ) + ZCatalog.manage_options - def __init__(self): ZCatalog.__init__(self, self.getId()) @@ -447,12 +446,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): user_is_superuser = (user_str == SUPER_USER) allowedRolesAndUsers = self._listAllowedRolesAndUsers(user) role_column_dict = {} - column_map = self.getSQLCatalog(sql_catalog_id).getColumnMap() + local_role_column_dict = {} + catalog = self.getSQLCatalog(sql_catalog_id) + column_map = catalog.getColumnMap() # Patch for ERP5 by JP Smets in order # to implement worklists and search of local roles if kw.has_key('local_roles'): local_roles = kw['local_roles'] + local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList()) + role_dict = dict(catalog.getSQLCatalogRoleKeysList()) # XXX user is not enough - we should also include groups of the user # Only consider local_roles if it is not empty if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough. @@ -460,27 +463,42 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): # Turn it into a list if necessary according to ';' separator if isinstance(local_roles, str): local_roles = local_roles.split(';') - local_roles = [x.lower() for x in local_roles] # Local roles now has precedence (since it comes from a WorkList) for user_or_group in allowedRolesAndUsers: for role in local_roles: # Performance optimisation - if role in column_map: + if local_role_dict.has_key(role): + # XXX This should be a list # If a given role exists as a column in the catalog, # then it is considered as single valued and indexed # through the catalog. if not user_is_superuser: - role_column_dict[role] = user_str # XXX This should be a list - # which also includes all user groups + # XXX This should be a list + # which also includes all user groups + column_id = local_role_dict[role] + local_role_column_dict[column_id] = user_str + if role_dict.has_key(role): + # XXX This should be a list + # If a given role exists as a column in the catalog, + # then it is considered as single valued and indexed + # through the catalog. + if not user_is_superuser: + # XXX This should be a list + # which also includes all user groups + column_id = role_dict[role] + role_column_dict[column_id] = user_str else: # Else, we use the standard approach new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role)) - allowedRolesAndUsers = new_allowedRolesAndUsers + if local_role_column_dict == {}: + allowedRolesAndUsers = new_allowedRolesAndUsers + else: # We only consider here the Owner role (since it was not indexed) # since some objects may only be visible by their owner # which was not indexed - if 'owner' in column_map: + for role, column_id in catalog.getSQLCatalogRoleKeysList(): + # XXX This should be a list if not user_is_superuser: try: # if called by an executable with proxy roles, we don't use @@ -488,11 +506,11 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): eo = getSecurityManager()._context.stack[-1] proxy_roles = getattr(eo, '_proxy_roles', None) if not proxy_roles: - role_column_dict['owner'] = user_str + role_column_dict[column_id] = user_str except IndexError: - role_column_dict['owner'] = user_str + role_column_dict[column_id] = user_str - return allowedRolesAndUsers, role_column_dict + return allowedRolesAndUsers, role_column_dict, local_role_column_dict def getSecurityUidListAndRoleColumnDict(self, sql_catalog_id=None, **kw): """ @@ -503,7 +521,8 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): site as long as security uids are considered consistent among all catalogs. """ - allowedRolesAndUsers, role_column_dict = self.getAllowedRolesAndUsers(**kw) + allowedRolesAndUsers, role_column_dict, local_role_column_dict = \ + self.getAllowedRolesAndUsers(**kw) catalog = self.getSQLCatalog(sql_catalog_id) method = getattr(catalog, catalog.sql_search_security, None) if allowedRolesAndUsers: @@ -534,7 +553,7 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): security_uid_cache[cache_key] = security_uid_list else: security_uid_list = [] - return security_uid_list, role_column_dict + return security_uid_list, role_column_dict, local_role_column_dict security.declarePublic('getSecurityQuery') def getSecurityQuery(self, query=None, sql_catalog_id=None, **kw): @@ -544,7 +563,9 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): catalogued with columns. """ original_query = query - security_uid_list, role_column_dict = self.getSecurityUidListAndRoleColumnDict(sql_catalog_id=sql_catalog_id, **kw) + security_uid_list, role_column_dict, local_role_column_dict = \ + self.getSecurityUidListAndRoleColumnDict( + sql_catalog_id=sql_catalog_id, **kw) if role_column_dict: query_list = [] for key, value in role_column_dict.items(): @@ -560,6 +581,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): query, operator='OR') else: query = Query(security_uid=security_uid_list, operator='IN') + + if local_role_column_dict: + query_list = [] + for key, value in local_role_column_dict.items(): + new_query = Query(**{key : value}) + query_list.append(new_query) + operator_kw = {'operator': 'AND'} + local_role_query = ComplexQuery(*query_list, **operator_kw) + query = ComplexQuery(query, local_role_query, operator='AND') + if original_query is not None: query = ComplexQuery(query, original_query, operator='AND') return query diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index a92c1e64b2..c621aa352b 100644 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -1896,7 +1896,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): self.assertEquals([], [x.getObject() for x in obj.searchFolder(portal_type='Bank Account')]) - def test_60_OwnerIndexing(self, quiet=quiet, run=run_all_test): + def test_60_ViewableOwnerIndexing(self, quiet=quiet, run=run_all_test): if not run: return @@ -1913,7 +1913,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): sub_portal_type = self.getPortal().portal_types._getOb(sub_portal_type_id) sql_connection = self.getSQLConnection() - sql = 'select owner from catalog where uid=%s' + sql = 'select viewable_owner as owner from catalog where uid=%s' login(self, 'super_owner') @@ -2292,6 +2292,198 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, )) self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result)) + def test_RealOwnerIndexing(self, quiet=quiet, run=run_all_test): + if not run: + return + + login = PortalTestCase.login + logout = self.logout + user1 = 'local_foo' + user2 = 'local_bar' + uf = self.getPortal().acl_users + uf._doAddUser(user1, user1, ['Member', ], []) + uf._doAddUser(user2, user2, ['Member', ], []) + + perm = 'View' + folder = self.getOrganisationModule() + folder.manage_setLocalRoles(user1, ['Author', 'Auditor']) + folder.manage_setLocalRoles(user2, ['Author', 'Auditor']) + portal_type = 'Organisation' + + sql_connection = self.getSQLConnection() + + login(self, user2) + obj2 = folder.newContent(portal_type=portal_type) + obj2.manage_setLocalRoles(user1, ['Auditor']) + obj2.manage_permission(perm, ['Owner', 'Auditor'], 0) + + login(self, user1) + + obj = folder.newContent(portal_type=portal_type) + obj.manage_setLocalRoles(user1, ['Owner', 'Auditor']) + + # Check that nothing is returned when user can not view the object + obj.manage_permission(perm, [], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Auditor'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Owner'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Add a new table to the catalog + sql_catalog = self.portal.portal_catalog.getSQLCatalog() + + local_roles_table = "test_local_roles" + + create_local_role_table_sql = """ +CREATE TABLE `%s` ( + `uid` BIGINT UNSIGNED NOT NULL, + `owner_reference` varchar(32) NOT NULL default '', + PRIMARY KEY (`uid`), + KEY `version` (`owner_reference`) +) TYPE=InnoDB; + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_create_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = create_local_role_table_sql) + + drop_local_role_table_sql = """ +DROP TABLE IF EXISTS %s + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z0_drop_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = drop_local_role_table_sql) + + catalog_local_role_sql = """ +REPLACE INTO + %s +VALUES +<dtml-in prefix="loop" expr="_.range(_.len(uid))"> +( + <dtml-sqlvar expr="uid[loop_item]" type="int">, + <dtml-sqlvar expr="Base_getOwnerId[loop_item]" type="string" optional> +) +<dtml-if sequence-end> +<dtml-else> +, +</dtml-if> +</dtml-in> + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_catalog_%s_list' % local_roles_table, + title = '', + connection_id = 'erp5_sql_connection', + arguments = "\n".join(['uid', + 'Base_getOwnerId']), + template = catalog_local_role_sql) + + get_transaction().commit() + current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + \ + ('z_catalog_%s_list' % local_roles_table,) + current_sql_clear_catalog = sql_catalog.sql_clear_catalog + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + \ + ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table) + current_sql_catalog_local_role_keys = \ + sql_catalog.sql_catalog_local_role_keys + sql_catalog.sql_catalog_local_role_keys = ('Owner | %s.owner_reference' % \ + local_roles_table,) + current_sql_search_tables = sql_catalog.sql_search_tables + sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \ + [local_roles_table] + get_transaction().commit() + + try: + # Clear catalog + portal_catalog = self.getCatalogTool() + portal_catalog.manage_catalogClear() + get_transaction().commit() + self.portal.portal_caches.clearAllCache() + get_transaction().commit() + obj2.reindexObject() + + # Check that nothing is returned when user can not view the object + obj.manage_permission(perm, [], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + method = obj.portal_catalog + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Auditor'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Owner'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + finally: + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + sql_catalog.sql_catalog_local_role_keys = \ + current_sql_catalog_local_role_keys + sql_catalog.sql_search_tables = current_sql_search_tables + get_transaction().commit() + def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestERP5Catalog)) -- 2.30.9