Commit 7f2b822b authored by Vincent Pelletier's avatar Vincent Pelletier

Allow getSecurityUidListAndRoleColumnDict to be None in groupWorklistListByCondition (part 1 of 2)

 - update prototype
 - add test
 - add methods to factorize code
 - add method calls in new code branch
 - indent existing code branch (to make changes in this branch obvious in next commit)


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17364 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent e00cc9dd
...@@ -95,8 +95,38 @@ class ExclusionTuple(tuple): ...@@ -95,8 +95,38 @@ class ExclusionTuple(tuple):
""" """
pass pass
def getValidCriterionDict(worklist_match_dict, acceptable_key_dict):
valid_criterion_dict = {}
metadata = None
for criterion_id, criterion_value in worklist_match_dict.iteritems():
if criterion_id in acceptable_key_dict:
if isinstance(criterion_value, tuple):
criterion_value = list(criterion_value)
assert criterion_id not in valid_criterion_dict
valid_criterion_dict[criterion_id] = criterion_value
elif criterion_id == WORKLIST_METADATA_KEY:
metadata = criterion_value
elif criterion_id == SECURITY_PARAMETER_ID:
pass
else:
LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \
'workflow %s filters on variable %s which is not available ' \
'in catalog. Its value will not be checked.' % \
(worklist_id, workflow_id, criterion_id))
return valid_criterion_dict, metadata
def updateWorklistSetDict(worklist_set_dict, workflow_worklist_key, valid_criterion_dict):
worklist_set_dict_key = valid_criterion_dict.keys()
if len(worklist_set_dict_key):
worklist_set_dict_key.sort()
worklist_set_dict_key = tuple(worklist_set_dict_key)
if worklist_set_dict_key not in worklist_set_dict:
worklist_set_dict[worklist_set_dict_key] = {}
worklist_set_dict[worklist_set_dict_key]\
[workflow_worklist_key] = valid_criterion_dict
def groupWorklistListByCondition(worklist_dict, acceptable_key_dict, def groupWorklistListByCondition(worklist_dict, acceptable_key_dict,
getSecurityUidListAndRoleColumnDict): getSecurityUidListAndRoleColumnDict=None):
""" """
Get a list of dict of WorklistVariableMatchDict grouped by compatible Get a list of dict of WorklistVariableMatchDict grouped by compatible
conditions. conditions.
...@@ -133,56 +163,67 @@ def groupWorklistListByCondition(worklist_dict, acceptable_key_dict, ...@@ -133,56 +163,67 @@ def groupWorklistListByCondition(worklist_dict, acceptable_key_dict,
for workflow_id, worklist in worklist_dict.iteritems(): for workflow_id, worklist in worklist_dict.iteritems():
for worklist_id, worklist_match_dict in worklist.iteritems(): for worklist_id, worklist_match_dict in worklist.iteritems():
workflow_worklist_key = '/'.join((workflow_id, worklist_id)) workflow_worklist_key = '/'.join((workflow_id, worklist_id))
security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, []) if getSecurityUidListAndRoleColumnDict is None:
security_kw = {} valid_criterion_dict, metadata = getValidCriterionDict(
if len(security_parameter): worklist_match_dict=worklist_match_dict,
security_kw[SECURITY_PARAMETER_ID] = security_parameter acceptable_key_dict=acceptable_key_dict)
uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict( if metadata is not None:
**security_kw) metadata_dict[workflow_worklist_key] = metadata
if len(uid_list): updateWorklistSetDict(
uid_list.sort() worklist_set_dict=worklist_set_dict,
role_column_dict[SECURITY_COLUMN_ID] = uid_list workflow_worklist_key=workflow_worklist_key,
# Make sure every item is a list - or a tuple valid_criterion_dict=valid_criterion_dict)
for security_column_id in role_column_dict.iterkeys(): else:
value = role_column_dict[security_column_id] security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, [])
if not isinstance(value, (tuple, list)): security_kw = {}
role_column_dict[security_column_id] = [value] if len(security_parameter):
applied_security_criterion_dict = {} security_kw[SECURITY_PARAMETER_ID] = security_parameter
# TODO: make security criterions be examined in the same order for all uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict(
# worklists if possible at all. **security_kw)
for security_column_id, security_column_value in \ if len(uid_list):
role_column_dict.iteritems(): uid_list.sort()
valid_criterion_dict = {} role_column_dict[SECURITY_COLUMN_ID] = uid_list
valid_criterion_dict.update(applied_security_criterion_dict) # Make sure every item is a list - or a tuple
# Current security criterion must be applied to all further queries for security_column_id in role_column_dict.iterkeys():
# for this worklist negated, so the a given line cannot match multiple value = role_column_dict[security_column_id]
# times. if not isinstance(value, (tuple, list)):
applied_security_criterion_dict[security_column_id] = \ role_column_dict[security_column_id] = [value]
ExclusionList(security_column_value) applied_security_criterion_dict = {}
valid_criterion_dict[security_column_id] = security_column_value # TODO: make security criterions be examined in the same order for all
for criterion_id, criterion_value in worklist_match_dict.iteritems(): # worklists if possible at all.
if criterion_id in acceptable_key_dict: for security_column_id, security_column_value in \
if isinstance(criterion_value, tuple): role_column_dict.iteritems():
criterion_value = list(criterion_value) valid_criterion_dict = {}
assert criterion_id not in valid_criterion_dict valid_criterion_dict.update(applied_security_criterion_dict)
valid_criterion_dict[criterion_id] = criterion_value # Current security criterion must be applied to all further queries
elif criterion_id == WORKLIST_METADATA_KEY: # for this worklist negated, so the a given line cannot match multiple
metadata_dict[workflow_worklist_key] = criterion_value # times.
elif criterion_id == SECURITY_PARAMETER_ID: applied_security_criterion_dict[security_column_id] = \
pass ExclusionList(security_column_value)
else: valid_criterion_dict[security_column_id] = security_column_value
LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \ for criterion_id, criterion_value in worklist_match_dict.iteritems():
'workflow %s filters on variable %s which is not available ' \ if criterion_id in acceptable_key_dict:
'in catalog. Its value will not be checked.' % \ if isinstance(criterion_value, tuple):
(worklist_id, workflow_id, criterion_id)) criterion_value = list(criterion_value)
worklist_set_dict_key = valid_criterion_dict.keys() assert criterion_id not in valid_criterion_dict
if len(worklist_set_dict_key): valid_criterion_dict[criterion_id] = criterion_value
worklist_set_dict_key.sort() elif criterion_id == WORKLIST_METADATA_KEY:
worklist_set_dict_key = tuple(worklist_set_dict_key) metadata_dict[workflow_worklist_key] = criterion_value
if worklist_set_dict_key not in worklist_set_dict: elif criterion_id == SECURITY_PARAMETER_ID:
worklist_set_dict[worklist_set_dict_key] = {} pass
worklist_set_dict[worklist_set_dict_key]\ else:
[workflow_worklist_key] = valid_criterion_dict LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \
'workflow %s filters on variable %s which is not available ' \
'in catalog. Its value will not be checked.' % \
(worklist_id, workflow_id, criterion_id))
worklist_set_dict_key = valid_criterion_dict.keys()
if len(worklist_set_dict_key):
worklist_set_dict_key.sort()
worklist_set_dict_key = tuple(worklist_set_dict_key)
if worklist_set_dict_key not in worklist_set_dict:
worklist_set_dict[worklist_set_dict_key] = {}
worklist_set_dict[worklist_set_dict_key]\
[workflow_worklist_key] = valid_criterion_dict
return worklist_set_dict.values(), metadata_dict return worklist_set_dict.values(), metadata_dict
def generateNestedQuery(priority_list, criterion_dict, def generateNestedQuery(priority_list, criterion_dict,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment