Commit 18b5e4ed authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity.Activity.SQLBase: Reduce the number of deadlocks

MariaDB seems to be using inconsistent lock acquisition order when
executing the activity reservation queries. As a consequence, it produces
internal deadlocks, which it detects. Upon detection, it kills one of the
involved query, which causes message reservation to fail, despite the
presence of executable activities.
To avoid depending on MariaDB internal lock acquisition order, acquire an
explicit table-scoped lock before running the activity reservation queries.

On an otherwise-idle 31 processing node cluster with the following
activities spawned, designed to stress activity reservation queries
(many ultra-short activities being executed one at a time):
  active_getTitle = context.getPortalObject().portal_catalog.activate(
    activity='SQLQueue',
    priority=5,
    tag='foo',
  ).getTitle
  for _ in xrange(40000):
    active_getTitle()
the results are:
- a 26% shorter activity execution time: from 206s with the original code
  to 152s
- a 100% reduction in reported deadlocks from 300 with the original code
  to 0

There is room for further improvements at a later time:
- tweaking the amount of time spent waiting for this new lock to be
  available, set for now at 1s.
- possibly bypassing this lock altogether when there are too few processing
  nodes simultaneously enabled, or even in an adaptive reaction to
  deadlock errors actually happening.
- cover more write accesses to these tables with the same lock

From a production environment, it appears that the getReservedMessageList
method alone is involved in 95% of these deadlocks, so for now this change
only targets this method.
parent 08960254
Pipeline #17964 failed with stage
in 0 seconds
...@@ -28,6 +28,7 @@ from __future__ import absolute_import ...@@ -28,6 +28,7 @@ from __future__ import absolute_import
############################################################################## ##############################################################################
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager
from itertools import product from itertools import product
import operator import operator
import sys import sys
...@@ -83,6 +84,24 @@ def render_datetime(x): ...@@ -83,6 +84,24 @@ def render_datetime(x):
_SQLTEST_NO_QUOTE_TYPE_SET = int, float, long _SQLTEST_NO_QUOTE_TYPE_SET = int, float, long
_SQLTEST_NON_SEQUENCE_TYPE_SET = _SQLTEST_NO_QUOTE_TYPE_SET + (DateTime, basestring) _SQLTEST_NON_SEQUENCE_TYPE_SET = _SQLTEST_NO_QUOTE_TYPE_SET + (DateTime, basestring)
@contextmanager
def SQLLock(db, lock_name, timeout):
"""
Attemp to acquire a named SQL lock. The outcome of this acquisition is
returned to the context statement and MUST be checked:
1: lock acquired
0: timeout
"""
lock_name = db.string_literal(lock_name)
query = db.query
(_, ((acquired, ), )) = query('SELECT GET_LOCK(%s, %f)' % (lock_name, timeout))
if acquired is None:
raise ValueError('Error acquiring lock')
try:
yield acquired
finally:
if acquired:
query('SELECT RELEASE_LOCK(%s)' % (lock_name, ))
# sqltest_dict ({'condition_name': <render_function>}) defines how to render # sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList # condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict(): def sqltest_dict():
...@@ -648,39 +667,59 @@ CREATE TABLE %s ( ...@@ -648,39 +667,59 @@ CREATE TABLE %s (
' AND group_method_id=' + quote(group_method_id) ' AND group_method_id=' + quote(group_method_id)
if group_method_id else '' , limit) if group_method_id else '' , limit)
# Get reservable messages. # Note: Not all write accesses to our table are protected by this lock.
# During normal operation, sorting by date (as last criteria) is fairer # This lock is not here for data consistency reasons, but to avoid wasting
# for users and reduce the probability to do the same work several times # time on SQL deadlocks caused by the varied lock ordering chosen by the
# (think of an object that is modified several times in a short period of # database. These queries specifically seem to be extremely prone to such
# time). # deadlocks, so prevent them from attempting to run in parallel on a given
if node_set is None: # activity table.
result = Results(query( # If more accesses are found to cause a significant waste of time because
"SELECT * FROM %s WHERE processing_node=0 AND %s%s" # of deadlocks, then they should acquire such lock as well. But
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0)) # preemptively applying such lock everywhere without checking the amount
else: # of waste is unlikely to produce a net gain.
# We'd like to write # XXX: timeout may benefit from being tweaked, but one second seem like a
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date # reasonable starting point.
# but this makes indices inefficient. # XXX: locking could probably be skipped altogether on clusters with few
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s" # enough processing nodes, as there should be little deadlocks and the
" WHERE {} AND processing_node=0 AND %s%s" # tradeoff becomes unfavorable to explicit locks. What threshold to
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format # choose ?
node = 'node=%s' % processing_node with SQLLock(db, self.sql_table, timeout=1) as acquired:
result = Results(query( if not acquired:
# "ALL" on all but one, to incur deduplication cost only once. # This table is busy, check for work to do elsewhere
# "UNION ALL" between the two naturally distinct sets. return ()
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t" # Get reservable messages.
" ORDER BY effective_priority, date LIMIT %s"% ( # During normal operation, sorting by date (as last criteria) is fairer
subquery(-1, node), # for users and reduce the probability to do the same work several times
subquery('', 'node=0'), # (think of an object that is modified several times in a short period of
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'), # time).
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '', if node_set is None:
limit), 0)) result = Results(query(
if result: "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
# Reserve messages. " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
uid_list = [x.uid for x in result] else:
self.assignMessageList(db, processing_node, uid_list) # We'd like to write
self._log(TRACE, 'Reserved messages: %r' % uid_list) # ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
return result # but this makes indices inefficient.
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % processing_node
result = Results(query(
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
limit), 0))
if result:
# Reserve messages.
uid_list = [x.uid for x in result]
self.assignMessageList(db, processing_node, uid_list)
self._log(TRACE, 'Reserved messages: %r' % uid_list)
return result
return () return ()
def assignMessageList(self, db, state, uid_list): def assignMessageList(self, db, state, uid_list):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment