Commit d1023052 authored by Julien Muchembled's avatar Julien Muchembled

* Fix regression causing worklist cache to not work at all.

* Clean testWorkflow and add unit test for the use of related keys in worklist.

Because related keys are not supported when using erp5_worklist_sql, testSQLCachedWorklist will fail.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24770 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7d89665a
......@@ -57,7 +57,7 @@ class TestWorklist(ERP5TypeTestCase):
worklist_wrong_state_id = '%s_wrong_state' % worklist_owner_id
actbox_wrong_state = '%s_wrong_state' % actbox_owner_name
worklist_int_variable_id = 'int_value_workflist'
worklist_int_variable_id = 'int_value_worklist'
actbox_int_variable_name = 'int_value_todo'
int_catalogued_variable_id = 'int_index'
int_value = 1
......@@ -130,16 +130,9 @@ class TestWorklist(ERP5TypeTestCase):
}
return user_dict
def stepLoginAsFoo(self, sequence=None, sequence_list=None, **kw):
self.login("foo")
def stepLoginAsBar(self, sequence=None,
sequence_list=None, **kw):
self.login("bar")
def createDocument(self):
def createDocument(self, **kw):
module = self.getPortal().getDefaultModule(self.checked_portal_type)
result = module.newContent(portal_type=self.checked_portal_type)
result = module.newContent(portal_type=self.checked_portal_type, **kw)
result.setProperty(self.int_catalogued_variable_id, self.int_value)
assert result.getValidationState() == self.checked_validation_state
return result
......@@ -158,15 +151,20 @@ class TestWorklist(ERP5TypeTestCase):
# reset aq_dynamic cache
_aq_reset()
def addWorkflowCataloguedVariable(self):
workflow = self.getWorkflowTool()[self.checked_workflow]
workflow.variables.addVariable(self.int_catalogued_variable_id)
variable = getattr(workflow.variables, self.int_catalogued_variable_id)
def addWorkflowCataloguedVariable(self, workflow_id, variable_id):
variables = self.getWorkflowTool()[workflow_id].variables
variables.addVariable(variable_id)
assert variables[variable_id].for_catalog == 1
def createWorklist(self):
workflow = self.getWorkflowTool()[self.checked_workflow]
worklists = workflow.worklists
def createWorklist(self, workflow_id, worklist_id, actbox_name, **kw):
worklists = self.getWorkflowTool()[workflow_id].worklists
worklists.addWorklist(worklist_id)
worklists._getOb(worklist_id).setProperties('',
actbox_name='%s (%%(count)s)' % actbox_name, props=dict(
(k.startswith('guard_') and k or 'var_match_'+k, v)
for k, v in kw.iteritems()))
def createWorklists(self):
for worklist_id, actbox_name, role, expr, state, int_variable in [
(self.worklist_assignor_id, self.actbox_assignor_name,
'Assignor', None, self.checked_validation_state, None),
......@@ -181,21 +179,22 @@ class TestWorklist(ERP5TypeTestCase):
(self.worklist_int_variable_id, self.actbox_int_variable_name,
None, None, None, str(self.int_value)),
]:
worklists.addWorklist(worklist_id)
worklist_definition = worklists._getOb(worklist_id)
worklist_definition.setProperties('',
actbox_name='%s (%%(count)s)' % (actbox_name, ),
props={'guard_roles': role,
'var_match_portal_type': self.checked_portal_type,
'var_match_validation_state': state,
# Variable value is saved as string in worklist
'var_match_%s' % self.int_catalogued_variable_id: \
int_variable,
'guard_expr': expr})
self.createWorklist(self.checked_workflow, worklist_id, actbox_name,
guard_roles=role, guard_expr=expr,
portal_type=self.checked_portal_type,
validation_state=state,
**{self.int_catalogued_variable_id: int_variable})
def clearCache(self):
self.portal.portal_caches.clearAllCache()
def checkWorklist(self, result, name, count):
entry_list = [x for x in result if x['name'].startswith(name)]
self.assertEquals(len(entry_list), count and 1)
if count:
self.assertEquals(count,
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']))
def test_01_worklist(self, quiet=0, run=run_all_test):
"""
Test the permission of the building module.
......@@ -205,13 +204,14 @@ class TestWorklist(ERP5TypeTestCase):
workflow_tool = self.portal.portal_workflow
self.logMessage("Create Manager")
self.logMessage("Create users")
self.createManagerAndLogin()
self.createUsers()
self.logMessage("Create worklist")
self.logMessage("Create worklists")
self.associatePropertySheet()
self.addWorkflowCataloguedVariable()
self.createWorklist()
self.addWorkflowCataloguedVariable(self.checked_workflow,
self.int_catalogued_variable_id)
self.createWorklists()
self.logMessage("Create document as Manager")
document = self.createDocument()
......@@ -220,156 +220,43 @@ class TestWorklist(ERP5TypeTestCase):
self.clearCache()
result = workflow_tool.listActions(object=document)
self.logout()
# Users can not see worklist as they are not Assignor
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 0)
self.checkWorklist(result, self.actbox_assignor_name, 0)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertEquals(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
self.checkWorklist(result, self.actbox_owner_name, 1)
for user_id in ('foo', 'bar'):
self.logMessage("Check %s worklist" % user_id)
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.assertEquals(result, [])
self.logout()
# Define foo as Assignor
self.login('manager')
self.logMessage("Give foo Assignor role")
document.manage_addLocalRoles('foo', ['Assignor'])
self.logMessage("Give manager Assignor role")
document.manage_addLocalRoles('manager', ['Assignor'])
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 1)
self.assertEquals(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertEquals(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertEquals(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
for user_id in ('bar', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 0)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
for user_id in ('foo', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertEquals(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
# Define foo and bar as Assignee
self.login('manager')
self.logMessage("Give foo Assignee role")
document.manage_addLocalRoles('foo', ['Assignee'])
self.logMessage("Give bar Assignee role")
document.manage_addLocalRoles('bar', ['Assignee'])
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for role, user_id_list in (('Assignor', ('foo', 'manager')),
('Assignee', ('foo', 'bar'))):
self.login('manager')
for user_id in user_id_list:
self.logMessage("Give %s %s role" % (user_id, role))
document.manage_addLocalRoles(user_id, [role])
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
# Users can not see worklist as they are not Assignor
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
for user_id in ('bar', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 0)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
for user_id in ('foo', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_assignor_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
for user_id, assignor, owner, both in (('manager', 1, 1, 1),
('bar' , 0, 0, 0),
('foo' , 1, 0, 1)):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage(" Check %s worklist as Assignor" % user_id)
self.checkWorklist(result, self.actbox_assignor_name, assignor)
self.logMessage(" Check %s worklist as Owner" % user_id)
self.checkWorklist(result, self.actbox_owner_name, owner)
self.logMessage(" Check %s worklist as Owner and Assignor" % user_id)
self.checkWorklist(result, self.actbox_assignor_owner_name, both)
# Check if int variable are managed by the worklist
user_id = 'manager'
......@@ -377,9 +264,7 @@ class TestWorklist(ERP5TypeTestCase):
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist with int value as %s" % \
(user_id, self.int_value))
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_int_variable_name)]
self.assertEquals(len(entry_list), 1)
self.checkWorklist(result, self.actbox_int_variable_name, 1)
# Change int value on document
new_value = self.int_value + 1
......@@ -391,11 +276,7 @@ class TestWorklist(ERP5TypeTestCase):
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist with int value as %s" % \
(user_id, new_value))
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_int_variable_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
self.checkWorklist(result, self.actbox_int_variable_name, 0)
#
# Check monovalued security role
......@@ -413,65 +294,27 @@ class TestWorklist(ERP5TypeTestCase):
bar_assignee_document.manage_setLocalRoles('manager', ['Assignee'])
bar_assignee_document.manage_permission('View', ['Owner', 'Assignee'], 0)
self.logout()
self.login('manager')
user_id = 'manager'
self.login(user_id)
module.manage_delLocalRoles('bar')
# User can not see worklist as user can not view the document
document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
document.manage_permission('View', [], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
def test(*count_list):
local_role_list = 'Assignee', 'Owner'
document.manage_setLocalRoles('manager', local_role_list)
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
# User can not see worklist as Owner can not view the document
document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
document.manage_permission('View', ['Assignee'], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for i, count in enumerate(count_list):
document.manage_permission('View', local_role_list[:i], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
# User can see worklist as Owner can view the document
document.manage_permission('View', ['Owner', 'Assignee'], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner (%s)" % (user_id, count))
self.checkWorklist(result, self.actbox_owner_name, count)
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
test(0, 0, 1)
# Define a local role key
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
......@@ -482,67 +325,56 @@ class TestWorklist(ERP5TypeTestCase):
self.portal.portal_caches.clearAllCache()
try:
# User can not see worklist as user can not view the document
document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
document.manage_permission('View', [], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 0)
self.logout()
# User can see worklist as Assignee can view the document
document.manage_permission('View', ['Assignee'], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
# User can see worklist as Owner can view the document
document.manage_permission('View', ['Owner', 'Assignee'], 0)
document.reindexObject()
get_transaction().commit()
self.tic()
self.clearCache()
self.logout()
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Owner" % user_id)
entry_list = [x for x in result \
if x['name'].startswith(self.actbox_owner_name)]
self.assertEquals(len(entry_list), 1)
self.assertTrue(
self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
self.logout()
test(0, 1, 1)
finally:
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
get_transaction().commit()
#
# Test related keys
#
self.logMessage("Test related keys")
self.addWorkflowCataloguedVariable(self.checked_workflow,
'base_category_id')
for base_category, category_list in (
('region', ('somewhere', 'elsewhere')),
('role', ('client', 'supplier'))):
newContent = self.getCategoryTool()[base_category].newContent
for category in category_list:
newContent(portal_type='Category', id=category)
self.createWorklist(self.checked_workflow, 'region_worklist', 'has_region',
portal_type=self.checked_portal_type,
base_category_id='region')
self.createWorklist(self.checked_workflow, 'role_worklist', 'has_role',
portal_type=self.checked_portal_type,
base_category_id='role')
get_transaction().commit()
self.tic()
self.clearCache()
self.logMessage(" Check no document has region/role categories defined")
result = workflow_tool.listActions(object=document)
self.checkWorklist(result, 'has_region', 0)
self.checkWorklist(result, 'has_role', 0)
self.logMessage(" Creates documents with region/role categories defined")
self.createDocument(role='client')
self.createDocument(region='somewhere')
self.createDocument(region='elsewhere')
get_transaction().commit()
self.tic()
self.clearCache()
self.logMessage(
" Check there are documents with region/role categories defined")
result = workflow_tool.listActions(object=document)
self.checkWorklist(result, 'has_region', 2)
self.checkWorklist(result, 'has_role', 1)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestWorklist))
......
......@@ -449,11 +449,12 @@ def WorkflowTool_listActions(self, info=None, object=None, src__=False):
portal_url = getToolByName(self, 'portal_url')()
portal_catalog = getToolByName(self, 'portal_catalog')
search_result = getattr(self, "Base_getCountFromWorklistTable", None)
if search_result is None:
use_cache = search_result is not None
if use_cache:
select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
else:
search_result = portal_catalog.unrestrictedSearchResults
select_expression_prefix = 'count(*) as %s' % (COUNT_COLUMN_TITLE, )
else:
select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
getSecurityUidListAndRoleColumnDict = \
portal_catalog.getSecurityUidListAndRoleColumnDict
security_query_cache_dict = {}
......@@ -486,7 +487,10 @@ def WorkflowTool_listActions(self, info=None, object=None, src__=False):
select_expression = [select_expression_prefix]
for criterion_id in total_criterion_id_list:
mapped_key = acceptable_key_dict[criterion_id]
if isinstance(mapped_key, str): # related key
if use_cache: # no support for related keys
select_expression.append(criterion_id)
continue
elif isinstance(mapped_key, str): # related key
mapped_key = mapped_key.split('/')
related_table_map_dict[criterion_id] = table_alias_list = tuple(
(table_id, '%s_%s' % (criterion_id, i))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment