ActivityTool.py 42 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32 33 34 35
import socket
import urllib
import threading
import sys
from types import TupleType, StringType
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
36
from Products.CMFCore import CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.PythonScripts.Utility import allow_class
40
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
41 42 43 44
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
45 46
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47
from Acquisition import aq_base
48
from Acquisition import aq_inner
49
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
50
from ActivityBuffer import ActivityBuffer
51
from zExceptions import ExceptionFormatter
52
from BTrees.OIBTree import OIBTree
53

54
from ZODB.POSException import ConflictError
55
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
56

57
from zLOG import LOG, INFO, WARNING
58 59

try:
60
  from Products.TimerService import getTimerService
61
except ImportError:
62 63
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
64

65
# minimal IP:Port regexp
66
NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
67

Jean-Paul Smets's avatar
Jean-Paul Smets committed
68 69 70 71 72
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
73 74
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
75
first_run = 1
76 77 78
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79 80 81 82 83

# Activity Registration
activity_dict = {}
activity_list = []

84 85 86 87 88 89 90
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
91 92 93
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
94
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
95 96 97 98 99
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

class Message:
100
  """Activity Message Class.
101

102 103
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
104 105
  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
    if isinstance(obj, str):
106
      self.object_path = tuple(obj.split('/'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
107
    else:
108
      self.object_path = obj.getPhysicalPath()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
109
    if type(active_process) is StringType:
110 111 112 113 114
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
115
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
116 117 118 119
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120
    self.is_executed = 0
121
    self.exc_type = None
122
    self.exc_value = None
123
    self.processing = None
124 125
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
126

127
  def getObject(self, activity_tool):
128
    """return the object referenced in this message."""
129
    return activity_tool.unrestrictedTraverse(self.object_path)
130

131
  def getObjectList(self, activity_tool):
132
    """return the list of object that can be expanded from this message."""
133
    object_list = []
134
    try:
135
      object_list.append(self.getObject(activity_tool))
136
    except KeyError:
137 138 139 140 141 142
      pass
    else:
      if self.hasExpandMethod():
        expand_method_id = self.activity_kw['expand_method_id']
        # FIXME: how to pass parameters?
        object_list = getattr(object_list[0], expand_method_id)()
143
    return object_list
144

145
  def hasExpandMethod(self):
146 147 148 149 150
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
151
    return self.activity_kw.has_key('expand_method_id')
152

153
  def changeUser(self, user_name, activity_tool):
154
    """restore the security context for the calling user."""
155 156
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
157
    # if the user is not found, try to get it from a parent acl_users
158 159 160 161
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
162 163 164
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
165 166 167
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
168
    else :
169 170
      LOG("CMFActivity", WARNING,
          "Unable to find user %s in the portal" % user_name)
171
      noSecurityManager()
172 173 174 175 176
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
177
      if isinstance(result,ActiveResult):
178 179
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
180 181
        # XXX Allow other method_id in future
        active_process.activateResult(result)
182
      else:
183
        active_process.activateResult(
184
                    ActiveResult(object_path=object,
185 186
                          method_id=self.method_id,
                          result=result)) # XXX Allow other method_id in future
187

Jean-Paul Smets's avatar
Jean-Paul Smets committed
188
  def __call__(self, activity_tool):
189
    try:
190
      obj = self.getObject(activity_tool)
191
      old_security_manager = getSecurityManager()
192
      # Change user if required (TO BE DONE)
193
      # We will change the user only in order to execute this method
194
      user = self.changeUser(self.user_name, activity_tool)
195 196 197
      try:
        result = getattr(obj, self.method_id)(*self.args, **self.kw)
      finally:
198 199
        setSecurityManager(old_security_manager)

200
      self.activateResult(activity_tool, result, obj)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
201
      self.is_executed = 1
202
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
203
      self.is_executed = 0
204
      self.exc_type = sys.exc_info()[0]
205
      self.exc_value = str(sys.exc_info()[1])
206 207
      self.traceback = ''.join(ExceptionFormatter.format_exception(
                               *sys.exc_info()))
208
      LOG('ActivityTool', WARNING,
209
          'Could not call method %s on object %s' % (
210
          self.method_id, self.object_path), error=sys.exc_info())
211
      # push the error in ZODB error_log
212
      if getattr(activity_tool, 'error_log', None) is not None:
213
        activity_tool.error_log.raising(sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
214

215 216 217 218 219 220 221
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

  def getDependentMessageList(self, activity, activity_tool):
    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
222

223
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
224 225 226 227 228 229
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
    user_email = None
    user = portal.portal_membership.getMemberById(self.user_name)
    if user is not None:
      user_email = user.getProperty('email')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
230
    if user_email in ('', None):
231 232
      user_email = portal.getProperty('email_to_address',
                       portal.getProperty('email_from_address'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
233
    mail_text = """From: %s
234 235 236 237 238 239 240
To: %s
Subject: %s

%s

Document: %s
Method: %s
241
Exception: %s %s
242

243
%s
244
""" % (activity_tool.email_from_address, user_email, message,
245 246
       message, '/'.join(self.object_path), self.method_id,
       self.exc_type, self.exc_value, self.traceback)
247 248 249 250
    try:
      activity_tool.MailHost.send( mail_text )
    except (socket.error, MailHostError), message:
      LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent: %s' % (message, ))
251

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268
class Method:

269
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
270 271
    self.__passive_self = passive_self
    self.__activity = activity
272
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
273 274 275 276
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
277
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
278 279
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

280 281
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
282 283
class ActiveWrapper:

284
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
285 286
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
287
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
288 289 290 291
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
292
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
293 294
                  self.__dict__['__kw'], id)

295
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
296
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
297 298 299 300 301 302 303 304 305 306 307 308
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
309 310 311
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
312
    portal_type = 'Activity Tool'
313
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
314 315
    security = ClassSecurityInfo()

316 317
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
318
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
319
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
320
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
321
                     ,
322
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
323 324 325 326

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

327 328 329
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

330 331
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
332 333 334 335 336 337
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
338 339 340

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
341

342 343 344 345 346 347 348 349 350 351
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
352 353
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
354
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
355 356 357 358
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1
359 360 361
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
362
        """
363 364 365 366 367 368 369 370 371 372 373 374
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
375

376
    security.declareProtected(Permissions.manage_properties, 'subscribe')
377
    def subscribe(self, REQUEST=None, RESPONSE=None):
378 379
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
380
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
381
        if not service:
382
            LOG('ActivityTool', INFO, 'TimerService not available')
383 384 385 386
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
387 388
        if RESPONSE is not None:
            RESPONSE.redirect(url)
389 390

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
391
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
392 393
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
394
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
395
        if not service:
396
            LOG('ActivityTool', INFO, 'TimerService not available')
397 398 399 400
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
401 402
        if RESPONSE is not None:
            RESPONSE.redirect(url)
403 404 405

    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
406 407
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
408 409
    def manage_afterAdd(self, item, container):
        self.subscribe()
410 411
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
412 413
    def getCurrentNode(self):
        """ Return current node in form ip:port """
414 415 416 417 418 419 420 421 422 423 424 425 426
        global currentNode
        if currentNode is None:
          port = ''
          from asyncore import socket_map
          for k, v in socket_map.items():
              if hasattr(v, 'port'):
                  # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                  type = str(getattr(v, '__class__', 'unknown'))
                  if type == 'ZServer.HTTPServer.zhttp_server':
                      port = v.port
                      break
          ip = socket.gethostbyname(socket.gethostname())
          currentNode = '%s:%s' %(ip, port)
427 428 429 430 431 432 433
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
        new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
      if not node_dict.has_key(node):
        if len(node_dict) == 0: # If we are registering the first node, make
                                # it both the distributing node and a processing
                                # node.
          role = ROLE_PROCESSING
          self.distributingNode = node
        else:
          role = ROLE_IDLE
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

471
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
472 473
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
474

475 476 477 478
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
479 480
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
481
        """ set the distributing node """   
482
        if not distributingNode or self._isValidNodeName(distributingNode):
483 484 485 486 487 488 489 490 491 492 493 494 495
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        node_dict = self.getNodeDict()
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
554

555
    def process_timer(self, tick, interval, prev="", next=""):
556
        """
557 558 559 560 561
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
562 563 564 565
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
566

Jérome Perrin's avatar
Jérome Perrin committed
567
        old_sm = getSecurityManager()
568 569 570
        try:
          # get owner of portal_catalog, so normally we should be able to
          # have the permission to invoke all activities
Jérome Perrin's avatar
Jérome Perrin committed
571
          user = self.portal_catalog.getWrappedOwner()
572
          newSecurityManager(self.REQUEST, user)
573

574
          currentNode = self.getCurrentNode()
575 576
          self.registerNode(currentNode)
          processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
577

578
          # only distribute when we are the distributingNode or if it's empty
579 580
          if (self.getDistributingNode() == currentNode):
            self.distribute(len(processing_node_list))
581

582 583 584 585 586 587 588
          # SkinsTool uses a REQUEST cache to store skin objects, as
          # with TimerService we have the same REQUEST over multiple
          # portals, we clear this cache to make sure the cache doesn't
          # contains skins from another portal.
          stool = getToolByName(self, 'portal_skins', None)
          if stool is not None:
            stool.changeSkin(None)
589

590 591 592
          # call tic for the current processing_node
          # the processing_node numbers are the indices of the elements in the node tuple +1
          # because processing_node starts form 1
593 594
          if currentNode in processing_node_list:
            self.tic(processing_node_list.index(currentNode)+1)
595

Jérome Perrin's avatar
Jérome Perrin committed
596
        finally:
597
          timerservice_lock.release()
Jérome Perrin's avatar
Jérome Perrin committed
598
          setSecurityManager(old_sm)
599

Jean-Paul Smets's avatar
Jean-Paul Smets committed
600 601 602 603 604 605
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
606
      global is_initialized
Jean-Paul Smets's avatar
Jean-Paul Smets committed
607 608 609 610
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
611
        try:
612
          activity.distribute(aq_inner(self), node_count)
613 614
        except ConflictError:
          raise
615
        except:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
616
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity, error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
617

Jean-Paul Smets's avatar
Jean-Paul Smets committed
618
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
619
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
620 621
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
622
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
623
      """
624
      global active_threads, is_initialized, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
625 626

      # return if the number of threads is too high
627
      # else, increase the number of active_threads and continue
628 629
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
630
      if not too_many_threads or force:
631
        active_threads += 1
632 633 634
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
635
      tic_lock.release()
636

Jean-Paul Smets's avatar
Jean-Paul Smets committed
637 638
      # Initialize if needed
      if not is_initialized: self.initialize()
639

640
      inner_self = aq_inner(self)
641

642 643 644
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
645 646 647 648
        inner_self.SQLDict_clearProcessingFlag(
                                processing_node=processing_node)
        inner_self.SQLQueue_clearProcessingFlag(
                                processing_node=processing_node)
649 650
        first_run = 0

651 652
      try:
        # Wakeup each queue
Jean-Paul Smets's avatar
Jean-Paul Smets committed
653
        for activity in activity_list:
654
          try:
655
            activity.wakeup(inner_self, processing_node)
656 657
          except ConflictError:
            raise
658
          except:
659
            LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
660

661 662 663 664 665 666
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
          for activity in activity_list:
            try:
667 668
              activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
              has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
669 670 671 672 673 674 675 676 677
            except ConflictError:
              raise
            except:
              LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
678

679
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
680
      # Check in each queue if the object has deferred tasks
681 682
      # if not argument is provided, then check on self
      if len(args) > 0:
683
        obj = args[0]
684
      else:
685
        obj = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
686
      for activity in activity_list:
687
        if activity.hasActivity(aq_inner(self), obj, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
688 689 690
          return 1
      return 0

691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
        Lock is held when checking for intermediate level existance
        because:
         - intermediate level dict must not be created in 2 threads at the
           same time, since one creation would destroy the existing one.
        It's released after that step because:
         - lower level access is at thread scope, thus by definition there
           can be only one access at a time to a key
         - GIL protects us when accessing python instances
      """
      global global_activity_buffer
      global global_activity_buffer_lock
      assert getattr(self, 'aq_self', None) is not None
      my_instance_key = self.getPhysicalPath()
      my_thread_key = get_ident()
      global_activity_buffer_lock.acquire()
      try:
        if my_instance_key not in global_activity_buffer:
          global_activity_buffer[my_instance_key] = {}
      finally:
        global_activity_buffer_lock.release()
      thread_activity_buffer = global_activity_buffer[my_instance_key]
      if my_thread_key not in thread_activity_buffer:
        if create_if_not_found:
          buffer = ActivityBuffer(activity_tool=self)
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
      activity_buffer = thread_activity_buffer[my_thread_key]
      return activity_buffer

728 729
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
730 731
      global is_initialized
      if not is_initialized: self.initialize()
732
      self.getActivityBuffer()
733
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
734

735
    def deferredQueueMessage(self, activity, message):
736 737
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
738

739
    def deferredDeleteMessage(self, activity, message):
740 741
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
742

Jean-Paul Smets's avatar
Jean-Paul Smets committed
743
    def getRegisteredMessageList(self, activity):
744
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
745
      if activity_buffer is not None:
746 747
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
748
                                                 aq_inner(self))
749 750
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
751

Jean-Paul Smets's avatar
Jean-Paul Smets committed
752
    def unregisterMessage(self, activity, message):
753 754 755
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
756

757
    def flush(self, obj, invoke=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
758 759
      global is_initialized
      if not is_initialized: self.initialize()
760
      self.getActivityBuffer()
761 762
      if isinstance(obj, tuple):
        object_path = obj
763
      else:
764
        object_path = obj.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
765
      for activity in activity_list:
766
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
767

768 769 770 771
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
772
        activity.start(aq_inner(self), **kw)
773 774 775 776 777

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
778
        activity.stop(aq_inner(self), **kw)
779

Jean-Paul Smets's avatar
Jean-Paul Smets committed
780
    def invoke(self, message):
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
        base_chain = [aq_base(x) for x in object.aq_chain]
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
        parents = getattr(request, 'PARENTS', None)
        if parents is None:
          LOG('CMFActivity.ActivityTool.invoke', INFO, 'PARENTS is not defined in REQUEST. It should only happen in unit tests.')
          request['PARENTS'] = object.aq_chain[:]
        new_request_container = request_container.__class__(REQUEST=request.clone())
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
      message(my_self)
803

804 805 806 807 808 809 810 811 812
    def invokeGroup(self, method_id, message_list):
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
      # Filter the list of messages. If an object is not available, ignore such a message.
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
813 814
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
815 816
        try:
          obj = m.getObject(self)
817
          i = len(new_message_list) # This is an index of this message in new_message_list.
818
          if m.hasExpandMethod():
819 820
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
821
              if path not in path_dict:
822
                path_dict[path] = i
823 824 825 826 827 828
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
829 830
                  if 'group_id' in activity_kw:
                    del activity_kw['group_id']                    
831 832 833 834
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
835 836 837
          else:
            path = obj.getPath()
            if path not in path_dict:
838
              path_dict[path] = i
839 840 841 842 843 844
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
845 846
                if 'group_id' in activity_kw:
                  del activity_kw['group_id']
847 848 849 850
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
851
          object_list.append(obj)
852 853 854
          new_message_list.append(m)
        except:
          m.is_executed = 0
855
          m.exc_type = sys.exc_info()[0]
856
          LOG('WARNING ActivityTool', 0,
857 858
              'Could not call method %s on object %s' %
              (m.method_id, m.object_path), error=sys.exc_info())
859

860 861
      try:
        if len(expanded_object_list) > 0:
862 863
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to apply security here?
864 865
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
866
          result = method(expanded_object_list, **m.kw)
867
        else:
868 869 870 871 872 873 874
          result = None
      except:
        # In this case, the group method completely failed.
        for m in new_message_list:
          m.is_executed = 0
          m.exc_type = sys.exc_info()[0]
        LOG('WARNING ActivityTool', 0,
875 876
            'Could not call method %s on objects %s' %
            (method_id, expanded_object_list), error=sys.exc_info())
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
            m.is_executed = 0
            LOG('ActivityTool', WARNING,
892 893
                'the method %s partially failed on object %s' %
                (m.method_id, m.object_path,))
894 895 896 897 898
          else:
            try:
              m.activateResult(self, result, object)
              m.is_executed = 1
            except:
899
              m.is_executed = 0
900
              m.exc_type = sys.exc_info()[0]
901
              LOG('ActivityTool', WARNING,
902 903
                  'Could not call method %s on object %s' % (
                  m.method_id, m.object_path), error=sys.exc_info())
904

905 906
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
907
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
908 909
      global is_initialized
      if not is_initialized: self.initialize()
910
      self.getActivityBuffer()
911
      activity_dict[activity].queueMessage(aq_inner(self),
912
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
913

914
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
915 916 917 918 919 920
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
921
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
922
      if REQUEST is not None:
923 924
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
925

926
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
927 928 929 930 931 932
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
933
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
934
      if REQUEST is not None:
935 936
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
937

938 939
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
940
    def manageClearActivities(self, keep=1, REQUEST=None):
941 942 943 944 945
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

946 947
      # Obtain all pending messages.
      message_list = []
948 949 950 951 952 953 954 955
      if keep:
        for activity in activity_list:
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
956 957 958
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
959 960

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
961 962 963 964 965
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
966
          LOG('CMFActivity', WARNING,
967
              'could not drop the message table',
968 969 970
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

971
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
972 973 974 975 976
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
977
          LOG('CMFActivity', WARNING,
978
              'could not drop the message queue table',
979 980 981
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

982 983 984
      # Reactivate the messages.
      for m in message_list:
        try:
985
          m.reactivate(aq_inner(self))
986 987 988 989
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
990 991
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
992

993
      if REQUEST is not None:
994 995
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
          'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
996

Jean-Paul Smets's avatar
Jean-Paul Smets committed
997
    security.declarePublic('getMessageList')
998
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
999 1000 1001
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1002 1003 1004
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1005 1006
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
1007
        try:
1008
          message_list += activity.getMessageList(aq_inner(self),**kw)
Sebastien Robin's avatar
Sebastien Robin committed
1009 1010
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1011 1012
      return message_list

1013 1014 1015 1016 1017 1018 1019
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
      for activity in activity_list:
1020
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1031
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1032 1033 1034 1035 1036
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
      for activity in activity_list:
1037
        message_count += activity.countMessage(aq_inner(self), **kw)
1038 1039
      return message_count

1040
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1041
    def newActiveProcess(self, **kw):
1042 1043 1044
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
1045 1046 1047
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
1048 1049 1050 1051

    def reindexObject(self):
      self.immediateReindexObject()

1052
    # Active synchronisation methods
1053
    security.declarePrivate('validateOrder')
1054
    def validateOrder(self, message, validator_id, validation_value):
1055 1056 1057 1058 1059
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
1060 1061
      global is_initialized
      if not is_initialized: self.initialize()
1062
      message_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
1063
      method_id = "_validate_%s" % validator_id
1064
      for activity in activity_list:
1065 1066 1067 1068 1069 1070
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
            message_list.extend([(activity, m) for m in result])
      return message_list
1071

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1072 1073
    # Required for tests (time shift)
    def timeShift(self, delay):
1074 1075 1076
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
1077
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1078

1079
InitializeClass(ActivityTool)