ActivityTool.py 43.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34 35
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
36
from Products.CMFCore import CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.PythonScripts.Utility import allow_class
40
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
41 42 43 44
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
45 46
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47
from Acquisition import aq_base
48
from Acquisition import aq_inner
49
from ActivityBuffer import ActivityBuffer
50
from zExceptions import ExceptionFormatter
51
from BTrees.OIBTree import OIBTree
52

53
from ZODB.POSException import ConflictError
54
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
55

56
from zLOG import LOG, INFO, WARNING, ERROR, DEBUG
57 58

try:
59
  from Products.TimerService import getTimerService
60
except ImportError:
61 62
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
63

64
# minimal IP:Port regexp
65
NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
66

Jean-Paul Smets's avatar
Jean-Paul Smets committed
67 68 69 70
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
Vincent Pelletier's avatar
Vincent Pelletier committed
71
is_initialized = False
72 73
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
74
first_run = True
75 76 77
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78 79 80 81

# Activity Registration
activity_dict = {}

82
logging = False
83 84 85 86 87 88 89 90 91

def enableLogging():
  global logging
  logging = True

def disableLogging():
  global logging
  logging = False

92 93 94 95 96 97 98
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
102
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103 104 105 106
  activity_instance = activity()
  activity_dict[activity.__name__] = activity_instance

class Message:
107
  """Activity Message Class.
108

109 110
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
111 112
  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
    if isinstance(obj, str):
113
      self.object_path = tuple(obj.split('/'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
114
    else:
115
      self.object_path = obj.getPhysicalPath()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
116
    if type(active_process) is StringType:
117 118 119 120 121
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
122
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
123 124 125 126
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
127
    self.is_executed = 0
Vincent Pelletier's avatar
Vincent Pelletier committed
128 129 130
    self.exc_type = None
    self.exc_value = None
    self.traceback = None
131
    self.processing = None
132 133
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
134

135
  def getObject(self, activity_tool):
136
    """return the object referenced in this message."""
137
    return activity_tool.unrestrictedTraverse(self.object_path)
138

139
  def getObjectList(self, activity_tool):
140
    """return the list of object that can be expanded from this message."""
141
    object_list = []
142
    try:
143
      object_list.append(self.getObject(activity_tool))
144
    except KeyError:
145 146 147 148 149 150
      pass
    else:
      if self.hasExpandMethod():
        expand_method_id = self.activity_kw['expand_method_id']
        # FIXME: how to pass parameters?
        object_list = getattr(object_list[0], expand_method_id)()
151
    return object_list
152

153
  def hasExpandMethod(self):
154 155 156 157 158
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
159
    return self.activity_kw.has_key('expand_method_id')
160

161
  def changeUser(self, user_name, activity_tool):
162
    """restore the security context for the calling user."""
163 164
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
165
    # if the user is not found, try to get it from a parent acl_users
166 167 168 169
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
170 171 172
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
173 174 175
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
176
    else :
177 178
      LOG("CMFActivity", WARNING,
          "Unable to find user %s in the portal" % user_name)
179
      noSecurityManager()
180 181 182 183 184
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
185
      if isinstance(result, ActiveResult):
186 187
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
188 189
        # XXX Allow other method_id in future
        active_process.activateResult(result)
190
      else:
191
        active_process.activateResult(
192
                    ActiveResult(object_path=object,
193 194
                                 method_id=self.method_id,
                                 result=result)) # XXX Allow other method_id in future
195

Jean-Paul Smets's avatar
Jean-Paul Smets committed
196
  def __call__(self, activity_tool):
197
    try:
198
      obj = self.getObject(activity_tool)
199
      old_security_manager = getSecurityManager()
200
      # Change user if required (TO BE DONE)
201
      # We will change the user only in order to execute this method
202
      user = self.changeUser(self.user_name, activity_tool)
203 204 205
      try:
        result = getattr(obj, self.method_id)(*self.args, **self.kw)
      finally:
206 207
        setSecurityManager(old_security_manager)

208
      self.activateResult(activity_tool, result, obj)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
209
      self.is_executed = 1
210
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
211
      self.is_executed = 0
212 213 214
      exc_info = sys.exc_info()
      self.exc_type = exc_info[0]
      self.exc_value = str(exc_info[1])
Vincent Pelletier's avatar
Vincent Pelletier committed
215
      self.traceback = ''.join(ExceptionFormatter.format_exception(
216
                               *exc_info))
217
      LOG('ActivityTool', WARNING,
218
          'Could not call method %s on object %s' % (
219
          self.method_id, self.object_path), error=exc_info)
220
      # push the error in ZODB error_log
221
      if getattr(activity_tool, 'error_log', None) is not None:
222
        activity_tool.error_log.raising(exc_info)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
223

224 225 226 227 228 229 230
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

  def getDependentMessageList(self, activity, activity_tool):
    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
231

232
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
233 234 235 236 237 238
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
    user_email = None
    user = portal.portal_membership.getMemberById(self.user_name)
    if user is not None:
      user_email = user.getProperty('email')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
239
    if user_email in ('', None):
240 241
      user_email = portal.getProperty('email_to_address',
                       portal.getProperty('email_from_address'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
242
    mail_text = """From: %s
243 244 245 246 247 248 249
To: %s
Subject: %s

%s

Document: %s
Method: %s
Vincent Pelletier's avatar
Vincent Pelletier committed
250
Exception: %s %s
251

252
%s
253
""" % (activity_tool.email_from_address, user_email, message,
254
       message, '/'.join(self.object_path), self.method_id,
Vincent Pelletier's avatar
Vincent Pelletier committed
255
       self.exc_type, self.exc_value, self.traceback)
256 257
    try:
      activity_tool.MailHost.send( mail_text )
Vincent Pelletier's avatar
Vincent Pelletier committed
258 259
    except (socket.error, MailHostError), message:
      LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent: %s. Exception was: %s %s\n%s' % (message, self.exc_type, self.exc_value, self.traceback))
260

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
276 277
class Method:

278
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
279 280
    self.__passive_self = passive_self
    self.__activity = activity
281
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
282 283 284 285
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
286
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
287 288
    if logging:
      LOG('Activity Tracking', DEBUG, 'queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
289 290
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

291 292
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
293 294
class ActiveWrapper:

295
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
296 297
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
298
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
299 300 301 302
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
303
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
304 305
                  self.__dict__['__kw'], id)

306
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
307
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
308 309 310 311 312 313 314 315 316 317 318 319
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
320 321 322
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
323
    portal_type = 'Activity Tool'
324
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
325 326
    security = ClassSecurityInfo()

327 328
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
329
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
330
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
331
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
332
                     ,
333
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
334 335 336 337

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

338 339 340
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

341 342
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
343 344 345 346 347 348
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
349 350 351

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
352

353 354 355 356 357 358 359 360 361 362
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
363 364
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
365
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
366
      # Initialize each queue
367
      for activity in activity_dict.itervalues():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
368
        activity.initialize(self)
Vincent Pelletier's avatar
Vincent Pelletier committed
369
      is_initialized = True
370 371 372
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
373
        """
374 375 376 377 378 379 380 381 382 383 384 385
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
386

387
    security.declareProtected(Permissions.manage_properties, 'subscribe')
388
    def subscribe(self, REQUEST=None, RESPONSE=None):
389 390
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
391
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
392
        if not service:
393
            LOG('ActivityTool', INFO, 'TimerService not available')
394 395 396 397
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
398 399
        if RESPONSE is not None:
            RESPONSE.redirect(url)
400 401

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
402
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
403 404
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
405
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
406
        if not service:
407
            LOG('ActivityTool', INFO, 'TimerService not available')
408 409 410 411
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
412 413
        if RESPONSE is not None:
            RESPONSE.redirect(url)
414 415 416

    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
417 418
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
419 420
    def manage_afterAdd(self, item, container):
        self.subscribe()
421 422
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
423 424
    def getCurrentNode(self):
        """ Return current node in form ip:port """
425 426 427 428 429 430 431 432 433 434 435 436 437
        global currentNode
        if currentNode is None:
          port = ''
          from asyncore import socket_map
          for k, v in socket_map.items():
              if hasattr(v, 'port'):
                  # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                  type = str(getattr(v, '__class__', 'unknown'))
                  if type == 'ZServer.HTTPServer.zhttp_server':
                      port = v.port
                      break
          ip = socket.gethostbyname(socket.gethostname())
          currentNode = '%s:%s' %(ip, port)
438 439 440 441 442 443 444
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
        new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
      if not node_dict.has_key(node):
        if len(node_dict) == 0: # If we are registering the first node, make
                                # it both the distributing node and a processing
                                # node.
          role = ROLE_PROCESSING
          self.distributingNode = node
        else:
          role = ROLE_IDLE
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

482
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
483 484
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
485

486 487 488 489
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
490 491
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
492
        """ set the distributing node """   
493
        if not distributingNode or self._isValidNodeName(distributingNode):
494 495 496 497 498 499 500 501 502 503 504 505 506
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        node_dict = self.getNodeDict()
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
565

566
    def process_timer(self, tick, interval, prev="", next=""):
567
        """
568 569 570 571 572
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
573 574 575 576
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
577

578
        try:
579
          old_sm = getSecurityManager()
580
          try:
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
            try:
              # get owner of portal_catalog, so normally we should be able to
              # have the permission to invoke all activities
              user = self.portal_catalog.getWrappedOwner()
              newSecurityManager(self.REQUEST, user)

              currentNode = self.getCurrentNode()
              self.registerNode(currentNode)
              processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

              # only distribute when we are the distributingNode or if it's empty
              if (self.getDistributingNode() == currentNode):
                self.distribute(len(processing_node_list))

              # SkinsTool uses a REQUEST cache to store skin objects, as
              # with TimerService we have the same REQUEST over multiple
              # portals, we clear this cache to make sure the cache doesn't
              # contains skins from another portal.
              stool = getToolByName(self, 'portal_skins', None)
              if stool is not None:
                stool.changeSkin(None)

              # call tic for the current processing_node
              # the processing_node numbers are the indices of the elements in the node tuple +1
              # because processing_node starts form 1
              if currentNode in processing_node_list:
                self.tic(processing_node_list.index(currentNode)+1)
            except:
              # Catch ALL exception to avoid killing timerserver.
              LOG('ActivityTool', ERROR, 'process_timer received an exception', error=sys.exc_info())
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
613
        finally:
614
          timerservice_lock.release()
615

Jean-Paul Smets's avatar
Jean-Paul Smets committed
616 617 618 619 620 621
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
622 623
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
624 625

      # Call distribute on each queue
626
      for activity in activity_dict.itervalues():
627
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
628

Jean-Paul Smets's avatar
Jean-Paul Smets committed
629
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
630
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
631 632
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
633
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
634
      """
Vincent Pelletier's avatar
Vincent Pelletier committed
635
      global active_threads, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
636 637

      # return if the number of threads is too high
638
      # else, increase the number of active_threads and continue
639 640
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
641
      if not too_many_threads or force:
642
        active_threads += 1
643 644 645
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
646
      tic_lock.release()
647

Jean-Paul Smets's avatar
Jean-Paul Smets committed
648
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
649 650
      if not is_initialized:
        self.initialize()
651

652
      inner_self = aq_inner(self)
653

654 655 656
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
657 658 659 660
        inner_self.SQLDict_clearProcessingFlag(
                                processing_node=processing_node)
        inner_self.SQLQueue_clearProcessingFlag(
                                processing_node=processing_node)
661
        first_run = False
662

663 664
      try:
        # Wakeup each queue
665
        for activity in activity_dict.itervalues():
666
          activity.wakeup(inner_self, processing_node)
667

668 669 670 671
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
672
          for activity in activity_dict.itervalues():
673 674
            activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
675 676 677 678 679
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
680

681
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
682
      # Check in each queue if the object has deferred tasks
683 684
      # if not argument is provided, then check on self
      if len(args) > 0:
685
        obj = args[0]
686
      else:
687
        obj = self
688
      for activity in activity_dict.itervalues():
689
        if activity.hasActivity(aq_inner(self), obj, **kw):
690 691
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
692

693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
        Lock is held when checking for intermediate level existance
        because:
         - intermediate level dict must not be created in 2 threads at the
           same time, since one creation would destroy the existing one.
        It's released after that step because:
         - lower level access is at thread scope, thus by definition there
           can be only one access at a time to a key
         - GIL protects us when accessing python instances
      """
709 710
      # Safeguard: make sure we are wrapped in  acquisition context before
      # using our path as an activity tool instance-wide identifier.
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
      assert getattr(self, 'aq_self', None) is not None
      my_instance_key = self.getPhysicalPath()
      my_thread_key = get_ident()
      global_activity_buffer_lock.acquire()
      try:
        if my_instance_key not in global_activity_buffer:
          global_activity_buffer[my_instance_key] = {}
      finally:
        global_activity_buffer_lock.release()
      thread_activity_buffer = global_activity_buffer[my_instance_key]
      if my_thread_key not in thread_activity_buffer:
        if create_if_not_found:
          buffer = ActivityBuffer(activity_tool=self)
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
      activity_buffer = thread_activity_buffer[my_thread_key]
      return activity_buffer

730 731
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
732 733
      if not is_initialized:
        self.initialize()
734
      self.getActivityBuffer()
735
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
736

737
    def deferredQueueMessage(self, activity, message):
738 739
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
740

741
    def deferredDeleteMessage(self, activity, message):
742 743
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
744

Jean-Paul Smets's avatar
Jean-Paul Smets committed
745
    def getRegisteredMessageList(self, activity):
746
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
747
      if activity_buffer is not None:
748 749
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
750
                                                 aq_inner(self))
751 752
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
753

Jean-Paul Smets's avatar
Jean-Paul Smets committed
754
    def unregisterMessage(self, activity, message):
755 756 757
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
758

759
    def flush(self, obj, invoke=0, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
760 761
      if not is_initialized:
        self.initialize()
762
      self.getActivityBuffer()
763 764
      if isinstance(obj, tuple):
        object_path = obj
765
      else:
766
        object_path = obj.getPhysicalPath()
767
      for activity in activity_dict.itervalues():
768
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
769

770
    def start(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
771 772
      if not is_initialized:
        self.initialize()
773
      for activity in activity_dict.itervalues():
774
        activity.start(aq_inner(self), **kw)
775 776

    def stop(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
777 778
      if not is_initialized:
        self.initialize()
779
      for activity in activity_dict.itervalues():
780
        activity.stop(aq_inner(self), **kw)
781

Jean-Paul Smets's avatar
Jean-Paul Smets committed
782
    def invoke(self, message):
783 784
      if logging:
        LOG('Activity Tracking', DEBUG, 'invoking message: object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
785 786
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
787
        base_chain = [aq_base(x) for x in self.aq_chain]
788 789 790 791 792 793 794 795
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
        parents = getattr(request, 'PARENTS', None)
        if parents is None:
          LOG('CMFActivity.ActivityTool.invoke', INFO, 'PARENTS is not defined in REQUEST. It should only happen in unit tests.')
796
          request['PARENTS'] = self.aq_chain[:]
797 798 799 800 801 802 803 804 805 806
        new_request_container = request_container.__class__(REQUEST=request.clone())
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
      message(my_self)
807 808
      if logging:
        LOG('Activity Tracking', DEBUG, 'invoked message')
809 810 811
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
812

813
    def invokeGroup(self, method_id, message_list):
814 815
      if logging:
        LOG('Activity Tracking', DEBUG, 'invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
816 817 818 819 820 821 822 823
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
      # Filter the list of messages. If an object is not available, ignore such a message.
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
824 825
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
826 827
        try:
          obj = m.getObject(self)
828
          i = len(new_message_list) # This is an index of this message in new_message_list.
829
          if m.hasExpandMethod():
830 831
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
832
              if path not in path_dict:
833
                path_dict[path] = i
834 835 836 837 838 839
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
840 841
                  if 'group_id' in activity_kw:
                    del activity_kw['group_id']                    
842 843 844 845
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
846 847 848
          else:
            path = obj.getPath()
            if path not in path_dict:
849
              path_dict[path] = i
850 851 852 853 854 855
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
856 857
                if 'group_id' in activity_kw:
                  del activity_kw['group_id']
858 859 860 861
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
862
          object_list.append(obj)
863 864 865
          new_message_list.append(m)
        except:
          m.is_executed = 0
866 867 868 869 870
          exc_info = sys.exc_info()
          m.exc_type = exc_info[0]
          m.exc_value = str(exc_info[1])
          m.traceback = ''.join(ExceptionFormatter.format_exception(
                                *exc_info))
871
          LOG('WARNING ActivityTool', 0,
872
              'Could not call method %s on object %s' %
873
              (m.method_id, m.object_path), error=exc_info)
874

875 876
      try:
        if len(expanded_object_list) > 0:
877 878
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to apply security here?
879 880
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
881
          result = method(expanded_object_list, **m.kw)
882
        else:
883 884 885
          result = None
      except:
        # In this case, the group method completely failed.
886 887 888 889 890
        exc_info = sys.exc_info()
        exc_type = exc_info[0]
        exc_value = str(exc_info[1])
        traceback = ''.join(ExceptionFormatter.format_exception(
                            *exc_info))
891 892
        for m in new_message_list:
          m.is_executed = 0
893 894 895
          m.exc_type = exc_type
          m.exc_value = exc_value
          m.traceback = traceback
896
        LOG('WARNING ActivityTool', 0,
897
            'Could not call method %s on objects %s' %
898
            (method_id, expanded_object_list), error=exc_info)
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
            m.is_executed = 0
            LOG('ActivityTool', WARNING,
914 915
                'the method %s partially failed on object %s' %
                (m.method_id, m.object_path,))
916 917 918 919 920
          else:
            try:
              m.activateResult(self, result, object)
              m.is_executed = 1
            except:
921
              m.is_executed = 0
Vincent Pelletier's avatar
Vincent Pelletier committed
922
              m.exc_type = sys.exc_info()[0]
923
              LOG('ActivityTool', WARNING,
924
                  'Could not call method %s on object %s' % (
Vincent Pelletier's avatar
Vincent Pelletier committed
925
                  m.method_id, m.object_path), error=sys.exc_info())
926 927
      if logging:
        LOG('Activity Tracking', DEBUG, 'invoked group messages')
928

929 930
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
931
      # Some Security Cheking should be made here XXX
Vincent Pelletier's avatar
Vincent Pelletier committed
932 933
      if not is_initialized:
        self.initialize()
934
      self.getActivityBuffer()
935
      activity_dict[activity].queueMessage(aq_inner(self),
936
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
937

938
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
939 940 941 942 943 944
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
945
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
946
      if REQUEST is not None:
947 948
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
949

950
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
951 952 953 954 955 956
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
957
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
958
      if REQUEST is not None:
959 960
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
961

962 963
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
964
    def manageClearActivities(self, keep=1, REQUEST=None):
965 966 967 968 969
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

970 971
      # Obtain all pending messages.
      message_list = []
972
      if keep:
973
        for activity in activity_dict.itervalues():
974 975 976 977 978 979
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
980 981 982
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
983 984

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
985 986 987 988 989
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
990
          LOG('CMFActivity', WARNING,
991
              'could not drop the message table',
992 993 994
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

995
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
996 997 998 999 1000
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
1001
          LOG('CMFActivity', WARNING,
1002
              'could not drop the message queue table',
1003 1004 1005
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

1006 1007 1008
      # Reactivate the messages.
      for m in message_list:
        try:
1009
          m.reactivate(aq_inner(self))
1010 1011 1012 1013
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
1014 1015
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
1016

1017
      if REQUEST is not None:
1018 1019
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
          'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
1020

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1021
    security.declarePublic('getMessageList')
1022
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1023 1024 1025
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1026
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1027 1028
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1029

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1030
      message_list = []
1031
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1032
        try:
1033
          message_list += activity.getMessageList(aq_inner(self),**kw)
Sebastien Robin's avatar
Sebastien Robin committed
1034 1035
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1036 1037
      return message_list

1038 1039 1040 1041 1042 1043
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1044
      for activity in activity_dict.itervalues():
1045
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1056
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1057 1058 1059 1060
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1061
      for activity in activity_dict.itervalues():
1062
        message_count += activity.countMessage(aq_inner(self), **kw)
1063 1064
      return message_count

1065
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1066
    def newActiveProcess(self, **kw):
1067 1068 1069
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
1070 1071 1072
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
1073 1074 1075 1076

    def reindexObject(self):
      self.immediateReindexObject()

1077
    # Active synchronisation methods
1078
    security.declarePrivate('validateOrder')
1079
    def validateOrder(self, message, validator_id, validation_value):
1080 1081 1082 1083 1084
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
Vincent Pelletier's avatar
Vincent Pelletier committed
1085 1086
      if not is_initialized:
        self.initialize()
1087
      message_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
1088
      method_id = "_validate_%s" % validator_id
1089
      for activity in activity_dict.itervalues():
1090 1091 1092 1093 1094 1095
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
            message_list.extend([(activity, m) for m in result])
      return message_list
1096

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1097 1098
    # Required for tests (time shift)
    def timeShift(self, delay):
Vincent Pelletier's avatar
Vincent Pelletier committed
1099 1100
      if not is_initialized:
        self.initialize()
1101
      for activity in activity_dict.itervalues():
1102
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1103

1104
InitializeClass(ActivityTool)