Commit eca363aa authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: drop useless 'order_validation_text' column from 'message' table

For even more refactoring between SQLDict & SQLQueue,
which now uses SQL tables with the same schema.
parent 3d644bde
...@@ -42,6 +42,9 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import ( ...@@ -42,6 +42,9 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
# TODO: Limit by size in bytes instead of number of rows.
MAX_MESSAGE_LIST_SIZE = 100
def sort_message_key(message): def sort_message_key(message):
# same sort key as in SQLBase.getMessageList # same sort key as in SQLBase.getMessageList
return message.line.priority, message.line.date, message.uid return message.line.priority, message.line.date, message.uid
...@@ -95,8 +98,7 @@ class SQLBase(Queue): ...@@ -95,8 +98,7 @@ class SQLBase(Queue):
def initialize(self, activity_tool, clear): def initialize(self, activity_tool, clear):
folder = activity_tool.getPortalObject().portal_skins.activity folder = activity_tool.getPortalObject().portal_skins.activity
try: try:
createMessageTable = getattr(folder, createMessageTable = folder.SQLBase_createMessageTable
self.__class__.__name__ + '_createMessageTable')
except AttributeError: except AttributeError:
return return
if clear: if clear:
...@@ -105,7 +107,8 @@ class SQLBase(Queue): ...@@ -105,7 +107,8 @@ class SQLBase(Queue):
column_list = [] column_list = []
try: try:
src = createMessageTable._upgradeSchema(added_list=column_list, src = createMessageTable._upgradeSchema(added_list=column_list,
modified_list=column_list) modified_list=column_list,
table=self.sql_table)
except ProgrammingError, e: except ProgrammingError, e:
if e[0] != NO_SUCH_TABLE: if e[0] != NO_SUCH_TABLE:
raise raise
...@@ -118,7 +121,41 @@ class SQLBase(Queue): ...@@ -118,7 +121,41 @@ class SQLBase(Queue):
LOG('CMFActivity', INFO, "%r table upgraded\n%s" LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src)) % (self.sql_table, src))
return return
createMessageTable() createMessageTable(table=self.sql_table)
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
uid_list = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=len(message_list), id_generator='uid')
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
portal.SQLBase_writeMessageList(
table=self.sql_table,
uid_list=uid_list,
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
def getNow(self, context): def getNow(self, context):
""" """
......
...@@ -40,8 +40,6 @@ MAX_VALIDATED_LIMIT = 1000 ...@@ -40,8 +40,6 @@ MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate. # Read up to this number of messages to validate.
READ_MESSAGE_LIMIT = 1000 READ_MESSAGE_LIMIT = 1000
MAX_MESSAGE_LIST_SIZE = 100
class SQLDict(SQLBase): class SQLDict(SQLBase):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
...@@ -49,48 +47,7 @@ class SQLDict(SQLBase): ...@@ -49,48 +47,7 @@ class SQLDict(SQLBase):
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message' sql_table = 'message'
uid_group = 'portal_activity'
# Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE]
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
order_validation_text_list = []
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
# BBB: 'order_validation_text' SQL column is now useless.
# If we remove it, 'message' & 'message_queue' can have the same
# schema, and much code can be merged into SQLBase.
order_validation_text_list.append(x)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(Message.dump, message_list)
# The uid_list also is store in the ZODB
uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
id_generator='uid', id_group='portal_activity',
id_count=len(message_list))
activity_tool.SQLDict_writeMessageList(
uid_list=uid_list,
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=dumped_message_list,
date_list=date_list,
group_method_id_list=group_method_id_list,
tag_list=tag_list,
serialization_tag_list=serialization_tag_list,
processing_node_list=processing_node_list,
order_validation_text_list=order_validation_text_list)
def generateMessageUID(self, m): def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')) return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
......
...@@ -38,8 +38,6 @@ MAX_VALIDATED_LIMIT = 1000 ...@@ -38,8 +38,6 @@ MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate. # Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000 READ_MESSAGE_LIMIT = 1000
MAX_MESSAGE_LIST_SIZE = 100
class SQLQueue(SQLBase): class SQLQueue(SQLBase):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
...@@ -47,41 +45,7 @@ class SQLQueue(SQLBase): ...@@ -47,41 +45,7 @@ class SQLQueue(SQLBase):
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message_queue' sql_table = 'message_queue'
uid_group = 'portal_activity_queue'
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE]
# The uid_list also is store in the ZODB
uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
id_generator='uid', id_group='portal_activity_queue',
id_count=len(message_list))
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(Message.dump, message_list)
activity_tool.SQLQueue_writeMessageList(
uid_list=uid_list,
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=dumped_message_list,
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
offset = 0 offset = 0
......
...@@ -583,7 +583,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -583,7 +583,7 @@ class ActivityTool (Folder, UniqueObject):
distributingNode = '' distributingNode = ''
_nodes = () _nodes = ()
activity_creation_trace = False activity_creation_trace = False
activity_tracking = False activity_tracking = True
activity_timing_log = False activity_timing_log = False
cancel_and_invoke_links_hidden = False cancel_and_invoke_links_hidden = False
......
...@@ -7,8 +7,8 @@ cache_time:0 ...@@ -7,8 +7,8 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params></params> <params>table</params>
CREATE TABLE `message_queue` ( CREATE TABLE <dtml-var table> (
`uid` INT UNSIGNED NOT NULL, `uid` INT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL, `date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL, `path` VARCHAR(255) NOT NULL,
......
...@@ -7,7 +7,8 @@ cache_time:0 ...@@ -7,7 +7,8 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid_list <params>table
uid_list
path_list path_list
active_process_uid_list active_process_uid_list
method_id_list method_id_list
...@@ -19,7 +20,7 @@ group_method_id_list ...@@ -19,7 +20,7 @@ group_method_id_list
tag_list tag_list
serialization_tag_list serialization_tag_list
</params> </params>
INSERT INTO message_queue INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message) (uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
VALUES VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
CREATE TABLE `message` (
`uid` INT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`order_validation_text` VARCHAR(255) NOT NULL,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`),
KEY (`order_validation_text`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
date_list
processing_node_list
group_method_id_list
tag_list
serialization_tag_list
order_validation_text_list</params>
INSERT INTO message
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, order_validation_text, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list is not None"><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
...@@ -2682,56 +2682,29 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2682,56 +2682,29 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
DB.query = DB.original_query DB.query = DB.original_query
del DB.original_query del DB.original_query
def test_MAX_MESSAGE_LIST_SIZE_SQLQueue(self): def test_MAX_MESSAGE_LIST_SIZE(self):
from Products.CMFActivity.Activity import SQLQueue from Products.CMFActivity.Activity import SQLBase
old_MAX_MESSAGE_LIST_SIZE = SQLQueue.MAX_MESSAGE_LIST_SIZE MAX_MESSAGE_LIST_SIZE = SQLBase.MAX_MESSAGE_LIST_SIZE
SQLQueue.MAX_MESSAGE_LIST_SIZE = 3
try:
global call_count
call_count = 0
def dummy_counter(self):
global call_count
call_count += 1
Organisation.dummy_counter = dummy_counter
o = self.portal.organisation_module.newContent(portal_type='Organisation',)
for i in range(10):
o.activate(activity='SQLQueue').dummy_counter()
self.flushAllActivities()
self.assertEqual(call_count, 10)
finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
del Organisation.dummy_counter
def test_MAX_MESSAGE_LIST_SIZE_SQLDict(self):
from Products.CMFActivity.Activity import SQLDict
old_MAX_MESSAGE_LIST_SIZE = SQLDict.MAX_MESSAGE_LIST_SIZE
SQLDict.MAX_MESSAGE_LIST_SIZE = 3
try: try:
global call_count SQLBase.MAX_MESSAGE_LIST_SIZE = 3
call_count = 0 def dummy_counter(o):
def dummy_counter(self): self.__call_count += 1
global call_count o = self.portal.organisation_module.newContent(portal_type='Organisation')
call_count += 1
o = self.portal.organisation_module.newContent(portal_type='Organisation',)
for i in range(10): for activity in "SQLDict", "SQLQueue":
method_name = 'dummy_counter_%s' % i self.__call_count = 0
setattr(Organisation, method_name, dummy_counter) try:
getattr(o.activate(activity='SQLDict'), method_name)() for i in xrange(10):
method_name = 'dummy_counter_%s' % i
self.flushAllActivities() getattr(o.activate(activity=activity), method_name)()
self.assertEqual(call_count, 10) setattr(Organisation, method_name, dummy_counter)
self.flushAllActivities()
finally:
for i in xrange(10):
delattr(Organisation, 'dummy_counter_%s' % i)
self.assertEqual(self.__call_count, 10)
finally: finally:
SQLDict.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE SQLBase.MAX_MESSAGE_LIST_SIZE = MAX_MESSAGE_LIST_SIZE
for i in range(10):
method_name = 'dummy_counter_%s' % i
delattr(Organisation, method_name)
def test_115_TestSerializationTagSQLDictPreventsParallelExecution(self, quiet=0, run=run_all_test): def test_115_TestSerializationTagSQLDictPreventsParallelExecution(self, quiet=0, run=run_all_test):
""" """
...@@ -3032,10 +3005,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3032,10 +3005,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
When an activity creates several activities, make sure that all newly When an activity creates several activities, make sure that all newly
created activities are not commited if there is ZODB Conflict error created activities are not commited if there is ZODB Conflict error
""" """
from Products.CMFActivity.Activity import SQLQueue from Products.CMFActivity.Activity import SQLBase
old_MAX_MESSAGE_LIST_SIZE = SQLQueue.MAX_MESSAGE_LIST_SIZE MAX_MESSAGE_LIST_SIZE = SQLBase.MAX_MESSAGE_LIST_SIZE
SQLQueue.MAX_MESSAGE_LIST_SIZE = 1
try: try:
SQLBase.MAX_MESSAGE_LIST_SIZE = 1
activity_tool = self.getPortal().portal_activities activity_tool = self.getPortal().portal_activities
def doSomething(self): def doSomething(self):
self.serialize() self.serialize()
...@@ -3057,7 +3030,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3057,7 +3030,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(['doSomething'],[x.method_id for x in message_list]) self.assertEqual(['doSomething'],[x.method_id for x in message_list])
activity_tool.manageClearActivities() activity_tool.manageClearActivities()
finally: finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE SQLBase.MAX_MESSAGE_LIST_SIZE = MAX_MESSAGE_LIST_SIZE
def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self): def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
activity_tool = self.portal.portal_activities activity_tool = self.portal.portal_activities
......
...@@ -276,9 +276,9 @@ _create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+', re.I).search ...@@ -276,9 +276,9 @@ _create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+', re.I).search
_key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search _key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search
def DA_upgradeSchema(self, connection_id=None, added_list=None, def DA_upgradeSchema(self, connection_id=None, added_list=None,
modified_list=None, src__=0): modified_list=None, src__=0, **kw):
query = self.getPortalObject()[connection_id or self.connection_id]().query query = self.getPortalObject()[connection_id or self.connection_id]().query
src = self(src__=1) src = self(src__=1, **kw)
m = _create_search(src) m = _create_search(src)
if m is None: if m is None:
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment