From 7f2b822b950168d5ff225c237b2b791a7f2da753 Mon Sep 17 00:00:00 2001 From: Vincent Pelletier <vincent@nexedi.com> Date: Fri, 2 Nov 2007 14:03:12 +0000 Subject: [PATCH] Allow getSecurityUidListAndRoleColumnDict to be None in groupWorklistListByCondition (part 1 of 2) - update prototype - add test - add methods to factorize code - add method calls in new code branch - indent existing code branch (to make changes in this branch obvious in next commit) git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17364 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5Type/patches/WorkflowTool.py | 143 +++++++++++++++-------- 1 file changed, 92 insertions(+), 51 deletions(-) diff --git a/product/ERP5Type/patches/WorkflowTool.py b/product/ERP5Type/patches/WorkflowTool.py index 103e9e8906..a05e5dd24d 100644 --- a/product/ERP5Type/patches/WorkflowTool.py +++ b/product/ERP5Type/patches/WorkflowTool.py @@ -95,8 +95,38 @@ class ExclusionTuple(tuple): """ pass +def getValidCriterionDict(worklist_match_dict, acceptable_key_dict): + valid_criterion_dict = {} + metadata = None + for criterion_id, criterion_value in worklist_match_dict.iteritems(): + if criterion_id in acceptable_key_dict: + if isinstance(criterion_value, tuple): + criterion_value = list(criterion_value) + assert criterion_id not in valid_criterion_dict + valid_criterion_dict[criterion_id] = criterion_value + elif criterion_id == WORKLIST_METADATA_KEY: + metadata = criterion_value + elif criterion_id == SECURITY_PARAMETER_ID: + pass + else: + LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \ + 'workflow %s filters on variable %s which is not available ' \ + 'in catalog. Its value will not be checked.' % \ + (worklist_id, workflow_id, criterion_id)) + return valid_criterion_dict, metadata + +def updateWorklistSetDict(worklist_set_dict, workflow_worklist_key, valid_criterion_dict): + worklist_set_dict_key = valid_criterion_dict.keys() + if len(worklist_set_dict_key): + worklist_set_dict_key.sort() + worklist_set_dict_key = tuple(worklist_set_dict_key) + if worklist_set_dict_key not in worklist_set_dict: + worklist_set_dict[worklist_set_dict_key] = {} + worklist_set_dict[worklist_set_dict_key]\ + [workflow_worklist_key] = valid_criterion_dict + def groupWorklistListByCondition(worklist_dict, acceptable_key_dict, - getSecurityUidListAndRoleColumnDict): + getSecurityUidListAndRoleColumnDict=None): """ Get a list of dict of WorklistVariableMatchDict grouped by compatible conditions. @@ -133,56 +163,67 @@ def groupWorklistListByCondition(worklist_dict, acceptable_key_dict, for workflow_id, worklist in worklist_dict.iteritems(): for worklist_id, worklist_match_dict in worklist.iteritems(): workflow_worklist_key = '/'.join((workflow_id, worklist_id)) - security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, []) - security_kw = {} - if len(security_parameter): - security_kw[SECURITY_PARAMETER_ID] = security_parameter - uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict( - **security_kw) - if len(uid_list): - uid_list.sort() - role_column_dict[SECURITY_COLUMN_ID] = uid_list - # Make sure every item is a list - or a tuple - for security_column_id in role_column_dict.iterkeys(): - value = role_column_dict[security_column_id] - if not isinstance(value, (tuple, list)): - role_column_dict[security_column_id] = [value] - applied_security_criterion_dict = {} - # TODO: make security criterions be examined in the same order for all - # worklists if possible at all. - for security_column_id, security_column_value in \ - role_column_dict.iteritems(): - valid_criterion_dict = {} - valid_criterion_dict.update(applied_security_criterion_dict) - # Current security criterion must be applied to all further queries - # for this worklist negated, so the a given line cannot match multiple - # times. - applied_security_criterion_dict[security_column_id] = \ - ExclusionList(security_column_value) - valid_criterion_dict[security_column_id] = security_column_value - for criterion_id, criterion_value in worklist_match_dict.iteritems(): - if criterion_id in acceptable_key_dict: - if isinstance(criterion_value, tuple): - criterion_value = list(criterion_value) - assert criterion_id not in valid_criterion_dict - valid_criterion_dict[criterion_id] = criterion_value - elif criterion_id == WORKLIST_METADATA_KEY: - metadata_dict[workflow_worklist_key] = criterion_value - elif criterion_id == SECURITY_PARAMETER_ID: - pass - else: - LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \ - 'workflow %s filters on variable %s which is not available ' \ - 'in catalog. Its value will not be checked.' % \ - (worklist_id, workflow_id, criterion_id)) - worklist_set_dict_key = valid_criterion_dict.keys() - if len(worklist_set_dict_key): - worklist_set_dict_key.sort() - worklist_set_dict_key = tuple(worklist_set_dict_key) - if worklist_set_dict_key not in worklist_set_dict: - worklist_set_dict[worklist_set_dict_key] = {} - worklist_set_dict[worklist_set_dict_key]\ - [workflow_worklist_key] = valid_criterion_dict + if getSecurityUidListAndRoleColumnDict is None: + valid_criterion_dict, metadata = getValidCriterionDict( + worklist_match_dict=worklist_match_dict, + acceptable_key_dict=acceptable_key_dict) + if metadata is not None: + metadata_dict[workflow_worklist_key] = metadata + updateWorklistSetDict( + worklist_set_dict=worklist_set_dict, + workflow_worklist_key=workflow_worklist_key, + valid_criterion_dict=valid_criterion_dict) + else: + security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, []) + security_kw = {} + if len(security_parameter): + security_kw[SECURITY_PARAMETER_ID] = security_parameter + uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict( + **security_kw) + if len(uid_list): + uid_list.sort() + role_column_dict[SECURITY_COLUMN_ID] = uid_list + # Make sure every item is a list - or a tuple + for security_column_id in role_column_dict.iterkeys(): + value = role_column_dict[security_column_id] + if not isinstance(value, (tuple, list)): + role_column_dict[security_column_id] = [value] + applied_security_criterion_dict = {} + # TODO: make security criterions be examined in the same order for all + # worklists if possible at all. + for security_column_id, security_column_value in \ + role_column_dict.iteritems(): + valid_criterion_dict = {} + valid_criterion_dict.update(applied_security_criterion_dict) + # Current security criterion must be applied to all further queries + # for this worklist negated, so the a given line cannot match multiple + # times. + applied_security_criterion_dict[security_column_id] = \ + ExclusionList(security_column_value) + valid_criterion_dict[security_column_id] = security_column_value + for criterion_id, criterion_value in worklist_match_dict.iteritems(): + if criterion_id in acceptable_key_dict: + if isinstance(criterion_value, tuple): + criterion_value = list(criterion_value) + assert criterion_id not in valid_criterion_dict + valid_criterion_dict[criterion_id] = criterion_value + elif criterion_id == WORKLIST_METADATA_KEY: + metadata_dict[workflow_worklist_key] = criterion_value + elif criterion_id == SECURITY_PARAMETER_ID: + pass + else: + LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \ + 'workflow %s filters on variable %s which is not available ' \ + 'in catalog. Its value will not be checked.' % \ + (worklist_id, workflow_id, criterion_id)) + worklist_set_dict_key = valid_criterion_dict.keys() + if len(worklist_set_dict_key): + worklist_set_dict_key.sort() + worklist_set_dict_key = tuple(worklist_set_dict_key) + if worklist_set_dict_key not in worklist_set_dict: + worklist_set_dict[worklist_set_dict_key] = {} + worklist_set_dict[worklist_set_dict_key]\ + [workflow_worklist_key] = valid_criterion_dict return worklist_set_dict.values(), metadata_dict def generateNestedQuery(priority_list, criterion_dict, -- 2.30.9