Commit 62af8254 authored by Vincent Pelletier's avatar Vincent Pelletier

Products.CMFActivity: Fix poor performance with many family-bound activities

When there are many simultaneously-pending activities attached to any
processing node family, the
  node>=0
subquery becomes dominant (taking hundreds of time longer than the other
subqueries). As a consequence, this starves processing nodes of activities
and increases the CPU needs of the mariadb process hosting the activity
tables.
So, move this subquery out of the regular codepath, and only run it if no
other subquery found any activity:
- there is no activity preferentially targeting the current node
- there is no activity bound to any of the current node's families
- there is no activity without any node preference at all
Also, simplify the content of that subquery: the effective priority can
only be 3 * priority + 1 when this query is run, and node=0 rows can be
excluded (they should not exist in the current database view).
Also, factorise the logic producing "node=processing_node" and
"node IN node_set" subqueries, for simplicity. In turn, this makes all
family-dependent subqueries use a simple equality test, ensuring a stable
query plan independently from the number of families the current node is
member of.
Also, use "UNION ALL" always, as now:
- all subqueries have stritly distinct result sets
- as per mariadb documentation, "UNION [DISTINCT] applies to all UNIONs
  on the left", so the original comment about where ALL is used was
  incorrect in assuming it was improving the effective query performance
Also, line-split SQL queries as visible in the python source to be more
readable, without effect on the produced SQL.
Also, line-split a few non-trivial python expression to make their
internal structure immediately apparent.

Another effect of this change this change is to reduce activity theft
(activities to be preferentially executed by one node being executed by
another), potentially improving object cache hit-rate and hence decreasing
I/O pressure on the ZODB.
parent 2e67044a
...@@ -32,7 +32,7 @@ from six.moves import xrange ...@@ -32,7 +32,7 @@ from six.moves import xrange
from Products.ERP5Type.Utils import ensure_list, str2bytes from Products.ERP5Type.Utils import ensure_list, str2bytes
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from itertools import product from itertools import product, chain
import operator import operator
import sys import sys
import transaction import transaction
...@@ -412,25 +412,59 @@ CREATE TABLE %s ( ...@@ -412,25 +412,59 @@ CREATE TABLE %s (
str2bytes(self.sql_table), b" AND ".join(where) or b"1") str2bytes(self.sql_table), b" AND ".join(where) or b"1")
def getPriority(self, activity_tool, processing_node, node_set=None): def getPriority(self, activity_tool, processing_node, node_set=None):
query = activity_tool.getSQLConnection().query
if node_set is None: if node_set is None:
q = (b"SELECT 3*priority, date FROM %s" result = query(
b" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)" b"SELECT 3*priority, date"
b" ORDER BY priority, date LIMIT 1" % str2bytes(self.sql_table)) b" FROM %s"
b" WHERE"
b" processing_node=0 AND"
b" date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date"
b" LIMIT 1" % str2bytes(self.sql_table),
0,
)[1]
else: else:
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s" subquery = (b"("
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)" b"SELECT 3*priority{} AS effective_priority, date"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format b" FROM %s"
node = 'node=%s' % processing_node b" WHERE"
# "ALL" on all but one, to incur deduplication cost only once. b" {} AND"
# "UNION ALL" between the two naturally distinct sets. b" processing_node=0 AND"
q = (b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t" b" date <= UTC_TIMESTAMP(6)"
b" ORDER BY effective_priority, date LIMIT 1" % ( b" ORDER BY priority, date"
str2bytes(subquery(-1, node)), b" LIMIT 1"
str2bytes(subquery('', 'node=0')), b")" % self.sql_table).format
str2bytes(subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0')), result = query(
b' UNION ALL ' + str2bytes(subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set)))) if node_set else b'', b"SELECT *"
)) b" FROM (%s) AS t"
result = activity_tool.getSQLConnection().query(q, 0)[1] b" ORDER BY effective_priority, date"
b" LIMIT 1" % (
b" UNION ALL ".join(
chain(
(
subquery(b'-1', b'node = %i' % processing_node),
subquery(b'', b'node=0'),
),
(
subquery(b'-1', b'node = %i' % x)
for x in node_set
),
),
)
),
0,
)[1]
if not result:
# We did not find any activity matching our node (by number nor by
# family), nor by having no node preference. Look for any other
# activity we could be allowed to execute.
# This is slower than the above queries, because it does a range
# scan, either on the "node" column to sort the set, or on the
# sorted set to filter negative node values.
# This is why this query is only executed when the previous one
# did not find anything.
result = query(subquery(b'+1', b'node>0'), 0)[1]
if result: if result:
return result[0] return result[0]
return Queue.getPriority(self, activity_tool, processing_node, node_set) return Queue.getPriority(self, activity_tool, processing_node, node_set)
...@@ -697,9 +731,16 @@ CREATE TABLE %s ( ...@@ -697,9 +731,16 @@ CREATE TABLE %s (
assert limit assert limit
quote = db.string_literal quote = db.string_literal
query = db.query query = db.query
args = (str2bytes(self.sql_table), sqltest_dict['to_date'](date, quote), args = (
b' AND group_method_id=' + quote(group_method_id) str2bytes(self.sql_table),
if group_method_id else b'' , limit) sqltest_dict['to_date'](date, quote),
(
b' AND group_method_id=' + quote(group_method_id)
if group_method_id else
b''
),
limit,
)
# Note: Not all write accesses to our table are protected by this lock. # Note: Not all write accesses to our table are protected by this lock.
# This lock is not here for data consistency reasons, but to avoid wasting # This lock is not here for data consistency reasons, but to avoid wasting
...@@ -728,26 +769,59 @@ CREATE TABLE %s ( ...@@ -728,26 +769,59 @@ CREATE TABLE %s (
# time). # time).
if node_set is None: if node_set is None:
result = Results(query( result = Results(query(
b"SELECT * FROM %s WHERE processing_node=0 AND %s%s" b"SELECT *"
b" ORDER BY priority, date LIMIT %d FOR UPDATE" % args, 0)) b" FROM %s"
b" WHERE"
b" processing_node=0 AND"
b" %s%s"
b" ORDER BY priority, date"
b" LIMIT %i"
b" FOR UPDATE" % args,
0,
))
else: else:
# We'd like to write subquery = (b"("
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date b"SELECT *, 3*priority{} AS effective_priority"
# but this makes indices inefficient. b" FROM %s"
subquery = (b"(SELECT *, 3*priority%%s as effective_priority FROM %s" b" WHERE"
b" WHERE %%s AND processing_node=0 AND %s%s" b" {} AND"
b" ORDER BY priority, date LIMIT %d FOR UPDATE)" % args) b" processing_node=0 AND"
node = b'node=%d' % processing_node b" %s%s"
b" ORDER BY priority, date"
b" LIMIT %i"
b" FOR UPDATE"
b")" % args).format
result = Results(query( result = Results(query(
# "ALL" on all but one, to incur deduplication cost only once. b"SELECT *"
# "UNION ALL" between the two naturally distinct sets. b" FROM (%s) AS t"
b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t" b" ORDER BY effective_priority, date"
b" ORDER BY effective_priority, date LIMIT %d"% ( b" LIMIT %i" % (
subquery % (b'-1', node), b" UNION ALL ".join(
subquery % (b'', b'node=0'), chain(
subquery % (b'+IF(node, IF(%s, -1, 1), 0)' % node, b'node>=0'), (
b' UNION ALL ' + subquery % (str2bytes(str(-1)), str2bytes('node IN (%s)' % ','.join(map(str, node_set)))) if node_set else b'', subquery(b'-1', b'node = %i' % processing_node),
limit), 0)) subquery(b'', b'node=0'),
),
(
subquery(b'-1', b'node = %i' % x)
for x in node_set
),
),
),
limit,
),
0,
))
if not result:
# We did not find any activity matching our node (by number nor by
# family), nor by having no node preference. Look for any other
# activity we could be allowed to execute.
# This is slower than the above queries, because it does a range
# scan, either on the "node" column to sort the set, or on the
# sorted set to filter negative node values.
# This is why this query is only executed when the previous one
# did not find anything.
result = Results(query(subquery(b'+1', b'node>0'), 0))
if result: if result:
# Reserve messages. # Reserve messages.
uid_list = [x.uid for x in result] uid_list = [x.uid for x in result]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment