Commit d580c7e0 authored by Jérome Perrin's avatar Jérome Perrin

Merge remote-tracking branch 'upstream/master' into zope4py2

parents b302c754 62af8254
......@@ -32,7 +32,7 @@ from six.moves import xrange
from Products.ERP5Type.Utils import ensure_list, str2bytes
from collections import defaultdict
from contextlib import contextmanager
from itertools import product
from itertools import product, chain
import operator
import sys
import transaction
......@@ -412,25 +412,59 @@ CREATE TABLE %s (
str2bytes(self.sql_table), b" AND ".join(where) or b"1")
def getPriority(self, activity_tool, processing_node, node_set=None):
query = activity_tool.getSQLConnection().query
if node_set is None:
q = (b"SELECT 3*priority, date FROM %s"
b" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date LIMIT 1" % str2bytes(self.sql_table))
result = query(
b"SELECT 3*priority, date"
b" FROM %s"
b" WHERE"
b" processing_node=0 AND"
b" date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date"
b" LIMIT 1" % str2bytes(self.sql_table),
0,
)[1]
else:
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % processing_node
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
q = (b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b" ORDER BY effective_priority, date LIMIT 1" % (
str2bytes(subquery(-1, node)),
str2bytes(subquery('', 'node=0')),
str2bytes(subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0')),
b' UNION ALL ' + str2bytes(subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set)))) if node_set else b'',
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
subquery = (b"("
b"SELECT 3*priority{} AS effective_priority, date"
b" FROM %s"
b" WHERE"
b" {} AND"
b" processing_node=0 AND"
b" date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date"
b" LIMIT 1"
b")" % self.sql_table).format
result = query(
b"SELECT *"
b" FROM (%s) AS t"
b" ORDER BY effective_priority, date"
b" LIMIT 1" % (
b" UNION ALL ".join(
chain(
(
subquery(b'-1', b'node = %i' % processing_node),
subquery(b'', b'node=0'),
),
(
subquery(b'-1', b'node = %i' % x)
for x in node_set
),
),
)
),
0,
)[1]
if not result:
# We did not find any activity matching our node (by number nor by
# family), nor by having no node preference. Look for any other
# activity we could be allowed to execute.
# This is slower than the above queries, because it does a range
# scan, either on the "node" column to sort the set, or on the
# sorted set to filter negative node values.
# This is why this query is only executed when the previous one
# did not find anything.
result = query(subquery(b'+1', b'node>0'), 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, processing_node, node_set)
......@@ -697,9 +731,16 @@ CREATE TABLE %s (
assert limit
quote = db.string_literal
query = db.query
args = (str2bytes(self.sql_table), sqltest_dict['to_date'](date, quote),
args = (
str2bytes(self.sql_table),
sqltest_dict['to_date'](date, quote),
(
b' AND group_method_id=' + quote(group_method_id)
if group_method_id else b'' , limit)
if group_method_id else
b''
),
limit,
)
# Note: Not all write accesses to our table are protected by this lock.
# This lock is not here for data consistency reasons, but to avoid wasting
......@@ -728,26 +769,59 @@ CREATE TABLE %s (
# time).
if node_set is None:
result = Results(query(
b"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d FOR UPDATE" % args, 0))
b"SELECT *"
b" FROM %s"
b" WHERE"
b" processing_node=0 AND"
b" %s%s"
b" ORDER BY priority, date"
b" LIMIT %i"
b" FOR UPDATE" % args,
0,
))
else:
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery = (b"(SELECT *, 3*priority%%s as effective_priority FROM %s"
b" WHERE %%s AND processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d FOR UPDATE)" % args)
node = b'node=%d' % processing_node
subquery = (b"("
b"SELECT *, 3*priority{} AS effective_priority"
b" FROM %s"
b" WHERE"
b" {} AND"
b" processing_node=0 AND"
b" %s%s"
b" ORDER BY priority, date"
b" LIMIT %i"
b" FOR UPDATE"
b")" % args).format
result = Results(query(
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b" ORDER BY effective_priority, date LIMIT %d"% (
subquery % (b'-1', node),
subquery % (b'', b'node=0'),
subquery % (b'+IF(node, IF(%s, -1, 1), 0)' % node, b'node>=0'),
b' UNION ALL ' + subquery % (str2bytes(str(-1)), str2bytes('node IN (%s)' % ','.join(map(str, node_set)))) if node_set else b'',
limit), 0))
b"SELECT *"
b" FROM (%s) AS t"
b" ORDER BY effective_priority, date"
b" LIMIT %i" % (
b" UNION ALL ".join(
chain(
(
subquery(b'-1', b'node = %i' % processing_node),
subquery(b'', b'node=0'),
),
(
subquery(b'-1', b'node = %i' % x)
for x in node_set
),
),
),
limit,
),
0,
))
if not result:
# We did not find any activity matching our node (by number nor by
# family), nor by having no node preference. Look for any other
# activity we could be allowed to execute.
# This is slower than the above queries, because it does a range
# scan, either on the "node" column to sort the set, or on the
# sorted set to filter negative node values.
# This is why this query is only executed when the previous one
# did not find anything.
result = Results(query(subquery(b'+1', b'node>0'), 0))
if result:
# Reserve messages.
uid_list = [x.uid for x in result]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment