ActivityTool.py 35.8 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32 33 34 35 36
import socket
import urllib
import traceback
import threading
import sys
from types import TupleType, StringType
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.CMFCore import CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
38
from Products.ERP5Type.Core.Folder import Folder
39
from Products.CMFActivity.ActiveResult import ActiveResult
40
from Products.PythonScripts.Utility import allow_class
41
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
42 43 44 45
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
46 47
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
from Acquisition import aq_base
49
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
50
from ActivityBuffer import ActivityBuffer
51

52
from ZODB.POSException import ConflictError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
53

54
from zLOG import LOG, INFO, WARNING
55 56

try:
57
  from Products.TimerService import getTimerService
58
except ImportError:
59 60
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61

62
# minimal IP:Port regexp
Alexandre Boeglin's avatar
Alexandre Boeglin committed
63
NODE_RE = re.compile('\d+\.\d+\.\d+\.\d+:\d+')
64

Jean-Paul Smets's avatar
Jean-Paul Smets committed
65 66 67 68 69
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
70 71
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
72
first_run = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
73 74 75 76 77 78 79 80

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
81
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85 86
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

class Message:
87 88 89 90
  """Activity Message Class.
  
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
91
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
92
    if type(object) is StringType:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93 94 95
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
96
    if type(active_process) is StringType:
97 98 99 100 101
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
102
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103 104 105 106
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
107
    self.is_executed = 0
108
    self.exc_type = None
109
    self.processing = None
110 111
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
112

113
  def getObject(self, activity_tool):
114
    """return the object referenced in this message."""
115 116 117
    return activity_tool.unrestrictedTraverse(self.object_path)
    
  def getObjectList(self, activity_tool):
118
    """return the list of object that can be expanded from this message."""
119 120 121 122 123 124 125
    try:
      expand_method_id = self.activity_kw['expand_method_id']
      obj = self.getObject(activity_tool)
      # FIXME: how to pass parameters?
      object_list = getattr(obj, expand_method_id)()
    except KeyError:
      object_list = [self.getObject(activity_tool)]
126
      
127
    return object_list
128 129
      
  def hasExpandMethod(self):
130 131 132 133 134
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
135 136 137
    return self.activity_kw.has_key('expand_method_id')
    
  def changeUser(self, user_name, activity_tool):
138
    """restore the security context for the calling user."""
139 140
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
141
    # if the user is not found, try to get it from a parent acl_users
142 143 144 145
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
146 147 148
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
149 150 151
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
152
    else :
153 154
      LOG("CMFActivity", WARNING,
          "Unable to find user %s in the portal" % user_name)
155
      noSecurityManager()
156 157 158 159 160
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
161
      if isinstance(result,ActiveResult):
162 163
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
164 165
        # XXX Allow other method_id in future
        active_process.activateResult(result)
166
      else:
167
        active_process.activateResult(
168
                    ActiveResult(object_path=object,
169 170
                          method_id=self.method_id,
                          result=result)) # XXX Allow other method_id in future
171
  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172
  def __call__(self, activity_tool):
173
    try:
174
      obj = self.getObject(activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
175
      # Change user if required (TO BE DONE)
176 177
      # We will change the user only in order to execute this method
      current_user = str(_getAuthenticatedUser(self))
178
      user = self.changeUser(self.user_name, activity_tool)
179 180 181 182 183 184 185
      try:
        result = getattr(obj, self.method_id)(*self.args, **self.kw)
      finally:
        # Use again the previous user
        if user is not None:
          self.changeUser(current_user, activity_tool)
      self.activateResult(activity_tool, result, obj)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
186
      self.is_executed = 1
187
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
188
      self.is_executed = 0
189
      self.exc_type = sys.exc_info()[0]
190
      self.exc_value = str(sys.exc_info()[1])
191 192
      self.traceback = ''.join(traceback.format_tb(sys.exc_info()[2]))
      LOG('ActivityTool', WARNING,
193
          'Could not call method %s on object %s' % (
194
          self.method_id, self.object_path), error=sys.exc_info())
195 196 197
      # push the error in ZODB error_log
      if hasattr(activity_tool, 'error_log'):
        activity_tool.error_log.raising(sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198 199 200 201

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

202
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
203 204 205 206 207 208
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
    user_email = None
    user = portal.portal_membership.getMemberById(self.user_name)
    if user is not None:
      user_email = user.getProperty('email')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
209
    if user_email in ('', None):
210 211
      user_email = portal.getProperty('email_to_address',
                       portal.getProperty('email_from_address'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
212
    mail_text = """From: %s
213 214 215 216 217 218 219
To: %s
Subject: %s

%s

Document: %s
Method: %s
220
Exception: %s %s
221 222
Traceback:
%s
223
""" % (activity_tool.email_from_address, user_email, message,
224 225
       message, '/'.join(self.object_path), self.method_id,
       self.exc_type, self.exc_value, self.traceback)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
226
    activity_tool.MailHost.send( mail_text )
227

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
243 244
class Method:

245
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247
    self.__passive_self = passive_self
    self.__activity = activity
248
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
249 250 251 252
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
253
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
254 255
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

256 257
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
258 259
class ActiveWrapper:

260
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
261 262
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
263
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
264 265 266 267
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
268
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
269 270
                  self.__dict__['__kw'], id)

271
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
272
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
273 274 275 276 277 278 279 280 281 282 283 284
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
285 286 287
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
288
    portal_type = 'Activity Tool'
289
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
290 291
    security = ClassSecurityInfo()

292 293 294
    _distributingNode = ''
    _nodes = ()

295 296
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
297
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
298
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
299
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
300
                     ,
301
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
302 303 304 305

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

306 307 308
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

309 310
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
311 312 313 314 315 316
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
317 318 319

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
320

321 322 323 324 325 326 327 328 329 330
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
331 332
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
333
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
334 335 336 337
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1
338 339 340
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
341
        """
342 343 344 345 346 347 348 349 350 351 352 353
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
354

355
    security.declareProtected(Permissions.manage_properties, 'subscribe')
356
    def subscribe(self, REQUEST=None, RESPONSE=None):
357 358
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
359
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
360
        if not service:
361
            LOG('ActivityTool', INFO, 'TimerService not available')
362 363 364 365
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
366 367
        if RESPONSE is not None:
            RESPONSE.redirect(url)
368 369

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
370
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
371 372
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
373
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
374
        if not service:
375
            LOG('ActivityTool', INFO, 'TimerService not available')
376 377 378 379
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
380 381
        if RESPONSE is not None:
            RESPONSE.redirect(url)
382 383 384

    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
385 386
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
387 388
    def manage_afterAdd(self, item, container):
        self.subscribe()
389 390
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
    def getCurrentNode(self):
        """ Return current node in form ip:port """
        port = ''
        from asyncore import socket_map
        for k, v in socket_map.items():
            if hasattr(v, 'port'):
                # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                type = str(getattr(v, '__class__', 'unknown'))
                if type == 'ZServer.HTTPServer.zhttp_server':
                    port = v.port
                    break
        ip = socket.gethostbyname(socket.gethostname())
        currentNode = '%s:%s' %(ip, port)
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

411
    security.declarePublic('getNodeList getNodes')
412 413 414
    def getNodes(self):
        """ Return all nodes """
        return self._nodes
415
    getNodeList = getNodes
416

417 418 419 420
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
421 422
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
423
        """ set the distributing node """   
424
        if not distributingNode or self._isValidNodeName(distributingNode):
425 426 427 428 429 430 431 432 433 434 435 436 437
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

438 439 440
    security.declarePublic('manage_addNode')
    def manage_addNode(self, node, REQUEST=None):
        """ add a node """
441 442 443 444 445 446 447 448
        if not self._isValidNodeName(node) :
            if REQUEST is not None:
                REQUEST.RESPONSE.redirect(
                    REQUEST.URL1 +
                    '/manageLoadBalancing?manage_tabs_message=' +
                    urllib.quote("Malformed node."))
            return
        
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
        if node in self._nodes:
            if REQUEST is not None:
                REQUEST.RESPONSE.redirect(
                    REQUEST.URL1 +
                    '/manageLoadBalancing?manage_tabs_message=' +
                    urllib.quote("Node exists already."))
            return
            
        self._nodes = self._nodes + (node,)
        
        if REQUEST is not None:
            REQUEST.RESPONSE.redirect(
                REQUEST.URL1 +
                '/manageLoadBalancing?manage_tabs_message=' +
                urllib.quote("Node successfully added."))
                        
    security.declarePublic('manage_delNode')
    def manage_delNode(self, deleteNodes, REQUEST=None):
        """ delete nodes """
        nodeList = list(self._nodes)
        for node in deleteNodes:
            if node in self._nodes:
                nodeList.remove(node)
        self._nodes = tuple(nodeList)
        if REQUEST is not None:
            REQUEST.RESPONSE.redirect(
                REQUEST.URL1 +
                '/manageLoadBalancing?manage_tabs_message=' +
                urllib.quote("Node(s) successfully deleted."))
        
479
    def process_timer(self, tick, interval, prev="", next=""):
480 481 482 483 484 485
        """ 
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
486 487 488 489
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
Jérome Perrin's avatar
Jérome Perrin committed
490 491
        
        old_sm = getSecurityManager()
492 493 494
        try:
          # get owner of portal_catalog, so normally we should be able to
          # have the permission to invoke all activities
Jérome Perrin's avatar
Jérome Perrin committed
495
          user = self.portal_catalog.getWrappedOwner()
496 497 498 499 500 501
          newSecurityManager(self.REQUEST, user)
          
          currentNode = self.getCurrentNode()
          
          # only distribute when we are the distributingNode or if it's empty
          if (self.distributingNode == self.getCurrentNode()):
Jérome Perrin's avatar
Jérome Perrin committed
502
            self.distribute(len(self._nodes))
503 504

          elif not self.distributingNode:
Jérome Perrin's avatar
Jérome Perrin committed
505
            self.distribute(1)
506
          
507 508 509 510 511 512 513 514
          # SkinsTool uses a REQUEST cache to store skin objects, as
          # with TimerService we have the same REQUEST over multiple
          # portals, we clear this cache to make sure the cache doesn't
          # contains skins from another portal.
          stool = getToolByName(self, 'portal_skins', None)
          if stool is not None:
            stool.changeSkin(None)
          
515 516 517 518
          # call tic for the current processing_node
          # the processing_node numbers are the indices of the elements in the node tuple +1
          # because processing_node starts form 1
          if currentNode in self._nodes:
Jérome Perrin's avatar
Jérome Perrin committed
519
            self.tic(list(self._nodes).index(currentNode)+1)
520 521
              
          elif len(self._nodes) == 0:
Jérome Perrin's avatar
Jérome Perrin committed
522
            self.tic(1)
523

Jérome Perrin's avatar
Jérome Perrin committed
524
        finally:
525
          timerservice_lock.release()
Jérome Perrin's avatar
Jérome Perrin committed
526
          setSecurityManager(old_sm)
527

Jean-Paul Smets's avatar
Jean-Paul Smets committed
528 529 530 531 532 533
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
534
      global is_initialized
Jean-Paul Smets's avatar
Jean-Paul Smets committed
535 536 537 538
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
539
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
540
          activity.distribute(self, node_count)
541 542
        except ConflictError:
          raise
543
        except:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
544
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity, error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
545

Jean-Paul Smets's avatar
Jean-Paul Smets committed
546
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
547
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
548 549
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
550
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
551
      """
552
      global active_threads, is_initialized, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
553 554

      # return if the number of threads is too high
555
      # else, increase the number of active_threads and continue
556 557
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
558
      if not too_many_threads or force:
559
        active_threads += 1
560 561 562
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
563
      tic_lock.release()
564

Jean-Paul Smets's avatar
Jean-Paul Smets committed
565 566 567
      # Initialize if needed
      if not is_initialized: self.initialize()

568 569 570 571 572 573 574
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
        self.SQLDict_clearProcessingFlag(processing_node=processing_node)
        self.SQLQueue_clearProcessingFlag(processing_node=processing_node)
        first_run = 0

575 576
      try:
        # Wakeup each queue
Jean-Paul Smets's avatar
Jean-Paul Smets committed
577
        for activity in activity_list:
578
          try:
579 580 581
            activity.wakeup(self, processing_node)
          except ConflictError:
            raise
582
          except:
583 584 585 586 587 588 589 590 591 592
            LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
  
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
          for activity in activity_list:
            try:
              activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
              has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
593
              #LOG('ActivityTool tic', 0, 'has_awake_activity = %r, activity = %r, activity.isAwake(self, processing_node) = %r' % (has_awake_activity, activity, activity.isAwake(self, processing_node)))
594 595 596 597 598 599 600 601 602
            except ConflictError:
              raise
            except:
              LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
603

604
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
605
      # Check in each queue if the object has deferred tasks
606 607
      # if not argument is provided, then check on self
      if len(args) > 0:
608
        obj = args[0]
609
      else:
610
        obj = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
611
      for activity in activity_list:
612
        if activity.hasActivity(self, obj, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
613 614 615
          return 1
      return 0

616 617
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
618 619
      global is_initialized
      if not is_initialized: self.initialize()
620
      if getattr(self, '_v_activity_buffer', None) is None:
621
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
622
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
623

624 625
    def deferredQueueMessage(self, activity, message):
      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
626

627
    def deferredDeleteMessage(self, activity, message):
628
      if getattr(self, '_v_activity_buffer', None) is None:
629
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
630
      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
631

Jean-Paul Smets's avatar
Jean-Paul Smets committed
632
    def getRegisteredMessageList(self, activity):
633 634 635
      activity_buffer = getattr(self, '_v_activity_buffer', None)
      if activity_buffer is not None:
        activity_buffer._register() # This is required if flush flush is called outside activate
636 637 638
        return activity.getRegisteredMessageList(self._v_activity_buffer, self)
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
639

Jean-Paul Smets's avatar
Jean-Paul Smets committed
640
    def unregisterMessage(self, activity, message):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
641
      self._v_activity_buffer._register() # Required if called by flush, outside activate
Jean-Paul Smets's avatar
Jean-Paul Smets committed
642
      return activity.unregisterMessage(self._v_activity_buffer, self, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
643

Jean-Paul Smets's avatar
Jean-Paul Smets committed
644 645 646
    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
647
      if getattr(self, '_v_activity_buffer', None) is None:
648
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
649
      if type(object) is TupleType:
650 651 652
        object_path = object
      else:
        object_path = object.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
653 654 655
      for activity in activity_list:
        activity.flush(self, object_path, invoke=invoke, **kw)

656 657 658 659 660 661 662 663 664 665 666 667
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.start(self, **kw)

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.stop(self, **kw)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
668 669
    def invoke(self, message):
      message(self)
670
 
671 672 673 674 675 676 677 678 679
    def invokeGroup(self, method_id, message_list):
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
      # Filter the list of messages. If an object is not available, ignore such a message.
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
680 681
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
682 683
        try:
          obj = m.getObject(self)
684
          i = len(new_message_list) # This is an index of this message in new_message_list.
685
          if m.hasExpandMethod():
686 687
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
688
              if path not in path_dict:
689
                path_dict[path] = i
690 691 692 693 694 695 696 697 698 699
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
700 701 702
          else:
            path = obj.getPath()
            if path not in path_dict:
703
              path_dict[path] = i
704 705 706 707 708 709 710 711 712 713
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
714
          object_list.append(obj)
715 716 717
          new_message_list.append(m)
        except:
          m.is_executed = 0
718
          m.exc_type = sys.exc_info()[0]
719
          LOG('WARNING ActivityTool', 0,
720 721
              'Could not call method %s on object %s' %
              (m.method_id, m.object_path), error=sys.exc_info())
722

723 724
      try:
        if len(expanded_object_list) > 0:
725 726
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to apply security here?
727 728
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
729
          result = method(expanded_object_list, **m.kw)
730
        else:
731 732 733 734 735 736 737
          result = None
      except:
        # In this case, the group method completely failed.
        for m in new_message_list:
          m.is_executed = 0
          m.exc_type = sys.exc_info()[0]
        LOG('WARNING ActivityTool', 0,
738 739
            'Could not call method %s on objects %s' %
            (method_id, expanded_object_list), error=sys.exc_info())
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
            m.is_executed = 0
            LOG('ActivityTool', WARNING,
755 756
                'the method %s partially failed on object %s' %
                (m.method_id, m.object_path,))
757 758 759 760 761
          else:
            try:
              m.activateResult(self, result, object)
              m.is_executed = 1
            except:
762
              m.is_executed = 0
763
              m.exc_type = sys.exc_info()[0]
764
              LOG('ActivityTool', WARNING,
765 766
                  'Could not call method %s on object %s' % (
                  m.method_id, m.object_path), error=sys.exc_info())
767
 
768 769
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
770
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
771 772
      global is_initialized
      if not is_initialized: self.initialize()
773
      if getattr(self, '_v_activity_buffer', None) is None:
774
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
775 776
      activity_dict[activity].queueMessage(self,
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
777

778
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
779 780 781 782 783 784
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
785
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
786
      if REQUEST is not None:
787 788
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
789

790
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
791 792 793 794 795 796
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
797
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
798
      if REQUEST is not None:
799 800
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
801

802 803
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
804
    def manageClearActivities(self, keep=1, REQUEST=None):
805 806 807 808 809
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

810 811
      # Obtain all pending messages.
      message_list = []
812 813 814 815 816 817 818 819
      if keep:
        for activity in activity_list:
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
820 821 822
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
823
            
824 825 826 827 828 829
      if hasattr(folder, 'SQLDict_createMessageTable'):
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
830
          LOG('CMFActivity', WARNING,
831
              'could not drop the message table',
832 833 834 835 836 837 838 839 840
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

      if hasattr(folder, 'SQLQueue_createMessageTable'):
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
841
          LOG('CMFActivity', WARNING,
842
              'could not drop the message queue table',
843 844 845
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

846 847 848 849 850 851 852 853
      # Reactivate the messages.
      for m in message_list:
        try:
          m.reactivate(self)
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
854 855
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
856

857
      if REQUEST is not None:
858 859
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
          'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
860

Jean-Paul Smets's avatar
Jean-Paul Smets committed
861
    security.declarePublic('getMessageList')
862
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
863 864 865
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
866 867 868
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
869 870
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
871
        try:
872
          message_list += activity.getMessageList(self,**kw)
Sebastien Robin's avatar
Sebastien Robin committed
873 874
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
875 876
      return message_list

877 878 879 880 881 882 883
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
        message_count += activity.countMessageWithTag(self, value)
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
        path : for activities on an particular object
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
      for activity in activity_list:
        message_count += activity.countMessage(self, **kw)
902 903
      return message_count

904
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
905
    def newActiveProcess(self, **kw):
906 907 908
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
909 910 911
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
912 913 914 915

    def reindexObject(self):
      self.immediateReindexObject()

916 917 918 919 920 921 922
    # Active synchronisation methods
    def validateOrder(self, message, validator_id, validation_value):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        method_id = "_validate_%s" % validator_id
        if hasattr(activity, method_id):
923
#           LOG('CMFActivity: ', 0, 'validateOrder calling method_id %s' % method_id)
924 925 926
          if getattr(activity,method_id)(self, message, validation_value):
            return 1
      return 0
927

Yoshinori Okuji's avatar
Yoshinori Okuji committed
928 929
    # Required for tests (time shift)
    def timeShift(self, delay):
930 931 932 933
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.timeShift(self, delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
934

935
InitializeClass(ActivityTool)