ActivityTool.py 56.2 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34 35
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
36
from Products.CMFCore import CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.PythonScripts.Utility import allow_class
40
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
41 42 43 44
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
45 46
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47
from Acquisition import aq_base
48
from Acquisition import aq_inner
49
from ActivityBuffer import ActivityBuffer
50
from zExceptions import ExceptionFormatter
51
from BTrees.OIBTree import OIBTree
52 53 54 55 56 57 58 59 60 61 62 63 64

try:
  from Products import iHotfix
  localizer_lock = iHotfix._the_lock
  localizer_contexts = iHotfix.contexts
  LocalizerContext = iHotfix.Context
except ImportError:
  # Localizer 1.2 includes iHotFix patches
  import Products.Localizer.patches
  localizer_lock = Products.Localizer.patches._requests_lock
  localizer_contexts = Products.Localizer.patches._requests
  LocalizerContext = lambda request: request

65

66
from ZODB.POSException import ConflictError
67
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
68

69
from zLOG import LOG, INFO, WARNING, ERROR
70
from warnings import warn
71
from time import time
72 73

try:
74
  from Products.TimerService import getTimerService
75
except ImportError:
76 77
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78

79 80 81 82 83
try:
  from traceback import format_list, extract_stack
except ImportError:
  format_list = extract_stack = None

84
# minimal IP:Port regexp
85
NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
86

Jean-Paul Smets's avatar
Jean-Paul Smets committed
87 88 89 90
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
Vincent Pelletier's avatar
Vincent Pelletier committed
91
is_initialized = False
92 93
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
94
is_running_lock = threading.Lock()
95
first_run = True
96 97 98
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101 102

# Activity Registration
activity_dict = {}

103 104 105 106 107
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
108
activity_tracking_logger = logging.getLogger('Tracking')
109
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
110 111 112 113 114 115 116 117 118 119

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
120 121
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
122 123 124 125 126 127 128 129
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

130 131 132 133 134 135 136 137
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

138 139 140 141 142 143 144
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
145 146 147
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
148
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
149 150 151
  activity_instance = activity()
  activity_dict[activity.__name__] = activity_instance

152 153 154 155
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

Jean-Paul Smets's avatar
Jean-Paul Smets committed
156
class Message:
157
  """Activity Message Class.
158

159 160
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
161 162 163

  active_process_uid = None

164 165
  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
    if isinstance(obj, str):
166
      self.object_path = tuple(obj.split('/'))
167
      activity_creation_trace = False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
168
    else:
169
      self.object_path = obj.getPhysicalPath()
170
      activity_creation_trace = obj.getPortalObject().portal_activities.activity_creation_trace
Yoshinori Okuji's avatar
Yoshinori Okuji committed
171
    if type(active_process) is StringType:
172 173 174 175 176
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
177
      self.active_process_uid = active_process.getUid()
178 179 180
    if activity_kw.get('serialization_tag', False) is None:
      # Remove serialization_tag if it's None.
      del activity_kw['serialization_tag']
Jean-Paul Smets's avatar
Jean-Paul Smets committed
181 182 183 184
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
185
    self.is_executed = MESSAGE_NOT_EXECUTED
Vincent Pelletier's avatar
Vincent Pelletier committed
186 187 188
    self.exc_type = None
    self.exc_value = None
    self.traceback = None
189
    if activity_creation_trace and format_list is not None:
190 191 192 193
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
194 195
    else:
      self.call_traceback = None
196
    self.processing = None
197
    self.user_name = str(_getAuthenticatedUser(self))
198
    # Store REQUEST Info
199
    self.request_info = {}
200 201
    request = getattr(obj, 'REQUEST', None)
    if request is not None:
202 203 204 205 206 207 208 209 210
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
211

212
  def getObject(self, activity_tool):
213
    """return the object referenced in this message."""
214
    return activity_tool.unrestrictedTraverse(self.object_path)
215

216
  def getObjectList(self, activity_tool):
217
    """return the list of object that can be expanded from this message."""
218
    object_list = []
219
    try:
220
      object_list.append(self.getObject(activity_tool))
221
    except KeyError:
222 223 224 225 226 227
      pass
    else:
      if self.hasExpandMethod():
        expand_method_id = self.activity_kw['expand_method_id']
        # FIXME: how to pass parameters?
        object_list = getattr(object_list[0], expand_method_id)()
228
    return object_list
229

230
  def hasExpandMethod(self):
231 232 233 234 235
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
236
    return self.activity_kw.has_key('expand_method_id')
237

238
  def changeUser(self, user_name, activity_tool):
239
    """restore the security context for the calling user."""
240 241
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
242
    # if the user is not found, try to get it from a parent acl_users
243 244 245 246
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
247 248 249
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
250 251 252
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
253
    else :
254
      LOG("CMFActivity", WARNING,
255
          "Unable to find user %r in the portal" % user_name)
256
      noSecurityManager()
257 258 259 260 261
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
262
      if isinstance(result, ActiveResult):
263 264
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
265 266
        # XXX Allow other method_id in future
        active_process.activateResult(result)
267
      else:
268
        active_process.activateResult(
269
                    ActiveResult(object_path=object,
270 271
                                 method_id=self.method_id,
                                 result=result)) # XXX Allow other method_id in future
272

Jean-Paul Smets's avatar
Jean-Paul Smets committed
273
  def __call__(self, activity_tool):
274
    try:
275
      obj = self.getObject(activity_tool)
276
    except KeyError:
277
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
278
    else:
279
      try:
280 281 282 283 284
        old_security_manager = getSecurityManager()
        # Change user if required (TO BE DONE)
        # We will change the user only in order to execute this method
        user = self.changeUser(self.user_name, activity_tool)
        try:
285 286 287
          try:
            # XXX: There is no check to see if user is allowed to access
            # that method !
288 289
            method = getattr(obj, self.method_id)
          except:
290
            method = None
291
            self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
292
          else:
293
            if activity_tool.activity_timing_log:
294 295 296
              result = activity_timing_method(method, self.args, self.kw)
            else:
              result = method(*self.args, **self.kw)
297 298 299 300 301
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
          self.activateResult(activity_tool, result, obj)
302
          self.setExecutionState(MESSAGE_EXECUTED)
303
      except:
304
        self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
305

306 307 308 309 310 311 312
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

  def getDependentMessageList(self, activity, activity_tool):
    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
313

314
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
315 316
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
317
    user_email = portal.getProperty('email_to_address',
318
                       portal.getProperty('email_from_address'))
319

320 321
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
322 323 324 325
    call_traceback = ''
    if self.call_traceback:
      call_traceback = 'Created at:\n%s' % self.call_traceback

326
    mail_text = """From: %s <%s>
327 328 329 330 331
To: %s
Subject: %s

%s

332
Node: %s
333
User name: %r
334 335
Document: %s
Method: %s
336 337
Arguments: %r
Named Parameters: %r
338 339
%s

Vincent Pelletier's avatar
Vincent Pelletier committed
340
Exception: %s %s
341

342
%s
343 344
""" % (email_from_name, activity_tool.email_from_address, 
       user_email, message, message,
345
       activity_tool.getCurrentNode(), self.user_name,
346
       '/'.join(self.object_path), self.method_id, self.args, self.kw,
347
       call_traceback, self.exc_type, self.exc_value, self.traceback)
348 349
    try:
      activity_tool.MailHost.send( mail_text )
Vincent Pelletier's avatar
Vincent Pelletier committed
350 351
    except (socket.error, MailHostError), message:
      LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent: %s. Exception was: %s %s\n%s' % (message, self.exc_type, self.exc_value, self.traceback))
352

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
      
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
    if is_executed != MESSAGE_EXECUTED:
      if exc_info is None:
        exc_info = sys.exc_info()
398 399 400 401 402 403 404
      if exc_info == (None, None, None):
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
        except:
          pass
        exc_info = sys.exc_info()
405 406 407 408 409
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
410
          error_log.raising(exc_info)
411 412
      self.exc_type = exc_info[0]
      self.exc_value = str(exc_info[1])
413
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info))
414 415 416 417

  def getExecutionState(self):
    return self.is_executed

Jean-Paul Smets's avatar
Jean-Paul Smets committed
418 419
class Method:

420
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
421 422
    self.__passive_self = passive_self
    self.__activity = activity
423
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
424 425 426 427
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
428
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
429 430
    portal_activities = self.__passive_self.portal_activities
    if portal_activities.activity_tracking:
431
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
432
    activity_dict[self.__activity].queueMessage(portal_activities, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
433

434 435
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
436 437
class ActiveWrapper:

438
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
439 440
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
441
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
442 443 444 445
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
446
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
447 448
                  self.__dict__['__kw'], id)

449 450 451 452
  def __repr__(self):
    return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
                                   self.__dict__['__passive_self'])

453 454 455
# True when activities cannot be executing any more.
has_processed_shutdown = False

456 457 458 459 460 461 462 463
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
464

465
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
466
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
467 468 469 470 471 472 473 474 475 476 477 478
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
479 480 481
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
482
    portal_type = 'Activity Tool'
483
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
484 485
    security = ClassSecurityInfo()

486 487
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
488
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
489
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
490
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
491
                     ,
492
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
493 494 495 496

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

497 498 499
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

500 501
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
502 503 504 505 506 507
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
508 509 510
    activity_creation_trace = False
    activity_tracking = False
    activity_timing_log = False
511

512 513 514 515 516
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

517 518
    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
519

520 521 522 523 524 525 526 527 528 529
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
530 531
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
532
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
533
      # Initialize each queue
534
      for activity in activity_dict.itervalues():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
535
        activity.initialize(self)
Vincent Pelletier's avatar
Vincent Pelletier committed
536
      is_initialized = True
537 538 539
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
540
        """
541 542 543 544 545 546 547 548 549 550 551 552
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
553

554
    security.declareProtected(Permissions.manage_properties, 'subscribe')
555
    def subscribe(self, REQUEST=None, RESPONSE=None):
556 557
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
558
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
559
        if not service:
560
            LOG('ActivityTool', INFO, 'TimerService not available')
561 562 563 564
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
565 566
        if RESPONSE is not None:
            RESPONSE.redirect(url)
567 568

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
569
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
570 571
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
572
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
573
        if not service:
574
            LOG('ActivityTool', INFO, 'TimerService not available')
575 576 577 578
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
579 580
        if RESPONSE is not None:
            RESPONSE.redirect(url)
581

582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

660 661
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
662 663
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
664 665
    def manage_afterAdd(self, item, container):
        self.subscribe()
666 667
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
668 669
    def getCurrentNode(self):
        """ Return current node in form ip:port """
670 671 672 673 674 675 676 677 678 679 680 681 682
        global currentNode
        if currentNode is None:
          port = ''
          from asyncore import socket_map
          for k, v in socket_map.items():
              if hasattr(v, 'port'):
                  # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                  type = str(getattr(v, '__class__', 'unknown'))
                  if type == 'ZServer.HTTPServer.zhttp_server':
                      port = v.port
                      break
          ip = socket.gethostbyname(socket.gethostname())
          currentNode = '%s:%s' %(ip, port)
683 684 685 686 687 688 689
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
        new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
      if not node_dict.has_key(node):
        if len(node_dict) == 0: # If we are registering the first node, make
                                # it both the distributing node and a processing
                                # node.
          role = ROLE_PROCESSING
          self.distributingNode = node
        else:
          role = ROLE_IDLE
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

727
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
728 729
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
730

731 732 733 734
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
735 736
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
737
        """ set the distributing node """   
738
        if not distributingNode or self._isValidNodeName(distributingNode):
739 740 741 742 743 744 745 746 747 748 749 750 751
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        node_dict = self.getNodeDict()
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
810

811 812 813 814 815
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
816
        global has_processed_shutdown
817 818
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
819 820 821 822
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

823
    def process_timer(self, tick, interval, prev="", next=""):
824
        """
825 826 827 828 829
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
830 831 832 833
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
834

835
        try:
836
          old_sm = getSecurityManager()
837
          try:
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
            try:
              # get owner of portal_catalog, so normally we should be able to
              # have the permission to invoke all activities
              user = self.portal_catalog.getWrappedOwner()
              newSecurityManager(self.REQUEST, user)

              currentNode = self.getCurrentNode()
              self.registerNode(currentNode)
              processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

              # only distribute when we are the distributingNode or if it's empty
              if (self.getDistributingNode() == currentNode):
                self.distribute(len(processing_node_list))

              # SkinsTool uses a REQUEST cache to store skin objects, as
              # with TimerService we have the same REQUEST over multiple
              # portals, we clear this cache to make sure the cache doesn't
              # contains skins from another portal.
              stool = getToolByName(self, 'portal_skins', None)
              if stool is not None:
                stool.changeSkin(None)

              # call tic for the current processing_node
              # the processing_node numbers are the indices of the elements in the node tuple +1
              # because processing_node starts form 1
              if currentNode in processing_node_list:
                self.tic(processing_node_list.index(currentNode)+1)
            except:
              # Catch ALL exception to avoid killing timerserver.
              LOG('ActivityTool', ERROR, 'process_timer received an exception', error=sys.exc_info())
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
870
        finally:
871
          timerservice_lock.release()
872

Jean-Paul Smets's avatar
Jean-Paul Smets committed
873 874 875 876 877 878
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
879 880
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
881 882

      # Call distribute on each queue
883
      for activity in activity_dict.itervalues():
884
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
885

Jean-Paul Smets's avatar
Jean-Paul Smets committed
886
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
887
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
888 889
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
890
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
891
      """
Vincent Pelletier's avatar
Vincent Pelletier committed
892
      global active_threads, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
893 894

      # return if the number of threads is too high
895
      # else, increase the number of active_threads and continue
896 897
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
898
      if not too_many_threads or force:
899
        active_threads += 1
900 901 902
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
903
      tic_lock.release()
904

Jean-Paul Smets's avatar
Jean-Paul Smets committed
905
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
906 907
      if not is_initialized:
        self.initialize()
908

909
      inner_self = aq_inner(self)
910

911 912 913
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
914 915 916 917
        inner_self.SQLDict_clearProcessingFlag(
                                processing_node=processing_node)
        inner_self.SQLQueue_clearProcessingFlag(
                                processing_node=processing_node)
918
        first_run = False
919

920
      try:
921 922 923 924 925 926 927
        #Sort activity list by priority
        activity_list = activity_dict.values()
        # Sort method must be local to access "self"
        def cmpActivities(activity_1, activity_2):
          return cmp(activity_1.getPriority(self), activity_2.getPriority(self))
        activity_list.sort(cmpActivities)
        
928
        # Wakeup each queue
929
        for activity in activity_list:
930
          activity.wakeup(inner_self, processing_node)
931

932 933 934 935
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
936
          for activity in activity_list:
937 938 939
            acquired = is_running_lock.acquire(0)
            if acquired:
              try:
940 941
                activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
                has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
942 943
              finally:
                is_running_lock.release()
944 945 946 947 948
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
949

950
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
951
      # Check in each queue if the object has deferred tasks
952 953
      # if not argument is provided, then check on self
      if len(args) > 0:
954
        obj = args[0]
955
      else:
956
        obj = self
957
      for activity in activity_dict.itervalues():
958
        if activity.hasActivity(aq_inner(self), obj, **kw):
959 960
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
961

962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
        Lock is held when checking for intermediate level existance
        because:
         - intermediate level dict must not be created in 2 threads at the
           same time, since one creation would destroy the existing one.
        It's released after that step because:
         - lower level access is at thread scope, thus by definition there
           can be only one access at a time to a key
         - GIL protects us when accessing python instances
      """
978 979
      # Safeguard: make sure we are wrapped in  acquisition context before
      # using our path as an activity tool instance-wide identifier.
980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
      assert getattr(self, 'aq_self', None) is not None
      my_instance_key = self.getPhysicalPath()
      my_thread_key = get_ident()
      global_activity_buffer_lock.acquire()
      try:
        if my_instance_key not in global_activity_buffer:
          global_activity_buffer[my_instance_key] = {}
      finally:
        global_activity_buffer_lock.release()
      thread_activity_buffer = global_activity_buffer[my_instance_key]
      if my_thread_key not in thread_activity_buffer:
        if create_if_not_found:
          buffer = ActivityBuffer(activity_tool=self)
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
      activity_buffer = thread_activity_buffer[my_thread_key]
      return activity_buffer

999 1000
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1001 1002
      if not is_initialized:
        self.initialize()
1003
      self.getActivityBuffer()
1004
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1005

1006
    def deferredQueueMessage(self, activity, message):
1007 1008
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1009

1010
    def deferredDeleteMessage(self, activity, message):
1011 1012
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1013

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1014
    def getRegisteredMessageList(self, activity):
1015
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1016
      if activity_buffer is not None:
1017 1018
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1019
                                                 aq_inner(self))
1020 1021
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1022

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1023
    def unregisterMessage(self, activity, message):
1024 1025 1026
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1027

1028
    def flush(self, obj, invoke=0, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1029 1030
      if not is_initialized:
        self.initialize()
1031
      self.getActivityBuffer()
1032 1033
      if isinstance(obj, tuple):
        object_path = obj
1034
      else:
1035
        object_path = obj.getPhysicalPath()
1036
      for activity in activity_dict.itervalues():
1037
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1038

1039
    def start(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1040 1041
      if not is_initialized:
        self.initialize()
1042
      for activity in activity_dict.itervalues():
1043
        activity.start(aq_inner(self), **kw)
1044 1045

    def stop(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1046 1047
      if not is_initialized:
        self.initialize()
1048
      for activity in activity_dict.itervalues():
1049
        activity.stop(aq_inner(self), **kw)
1050

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1051
    def invoke(self, message):
1052
      if self.activity_tracking:
1053
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1054
      old_localizer_context = False
1055 1056
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1057
        base_chain = [aq_base(x) for x in self.aq_chain]
1058 1059 1060
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1061 1062 1063 1064 1065 1066 1067 1068
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1069 1070
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1071 1072
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1073
        # XXX: itools (used by Localizer) requires PATH_INFO to be set, and it's
1074 1075
        # not when runing unit tests. Recreate it if it does not exist.
        if request.environ.get('PATH_INFO') is None:
1076
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1077 1078 1079
        
        # restore request information
        new_request = request.clone()
1080
        request_info = message.request_info
1081 1082
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1083 1084
        new_request._script = request_info['_script']
        if 'SERVER_URL' in request_info:
1085
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1086 1087 1088
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1089
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1090 1091
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1092
          id = get_ident()
1093
          localizer_lock.acquire()
1094
          try:
1095 1096
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1097
          finally:
1098 1099
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1100
          new_request.processInputs()
1101 1102

        new_request_container = request_container.__class__(REQUEST=new_request)
1103 1104 1105 1106 1107 1108 1109 1110
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1111 1112 1113
      try:
        message(my_self)
      finally:
1114 1115 1116 1117
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1118 1119
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1120
          id = get_ident()
1121
          localizer_lock.acquire()
1122
          try:
1123 1124
            if old_localizer_context is None:
              del localizer_contexts[id]
1125
            else:
1126
              localizer_contexts[id] = old_localizer_context
1127
          finally:
1128
            localizer_lock.release()
1129
      if self.activity_tracking:
1130
        activity_tracking_logger.info('invoked message')
1131 1132 1133
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1134

1135
    def invokeGroup(self, method_id, message_list):
1136
      if self.activity_tracking:
1137
        activity_tracking_logger.info('invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
1138 1139 1140 1141 1142
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
1143
      # Filter the list of messages. If an object is not available, mark its message as non-executable.
1144 1145
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
1146 1147
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1148 1149
        try:
          obj = m.getObject(self)
1150
        except KeyError:
1151
          m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
1152 1153
          continue
        try:
1154
          i = len(new_message_list) # This is an index of this message in new_message_list.
1155
          if m.hasExpandMethod():
1156 1157
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
1158
              if path not in path_dict:
1159
                path_dict[path] = i
1160 1161 1162 1163 1164 1165
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
1166 1167
                  if 'group_id' in activity_kw:
                    del activity_kw['group_id']                    
1168 1169 1170 1171
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
1172 1173 1174
          else:
            path = obj.getPath()
            if path not in path_dict:
1175
              path_dict[path] = i
1176 1177 1178 1179 1180 1181
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
1182 1183
                if 'group_id' in activity_kw:
                  del activity_kw['group_id']
1184 1185 1186 1187
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
1188
          object_list.append(obj)
1189 1190
          new_message_list.append(m)
        except:
1191
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1192

1193 1194
      try:
        if len(expanded_object_list) > 0:
1195 1196
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to apply security here?
1197 1198
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
1199
          result = method(expanded_object_list, **m.kw)
1200
        else:
1201 1202 1203
          result = None
      except:
        # In this case, the group method completely failed.
1204
        exc_info = sys.exc_info()
1205
        for m in new_message_list:
1206
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
1207
        LOG('WARNING ActivityTool', 0,
1208
            'Could not call method %s on objects %s' %
1209
            (method_id, expanded_object_list), error=exc_info)
1210 1211 1212
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
1226
            m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1227 1228 1229 1230
          else:
            try:
              m.activateResult(self, result, object)
            except:
1231
              m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1232
            else:
1233
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1234
      if self.activity_tracking:
1235
        activity_tracking_logger.info('invoked group messages')
1236

1237 1238
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1239
      # Some Security Cheking should be made here XXX
Vincent Pelletier's avatar
Vincent Pelletier committed
1240 1241
      if not is_initialized:
        self.initialize()
1242
      self.getActivityBuffer()
1243
      activity_dict[activity].queueMessage(aq_inner(self),
1244
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1245

1246
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1247 1248 1249 1250 1251 1252
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1253
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1254
      if REQUEST is not None:
1255 1256
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1257

1258
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1259 1260 1261 1262 1263 1264
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1265
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1266
      if REQUEST is not None:
1267 1268
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1269

1270 1271
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1272
    def manageClearActivities(self, keep=1, REQUEST=None):
1273 1274 1275 1276 1277
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

1278 1279
      # Obtain all pending messages.
      message_list = []
1280
      if keep:
1281
        for activity in activity_dict.itervalues():
1282 1283 1284 1285 1286 1287
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
1288 1289 1290
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
1291 1292

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
1293 1294 1295 1296 1297
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
1298
          LOG('CMFActivity', WARNING,
1299
              'could not drop the message table',
1300 1301 1302
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

1303
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
1304 1305 1306 1307 1308
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
1309
          LOG('CMFActivity', WARNING,
1310
              'could not drop the message queue table',
1311 1312 1313
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

1314 1315 1316
      # Reactivate the messages.
      for m in message_list:
        try:
1317
          m.reactivate(aq_inner(self))
1318 1319 1320 1321
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
1322 1323
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
1324

1325
      if REQUEST is not None:
1326 1327 1328 1329 1330 1331
        message = 'Activities%20Cleared'
        if keep:
          message = 'Tables%20Recreated'
        return REQUEST.RESPONSE.redirect(
            '%s/manageActivitiesAdvanced?manage_tabs_message=%s' % (
              self.absolute_url(), message))
1332

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1333
    security.declarePublic('getMessageList')
1334
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1335 1336 1337
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1338
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1339 1340
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1341

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1342
      message_list = []
1343
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1344
        try:
1345
          message_list += activity.getMessageList(aq_inner(self),**kw)
Sebastien Robin's avatar
Sebastien Robin committed
1346 1347
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1348 1349
      return message_list

1350 1351 1352 1353 1354 1355
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1356
      for activity in activity_dict.itervalues():
1357
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1368
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1369 1370 1371 1372
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1373
      for activity in activity_dict.itervalues():
1374
        message_count += activity.countMessage(aq_inner(self), **kw)
1375 1376
      return message_count

1377
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1378
    def newActiveProcess(self, **kw):
1379 1380
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
1381
      return addActiveProcess(self, new_id, **kw)
1382

1383
    # Active synchronisation methods
1384
    security.declarePrivate('validateOrder')
1385
    def validateOrder(self, message, validator_id, validation_value):
1386 1387 1388 1389 1390
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
Vincent Pelletier's avatar
Vincent Pelletier committed
1391 1392
      if not is_initialized:
        self.initialize()
1393
      message_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
1394
      method_id = "_validate_%s" % validator_id
1395
      for activity in activity_dict.itervalues():
1396 1397 1398 1399 1400 1401
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
            message_list.extend([(activity, m) for m in result])
      return message_list
1402

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1403 1404
    # Required for tests (time shift)
    def timeShift(self, delay):
Vincent Pelletier's avatar
Vincent Pelletier committed
1405 1406
      if not is_initialized:
        self.initialize()
1407
      for activity in activity_dict.itervalues():
1408
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1409

1410
InitializeClass(ActivityTool)