Commit ad7ee9aa authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: move most SQL queries from DTML to Python

parent 707058d7
...@@ -189,9 +189,6 @@ class Queue(object): ...@@ -189,9 +189,6 @@ class Queue(object):
elif cached_result: elif cached_result:
message_dict[message.uid] = message message_dict[message.uid] = message
def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
return 0
def flush(self, activity_tool, object, **kw): def flush(self, activity_tool, object, **kw):
pass pass
...@@ -216,14 +213,6 @@ class Queue(object): ...@@ -216,14 +213,6 @@ class Queue(object):
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
return [] return []
def countMessage(self, activity_tool,**kw):
return 0
def countMessageWithTag(self, activity_tool,value):
"""Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
# Transaction Management # Transaction Management
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
# Called to prepare transaction commit for queued messages # Called to prepare transaction commit for queued messages
......
...@@ -115,18 +115,29 @@ def sqltest_dict(): ...@@ -115,18 +115,29 @@ def sqltest_dict():
return sqltest_dict return sqltest_dict
sqltest_dict = sqltest_dict() sqltest_dict = sqltest_dict()
def getNow(db):
"""
Return the UTC date from the point of view of the SQL server.
Note that this value is not cached, and is not transactionnal on MySQL
side.
"""
return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
class SQLBase(Queue): class SQLBase(Queue):
""" """
Define a set of common methods for SQL-based storage of activities. Define a set of common methods for SQL-based storage of activities.
""" """
_createMessageTable = 'SQLBase_createMessageTable'
def initialize(self, activity_tool, clear): def initialize(self, activity_tool, clear):
folder = activity_tool.getPortalObject().portal_skins.activity folder = activity_tool.getPortalObject().portal_skins.activity
try: try:
createMessageTable = folder.SQLBase_createMessageTable createMessageTable = getattr(folder, self._createMessageTable)
except AttributeError: except AttributeError:
return return
if clear: if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table) activity_tool.getSQLConnection().query(
"DROP TABLE IF EXISTS " + self.sql_table)
createMessageTable(table=self.sql_table) createMessageTable(table=self.sql_table)
else: else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1, src = createMessageTable._upgradeSchema(create_if_not_exists=1,
...@@ -185,33 +196,21 @@ class SQLBase(Queue): ...@@ -185,33 +196,21 @@ class SQLBase(Queue):
else: else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached") raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
def getNow(self, context): def _getMessageList(self, db, count=1000, src__=0, **kw):
"""
Return the current value for SQL server's NOW().
Note that this value is not cached, and is not transactionnal on MySQL
side.
"""
result = context.SQLBase_getNow()
assert len(result) == 1
assert len(result[0]) == 1
return result[0][0]
def _getMessageList(self, activity_tool, count=1000, src__=0, **kw):
# XXX: Because most columns have NOT NULL constraint, conditions with None # XXX: Because most columns have NOT NULL constraint, conditions with None
# value should be ignored, instead of trying to render them # value should be ignored, instead of trying to render them
# (with comparisons with NULL). # (with comparisons with NULL).
sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection q = db.string_literal
q = sql_connection.sql_quote__
sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems()) sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % ( sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
self.sql_table, self.sql_table,
sql and '\nWHERE ' + sql, sql and '\nWHERE ' + sql,
'' if count is None else '\nLIMIT %d' % count, '' if count is None else '\nLIMIT %d' % count,
) )
return sql if src__ else Results(sql_connection().query(sql, max_rows=0)) return sql if src__ else Results(db.query(sql, max_rows=0))
def getMessageList(self, *args, **kw): def getMessageList(self, activity_tool, *args, **kw):
result = self._getMessageList(*args, **kw) result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw)
if type(result) is str: # src__ == 1 if type(result) is str: # src__ == 1
return result, return result,
class_name = self.__class__.__name__ class_name = self.__class__.__name__
...@@ -223,57 +222,27 @@ class SQLBase(Queue): ...@@ -223,57 +222,27 @@ class SQLBase(Queue):
processing=line.processing) processing=line.processing)
for line in result] for line in result]
def countMessage(self, activity_tool, tag=None, path=None, def countMessageSQL(self, quote, **kw):
method_id=None, message_uid=None, **kw): return "SELECT count(*) FROM %s WHERE processing_node > -10 AND %s" % (
"""Return the number of messages which match the given parameters. self.sql_table, " AND ".join(
""" sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v
if isinstance(tag, str): ) or "1")
tag = [tag]
if isinstance(path, str): def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
path = [path] where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v]
if isinstance(method_id, str): if only_valid:
method_id = [method_id] where.append('processing_node > -2')
result = activity_tool.SQLBase_validateMessageList(table=self.sql_table, if only_invalid:
method_id=method_id, where.append('processing_node < -1')
path=path, return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
message_uid=message_uid, self.sql_table, " AND ".join(where) or "1")
tag=tag,
serialization_tag=None,
count=1)
return result[0].uid_count
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None,
active_process_uid=None,
only_invalid=False):
hasMessage = getattr(activity_tool, 'SQLBase_hasMessage', None)
if hasMessage is not None:
if object is None:
path = None
else:
path = '/'.join(object.getPhysicalPath())
try:
result = hasMessage(table=self.sql_table, path=path, method_id=method_id,
only_valid=only_valid, active_process_uid=active_process_uid,
only_invalid=only_invalid)
except DatabaseError:
LOG(
'SQLBase',
ERROR,
'%r raised, considering there are no activities' % (
hasMessage,
),
error=True,
)
else:
return result[0].message_count > 0
return 0
def getPriority(self, activity_tool): def getPriority(self, activity_tool):
result = activity_tool.SQLBase_getPriority(table=self.sql_table) result = activity_tool.getSQLConnection().query(
if result: "SELECT priority, date FROM %s"
result, = result " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
return result['priority'], result['date'] " ORDER BY priority, date LIMIT 1" % self.sql_table, 0)[1]
return Queue.getPriority(self, activity_tool) return result[0] if result else Queue.getPriority(self, activity_tool)
def _retryOnLockError(self, method, args=(), kw={}): def _retryOnLockError(self, method, args=(), kw={}):
while True: while True:
...@@ -349,10 +318,8 @@ class SQLBase(Queue): ...@@ -349,10 +318,8 @@ class SQLBase(Queue):
error=severity>INFO and sys.exc_info() or None) error=severity>INFO and sys.exc_info() or None)
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None) db = activity_tool.getSQLConnection()
if assignMessage is None: now_date = getNow(db)
return
now_date = self.getNow(activity_tool)
where_kw = { where_kw = {
'processing_node': -1, 'processing_node': -1,
'to_date': now_date, 'to_date': now_date,
...@@ -360,7 +327,7 @@ class SQLBase(Queue): ...@@ -360,7 +327,7 @@ class SQLBase(Queue):
} }
validated_count = 0 validated_count = 0
while 1: while 1:
result = self._getMessageList(activity_tool, **where_kw) result = self._getMessageList(db, **where_kw)
if not result: if not result:
return return
transaction.commit() transaction.commit()
...@@ -395,8 +362,7 @@ class SQLBase(Queue): ...@@ -395,8 +362,7 @@ class SQLBase(Queue):
distributable_uid_set.add(message.uid) distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set) distributable_count = len(distributable_uid_set)
if distributable_count: if distributable_count:
assignMessage(table=self.sql_table, self.unreserveMessageList(db, 0, distributable_uid_set)
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT: if validated_count >= MAX_VALIDATED_LIMIT:
return return
...@@ -404,7 +370,7 @@ class SQLBase(Queue): ...@@ -404,7 +370,7 @@ class SQLBase(Queue):
where_kw['from_date'] = line.date where_kw['from_date'] = line.date
where_kw['above_uid'] = line.uid where_kw['above_uid'] = line.uid
def getReservedMessageList(self, activity_tool, date, processing_node, def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None): limit=None, group_method_id=None):
""" """
Get and reserve a list of messages. Get and reserve a list of messages.
...@@ -418,29 +384,34 @@ class SQLBase(Queue): ...@@ -418,29 +384,34 @@ class SQLBase(Queue):
If None (or not given) no limit apply. If None (or not given) no limit apply.
""" """
assert limit assert limit
quote = db.string_literal
query = db.query
sql_group = ('' if group_method_id is None else
' AND group_method_id=' + quote(group_method_id))
# Select reserved messages.
# Do not check already-assigned messages when trying to reserve more # Do not check already-assigned messages when trying to reserve more
# activities, because in such case we will find one reserved activity. # activities, because in such case we will find one reserved activity.
result = activity_tool.SQLBase_selectReservedMessageList( result = Results(query(
table=self.sql_table, "SELECT * FROM %s WHERE processing_node=%s%s LIMIT %s" % (
count=limit, self.sql_table, processing_node, sql_group, limit), 0))
processing_node=processing_node,
group_method_id=group_method_id,
)
limit -= len(result) limit -= len(result)
if limit: if limit:
reservable = activity_tool.SQLBase_getReservableMessageList( # Get reservable messages.
table=self.sql_table, # During normal operation, sorting by date (as last criteria) is fairer
count=limit, # for users and reduce the probability to do the same work several times
processing_node=processing_node, # (think of an object that is modified several times in a short period of
to_date=date, # time).
group_method_id=group_method_id, reservable = Results(query(
) "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % (
self.sql_table, sqltest_dict['to_date'](date, quote), sql_group,
limit), 0))
if reservable: if reservable:
activity_tool.SQLBase_reserveMessageList( # Reserve messages.
uid=[x.uid for x in reservable], query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
table=self.sql_table, self.sql_table, processing_node,
processing_node=processing_node, ','.join(str(x.uid) for x in reservable)))
)
# DC.ZRDB.Results.Results does not implement concatenation # DC.ZRDB.Results.Results does not implement concatenation
# Implement an imperfect (but cheap) concatenation. Do not update # Implement an imperfect (but cheap) concatenation. Do not update
# __items__ nor _data_dictionary. # __items__ nor _data_dictionary.
...@@ -449,15 +420,15 @@ class SQLBase(Queue): ...@@ -449,15 +420,15 @@ class SQLBase(Queue):
result._data += reservable._data result._data += reservable._data
return result return result
def makeMessageListAvailable(self, activity_tool, uid_list): def unreserveMessageList(self, db, state, uid_list):
""" """
Put messages back in processing_node=0 . Put messages back in given processing_node.
""" """
if len(uid_list): db.query(
activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table, "UPDATE %s SET processing_node=%s, processing=0 WHERE uid IN (%s)\0"
uid=uid_list) "COMMIT" % (self.sql_table, state, ','.join(map(str, uid_list))))
def getProcessableMessageLoader(self, activity_tool, processing_node): def getProcessableMessageLoader(self, db, processing_node):
# do not merge anything # do not merge anything
def load(line): def load(line):
uid = line.uid uid = line.uid
...@@ -494,8 +465,9 @@ class SQLBase(Queue): ...@@ -494,8 +465,9 @@ class SQLBase(Queue):
- group_method_id - group_method_id
- uid_to_duplicate_uid_list_dict - uid_to_duplicate_uid_list_dict
""" """
db = activity_tool.getSQLConnection()
def getReservedMessageList(limit, group_method_id=None): def getReservedMessageList(limit, group_method_id=None):
line_list = self.getReservedMessageList(activity_tool=activity_tool, line_list = self.getReservedMessageList(db,
date=now_date, date=now_date,
processing_node=processing_node, processing_node=processing_node,
limit=limit, limit=limit,
...@@ -503,12 +475,12 @@ class SQLBase(Queue): ...@@ -503,12 +475,12 @@ class SQLBase(Queue):
if line_list: if line_list:
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list]) self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list return line_list
now_date = self.getNow(activity_tool) now_date = getNow(db)
uid_to_duplicate_uid_list_dict = {} uid_to_duplicate_uid_list_dict = {}
try: try:
result = getReservedMessageList(1) result = getReservedMessageList(1)
if result: if result:
load = self.getProcessableMessageLoader(activity_tool, processing_node) load = self.getProcessableMessageLoader(db, processing_node)
m, uid, uid_list = load(result[0]) m, uid, uid_list = load(result[0])
message_list = [m] message_list = [m]
uid_to_duplicate_uid_list_dict[uid] = uid_list uid_to_duplicate_uid_list_dict[uid] = uid_list
...@@ -538,10 +510,14 @@ class SQLBase(Queue): ...@@ -538,10 +510,14 @@ class SQLBase(Queue):
message_list.append(m) message_list.append(m)
if cost >= 1: if cost >= 1:
# Unreserve extra messages as soon as possible. # Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list = [line.uid for line in result if line.uid != uid]
uid_list=[line.uid for line in result if line.uid != uid]) if uid_list:
activity_tool.SQLBase_processMessage(table=self.sql_table, self.unreserveMessageList(db, 0, uid_list)
uid=uid_to_duplicate_uid_list_dict.keys()) # Process messages.
db.query("UPDATE %s"
" SET processing=1, processing_date=UTC_TIMESTAMP(6)"
" WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, ','.join(map(str, uid_to_duplicate_uid_list_dict))))
return message_list, group_method_id, uid_to_duplicate_uid_list_dict return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except: except:
self._log(WARNING, 'Exception while reserving messages.') self._log(WARNING, 'Exception while reserving messages.')
...@@ -550,8 +526,7 @@ class SQLBase(Queue): ...@@ -550,8 +526,7 @@ class SQLBase(Queue):
for uid_list in uid_to_duplicate_uid_list_dict.itervalues(): for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
to_free_uid_list += uid_list to_free_uid_list += uid_list
try: try:
self.makeMessageListAvailable(activity_tool=activity_tool, self.unreserveMessageList(db, 0, to_free_uid_list)
uid_list=to_free_uid_list)
except: except:
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list) self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else: else:
...@@ -636,6 +611,18 @@ class SQLBase(Queue): ...@@ -636,6 +611,18 @@ class SQLBase(Queue):
transaction.commit() transaction.commit()
return not message_list return not message_list
def deleteMessageList(self, db, uid_list):
db.query("DELETE FROM %s WHERE uid IN (%s)" % (
self.sql_table, ','.join(map(str, uid_list))))
def reactivateMessageList(self, db, uid_list, delay, retry):
db.query("UPDATE %s SET"
" date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
"%s WHERE uid IN (%s)" % (
self.sql_table, delay,
", priority = priority + 1, retry = retry + 1" if retry else "",
",".join(map(str, uid_list))))
def finalizeMessageExecution(self, activity_tool, message_list, def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None): uid_to_duplicate_uid_list_dict=None):
""" """
...@@ -648,6 +635,7 @@ class SQLBase(Queue): ...@@ -648,6 +635,7 @@ class SQLBase(Queue):
be put in a permanent-error state. be put in a permanent-error state.
- In all other cases, retry count is increased and message is delayed. - In all other cases, retry count is increased and message is delayed.
""" """
db = activity_tool.getSQLConnection()
deletable_uid_list = [] deletable_uid_list = []
delay_uid_list = [] delay_uid_list = []
final_error_uid_list = [] final_error_uid_list = []
...@@ -692,10 +680,7 @@ class SQLBase(Queue): ...@@ -692,10 +680,7 @@ class SQLBase(Queue):
delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2 delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2
try: try:
# Immediately update, because values different for every message # Immediately update, because values different for every message
activity_tool.SQLBase_reactivate(table=self.sql_table, self.reactivateMessageList(db, (uid,), delay, True)
uid=[uid],
delay=delay,
retry=1)
except: except:
self._log(WARNING, 'Failed to reactivate %r' % uid) self._log(WARNING, 'Failed to reactivate %r' % uid)
make_available_uid_list.append(uid) make_available_uid_list.append(uid)
...@@ -709,9 +694,7 @@ class SQLBase(Queue): ...@@ -709,9 +694,7 @@ class SQLBase(Queue):
deletable_uid_list.append(uid) deletable_uid_list.append(uid)
if deletable_uid_list: if deletable_uid_list:
try: try:
self._retryOnLockError(activity_tool.SQLBase_delMessage, self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list))
kw={'table': self.sql_table,
'uid': deletable_uid_list})
except: except:
self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list) self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
else: else:
...@@ -719,21 +702,19 @@ class SQLBase(Queue): ...@@ -719,21 +702,19 @@ class SQLBase(Queue):
if delay_uid_list: if delay_uid_list:
try: try:
# If this is a conflict error, do not increase 'retry' but only delay. # If this is a conflict error, do not increase 'retry' but only delay.
activity_tool.SQLBase_reactivate(table=self.sql_table, self.reactivateMessageList(db, delay_uid_list,
uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, retry=None) VALIDATION_ERROR_DELAY, False)
except: except:
self._log(ERROR, 'Failed to delay %r' % delay_uid_list) self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
if final_error_uid_list: if final_error_uid_list:
try: try:
activity_tool.SQLBase_assignMessage(table=self.sql_table, self.unreserveMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
except: except:
self._log(ERROR, 'Failed to set message to error state for %r' self._log(ERROR, 'Failed to set message to error state for %r'
% final_error_uid_list) % final_error_uid_list)
if make_available_uid_list: if make_available_uid_list:
try: try:
self.makeMessageListAvailable(activity_tool=activity_tool, self.unreserveMessageList(db, 0, make_available_uid_list)
uid_list=make_available_uid_list)
except: except:
self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list) self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
else: else:
...@@ -783,13 +764,14 @@ class SQLBase(Queue): ...@@ -783,13 +764,14 @@ class SQLBase(Queue):
invoke(m) invoke(m)
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
uid_list = [] uid_list = []
for line in self._getMessageList(activity_tool, path=path, processing=0, db = activity_tool.getSQLConnection()
for line in self._getMessageList(db, path=path, processing=0,
**({'method_id': method_id} if method_id else {})): **({'method_id': method_id} if method_id else {})):
uid_list.append(line.uid) uid_list.append(line.uid)
if invoke: if invoke:
invoke(Message.load(line.message, uid=line.uid, line=line)) invoke(Message.load(line.message, uid=line.uid, line=line))
if uid_list: if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list) self.deleteMessageList(db, uid_list)
# Required for tests # Required for tests
def timeShift(self, activity_tool, delay, processing_node=None): def timeShift(self, activity_tool, delay, processing_node=None):
...@@ -797,5 +779,9 @@ class SQLBase(Queue): ...@@ -797,5 +779,9 @@ class SQLBase(Queue):
To simulate time shift, we simply substract delay from To simulate time shift, we simply substract delay from
all dates in message(_queue) table all dates in message(_queue) table
""" """
activity_tool.SQLBase_timeShift(table=self.sql_table, delay=delay, activity_tool.getSQLConnection().query("UPDATE %s SET"
processing_node=processing_node) " date = DATE_SUB(date, INTERVAL %s SECOND),"
" processing_date = DATE_SUB(processing_date, INTERVAL %s SECOND)"
% (self.sql_table, delay, delay)
+ ('' if processing_node is None else
"WHERE processing_node=%s" % processing_node))
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
# #
############################################################################## ##############################################################################
from Shared.DC.ZRDB.Results import Results
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
import sys import sys
#from time import time #from time import time
...@@ -74,8 +75,9 @@ class SQLDict(SQLBase): ...@@ -74,8 +75,9 @@ class SQLDict(SQLBase):
message_list = activity_buffer.getMessageList(self) message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered] return [m for m in message_list if m.is_registered]
def getProcessableMessageLoader(self, activity_tool, processing_node): def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {} path_and_method_id_dict = {}
quote = db.string_literal
def load(line): def load(line):
# getProcessableMessageList already fetch messages with the same # getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path and # group_method_id, so what remains to be filtered on are path and
...@@ -87,6 +89,8 @@ class SQLDict(SQLBase): ...@@ -87,6 +89,8 @@ class SQLDict(SQLBase):
uid = line.uid uid = line.uid
original_uid = path_and_method_id_dict.get(key) original_uid = path_and_method_id_dict.get(key)
if original_uid is None: if original_uid is None:
sql_method_id = " AND method_id = %s AND group_method_id = %s" % (
quote(method_id), quote(line.group_method_id))
m = Message.load(line.message, uid=uid, line=line) m = Message.load(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent') merge_parent = m.activity_kw.get('merge_parent')
try: try:
...@@ -101,11 +105,14 @@ class SQLDict(SQLBase): ...@@ -101,11 +105,14 @@ class SQLDict(SQLBase):
path_list.append(path) path_list.append(path)
uid_list = [] uid_list = []
if path_list: if path_list:
result = activity_tool.SQLDict_selectParentMessage( # Select parent messages.
path=path_list, result = Results(db.query("SELECT * FROM message"
method_id=method_id, " WHERE processing_node IN (0, %s) AND path IN (%s)%s"
group_method_id=line.group_method_id, " ORDER BY path LIMIT 1 FOR UPDATE" % (
processing_node=processing_node) processing_node,
','.join(map(quote, path_list)),
sql_method_id,
), 0))
if result: # found a parent if result: # found a parent
# mark child as duplicate # mark child as duplicate
uid_list.append(uid) uid_list.append(uid)
...@@ -115,29 +122,34 @@ class SQLDict(SQLBase): ...@@ -115,29 +122,34 @@ class SQLDict(SQLBase):
uid = line.uid uid = line.uid
m = Message.load(line.message, uid=uid, line=line) m = Message.load(line.message, uid=uid, line=line)
# return unreserved similar children # return unreserved similar children
result = activity_tool.SQLDict_selectChildMessageList( path = line.path
path=line.path, result = db.query("SELECT uid FROM message"
method_id=method_id, " WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
group_method_id=line.group_method_id) "%s FOR UPDATE" % (
reserve_uid_list = [x.uid for x in result] quote(path), quote(path.replace('_', r'\_') + '/%'),
sql_method_id,
), 0)[1]
reserve_uid_list = [x for x, in result]
uid_list += reserve_uid_list uid_list += reserve_uid_list
if not line.processing_node: if not line.processing_node:
# reserve found parent # reserve found parent
reserve_uid_list.append(uid) reserve_uid_list.append(uid)
else: else:
result = activity_tool.SQLDict_selectDuplicatedLineList( # Select duplicates.
path=path, result = db.query("SELECT uid FROM message"
method_id=method_id, " WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
group_method_id=line.group_method_id) quote(path), sql_method_id,
reserve_uid_list = uid_list = [x.uid for x in result] ), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
if reserve_uid_list: if reserve_uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList( db.query(
processing_node=processing_node, uid=reserve_uid_list) "UPDATE message SET processing_node=%s WHERE uid IN (%s)" % (
else: processing_node, ','.join(map(str, reserve_uid_list)),
activity_tool.SQLDict_commit() # release locks ))
db.query("COMMIT")
except: except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception') self._log(WARNING, 'Failed to reserve duplicates')
activity_tool.SQLDict_rollback() # release locks db.query("ROLLBACK")
raise raise
if uid_list: if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
...@@ -45,25 +45,7 @@ class SQLJoblib(SQLDict): ...@@ -45,25 +45,7 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job' sql_table = 'message_job'
uid_group = 'portal_activity_job' uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear): _createMessageTable = 'SQLJoblib_createMessageTable'
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable()
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def generateMessageUID(self, m): def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'), return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
...@@ -114,8 +96,9 @@ class SQLJoblib(SQLDict): ...@@ -114,8 +96,9 @@ class SQLJoblib(SQLDict):
else: else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached") raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
def getProcessableMessageLoader(self, activity_tool, processing_node): def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {} path_and_method_id_dict = {}
quote = db.string_literal
def load(line): def load(line):
# getProcessableMessageList already fetch messages with the same # getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id # group_method_id, so what remains to be filtered on are path, method_id
...@@ -128,19 +111,23 @@ class SQLJoblib(SQLDict): ...@@ -128,19 +111,23 @@ class SQLJoblib(SQLDict):
if original_uid is None: if original_uid is None:
m = Message.load(line.message, uid=uid, line=line) m = Message.load(line.message, uid=uid, line=line)
try: try:
result = activity_tool.SQLJoblib_selectDuplicatedLineList( # Select duplicates.
path=path, result = db.query("SELECT uid FROM message_job"
method_id=method_id, " WHERE processing_node = 0 AND path = %s AND signature = %s"
group_method_id=line.group_method_id, " AND method_id = %s AND group_method_id = %s FOR UPDATE" % (
signature=line.signature) quote(path), quote(line.signature),
reserve_uid_list = uid_list = [x.uid for x in result] quote(method_id), quote(line.group_method_id),
if reserve_uid_list: ), 0)[1]
activity_tool.SQLBase_reserveMessageList( uid_list = [x for x, in result]
table=self.sql_table, if uid_list:
processing_node=processing_node, db.query(
uid=reserve_uid_list) "UPDATE message_job SET processing_node=%s WHERE uid IN (%s)" % (
processing_node, ','.join(map(str, uid_list)),
))
db.query("COMMIT")
except: except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception') self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise raise
if uid_list: if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
...@@ -655,11 +655,6 @@ class ActivityTool (BaseTool): ...@@ -655,11 +655,6 @@ class ActivityTool (BaseTool):
activity_timing_log = False activity_timing_log = False
cancel_and_invoke_links_hidden = False cancel_and_invoke_links_hidden = False
def SQLDict_setPriority(self, **kw):
real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
return real_SQLDict_setPriority(**kw)
# Filter content (ZMI)) # Filter content (ZMI))
def filtered_meta_types(self, user=None): def filtered_meta_types(self, user=None):
# Filters the list of available meta types. # Filters the list of available meta types.
...@@ -670,6 +665,9 @@ class ActivityTool (BaseTool): ...@@ -670,6 +665,9 @@ class ActivityTool (BaseTool):
meta_types.append(meta_type) meta_types.append(meta_type)
return meta_types return meta_types
def getSQLConnection(self):
return self.aq_inner.aq_parent.cmf_activity_sql_connection()
def maybeMigrateConnectionClass(self): def maybeMigrateConnectionClass(self):
connection_id = 'cmf_activity_sql_connection' connection_id = 'cmf_activity_sql_connection'
sql_connection = getattr(self, connection_id, None) sql_connection = getattr(self, connection_id, None)
...@@ -1127,14 +1125,16 @@ class ActivityTool (BaseTool): ...@@ -1127,14 +1125,16 @@ class ActivityTool (BaseTool):
def hasActivity(self, *args, **kw): def hasActivity(self, *args, **kw):
# Check in each queue if the object has deferred tasks # Check in each queue if the object has deferred tasks
# if not argument is provided, then check on self # if not argument is provided, then check on self
if len(args) > 0: if args:
obj = args[0] obj, = args
else: else:
obj = self obj = self
for activity in activity_dict.itervalues(): path = None if obj is None else '/'.join(obj.getPhysicalPath())
if activity.hasActivity(aq_inner(self), obj, **kw): db = self.getSQLConnection()
return True quote = db.string_literal
return False return bool(db.query("(%s)" % ") UNION ALL (".join(
activity.hasActivitySQL(quote, path=path, **kw)
for activity in activity_dict.itervalues()))[1])
security.declarePrivate('getActivityBuffer') security.declarePrivate('getActivityBuffer')
def getActivityBuffer(self, create_if_not_found=True): def getActivityBuffer(self, create_if_not_found=True):
...@@ -1443,8 +1443,9 @@ class ActivityTool (BaseTool): ...@@ -1443,8 +1443,9 @@ class ActivityTool (BaseTool):
""" """
if not(isinstance(message_uid_list, list)): if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list] message_uid_list = [message_uid_list]
self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table, if message_uid_list:
uid=message_uid_list) activity_dict[activity].unreserveMessageList(self.getSQLConnection(),
0, message_uid_list)
if REQUEST is not None: if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % ( return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view')) self.absolute_url(), 'view'))
...@@ -1470,8 +1471,8 @@ class ActivityTool (BaseTool): ...@@ -1470,8 +1471,8 @@ class ActivityTool (BaseTool):
""" """
if not(isinstance(message_uid_list, list)): if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list] message_uid_list = [message_uid_list]
self.SQLBase_delMessage(table=activity_dict[activity].sql_table, activity_dict[activity].deleteMessageList(
uid=message_uid_list) self.getSQLConnection(), message_uid_list)
if REQUEST is not None: if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % ( return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view')) self.absolute_url(), 'view'))
...@@ -1523,10 +1524,7 @@ class ActivityTool (BaseTool): ...@@ -1523,10 +1524,7 @@ class ActivityTool (BaseTool):
""" """
Return the number of messages which match the given tag. Return the number of messages which match the given tag.
""" """
message_count = 0 return self.countMessage(tag=value)
for activity in activity_dict.itervalues():
message_count += activity.countMessageWithTag(aq_inner(self), value)
return message_count
security.declarePublic('countMessage') security.declarePublic('countMessage')
def countMessage(self, **kw): def countMessage(self, **kw):
...@@ -1540,10 +1538,11 @@ class ActivityTool (BaseTool): ...@@ -1540,10 +1538,11 @@ class ActivityTool (BaseTool):
tag : activities with a particular tag tag : activities with a particular tag
message_uid : activities with a particular uid message_uid : activities with a particular uid
""" """
message_count = 0 db = self.getSQLConnection()
for activity in activity_dict.itervalues(): quote = db.string_literal
message_count += activity.countMessage(aq_inner(self), **kw) return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join(
return message_count activity.countMessageSQL(quote, **kw)
for activity in activity_dict.itervalues()))[1])
security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' ) security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
def newActiveProcess(self, REQUEST=None, **kw): def newActiveProcess(self, REQUEST=None, **kw):
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
uid:list
</params>
UPDATE
<dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">,
processing=0
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid:list
</params>
DELETE FROM
<dtml-var table>
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
DROP TABLE IF EXISTS <dtml-var table>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
SELECT UTC_TIMESTAMP(6)
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
</params>
SELECT `priority`, `date` FROM
<dtml-var table>
WHERE
processing_node = 0
AND date <= UTC_TIMESTAMP(6)
ORDER BY priority, date
LIMIT 1
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
to_date
count
group_method_id
</params>
SELECT
*
FROM
<dtml-var table>
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime(6)">
<dtml-if expr="group_method_id is not None">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
</dtml-if>
ORDER BY
<dtml-comment>
During normal operation, sorting by date (as 2nd criteria) is fairer
for users and reduce the probability to do the same work several times
(think of an object that is modified several times in a short period of time).
</dtml-comment>
priority, date
LIMIT <dtml-sqlvar count type="int">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
path
method_id
active_process_uid
only_valid
only_invalid</params>
SELECT count(path) as message_count FROM
<dtml-var table>
WHERE 1 = 1
<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if expr="only_valid">AND processing_node > -2</dtml-if>
<dtml-if expr="only_invalid">AND processing_node < -1</dtml-if>
<dtml-if expr="active_process_uid is not None"> AND active_process_uid = <dtml-sqlvar active_process_uid type="int"> </dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid</params>
UPDATE
<dtml-var table>
SET
processing_node=0,
processing=0
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid</params>
UPDATE
<dtml-var table>
SET
processing_date = UTC_TIMESTAMP(6),
processing = 1
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid:list
retry
delay
</params>
UPDATE
<dtml-var table>
SET
date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL
<dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="retry is not None">
, priority = priority + <dtml-sqlvar retry type="int">
, retry = retry + <dtml-sqlvar retry type="int">
</dtml-if>
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
uid
</params>
UPDATE
<dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
group_method_id
count</params>
SELECT
*
FROM
<dtml-var table>
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
<dtml-if expr="group_method_id is not None">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
</dtml-if>
<dtml-if expr="count is not None">
LIMIT <dtml-sqlvar count type="int">
</dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
delay
processing_node</params>
UPDATE
<dtml-var table>
SET
date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND),
processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="processing_node is not None">
WHERE <dtml-sqltest processing_node type="int">
</dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
</params>
UPDATE
message
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
ROLLBACK
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
</params>
SELECT uid FROM
message
WHERE
processing_node = 0
AND (path = <dtml-sqlvar path type="string">
OR path LIKE <dtml-sqlvar type="string"
expr="path.replace('_', r'\_') + '/%'">)
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
</params>
SELECT uid FROM
message
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
processing_node
</params>
SELECT * FROM
message
WHERE
processing_node IN (0, <dtml-sqlvar processing_node type="int">)
AND <dtml-sqltest path type="string" multiple>
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
ORDER BY path
LIMIT 1
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE
...@@ -2184,7 +2184,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2184,7 +2184,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity.getProcessableMessageList(activity_tool, 3) activity.getProcessableMessageList(activity_tool, 3)
self.commit() self.commit()
result = activity._getMessageList(activity_tool) result = activity._getMessageList(activity_tool.getSQLConnection())
try: try:
self.assertEqual(len([message self.assertEqual(len([message
for message in result for message in result
...@@ -2205,8 +2205,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2205,8 +2205,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
1) 1)
finally: finally:
# Clear activities from all nodes # Clear activities from all nodes
activity_tool.SQLBase_delMessage(table=SQLDict.sql_table, activity.deleteMessageList(activity_tool.getSQLConnection(),
uid=[message.uid for message in result]) [message.uid for message in result])
self.commit() self.commit()
def test_116_RaiseInCommitBeforeMessageExecution(self): def test_116_RaiseInCommitBeforeMessageExecution(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment