Commit 13e9eaa2 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Added deferred / transaction API


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@661 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 033c5022
...@@ -80,8 +80,19 @@ class Queue: ...@@ -80,8 +80,19 @@ class Queue:
self.is_initialized = 1 self.is_initialized = 1
def queueMessage(self, activity_tool, m): def queueMessage(self, activity_tool, m):
pass activity_tool.deferredQueueMessage(self, m)
m.is_queued = 1
def deleteMessage(self, activity_tool, m):
activity_tool.deferredDeleteMessage(self, m)
m.is_deleted = 1
def isDeleted(self, m):
return m.is_deleted
def isQueued(self, m):
return m.is_queued
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
pass pass
...@@ -137,8 +148,10 @@ class Queue: ...@@ -137,8 +148,10 @@ class Queue:
# Stop queue / activities in queue for given process # Stop queue / activities in queue for given process
pass pass
def loadMessage(self, s): def loadMessage(self, s, **kw):
return pickle.loads(s) m = pickle.loads(s)
m.__dict__.update(kw)
return m
def dumpMessage(self, m): def dumpMessage(self, m):
return pickle.dumps(m) return pickle.dumps(m)
...@@ -146,3 +159,16 @@ class Queue: ...@@ -146,3 +159,16 @@ class Queue:
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
return [] return []
# Transaction Management
def prepareQueueMessage(self, activity_tool, m):
pass
def finishQueueMessage(self, activity_tool, m):
pass
def prepareDequeueMessage(self, activity_tool, m):
pass
def finishDequeueMessage(self, activity_tool, m):
pass
\ No newline at end of file
...@@ -45,9 +45,14 @@ class RAMDict(Queue): ...@@ -45,9 +45,14 @@ class RAMDict(Queue):
Queue.__init__(self) Queue.__init__(self)
self.dict = {} self.dict = {}
def queueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
self.dict[(m.object_path, m.method_id)] = m self.dict[(m.object_path, m.method_id)] = m
def finishDeleteMessage(self, activity_tool, message):
for key, m in self.dict.items():
if m.object_path == message.object_path and m.method_id == message.method_id:
del self.dict[(m.object_path, m.method_id)]
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.dict.keys()) is 0: if len(self.dict.keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
...@@ -67,13 +72,14 @@ class RAMDict(Queue): ...@@ -67,13 +72,14 @@ class RAMDict(Queue):
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
for key, m in self.dict.items(): for key, m in self.dict.items():
if m.object_path == object_path: if not m.is_deleted:
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) if m.object_path == object_path:
if invoke: activity_tool.invoke(m) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
del self.dict[key] if invoke: activity_tool.invoke(m)
else: self.deleteMessage(m)
pass else:
#LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path)) pass
#LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path))
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
return self.dict.values() return self.dict.values()
......
...@@ -34,20 +34,31 @@ class RAMQueue(Queue): ...@@ -34,20 +34,31 @@ class RAMQueue(Queue):
""" """
A simple RAM based queue A simple RAM based queue
""" """
message_queue_id = 0
def __init__(self): def __init__(self):
Queue.__init__(self) Queue.__init__(self)
self.queue = [] self.queue = []
def queueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
self.message_queue_id = self.message_queue_id + 1
m.message_queue_id = self.message_queue_id
self.queue.append(m) self.queue.append(m)
def finishDeleteMessage(self, activity_tool, m):
i = 0
for my_message in self.queue:
if my_message.message_queue_id == m.message_queue_id:
del self.queue[i]
return
i = i + 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.queue) is 0: if len(self.queue) is 0:
return 1 # Go to sleep return 1 # Go to sleep
m = self.queue[0] m = self.queue[0]
activity_tool.invoke(m) activity_tool.invoke(m)
del self.queue[0] self.deleteMessage(m)
return 0 # Keep on ticking return 0 # Keep on ticking
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
...@@ -58,14 +69,11 @@ class RAMQueue(Queue): ...@@ -58,14 +69,11 @@ class RAMQueue(Queue):
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
new_queue = []
for m in self.queue: for m in self.queue:
if m.object_path == object_path: if not m.is_deleted:
if invoke: if m.object_path == object_path:
activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
else: self.deleteMessage(m)
new_queue.append(m)
self.queue = new_queue
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
new_queue = [] new_queue = []
......
...@@ -52,13 +52,19 @@ class SQLDict(RAMDict): ...@@ -52,13 +52,19 @@ class SQLDict(RAMDict):
because use of OOBTree. because use of OOBTree.
""" """
def queueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m)) message = self.dumpMessage(m))
# Also store uid of activity # Also store uid of activity
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
uid_list = activity_tool.SQLDict_readUidList(path=m.object_path, method_id=m.method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list)
activity_tool.SQLDict_delMessage(uid = uid_list)
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level # Try to find a message at given priority level
...@@ -144,20 +150,19 @@ class SQLDict(RAMDict): ...@@ -144,20 +150,19 @@ class SQLDict(RAMDict):
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
path = '/'.join(object_path) path = '/'.join(object_path)
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
if invoke: result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {}
method_dict = {} # Parse each message
# Parse each message for line in result:
for line in result: path = line.path
path = line.path method_id = line.method_id
method_id = line.method_id if not method_dict.has_key(method_id):
if not method_dict.has_key(method_id): # Only invoke once (it would be different for a queue)
# Only invoke once (it would be different for a queue) method_dict[method_id] = 1
method_dict[method_id] = 1 m = self.loadMessage(line.message)
m = self.loadMessage(line.message) self.deleteMessage(m)
if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
...@@ -169,9 +174,6 @@ class SQLDict(RAMDict): ...@@ -169,9 +174,6 @@ class SQLDict(RAMDict):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Erase all messages in a single transaction
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing)
def start(self, activity_tool, active_process=None): def start(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process)
......
...@@ -52,12 +52,16 @@ class SQLQueue(RAMQueue): ...@@ -52,12 +52,16 @@ class SQLQueue(RAMQueue):
because use of OOBTree. because use of OOBTree.
""" """
def queueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m)) message = self.dumpMessage(m))
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
activity_tool.SQLQueue_delMessage(uid = m.uid)
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level # Try to find a message at given priority level
...@@ -128,17 +132,18 @@ class SQLQueue(RAMQueue): ...@@ -128,17 +132,18 @@ class SQLQueue(RAMQueue):
return # Do nothing here to precent overlocking return # Do nothing here to precent overlocking
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
if invoke: result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {}
method_dict = {} # Parse each message
# Parse each message for line in result:
for line in result: path = line.path
path = line.path method_id = line.method_id
method_id = line.method_id if not method_dict.has_key(method_id):
if not method_dict.has_key(method_id): # Only invoke once (it would be different for a queue)
# Only invoke once (it would be different for a queue) method_dict[method_id] = 1
method_dict[method_id] = 1 m = self.loadMessage(line.message, uid = line.uid)
m = self.loadMessage(line.message) self.deleteMessage(m)
if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
...@@ -150,8 +155,6 @@ class SQLQueue(RAMQueue): ...@@ -150,8 +155,6 @@ class SQLQueue(RAMQueue):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Erase all messages in a single transaction
activity_tool.SQLQueue_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing)
def start(self, activity_tool, active_process=None): def start(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
......
...@@ -35,6 +35,7 @@ from Globals import InitializeClass, DTMLFile, get_request ...@@ -35,6 +35,7 @@ from Globals import InitializeClass, DTMLFile, get_request
from Acquisition import aq_base from Acquisition import aq_base
from DateTime.DateTime import DateTime from DateTime.DateTime import DateTime
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from ActivityBuffer import ActivityBuffer
import threading import threading
from zLOG import LOG from zLOG import LOG
...@@ -82,6 +83,10 @@ class Result: ...@@ -82,6 +83,10 @@ class Result:
allow_class(Result) allow_class(Result)
class Message: class Message:
is_deleted = 0
is_queued = 0
def __init__(self, object, active_process, activity_kw, method_id, args, kw): def __init__(self, object, active_process, activity_kw, method_id, args, kw):
if type(object) is type('a'): if type(object) is type('a'):
self.object_path = object.split('/') self.object_path = object.split('/')
...@@ -203,7 +208,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -203,7 +208,7 @@ class ActivityTool (Folder, UniqueObject):
def __init__(self): def __init__(self):
return Folder.__init__(self, ActivityTool.id) return Folder.__init__(self, ActivityTool.id)
# Filter content (ZMI)) # Filter content (ZMI))
def filtered_meta_types(self, user=None): def filtered_meta_types(self, user=None):
# Filters the list of available meta types. # Filters the list of available meta types.
...@@ -304,8 +309,15 @@ class ActivityTool (Folder, UniqueObject): ...@@ -304,8 +309,15 @@ class ActivityTool (Folder, UniqueObject):
def activate(self, object, activity, active_process, **kw): def activate(self, object, activity, active_process, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
return ActiveWrapper(object, activity, active_process, **kw) return ActiveWrapper(object, activity, active_process, **kw)
def deferredQueueMessage(self, activity, message):
self._v_activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message):
self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
def flush(self, object, invoke=0, **kw): def flush(self, object, invoke=0, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
...@@ -400,4 +412,4 @@ class ActivityTool (Folder, UniqueObject): ...@@ -400,4 +412,4 @@ class ActivityTool (Folder, UniqueObject):
return None return None
InitializeClass(ActivityTool) InitializeClass(ActivityTool)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment