Commit ad2c7960 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Added priority implementation


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@344 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 5c2172dd
...@@ -26,17 +26,25 @@ ...@@ -26,17 +26,25 @@
# #
############################################################################## ##############################################################################
import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMDict import RAMDict from RAMDict import RAMDict
from zLOG import LOG from zLOG import LOG
MAX_RETRY = 10 MAX_RETRY = 5
DISTRIBUTABLE_STATE = -1 DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2 INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3 VALIDATE_ERROR_STATE = -3
priority_weight = \
[1] * 64 + \
[2] * 20 + \
[3] * 10 + \
[4] * 5 + \
[5] * 1
class SQLDict(RAMDict): class SQLDict(RAMDict):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
...@@ -45,10 +53,18 @@ class SQLDict(RAMDict): ...@@ -45,10 +53,18 @@ class SQLDict(RAMDict):
""" """
def queueMessage(self, activity_tool, m): def queueMessage(self, activity_tool, m):
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, message = self.dumpMessage(m)) activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m))
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
result = activity_tool.SQLDict_readMessage(processing_node=processing_node) priority = random.choice(priority_weight)
# Try to find a message at given priority level
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
if len(result) == 0:
# If empty, take any message
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=None)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
...@@ -57,7 +73,7 @@ class SQLDict(RAMDict): ...@@ -57,7 +73,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node) activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
if m.validate(self, activity_tool): if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time
retry = 0 retry = 0
while retry < MAX_RETRY: while retry < MAX_RETRY:
activity_tool.invoke(m) # Try to invoke the message activity_tool.invoke(m) # Try to invoke the message
...@@ -122,7 +138,7 @@ class SQLDict(RAMDict): ...@@ -122,7 +138,7 @@ class SQLDict(RAMDict):
get_transaction().abort() # Abort and retry get_transaction().abort() # Abort and retry
retry = retry + 1 retry = retry + 1
if m.is_executed: # Make sure message could be invoked if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=None) # Delete it
if commit: get_transaction().commit() # If successful, commit if commit: get_transaction().commit() # If successful, commit
else: else:
if commit: get_transaction().abort() # If not, abort transaction and start a new one if commit: get_transaction().abort() # If not, abort transaction and start a new one
......
...@@ -13,9 +13,11 @@ CREATE TABLE `message` ( ...@@ -13,9 +13,11 @@ CREATE TABLE `message` (
`method_id` VARCHAR(40), `method_id` VARCHAR(40),
`processing_node` INT DEFAULT -1, `processing_node` INT DEFAULT -1,
`processing` INT DEFAULT 0, `processing` INT DEFAULT 0,
`priority` INT DEFAULT 0,
`message` BLOB, `message` BLOB,
KEY `path` (`path`), KEY `path` (`path`),
KEY `method_id` (`method_id`), KEY `method_id` (`method_id`),
KEY `processing_node` (`processing_node`), KEY `processing_node` (`processing_node`),
KEY `processing` (`processing`), KEY `processing` (`processing`),
KEY `priority` (`priority`),
) TYPE = InnoDB; ) TYPE = InnoDB;
...@@ -7,11 +7,12 @@ cache_time:0 ...@@ -7,11 +7,12 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node</params> <params>processing_node
priority</params>
SELECT * FROM SELECT * FROM
message message
WHERE WHERE
processing = 0 processing = 0
<dtml-if processing_node> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
AND processing_node = <dtml-sqlvar processing_node type="int"> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
</dtml-if>
\ No newline at end of file
...@@ -9,11 +9,13 @@ class_file: ...@@ -9,11 +9,13 @@ class_file:
</dtml-comment> </dtml-comment>
<params>path <params>path
method_id method_id
processing_node</params> processing_node
priority</params>
SELECT * FROM SELECT * FROM
message message
WHERE WHERE
processing = 0 processing = 0
<dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
...@@ -9,7 +9,8 @@ class_file: ...@@ -9,7 +9,8 @@ class_file:
</dtml-comment> </dtml-comment>
<params>path <params>path
method_id method_id
priority
message</params> message</params>
INSERT INTO message INSERT INTO message
VALUES VALUES
(<dtml-sqlvar path type="string">,<dtml-sqlvar method_id type="string">,-1,0,<dtml-sqlvar message type="string">); (<dtml-sqlvar path type="string">,<dtml-sqlvar method_id type="string">,-1,0,<dtml-sqlvar priority type="int">,<dtml-sqlvar message type="string">);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment