Commit a016ed04 authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Optimise validation queries.

See SQLBase._getExecutableMessageSet for operation principle.
Removes the notion of order_validation_text: activity validation is no
longer evaluated per-activity , but per-dependency for multiple activities
at a time. In this context, order_validation_text does not make sense as
it flattens all dependency types for a given activity.
Rework activity-dependency-to-SQL methods: use a dict rather
dynamically-generated method names.
Based on initial work by Julien Muchembled.
parent f09e1a36
Pipeline #13170 failed with stage
...@@ -89,74 +89,9 @@ class Queue(object): ...@@ -89,74 +89,9 @@ class Queue(object):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
raise NotImplementedError raise NotImplementedError
def getExecutableMessageList(self, activity_tool, message, message_dict,
validation_text_dict, now_date=None):
"""Get messages which have no dependent message, and store them in the dictionary.
If the passed message itself is executable, simply store only that message.
Otherwise, try to find at least one message executable from dependent messages.
This may result in no new message, if all dependent messages are already present
in the dictionary, if all dependent messages are in different activities, or if
the message has a circular dependency.
The validation text dictionary is used only to cache the results of validations,
in order to reduce the number of SQL queries.
"""
if message.uid in message_dict:
# Nothing to do. But detect a circular dependency.
if message_dict[message.uid] is None:
LOG('CMFActivity', ERROR,
'message uid %r has a circular dependency' % (message.uid,))
return
cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None:
message_list = activity_tool.getDependentMessageList(message, self)
if message_list:
# The result is not empty, so this message is not executable.
validation_text_dict[message.order_validation_text] = 0
if now_date is None:
now_date = DateTime()
for activity, m in message_list:
# Note that the messages may contain ones which are already assigned or not
# executable yet.
if activity is self and m.processing_node == -1 and m.date <= now_date:
# Call recursively. Set None as a marker to detect a circular dependency.
message_dict[message.uid] = None
try:
self.getExecutableMessageList(activity_tool, m, message_dict,
validation_text_dict, now_date=now_date)
finally:
del message_dict[message.uid]
else:
validation_text_dict[message.order_validation_text] = 1
message_dict[message.uid] = message
elif cached_result:
message_dict[message.uid] = message
def flush(self, activity_tool, object, **kw): def flush(self, activity_tool, object, **kw):
pass pass
def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
order_validation_item_list = [
(key, value)
for key, value in sorted(
message.activity_kw.iteritems(), key=lambda x: x[0],
)
if value is not None and
getattr(self, "_validate_" + key, None) is not None
]
if order_validation_item_list:
return sha1(repr(order_validation_item_list)).hexdigest()
# When no order validation argument is specified, skip the computation
# of the checksum for speed. Here, 'none' is used, because this never be
# identical to SHA1 hexdigest (which is always 40 characters), and 'none'
# is true in Python. This is important, because dtml-if assumes that an empty
# string is false, so we must use a non-empty string for this.
return 'none'
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
return [] return []
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
# #
############################################################################## ##############################################################################
from collections import defaultdict
from itertools import product
import sys import sys
import transaction import transaction
from random import getrandbits from random import getrandbits
...@@ -130,6 +132,63 @@ def sqltest_dict(): ...@@ -130,6 +132,63 @@ def sqltest_dict():
return sqltest_dict return sqltest_dict
sqltest_dict = sqltest_dict() sqltest_dict = sqltest_dict()
def _validate_after_path_and_method_id(value, render_string):
path, method_id = value
return (
sqltest_dict['method_id'](method_id, render_string) +
' AND ' +
sqltest_dict['path'](path, render_string)
)
def _validate_after_tag_and_method_id(value, render_string):
tag, method_id = value
return (
sqltest_dict['method_id'](method_id, render_string) +
' AND ' +
sqltest_dict['tag'](tag, render_string)
)
# Definition of activity dependencies
# key: dependency name (as passed to ActiveObject.activate() & friends)
# value:
# - tuple of column names. If there is more than one, they must be in the
# same order as the dependency value items expected by the next item
# - callable rendering given values into an SQL condition
# (value, render_string) -> str
_DEPENDENCY_TESTER_DICT = {
'after_method_id': (
('method_id', ),
sqltest_dict['method_id'],
),
'after_path': (
('path', ),
sqltest_dict['path'],
),
'after_message_uid': (
('uid', ),
sqltest_dict['uid'],
),
'after_path_and_method_id': (
('path', 'method_id'),
_validate_after_path_and_method_id,
),
'after_tag': (
('tag', ),
sqltest_dict['tag'],
),
'after_tag_and_method_id': (
('tag', 'method_id'),
_validate_after_tag_and_method_id,
),
'serialization_tag': (
('serialization_tag', ),
lambda value, render_string: (
'processing_node > -1 AND ' +
sqltest_dict['serialization_tag'](value, render_string)
),
),
}
def getNow(db): def getNow(db):
""" """
Return the UTC date from the point of view of the SQL server. Return the UTC date from the point of view of the SQL server.
...@@ -164,10 +223,10 @@ CREATE TABLE %s ( ...@@ -164,10 +223,10 @@ CREATE TABLE %s (
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`), KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`), KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`), KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`), KEY (`path`, `processing_node`),
KEY (`active_process_uid`), KEY (`active_process_uid`),
KEY (`method_id`), KEY (`method_id`, `processing_node`),
KEY (`tag`) KEY (`tag`, `processing_node`)
) ENGINE=InnoDB""" % self.sql_table ) ENGINE=InnoDB""" % self.sql_table
def initialize(self, activity_tool, clear): def initialize(self, activity_tool, clear):
...@@ -197,6 +256,17 @@ CREATE TABLE %s ( ...@@ -197,6 +256,17 @@ CREATE TABLE %s (
" message) VALUES\n(%s)") " message) VALUES\n(%s)")
_insert_separator = "),\n(" _insert_separator = "),\n("
def _hasDependency(
self,
message,
_DEPENDENCY_NAME_SET=frozenset(_DEPENDENCY_TESTER_DICT),
):
get = message.activity_kw.get
return any(
get(x) is not None
for x in _DEPENDENCY_NAME_SET
)
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
quote = db.string_literal quote = db.string_literal
...@@ -223,11 +293,10 @@ CREATE TABLE %s ( ...@@ -223,11 +293,10 @@ CREATE TABLE %s (
values_list = [] values_list = []
max_payload = self._insert_max_payload max_payload = self._insert_max_payload
sep_len = len(self._insert_separator) sep_len = len(self._insert_separator)
hasDependency = self._hasDependency
for m in message_list: for m in message_list:
if m.is_registered: if m.is_registered:
active_process_uid = m.active_process_uid active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date') date = m.activity_kw.get('at_date')
row = ','.join(( row = ','.join((
'@uid+%s' % i, '@uid+%s' % i,
...@@ -235,7 +304,7 @@ CREATE TABLE %s ( ...@@ -235,7 +304,7 @@ CREATE TABLE %s (
'NULL' if active_process_uid is None else str(active_process_uid), 'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)), "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id), quote(m.method_id),
'0' if order_validation_text == 'none' else '-1', '-1' if hasDependency(m) else '0',
str(m.activity_kw.get('priority', 1)), str(m.activity_kw.get('priority', 1)),
str(m.activity_kw.get('node', 0)), str(m.activity_kw.get('node', 0)),
quote(m.getGroupId()), quote(m.getGroupId()),
...@@ -329,60 +398,179 @@ CREATE TABLE %s ( ...@@ -329,60 +398,179 @@ CREATE TABLE %s (
# a lock error into a conflict error. # a lock error into a conflict error.
LOG('SQLBase', INFO, 'Got a lock error, retrying...') LOG('SQLBase', INFO, 'Got a lock error, retrying...')
# Validation private methods
def getValidationSQL(self, quote, activate_kw, same_queue):
validate_list = []
for k, v in activate_kw.iteritems():
if v is not None:
try:
method = getattr(self, '_validate_' + k, None)
if method:
validate_list.append(' AND '.join(method(v, quote)))
except Exception:
LOG('CMFActivity', WARNING, 'invalid %s value: %r' % (k, v),
error=True)
# Prevent validation by depending on anything, at least itself.
validate_list = '1',
same_queue = False
break
if validate_list:
return ("SELECT '%s' as activity, uid, date, processing_node,"
" priority, group_method_id, message FROM %s"
" WHERE processing_node > %d AND (%s) LIMIT %s" % (
type(self).__name__, self.sql_table,
DEPENDENCY_IGNORED_ERROR_STATE,
' OR '.join(validate_list),
READ_MESSAGE_LIMIT if same_queue else 1))
def _validate_after_method_id(self, *args):
return sqltest_dict['method_id'](*args),
def _validate_after_path(self, *args):
return sqltest_dict['path'](*args),
def _validate_after_message_uid(self, *args):
return sqltest_dict['uid'](*args),
def _validate_after_path_and_method_id(self, value, quote):
path, method_id = value
return (sqltest_dict['method_id'](method_id, quote),
sqltest_dict['path'](path, quote))
def _validate_after_tag(self, *args):
return sqltest_dict['tag'](*args),
def _validate_after_tag_and_method_id(self, value, quote):
tag, method_id = value
return (sqltest_dict['method_id'](method_id, quote),
sqltest_dict['tag'](tag, quote))
def _validate_serialization_tag(self, *args):
return 'processing_node > -1', sqltest_dict['serialization_tag'](*args)
def _log(self, severity, summary): def _log(self, severity, summary):
LOG(self.__class__.__name__, severity, summary, LOG(self.__class__.__name__, severity, summary,
error=severity > INFO) error=severity > INFO)
def _getExecutableMessageSet(self, activity_tool, db, message_list):
"""
Return, from given message list, the set of messages which have all their
dependencies satisfied.
"""
# Principle of operation:
# For each dependency type used in given message list, find all messages
# matching any of the dependency values used in given message list.
# This provides the SQL database with structurally simple queries that it
# should be able to optimise easily.
# Further refinements:
# - Any blocked message is ignored in further dendency type lookups (we
# already know it is blocked, no point in checking further).
# - Test the most popular dependency types first, with the expectation
# that these will find most of the blockers, reducing the set of
# activities left to test and (with the refinement above) reducing the
# probability of having to run further queries (if there are other
# dependency types to test)
dependency_tester_dict = _DEPENDENCY_TESTER_DICT
# dependency_name (str): Something like 'serialization_tag', etc
# dependency_value (any): dependency_name-dependent structure and meaning.
# dependency_dict: define the dependencies to check, and which messages are
# blocked by each found blocker.
# [dependency_name][dependency_value] -> message set
dependency_dict = defaultdict(lambda: defaultdict(set))
# message_dependency_dict: define which message has which dependencies, to
# efficiently remove a message from dependency_dict once it is found to be
# blocked.
# [message][dependency_name] -> dependency_value
message_dependency_dict = defaultdict(dict)
for message in message_list:
for (
dependency_name,
dependency_value,
) in message.activity_kw.iteritems():
try:
column_list, _ = dependency_tester_dict[dependency_name]
except KeyError:
continue
# There are 2 types of dependencies:
# - monovalued (most), which accepts a single value and a vector of
# values.
# - 2-valued (after_path_and_method_id and after_tag_and_method_id)
# which accept a 2-vector, each item being a single value or a vector
# of values.
if len(column_list) == 1:
dependency_value_list = [
x
for x in (
dependency_value
if isinstance(dependency_value, (tuple, list)) else
(dependency_value, )
)
if x is not None
]
else:
dependency_value_list = list(product(*(
(
x
if isinstance(x, (tuple, list)) else
(x, )
)
for x in dependency_value
if x is not None
)))
if not dependency_value_list:
continue
message_dependency_dict[message][dependency_name] = dependency_value_list
dependency_value_dict = dependency_dict[dependency_name]
for dependency_value in dependency_value_list:
dependency_value_dict[dependency_value].add(message)
# Messages are supposed valid until blockage is found.
result = set(message_list)
# Messages for which a blockage is found, so removal of this message from
# further dependency processing is delayed to the next iteration, to avoid
# doing such work if there is no such further iteration.
new_blocked_message_set = set()
quote = db.string_literal
table_name_list = activity_tool.getSQLQueueTableNameSet()
for (
dependency_name,
dependency_value_dict,
) in sorted(
dependency_dict.iteritems(),
# Test first the condition with the most values.
# XXX: after_path=('foo', 'bar') counts as 2 points for after_path
# despite being a single activity. Is there a fairer (while cheap) way ?
key=lambda dependency_dict_item: sum(
len(message_set)
for message_set in dependency_dict_item[1].itervalues()
),
reverse=True,
):
# Previous iteration found blocked messages.
# Find which activities, and remove their values from dependency_dict
# so these activities are not tested in further queries (we already
# know they are blocked).
while new_blocked_message_set:
blocked_message = new_blocked_message_set.pop()
for (
message_dependency_name,
message_dependency_value_list,
) in message_dependency_dict[blocked_message].iteritems():
message_dependency_value_dict = dependency_dict[message_dependency_name]
if not message_dependency_value_dict:
# This dependency was already dropped or evaluated, nothing to
# cleanup here.
continue
for message_dependency_value in message_dependency_value_list:
message_set = message_dependency_value_dict[message_dependency_value]
message_set.remove(blocked_message)
if not message_set:
# No more message wait for this value for this dependency, drop
# the entry.
del message_dependency_value_dict[message_dependency_value]
# Note: no point in editing dependency_dict if
# message_dependency_value_dict is empty, the outer loop is working
# on a copy.
if not dependency_value_dict:
# No more non-blocked message for this dependency, skip it.
continue
column_list, to_sql = dependency_tester_dict[dependency_name]
if len(column_list) == 1:
def row2key(row):
key, = row
return key
dependency_sql = to_sql(dependency_value_dict.keys(), quote)
else:
row2key = lambda x: x
# XXX: generated SQL could be simpler: for example, a dependency input
# as
# ('foo', ('bar', 'baz'))
# will become
# (... = 'foo' AND ... = 'bar') OR (... = 'foo' AND ... = 'baz')
# This is the correct condition, but it could be expressed with shorter
# SQL. But I'm not sure this makes much of a difference for the query
# planner, it would likely increase the complexity here a lot, and
# anyway these multi-column dependencies should rather be replaced with
# tags (as it often possible and produces better overall activity
# behaviour).
dependency_sql = ' OR '.join(
'(' + to_sql(dependency_value, quote) + ')'
for dependency_value in dependency_value_dict
)
base_sql_prefix = '(SELECT DISTINCT %s FROM ' % (
','.join(column_list),
)
base_sql_suffix = ' WHERE processing_node > %i AND (%s))' % (
DEPENDENCY_IGNORED_ERROR_STATE,
dependency_sql,
)
for row in db.query(
' UNION '.join(
base_sql_prefix + table_name + base_sql_suffix
for table_name in table_name_list
),
max_rows=0,
)[1]:
# Each row is a value which blocks some activities.
dependent_message_set = dependency_value_dict[row2key(row)]
# queue blocked messages for processing in the beginning of next
# outermost iteration.
new_blocked_message_set.update(dependent_message_set)
# ...but update result immediately, in case there is no next
# outermost iteration.
result.difference_update(dependent_message_set)
dependency_value_dict.clear()
return result
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
now_date = getNow(db) now_date = getNow(db)
...@@ -397,20 +585,14 @@ CREATE TABLE %s ( ...@@ -397,20 +585,14 @@ CREATE TABLE %s (
if not result: if not result:
return return
transaction.commit() transaction.commit()
message_list = [Message.load(line.message, uid=line.uid, line=line)
validation_text_dict = {'none': 1} for line in result]
message_dict = {} message_set = self._getExecutableMessageSet(activity_tool, db, message_list)
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
transaction.commit() transaction.commit()
if message_dict: if message_set:
distributable_uid_set = set() distributable_uid_set = set()
serialization_tag_dict = {} serialization_tag_dict = {}
for message in message_dict.itervalues(): for message in message_set:
serialization_tag = message.activity_kw.get('serialization_tag') serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None: if serialization_tag is None:
distributable_uid_set.add(message.uid) distributable_uid_set.add(message.uid)
...@@ -433,6 +615,7 @@ CREATE TABLE %s ( ...@@ -433,6 +615,7 @@ CREATE TABLE %s (
validated_count += distributable_count validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT: if validated_count >= MAX_VALIDATED_LIMIT:
return return
line = result[-1]
where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid) where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
def getReservedMessageList(self, db, date, processing_node, limit, def getReservedMessageList(self, db, date, processing_node, limit,
...@@ -818,6 +1001,7 @@ CREATE TABLE %s ( ...@@ -818,6 +1001,7 @@ CREATE TABLE %s (
""" """
object_path is a tuple object_path is a tuple
""" """
db = activity_tool.getSQLConnection()
path = '/'.join(object_path) path = '/'.join(object_path)
if invoke: if invoke:
invoked = set() invoked = set()
...@@ -831,7 +1015,7 @@ CREATE TABLE %s ( ...@@ -831,7 +1015,7 @@ CREATE TABLE %s (
pass pass
line = getattr(message, 'line', None) line = getattr(message, 'line', None)
if (line and line.processing_node != -1 or if (line and line.processing_node != -1 or
not activity_tool.getDependentMessageList(message)): self._getExecutableMessageSet(activity_tool, db, [message])):
# Try to invoke the message - what happens if invoke calls flushActivity ?? # Try to invoke the message - what happens if invoke calls flushActivity ??
with ActivityRuntimeEnvironment(message): with ActivityRuntimeEnvironment(message):
activity_tool.invoke(message) activity_tool.invoke(message)
...@@ -848,7 +1032,6 @@ CREATE TABLE %s ( ...@@ -848,7 +1032,6 @@ CREATE TABLE %s (
invoke(m) invoke(m)
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
uid_list = [] uid_list = []
db = activity_tool.getSQLConnection()
for line in self._getMessageList(db, path=path, for line in self._getMessageList(db, path=path,
**({'method_id': method_id} if method_id else {})): **({'method_id': method_id} if method_id else {})):
if only_safe and line.processing_node > -2: if only_safe and line.processing_node > -2:
......
...@@ -106,11 +106,10 @@ CREATE TABLE %s ( ...@@ -106,11 +106,10 @@ CREATE TABLE %s (
values_list = [] values_list = []
max_payload = self._insert_max_payload max_payload = self._insert_max_payload
sep_len = len(self._insert_separator) sep_len = len(self._insert_separator)
hasDependency = self._hasDependency
for m in message_list: for m in message_list:
if m.is_registered: if m.is_registered:
active_process_uid = m.active_process_uid active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date') date = m.activity_kw.get('at_date')
row = ','.join(( row = ','.join((
'@uid+%s' % i, '@uid+%s' % i,
...@@ -118,7 +117,7 @@ CREATE TABLE %s ( ...@@ -118,7 +117,7 @@ CREATE TABLE %s (
'NULL' if active_process_uid is None else str(active_process_uid), 'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)), "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id), quote(m.method_id),
'0' if order_validation_text == 'none' else '-1', '-1' if hasDependency(m) else '0',
str(m.activity_kw.get('priority', 1)), str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()), quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')), quote(m.activity_kw.get('tag', '')),
......
...@@ -1811,31 +1811,9 @@ class ActivityTool (BaseTool): ...@@ -1811,31 +1811,9 @@ class ActivityTool (BaseTool):
REQUEST['RESPONSE'].redirect( 'manage_main' ) REQUEST['RESPONSE'].redirect( 'manage_main' )
return obj return obj
security.declarePrivate('getDependentMessageList') security.declarePrivate('getSQLQueueTableNameSet')
def getDependentMessageList(self, message, validating_queue=None): def getSQLQueueTableNameSet(self):
activity_kw = message.activity_kw return [x.sql_table for x in activity_dict.itervalues()]
db = self.getSQLConnection()
quote = db.string_literal
queries = []
for activity in activity_dict.itervalues():
q = activity.getValidationSQL(
quote, activity_kw, activity is validating_queue)
if q:
queries.append(q)
if queries:
message_list = []
for line in Results(db.query("(%s)" % ") UNION ALL (".join(queries))):
activity = activity_dict[line.activity]
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = activity.getOrderValidationText(m)
message_list.append((activity, m))
return message_list
return ()
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, delay): def timeShift(self, delay):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment