Commit 4fbe8187 authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: drop DTML completely and use consecutive uids when possible

This moves the remaining DTML queries to Python, dropping the 'activity' skin.

Dealing with conflicts of uids is easier if the inserted uids are consecutive:
now, only 1 random value is generated, as base uid. This also preserves the
order of insertion, which is wanted for performance reasons:
- No more random write in the primary index.
- When modifying several lines of several documents, 1 document being processed
  at a time, we'd like that any grouped activity (usually indexation) follows
  the same order, so that a processing node prefer many lines from a few
  documents instead of mixing lines from too many documents at the same time.
  This is usually better for caches.
parent 042d2f9c
...@@ -33,7 +33,6 @@ import MySQLdb ...@@ -33,7 +33,6 @@ import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY from MySQLdb.constants.ER import DUP_ENTRY
from DateTime import DateTime from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results from Shared.DC.ZRDB.Results import Results
from Shared.DC.ZRDB.DA import DatabaseError
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import ( from Products.CMFActivity.ActivityTool import (
...@@ -70,13 +69,14 @@ def sort_message_key(message): ...@@ -70,13 +69,14 @@ def sort_message_key(message):
_DequeueMessageException = Exception() _DequeueMessageException = Exception()
def render_datetime(x):
return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]
# sqltest_dict ({'condition_name': <render_function>}) defines how to render # sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList # condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict(): def sqltest_dict():
sqltest_dict = {} sqltest_dict = {}
no_quote_type = int, float, long no_quote_type = int, float, long
def render_datetime(x):
return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]
def _(name, column=None, op="="): def _(name, column=None, op="="):
if column is None: if column is None:
column = name column = name
...@@ -126,22 +126,41 @@ class SQLBase(Queue): ...@@ -126,22 +126,41 @@ class SQLBase(Queue):
""" """
Define a set of common methods for SQL-based storage of activities. Define a set of common methods for SQL-based storage of activities.
""" """
_createMessageTable = 'SQLBase_createMessageTable' def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def initialize(self, activity_tool, clear): def initialize(self, activity_tool, clear):
folder = activity_tool.getPortalObject().portal_skins.activity db = activity_tool.getSQLConnection()
try: create = self.createTableSQL()
createMessageTable = getattr(folder, self._createMessageTable)
except AttributeError:
return
if clear: if clear:
activity_tool.getSQLConnection().query( db.query("DROP TABLE IF EXISTS " + self.sql_table)
"DROP TABLE IF EXISTS " + self.sql_table) db.query(create)
createMessageTable(table=self.sql_table)
else: else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1, src = db.upgradeSchema(create, create_if_not_exists=1,
initialize=self._initialize, initialize=self._initialize)
table=self.sql_table)
if src: if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s" LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src)) % (self.sql_table, src))
...@@ -152,48 +171,56 @@ class SQLBase(Queue): ...@@ -152,48 +171,56 @@ class SQLBase(Queue):
% (self.sql_table, ", ".join(column_list))) % (self.sql_table, ", ".join(column_list)))
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered] db = activity_tool.getSQLConnection()
portal = activity_tool.getPortalObject() quote = db.string_literal
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): def insert(reset_uid):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE] values = "),\n(".join(values_list)
path_list = ['/'.join(m.object_path) for m in message_list] del values_list[:]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
for _ in xrange(UID_ALLOCATION_TRY_COUNT): for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try: try:
portal.SQLBase_writeMessageList( db.query("INSERT INTO %s (uid,"
table=self.sql_table, " path, active_process_uid, date, method_id, processing_node,"
uid_list=[ " priority, group_method_id, tag, serialization_tag,"
getrandbits(UID_SAFE_BITSIZE) " message) VALUES\n(%s)" % (self.sql_table, values))
for _ in xrange(len(message_list))
],
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
except MySQLdb.IntegrityError, (code, _): except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY: if code != DUP_ENTRY:
raise raise
reset_uid = True
else: else:
break break
else: else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached") raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date')
values_list.append(','.join((
'@uid+%s' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m)))))
i += 1
if not i % MAX_MESSAGE_LIST_SIZE:
insert(reset_uid)
reset_uid = False
if values_list:
insert(reset_uid)
def _getMessageList(self, db, count=1000, src__=0, **kw): def _getMessageList(self, db, count=1000, src__=0, **kw):
# XXX: Because most columns have NOT NULL constraint, conditions with None # XXX: Because most columns have NOT NULL constraint, conditions with None
......
...@@ -45,56 +45,89 @@ class SQLJoblib(SQLDict): ...@@ -45,56 +45,89 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job' sql_table = 'message_job'
uid_group = 'portal_activity_job' uid_group = 'portal_activity_job'
_createMessageTable = 'SQLJoblib_createMessageTable' def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def generateMessageUID(self, m): def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'), return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id')) m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered] db = activity_tool.getSQLConnection()
portal = activity_tool.getPortalObject() quote = db.string_literal
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE): def insert(reset_uid):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE] values = "),\n(".join(values_list)
path_list = ['/'.join(m.object_path) for m in message_list] del values_list[:]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
signature_list=[m.activity_kw.get('signature', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
for _ in xrange(UID_ALLOCATION_TRY_COUNT): for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try: try:
portal.SQLJoblib_writeMessage( db.query("INSERT INTO %s (uid,"
uid_list=[ " path, active_process_uid, date, method_id, processing_node,"
getrandbits(UID_SAFE_BITSIZE) " priority, group_method_id, tag, signature, serialization_tag,"
for _ in xrange(len(message_list)) " message) VALUES\n(%s)" % (self.sql_table, values))
],
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
signature_list=signature_list,
serialization_tag_list=serialization_tag_list)
except MySQLdb.IntegrityError, (code, _): except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY: if code != DUP_ENTRY:
raise raise
reset_uid = True
else: else:
break break
else: else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached") raise ValueError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date')
values_list.append(','.join((
'@uid+%s' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else render_datetime(date),
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('signature', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m)))))
i += 1
if not i % MAX_MESSAGE_LIST_SIZE:
insert(reset_uid)
reset_uid = False
if values_list:
insert(reset_uid)
def getProcessableMessageLoader(self, db, processing_node): def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {} path_and_method_id_dict = {}
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
CREATE TABLE <dtml-var table> (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
processing_node_list
date_list
group_method_id_list
tag_list
serialization_tag_list
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
CREATE TABLE message_job (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
processing_node_list
date_list
group_method_id_list
tag_list
signature_list
serialization_tag_list
</params>
INSERT INTO message_job
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="signature_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
...@@ -2211,11 +2211,8 @@ class ERP5Generator(PortalGenerator): ...@@ -2211,11 +2211,8 @@ class ERP5Generator(PortalGenerator):
createDirectoryView(ps, reg_key) createDirectoryView(ps, reg_key)
def setupDefaultSkins(self, p): def setupDefaultSkins(self, p):
from Products.CMFCore.DirectoryView import addDirectoryViews
from Products.CMFActivity import cmfactivity_globals
ps = p.portal_skins ps = p.portal_skins
self.addCMFDefaultDirectoryViews(p) self.addCMFDefaultDirectoryViews(p)
addDirectoryViews(ps, 'skins', cmfactivity_globals)
ps.manage_addProduct['OFSP'].manage_addFolder(id='external_method') ps.manage_addProduct['OFSP'].manage_addFolder(id='external_method')
ps.manage_addProduct['OFSP'].manage_addFolder(id='custom') ps.manage_addProduct['OFSP'].manage_addFolder(id='custom')
# Set the 'custom' layer a high priority, so it remains the first # Set the 'custom' layer a high priority, so it remains the first
...@@ -2223,7 +2220,6 @@ class ERP5Generator(PortalGenerator): ...@@ -2223,7 +2220,6 @@ class ERP5Generator(PortalGenerator):
ps['custom'].manage_addProperty("business_template_skin_layer_priority", 100.0, "float") ps['custom'].manage_addProperty("business_template_skin_layer_priority", 100.0, "float")
skin_folder_list = [ 'custom' skin_folder_list = [ 'custom'
, 'external_method' , 'external_method'
, 'activity'
] + self.CMFDEFAULT_FOLDER_LIST ] + self.CMFDEFAULT_FOLDER_LIST
skin_folders = ', '.join(skin_folder_list) skin_folders = ', '.join(skin_folder_list)
ps.addSkinSelection( 'View' ps.addSkinSelection( 'View'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment