ActivityTool.py 35.1 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32 33 34 35 36
import socket
import urllib
import traceback
import threading
import sys
from types import TupleType, StringType
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.CMFCore import CMFCorePermissions
38
from Products.ERP5Type.Document.Folder import Folder
39
from Products.CMFActivity.ActiveResult import ActiveResult
40
from Products.PythonScripts.Utility import allow_class
41
from AccessControl import ClassSecurityInfo, Permissions
42
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
43 44
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45
from Acquisition import aq_base
46
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
47
from ActivityBuffer import ActivityBuffer
48

49
from ZODB.POSException import ConflictError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
50

51
from zLOG import LOG, INFO, WARNING
52 53

try:
54
  from Products.TimerService import getTimerService
55
except ImportError:
56 57
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58

59
# minimal IP:Port regexp
Alexandre Boeglin's avatar
Alexandre Boeglin committed
60
NODE_RE = re.compile('\d+\.\d+\.\d+\.\d+:\d+')
61

Jean-Paul Smets's avatar
Jean-Paul Smets committed
62 63 64 65 66
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
67 68
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
69
first_run = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
70 71 72 73 74 75 76 77

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
78
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79 80 81 82 83
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

class Message:
84 85 86 87
  """Activity Message Class.
  
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
88
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
89
    if type(object) is StringType:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
90 91 92
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
93
    if type(active_process) is StringType:
94 95 96 97 98
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
99
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
100 101 102 103
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104
    self.is_executed = 0
105
    self.exc_type = None
106
    self.processing = None
107 108
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109

110
  def getObject(self, activity_tool):
111
    """return the object referenced in this message."""
112 113 114
    return activity_tool.unrestrictedTraverse(self.object_path)
    
  def getObjectList(self, activity_tool):
115
    """return the list of object that can be expanded from this message."""
116 117 118 119 120 121 122
    try:
      expand_method_id = self.activity_kw['expand_method_id']
      obj = self.getObject(activity_tool)
      # FIXME: how to pass parameters?
      object_list = getattr(obj, expand_method_id)()
    except KeyError:
      object_list = [self.getObject(activity_tool)]
123
      
124
    return object_list
125 126
      
  def hasExpandMethod(self):
127 128 129 130 131
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
132 133 134
    return self.activity_kw.has_key('expand_method_id')
    
  def changeUser(self, user_name, activity_tool):
135
    """restore the security context for the calling user."""
136 137
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
138
    # if the user is not found, try to get it from a parent acl_users
139 140 141 142
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
143 144 145
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
146 147 148
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
149
    else :
150 151
      LOG("CMFActivity", WARNING,
          "Unable to find user %s in the portal" % user_name)
152
      noSecurityManager()
153 154 155 156 157
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
158
      if isinstance(result,ActiveResult):
159 160
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
161 162
        # XXX Allow other method_id in future
        active_process.activateResult(result)
163
      else:
164
        active_process.activateResult(
165
                    ActiveResult(object_path=object,
166 167
                          method_id=self.method_id,
                          result=result)) # XXX Allow other method_id in future
168
  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
169
  def __call__(self, activity_tool):
170
    try:
171
      obj = self.getObject(activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172
      # Change user if required (TO BE DONE)
173 174
      # We will change the user only in order to execute this method
      current_user = str(_getAuthenticatedUser(self))
175
      user = self.changeUser(self.user_name, activity_tool)
176 177 178 179 180 181 182
      try:
        result = getattr(obj, self.method_id)(*self.args, **self.kw)
      finally:
        # Use again the previous user
        if user is not None:
          self.changeUser(current_user, activity_tool)
      self.activateResult(activity_tool, result, obj)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
183
      self.is_executed = 1
184
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
185
      self.is_executed = 0
186
      self.exc_type = sys.exc_info()[0]
187
      self.exc_value = str(sys.exc_info()[1])
188 189
      self.traceback = ''.join(traceback.format_tb(sys.exc_info()[2]))
      LOG('ActivityTool', WARNING,
190
          'Could not call method %s on object %s' % (
191
          self.method_id, self.object_path), error=sys.exc_info())
192 193 194
      # push the error in ZODB error_log
      if hasattr(activity_tool, 'error_log'):
        activity_tool.error_log.raising(sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
195 196 197 198

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

199
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
200 201 202 203 204 205
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
    user_email = None
    user = portal.portal_membership.getMemberById(self.user_name)
    if user is not None:
      user_email = user.getProperty('email')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
206
    if user_email in ('', None):
207 208
      user_email = portal.getProperty('email_to_address',
                       portal.getProperty('email_from_address'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
209
    mail_text = """From: %s
210 211 212 213 214 215 216
To: %s
Subject: %s

%s

Document: %s
Method: %s
217
Exception: %s %s
218 219
Traceback:
%s
220
""" % (activity_tool.email_from_address, user_email, message,
221 222
       message, '/'.join(self.object_path), self.method_id,
       self.exc_type, self.exc_value, self.traceback)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
223
    activity_tool.MailHost.send( mail_text )
224

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
240 241
class Method:

242
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
243 244
    self.__passive_self = passive_self
    self.__activity = activity
245
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247 248 249
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
250
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

253 254
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
255 256
class ActiveWrapper:

257
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
258 259
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
260
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
261 262 263 264
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
265
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
266 267
                  self.__dict__['__kw'], id)

268
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
269
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
270 271 272 273 274 275 276 277 278 279 280 281
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
282 283 284
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
285
    portal_type = 'Activity Tool'
286
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
287 288
    security = ClassSecurityInfo()

289 290 291
    _distributingNode = ''
    _nodes = ()

292 293
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
294
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
295
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
296
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
297
                     ,
298
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
299 300 301 302

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

303 304 305
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

306 307
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
308 309 310 311 312 313
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
314 315 316

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
317

318 319 320 321 322 323 324 325 326 327
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
328 329
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
330
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
331 332 333 334
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
        """"
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
351

352 353 354 355
    security.declareProtected(Permissions.manage_properties, 'subscribe')
    def subscribe(self):
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
356
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
357
        if not service:
358
            LOG('ActivityTool', INFO, 'TimerService not available')
359 360 361 362 363
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
        return self.REQUEST.RESPONSE.redirect(url)
364 365

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
366
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
367 368
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
369
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
370
        if not service:
371
            LOG('ActivityTool', INFO, 'TimerService not available')
372 373 374 375
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
376 377
        if RESPONSE is not None:
            RESPONSE.redirect(url)
378 379 380

    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
381 382
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
383 384
    def manage_afterAdd(self, item, container):
        self.subscribe()
385 386
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
    def getCurrentNode(self):
        """ Return current node in form ip:port """
        port = ''
        from asyncore import socket_map
        for k, v in socket_map.items():
            if hasattr(v, 'port'):
                # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                type = str(getattr(v, '__class__', 'unknown'))
                if type == 'ZServer.HTTPServer.zhttp_server':
                    port = v.port
                    break
        ip = socket.gethostbyname(socket.gethostname())
        currentNode = '%s:%s' %(ip, port)
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

407
    security.declarePublic('getNodeList getNodes')
408 409 410
    def getNodes(self):
        """ Return all nodes """
        return self._nodes
411
    getNodeList = getNodes
412

413 414 415 416
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
417 418
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
419
        """ set the distributing node """   
420
        if not distributingNode or self._isValidNodeName(distributingNode):
421 422 423 424 425 426 427 428 429 430 431 432 433
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

434 435 436
    security.declarePublic('manage_addNode')
    def manage_addNode(self, node, REQUEST=None):
        """ add a node """
437 438 439 440 441 442 443 444
        if not self._isValidNodeName(node) :
            if REQUEST is not None:
                REQUEST.RESPONSE.redirect(
                    REQUEST.URL1 +
                    '/manageLoadBalancing?manage_tabs_message=' +
                    urllib.quote("Malformed node."))
            return
        
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
        if node in self._nodes:
            if REQUEST is not None:
                REQUEST.RESPONSE.redirect(
                    REQUEST.URL1 +
                    '/manageLoadBalancing?manage_tabs_message=' +
                    urllib.quote("Node exists already."))
            return
            
        self._nodes = self._nodes + (node,)
        
        if REQUEST is not None:
            REQUEST.RESPONSE.redirect(
                REQUEST.URL1 +
                '/manageLoadBalancing?manage_tabs_message=' +
                urllib.quote("Node successfully added."))
                        
    security.declarePublic('manage_delNode')
    def manage_delNode(self, deleteNodes, REQUEST=None):
        """ delete nodes """
        nodeList = list(self._nodes)
        for node in deleteNodes:
            if node in self._nodes:
                nodeList.remove(node)
        self._nodes = tuple(nodeList)
        if REQUEST is not None:
            REQUEST.RESPONSE.redirect(
                REQUEST.URL1 +
                '/manageLoadBalancing?manage_tabs_message=' +
                urllib.quote("Node(s) successfully deleted."))
        
475
    def process_timer(self, tick, interval, prev="", next=""):
476 477 478 479 480 481
        """ 
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
482 483 484 485
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
486

487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
        try:
          # get owner of portal_catalog, so normally we should be able to
          # have the permission to invoke all activities
          user = self.portal_catalog.getOwner()
          newSecurityManager(self.REQUEST, user)
          
          currentNode = self.getCurrentNode()
          
          # only distribute when we are the distributingNode or if it's empty
          if (self.distributingNode == self.getCurrentNode()):
              self.distribute(len(self._nodes))
              #LOG('CMFActivity:', INFO, 'self.distribute(node_count=%s)' %len(self._nodes))

          elif not self.distributingNode:
              self.distribute(1)
              #LOG('CMFActivity:', INFO, 'distributingNodes empty! Calling distribute(1)')
          
          # call tic for the current processing_node
          # the processing_node numbers are the indices of the elements in the node tuple +1
          # because processing_node starts form 1
          if currentNode in self._nodes:
              self.tic(list(self._nodes).index(currentNode)+1)
              #LOG('CMFActivity:', INFO, 'self.tic(processing_node=%s)' %str(list(self._nodes).index(currentNode)+1))
              
          elif len(self._nodes) == 0:
              self.tic(1)
              #LOG('CMFActivity:', INFO, 'Node List is empty! Calling tic(1)')

        except:
          timerservice_lock.release()
          raise
        else:
          timerservice_lock.release()
520

Jean-Paul Smets's avatar
Jean-Paul Smets committed
521 522 523 524 525 526
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
527
      global is_initialized
Jean-Paul Smets's avatar
Jean-Paul Smets committed
528 529 530 531
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
532
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
533
          activity.distribute(self, node_count)
534 535
        except ConflictError:
          raise
536
        except:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
537
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity, error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
538

Jean-Paul Smets's avatar
Jean-Paul Smets committed
539
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
540
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
541 542
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
543
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
544
      """
545
      global active_threads, is_initialized, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
546 547

      # return if the number of threads is too high
548
      # else, increase the number of active_threads and continue
549 550
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
551
      if not too_many_threads or force:
552
        active_threads += 1
553 554 555
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
556
      tic_lock.release()
557

Jean-Paul Smets's avatar
Jean-Paul Smets committed
558 559 560
      # Initialize if needed
      if not is_initialized: self.initialize()

561 562 563 564 565 566 567
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
        self.SQLDict_clearProcessingFlag(processing_node=processing_node)
        self.SQLQueue_clearProcessingFlag(processing_node=processing_node)
        first_run = 0

568 569
      try:
        # Wakeup each queue
Jean-Paul Smets's avatar
Jean-Paul Smets committed
570
        for activity in activity_list:
571
          try:
572 573 574
            activity.wakeup(self, processing_node)
          except ConflictError:
            raise
575
          except:
576 577 578 579 580 581 582 583 584 585
            LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
  
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
          for activity in activity_list:
            try:
              activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
              has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
586
              #LOG('ActivityTool tic', 0, 'has_awake_activity = %r, activity = %r, activity.isAwake(self, processing_node) = %r' % (has_awake_activity, activity, activity.isAwake(self, processing_node)))
587 588 589 590 591 592 593 594 595
            except ConflictError:
              raise
            except:
              LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
596

597
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
598
      # Check in each queue if the object has deferred tasks
599 600
      # if not argument is provided, then check on self
      if len(args) > 0:
601
        obj = args[0]
602
      else:
603
        obj = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
604
      for activity in activity_list:
605
        if activity.hasActivity(self, obj, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
606 607 608
          return 1
      return 0

609
    def activate(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
610 611
      global is_initialized
      if not is_initialized: self.initialize()
612
      if getattr(self, '_v_activity_buffer', None) is None:
613
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
614
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
615

616 617
    def deferredQueueMessage(self, activity, message):
      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
618

619
    def deferredDeleteMessage(self, activity, message):
620
      if getattr(self, '_v_activity_buffer', None) is None:
621
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
622
      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
623

Jean-Paul Smets's avatar
Jean-Paul Smets committed
624
    def getRegisteredMessageList(self, activity):
625 626 627
      activity_buffer = getattr(self, '_v_activity_buffer', None)
      if activity_buffer is not None:
        activity_buffer._register() # This is required if flush flush is called outside activate
628 629 630
        return activity.getRegisteredMessageList(self._v_activity_buffer, self)
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
631

Jean-Paul Smets's avatar
Jean-Paul Smets committed
632
    def unregisterMessage(self, activity, message):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
633
      self._v_activity_buffer._register() # Required if called by flush, outside activate
Jean-Paul Smets's avatar
Jean-Paul Smets committed
634
      return activity.unregisterMessage(self._v_activity_buffer, self, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
635

Jean-Paul Smets's avatar
Jean-Paul Smets committed
636 637 638
    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
639
      if getattr(self, '_v_activity_buffer', None) is None:
640
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
641
      if type(object) is TupleType:
642 643 644
        object_path = object
      else:
        object_path = object.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
645 646 647
      for activity in activity_list:
        activity.flush(self, object_path, invoke=invoke, **kw)

648 649 650 651 652 653 654 655 656 657 658 659
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.start(self, **kw)

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.stop(self, **kw)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
660 661
    def invoke(self, message):
      message(self)
662
 
663 664 665 666 667 668 669 670 671
    def invokeGroup(self, method_id, message_list):
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
      # Filter the list of messages. If an object is not available, ignore such a message.
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
672 673
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
674 675
        try:
          obj = m.getObject(self)
676
          i = len(new_message_list) # This is an index of this message in new_message_list.
677
          if m.hasExpandMethod():
678 679
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
680
              if path not in path_dict:
681
                path_dict[path] = i
682 683 684 685 686 687 688 689 690 691
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
692 693 694
          else:
            path = obj.getPath()
            if path not in path_dict:
695
              path_dict[path] = i
696 697 698 699 700 701 702 703 704 705
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
706
          object_list.append(obj)
707 708 709
          new_message_list.append(m)
        except:
          m.is_executed = 0
710
          m.exc_type = sys.exc_info()[0]
711
          LOG('WARNING ActivityTool', 0,
712 713
              'Could not call method %s on object %s' %
              (m.method_id, m.object_path), error=sys.exc_info())
714

715 716
      try:
        if len(expanded_object_list) > 0:
717 718 719
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to pass parameters?
          # FIXME: how to apply security here?
720 721
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
722 723
          result = method(expanded_object_list)
        else:
724 725 726 727 728 729 730
          result = None
      except:
        # In this case, the group method completely failed.
        for m in new_message_list:
          m.is_executed = 0
          m.exc_type = sys.exc_info()[0]
        LOG('WARNING ActivityTool', 0,
731 732
            'Could not call method %s on objects %s' %
            (method_id, expanded_object_list), error=sys.exc_info())
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
            m.is_executed = 0
            LOG('ActivityTool', WARNING,
748 749
                'the method %s partially failed on object %s' %
                (m.method_id, m.object_path,))
750 751 752 753 754
          else:
            try:
              m.activateResult(self, result, object)
              m.is_executed = 1
            except:
755
              m.is_executed = 0
756
              m.exc_type = sys.exc_info()[0]
757
              LOG('ActivityTool', WARNING,
758 759
                  'Could not call method %s on object %s' % (
                  m.method_id, m.object_path), error=sys.exc_info())
760
 
761 762
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
763
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
764 765
      global is_initialized
      if not is_initialized: self.initialize()
766
      if getattr(self, '_v_activity_buffer', None) is None:
767
        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
768 769
      activity_dict[activity].queueMessage(self,
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
770

771
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
772 773 774 775 776 777
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
778
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
779
      if REQUEST is not None:
780 781
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
782

783
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
784 785 786 787 788 789
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
790
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
791
      if REQUEST is not None:
792 793
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
794

795 796
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
797
    def manageClearActivities(self, keep=1, REQUEST=None):
798 799 800 801 802
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

803 804
      # Obtain all pending messages.
      message_list = []
805 806 807 808 809 810 811 812
      if keep:
        for activity in activity_list:
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
813 814 815
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
816
            
817 818 819 820 821 822
      if hasattr(folder, 'SQLDict_createMessageTable'):
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
823
          LOG('CMFActivity', WARNING,
824
              'could not drop the message table',
825 826 827 828 829 830 831 832 833
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

      if hasattr(folder, 'SQLQueue_createMessageTable'):
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
834
          LOG('CMFActivity', WARNING,
835
              'could not drop the message queue table',
836 837 838
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

839 840 841 842 843 844 845 846
      # Reactivate the messages.
      for m in message_list:
        try:
          m.reactivate(self)
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
847 848
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
849

850
      if REQUEST is not None:
851 852
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
          'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
853

Jean-Paul Smets's avatar
Jean-Paul Smets committed
854
    security.declarePublic('getMessageList')
855
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
856 857 858
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
859 860 861
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
862 863
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
864
        try:
865
          message_list += activity.getMessageList(self,**kw)
Sebastien Robin's avatar
Sebastien Robin committed
866 867
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
868 869
      return message_list

870 871 872 873 874 875 876 877 878 879 880 881 882
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
      for activity in activity_list:
        try:
          message_count += activity.countMessageWithTag(self, value)
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:', 0, activity)
      return message_count

883
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
884
    def newActiveProcess(self, **kw):
885 886 887
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
888 889 890
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
891 892 893 894

    def reindexObject(self):
      self.immediateReindexObject()

895 896 897 898 899 900 901
    # Active synchronisation methods
    def validateOrder(self, message, validator_id, validation_value):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        method_id = "_validate_%s" % validator_id
        if hasattr(activity, method_id):
902
#           LOG('CMFActivity: ', 0, 'validateOrder calling method_id %s' % method_id)
903 904 905
          if getattr(activity,method_id)(self, message, validation_value):
            return 1
      return 0
906

Yoshinori Okuji's avatar
Yoshinori Okuji committed
907 908
    # Required for tests (time shift)
    def timeShift(self, delay):
909 910 911 912
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.timeShift(self, delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
913

914
InitializeClass(ActivityTool)