ActivityTool.py 62.4 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34
from collections import defaultdict
35
from cPickle import dumps, loads
36
from Products.CMFCore import permissions as CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40
from Products.CMFActivity.ActivityConnection import ActivityConnection
41
from Products.PythonScripts.Utility import allow_class
42
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
43 44 45 46
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
47
from AccessControl.User import system as system_user
48
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser
49
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
50
from Acquisition import aq_base, aq_inner, aq_parent
51
from ActivityBuffer import ActivityBuffer
52
from ActivityRuntimeEnvironment import BaseMessage
53
from zExceptions import ExceptionFormatter
54
from BTrees.OIBTree import OIBTree
55 56
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
57
from zope.site.hooks import setSite
58
import transaction
59
from App.config import getConfiguration
60

61 62 63 64
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
localizer_contexts = Products.Localizer.patches._requests
LocalizerContext = lambda request: request
65

66

67
from ZODB.POSException import ConflictError
68
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69

70
from zLOG import LOG, INFO, WARNING, ERROR
71
import warnings
72
from time import time
73 74

try:
75
  from Products.TimerService import getTimerService
76
except ImportError:
77 78
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80
from traceback import format_list, extract_stack
81

Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
Vincent Pelletier's avatar
Vincent Pelletier committed
86
is_initialized = False
87 88
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
89
is_running_lock = threading.Lock()
90
currentNode = None
91
_server_address = None
92 93
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
94 95 96 97

# Activity Registration
activity_dict = {}

98 99 100 101 102
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
103
activity_tracking_logger = logging.getLogger('Tracking')
104
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
105 106 107 108 109 110 111 112 113 114

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
115 116
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
117 118 119 120 121 122 123 124
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

125 126 127 128 129 130 131 132
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

133 134 135
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
136 137
global_activity_buffer = defaultdict(dict)
from thread import get_ident
138

Jean-Paul Smets's avatar
Jean-Paul Smets committed
139 140 141
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
142
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
143 144 145
  activity_instance = activity()
  activity_dict[activity.__name__] = activity_instance

146 147 148 149
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

150 151

class Message(BaseMessage):
152
  """Activity Message Class.
153

154 155
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
156

157
  active_process = None
158
  active_process_uid = None
159
  call_traceback = None
160
  exc_info = None
161 162 163
  is_executed = MESSAGE_NOT_EXECUTED
  processing = None
  traceback = None
164
  oid = None
165
  is_registered = False
166

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
  def __init__(
      self,
      url,
      oid,
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
    self.oid = oid
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
183 184 185 186
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
187
    if getattr(portal_activities, 'activity_creation_trace', False):
188 189 190 191
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
192
    self.user_name = str(_getAuthenticatedUser(self))
193
    # Store REQUEST Info
194
    self.request_info = {}
195
    if request is not None:
196 197 198 199 200 201 202 203 204
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
205

206 207 208 209 210 211 212 213
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

214 215 216 217 218 219 220
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

221 222 223 224 225 226
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

227
  def getObject(self, activity_tool):
228
    """return the object referenced in this message."""
229
    try:
230
      obj = self._getObject(activity_tool)
231
    except KeyError:
232 233 234
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
          % (self.object_path,), error=sys.exc_info())
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
235
    else:
236 237 238
      if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and
          # XXX: BusinessTemplate must be fixed to preserve OID
          'portal_workflow' not in self.object_path):
239 240 241 242 243
        raise ValueError("OID mismatch for %r" % obj)
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
244 245 246 247
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
248 249 250 251 252 253 254 255 256 257
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
258
        obj = self._getObject(activity_tool)
259 260 261 262
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
      except StandardError:
        pass
    return 1
263

264
  def changeUser(self, user_name, activity_tool):
265
    """restore the security context for the calling user."""
266 267 268
    portal = activity_tool.getPortalObject()
    portal_uf = portal.acl_users
    uf = portal_uf
269
    user = uf.getUserById(user_name)
270
    # if the user is not found, try to get it from a parent acl_users
271
    # XXX this is still far from perfect, because we need to store all
272
    # information about the user (like original user folder, roles) to
273 274
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
275
    if user is None:
276
      uf = portal.aq_parent.acl_users
277
      user = uf.getUserById(user_name)
278
    if user is None and user_name == system_user.getUserName():
279 280 281 282
      # The following logic partly comes from unrestricted_apply()
      # implementation in ERP5Type.UnrestrictedMethod but we get roles
      # from the portal to have more roles.
      uf = portal_uf
283 284
      role_list = uf.valid_roles()
      user = PrivilegedUser(user_name, None, role_list, ()).__of__(uf)
285 286 287
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
288
      transaction.get().setUser(user_name, '/'.join(uf.getPhysicalPath()))
289
    else :
290
      LOG("CMFActivity", WARNING,
291
          "Unable to find user %r in the portal" % user_name)
292
      noSecurityManager()
293 294
    return user

295 296 297 298 299
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
300
    active_process.postResult(result)
301

Jean-Paul Smets's avatar
Jean-Paul Smets committed
302
  def __call__(self, activity_tool):
303
    try:
304
      obj = self.getObject(activity_tool)
305
      if obj is not None:
306 307
        old_security_manager = getSecurityManager()
        try:
308 309 310
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
311 312 313
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
314 315 316
          transaction.get().note(
            'CMFActivity ' + '/'.join(self.object_path) + '/' + self.method_id
          )
317 318 319 320
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
321
          else:
322
            result = method(*self.args, **self.kw)
323 324 325 326
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
327 328 329 330
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
331
          self.setExecutionState(MESSAGE_EXECUTED)
332 333
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
334

335 336 337 338 339
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

340
  def notifyUser(self, activity_tool, retry=False):
341 342
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
343
    user_email = portal.getProperty('email_to_address',
344
                       portal.getProperty('email_from_address'))
345 346
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
347
    fail_count = self.line.retry + 1
348
    if retry:
349 350 351 352
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
353
    mail_text = """From: %s <%s>
354
To: %s
355
Subject: %s: %s/%s
356

357
Node: %s
358
Failures: %s
359
User name: %r
360
Uid: %u
361 362
Document: %s
Method: %s
363 364
Arguments: %r
Named Parameters: %r
365 366
""" % (email_from_name, activity_tool.email_from_address, user_email, message,
       path, self.method_id, activity_tool.getCurrentNode(), fail_count,
367
       self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
368 369 370 371
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
372
    try:
373
      portal.MailHost.send(mail_text)
Vincent Pelletier's avatar
Vincent Pelletier committed
374
    except (socket.error, MailHostError), message:
375 376
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
377

378
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
379
    # Reactivate the original object.
380
    obj = self._getObject(activity_tool)
381
    old_security_manager = getSecurityManager()
382
    try:
383 384 385
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
386
      active_obj = obj.activate(activity=activity, **self.activity_kw)
387 388 389
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
390
      setSecurityManager(old_security_manager)
391

392 393 394 395 396 397
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
419
    if is_executed == MESSAGE_NOT_EXECUTED:
420
      if not exc_info:
421
        exc_info = sys.exc_info()
422 423
      if self.on_error_callback is not None:
        self.exc_info = exc_info
424 425
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
426 427 428
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
429 430
        except Exception:
          exc_info = sys.exc_info()
431 432 433 434 435
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
436
          error_log.raising(exc_info)
437
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
438 439 440 441

  def getExecutionState(self):
    return self.is_executed

Vincent Pelletier's avatar
Vincent Pelletier committed
442 443
class Method(object):
  __slots__ = (
444 445 446 447 448 449 450 451 452
    '_portal_activities',
    '_passive_url',
    '_passive_oid',
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
453
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
454

455 456
  def __init__(self, portal_activities, passive_url, passive_oid, activity,
      active_process, active_process_uid, kw, method_id, request):
457 458 459 460 461 462 463 464 465
    self._portal_activities = portal_activities
    self._passive_url = passive_url
    self._passive_oid = passive_oid
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
466 467

  def __call__(self, *args, **kw):
468
    portal_activities = self._portal_activities
469
    m = Message(
470 471 472 473 474 475
      url=self._passive_url,
      oid=self._passive_oid,
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
476 477
      args=args,
      kw=kw,
478
      request=self._request,
479 480
      portal_activities=portal_activities,
    )
481
    if portal_activities.activity_tracking:
482
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
483
    portal_activities.getActivityBuffer().deferredQueueMessage(
484
      portal_activities, activity_dict[self._activity], m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
485

486 487
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
488 489 490
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
491 492
    '__passive_url',
    '__passive_oid',
Vincent Pelletier's avatar
Vincent Pelletier committed
493 494
    '__activity',
    '__active_process',
495
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
496 497 498
    '__kw',
    '__request',
  )
499 500 501
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

502 503
  def __init__(self, portal_activities, url, oid, activity, active_process,
      active_process_uid, kw, request):
504
    # second parameter can be an object or an object's path
505
    self.__portal_activities = portal_activities
506 507
    self.__passive_url = url
    self.__passive_oid = oid
508 509
    self.__activity = activity
    self.__active_process = active_process
510
    self.__active_process_uid = active_process_uid
511 512 513 514 515 516
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
517 518
      self.__passive_url,
      self.__passive_oid,
519 520
      self.__activity,
      self.__active_process,
521
      self.__active_process_uid,
522 523 524 525
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
526

527
  def __repr__(self):
528 529
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
530

531 532 533
# True when activities cannot be executing any more.
has_processed_shutdown = False

534 535 536 537 538 539 540 541
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
542

543
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
544
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
545 546 547 548 549 550 551 552 553 554 555 556
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
557 558 559
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
560
    portal_type = 'Activity Tool'
561
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
562 563
    security = ClassSecurityInfo()

564 565
    isIndexable = False

566 567
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
568
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
569
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
570
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
571
                     ,
572
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
573 574 575 576

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

577 578 579
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

580 581
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
582

583 584
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
585

586 587
    distributingNode = ''
    _nodes = ()
588 589 590
    activity_creation_trace = False
    activity_tracking = False
    activity_timing_log = False
591
    cancel_and_invoke_links_hidden = False
592

593 594 595 596 597
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

598 599 600 601
    def __init__(self, id=None):
        if id is None:
          id = ActivityTool.id
        return Folder.__init__(self, id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
602

603 604 605
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
606
        all = Folder.filtered_meta_types(self)
607 608 609 610 611 612
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

613 614 615 616 617 618 619 620 621 622 623 624 625 626
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
627 628
    def initialize(self):
      global is_initialized
629
      from Activity import SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
630
      # Initialize each queue
631
      for activity in activity_dict.itervalues():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
632
        activity.initialize(self)
633
      self.maybeMigrateConnectionClass()
Vincent Pelletier's avatar
Vincent Pelletier committed
634
      is_initialized = True
635

636 637
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
638
        """
639 640 641 642 643 644 645
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
646

647 648 649 650
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
651

652
    security.declareProtected(Permissions.manage_properties, 'subscribe')
653
    def subscribe(self, REQUEST=None, RESPONSE=None):
654 655
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
656
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
657
        if not service:
658
            LOG('ActivityTool', INFO, 'TimerService not available')
659 660 661 662
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
663 664
        if RESPONSE is not None:
            RESPONSE.redirect(url)
665 666

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
667
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
668 669
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
670
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
671
        if not service:
672
            LOG('ActivityTool', INFO, 'TimerService not available')
673 674 675 676
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
677 678
        if RESPONSE is not None:
            RESPONSE.redirect(url)
679

680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

782 783
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
784
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
785

786 787
    def manage_afterAdd(self, item, container):
        self.subscribe()
788
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809

    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
        global _server_address
        if _server_address is None:
            ip = port = ''
            from asyncore import socket_map
            for k, v in socket_map.items():
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
            if ip == '0.0.0.0':
                ip = socket.gethostbyname(socket.gethostname())
            _server_address = '%s:%s' %(ip, port)
        return _server_address

810
    def getCurrentNode(self):
811
        """ Return current node identifier """
812 813
        global currentNode
        if currentNode is None:
814 815 816 817 818 819 820 821
          currentNode = getattr(
            getConfiguration(),
            'product_config',
            {},
          ).get('cmfactivity', {}).get('node-id')
        if currentNode is None:
          warnings.warn('Node name auto-generation is deprecated, please add a'
            '\n'
822
            '<product-config CMFActivity>\n'
823 824 825 826
            '  node-id = ...\n'
            '</product-config>\n'
            'section in your zope.conf, replacing "..." with a cluster-unique '
            'node identifier.', DeprecationWarning)
827
          currentNode = self.getServerAddress()
828
        return currentNode
829

830 831 832 833 834
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

835 836 837 838 839 840 841 842 843 844 845 846 847
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
848
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
849 850 851 852 853
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
854 855 856 857 858 859 860 861 862 863 864
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
          server_address = self.getServerAddress()
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
865 866 867 868 869 870 871 872 873 874 875 876
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

877
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
878 879
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
880

881 882
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
883
      return isinstance(node_name, str)
884

885 886
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
887
        """ set the distributing node """
888
        if not distributingNode or self._isValidNodeName(distributingNode):
889 890 891 892 893 894 895 896 897 898 899 900 901
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
958

959 960 961 962 963
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
964
        global has_processed_shutdown
965 966
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
967 968 969 970
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

971
    def process_timer(self, tick, interval, prev="", next=""):
972 973 974 975 976 977 978 979
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
980
        try:
981 982 983 984 985 986
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
987
          old_sm = getSecurityManager()
988
          try:
989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

            currentNode = self.getCurrentNode()
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1006
            try:
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
                error=sys.exc_info())
1020 1021
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1022
        finally:
1023
          timerservice_lock.release()
1024

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1025 1026 1027 1028 1029 1030
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1031 1032
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1033 1034

      # Call distribute on each queue
1035
      for activity in activity_dict.itervalues():
1036
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1037

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1038
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1039
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1040 1041
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1042
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1043
      """
1044
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1045 1046

      # return if the number of threads is too high
1047
      # else, increase the number of active_threads and continue
1048 1049
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
1050
      if not too_many_threads or force:
1051
        active_threads += 1
1052 1053 1054
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
1055
      tic_lock.release()
1056

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1057
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1058 1059
      if not is_initialized:
        self.initialize()
1060

1061
      inner_self = aq_inner(self)
1062

1063
      try:
1064 1065 1066
        # Loop as long as there are activities. Always process the queue with
        # "highest" priority. If several queues have same highest priority, do
        # not choose one that has just been processed.
1067
        # This algorithm is fair enough because we only have 2 queues.
1068 1069 1070 1071 1072 1073 1074 1075
        # Otherwise, a round-robin of highest-priority queues would be required.
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
        last = None
        def sort_key(activity):
          return activity.getPriority(self), activity is last
        while is_running_lock.acquire(0):
          try:
1076
            for last in sorted(activity_dict.values(), key=sort_key):
1077 1078 1079 1080 1081 1082 1083
              # Transaction processing is the responsability of the activity
              if not last.dequeueMessage(inner_self, processing_node):
                break
            else:
              break
          finally:
            is_running_lock.release()
1084 1085 1086 1087 1088
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1089

1090
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1091
      # Check in each queue if the object has deferred tasks
1092 1093
      # if not argument is provided, then check on self
      if len(args) > 0:
1094
        obj = args[0]
1095
      else:
1096
        obj = self
1097
      for activity in activity_dict.itervalues():
1098
        if activity.hasActivity(aq_inner(self), obj, **kw):
1099 1100
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1101

1102 1103 1104 1105 1106 1107 1108 1109
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1125
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1126
      my_thread_key = get_ident()
1127 1128 1129
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1130
        if create_if_not_found:
1131
          buffer = ActivityBuffer()
1132 1133 1134
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1135
        return buffer
1136

1137
    def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1138 1139
      if not is_initialized:
        self.initialize()
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        oid = None
        url = tuple(object.split('/'))
      else:
        try:
          oid = aq_base(object)._p_oid
          # Note that it's too early to get the OID of a newly created object,
          # so at this point, self.oid may still be None.
        except AttributeError:
          pass
        url = object.getPhysicalPath()
      if kw.get('serialization_tag', False) is None:
        del kw['serialization_tag']
      return ActiveWrapper(self, url, oid, activity,
                           active_process, active_process_uid, kw,
1163
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1164

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1165
    def getRegisteredMessageList(self, activity):
1166
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1167
      if activity_buffer is not None:
1168 1169
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1170
                                                 aq_inner(self))
1171 1172
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1173

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1174
    def unregisterMessage(self, activity, message):
1175 1176 1177
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1178

1179
    def flush(self, obj, invoke=0, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1180 1181
      if not is_initialized:
        self.initialize()
1182
      self.getActivityBuffer()
1183 1184
      if isinstance(obj, tuple):
        object_path = obj
1185
      else:
1186
        object_path = obj.getPhysicalPath()
1187
      for activity in activity_dict.itervalues():
1188
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1189 1190

    def invoke(self, message):
1191
      if self.activity_tracking:
1192
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1193
      old_localizer_context = False
1194 1195
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1196
        base_chain = [aq_base(x) for x in self.aq_chain]
1197 1198 1199
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1200 1201 1202 1203 1204 1205 1206 1207
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1208 1209
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1210 1211
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1212
        # XXX: PATH_INFO might not be set when runing unit tests.
1213
        if request.environ.get('PATH_INFO') is None:
1214
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1215

1216 1217
        # restore request information
        new_request = request.clone()
1218
        request_info = message.request_info
1219 1220
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1221 1222
        if '_script' in request_info:
          new_request._script = request_info['_script']
1223
        if 'SERVER_URL' in request_info:
1224
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1225 1226 1227
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1228
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1229 1230
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1231
          id = get_ident()
1232
          localizer_lock.acquire()
1233
          try:
1234 1235
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1236
          finally:
1237 1238
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1239
          new_request.processInputs()
1240 1241

        new_request_container = request_container.__class__(REQUEST=new_request)
1242 1243 1244 1245 1246 1247 1248 1249
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1250 1251 1252
      try:
        message(my_self)
      finally:
1253 1254 1255 1256
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1257 1258
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1259
          id = get_ident()
1260
          localizer_lock.acquire()
1261
          try:
1262 1263
            if old_localizer_context is None:
              del localizer_contexts[id]
1264
            else:
1265
              localizer_contexts[id] = old_localizer_context
1266
          finally:
1267
            localizer_lock.release()
1268
      if self.activity_tracking:
1269
        activity_tracking_logger.info('invoked message')
1270 1271 1272
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1273

1274
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1275
      if self.activity_tracking:
1276 1277 1278
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1279
      # Invoke a group method.
1280
      message_dict = {}
1281
      path_set = set()
1282 1283 1284
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1285
      for m in message_list:
1286 1287
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1288
        try:
1289 1290 1291
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1292
          message_dict[m] = expanded_object_list = []
1293
          for subobj in object_list:
1294 1295 1296 1297
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1298
              path_set.add(path)
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1309
              expanded_object_list.append([subobj, m.args, m.kw, None])
1310
        except:
1311
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1312

1313
      expanded_object_list = sum(message_dict.itervalues(), [])
1314 1315
      try:
        if len(expanded_object_list) > 0:
1316
          traverse = self.getPortalObject().unrestrictedTraverse
1317
          # FIXME: how to apply security here?
1318 1319 1320 1321
          # NOTE: expanded_object_list[*][3] must be updated by the callee:
          #       it must be deleted in case of failure, or updated with the
          #       result to post on the active process otherwise.
          traverse(method_id)(expanded_object_list)
1322 1323
      except:
        # In this case, the group method completely failed.
1324
        exc_info = sys.exc_info()
1325
        for m in message_dict:
1326
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1327
        LOG('WARNING ActivityTool', 0,
1328
            'Could not call method %s on objects %s' %
1329
            (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
1330 1331 1332
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1333
      else:
1334 1335 1336 1337 1338 1339 1340 1341
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
            if len(result) != 4:
              break # message marked as failed by the group_method_id
            elif result[3] is not None:
              result_list.append(result)
1342 1343
          else:
            try:
1344 1345 1346 1347
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
                  m.activateResult(active_process, result[3], result[0])
1348
            except:
1349
              pass
1350
            else:
1351
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1352 1353
              continue
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1354
      if self.activity_tracking:
1355
        activity_tracking_logger.info('invoked group messages')
1356

1357 1358 1359 1360 1361
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
          for m in message_list:
1362
            m[3] = getattr(m[0], method_id)(*m[1], **m[2])
1363 1364 1365
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1366 1367
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1368
      # Some Security Cheking should be made here XXX
Vincent Pelletier's avatar
Vincent Pelletier committed
1369 1370
      if not is_initialized:
        self.initialize()
1371
      self.getActivityBuffer()
1372
      activity_dict[activity].queueMessage(aq_inner(self),
1373 1374
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1375

1376
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1377 1378 1379 1380 1381 1382
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1383
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1384
      if REQUEST is not None:
1385 1386
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1387

1388
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1389 1390 1391 1392
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1393 1394 1395 1396
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
1397 1398 1399 1400
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1401
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1413
          self.absolute_url(), 'manageActivities'))
1414 1415 1416 1417 1418 1419

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1420 1421 1422 1423
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1424
      if REQUEST is not None:
1425 1426
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1427

1428 1429
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1430
    def manageClearActivities(self, keep=1, REQUEST=None):
1431 1432 1433
      """
        Clear all activities and recreate tables.
      """
1434
      folder = self.getPortalObject().portal_skins.activity
1435

1436
      # Obtain all pending messages.
1437
      message_list_dict = {}
1438
      if keep:
1439
        for activity in activity_dict.itervalues():
1440 1441
          if hasattr(activity, 'dumpMessageList'):
            try:
1442 1443
              message_list_dict[activity.__class__.__name__] =\
                                    activity.dumpMessageList(self)
1444 1445 1446
            except ConflictError:
              raise
            except:
1447 1448 1449
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
1450 1451

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
1452 1453 1454 1455 1456
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
1457
          LOG('CMFActivity', WARNING,
1458
              'could not drop the message table',
1459 1460 1461
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

1462
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
1463 1464 1465 1466 1467
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
1468
          LOG('CMFActivity', WARNING,
1469
              'could not drop the message queue table',
1470 1471 1472
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

1473
      # Reactivate the messages.
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
      for activity, message_list in message_list_dict.iteritems():
        for m in message_list:
          try:
            m.reactivate(aq_inner(self), activity=activity)
          except ConflictError:
            raise
          except:
            LOG('ActivityTool', WARNING,
                'could not reactivate the message %r, %r' %
                (m.object_path, m.method_id), error=sys.exc_info())
1484

1485
      if REQUEST is not None:
1486 1487 1488 1489 1490 1491
        message = 'Activities%20Cleared'
        if keep:
          message = 'Tables%20Recreated'
        return REQUEST.RESPONSE.redirect(
            '%s/manageActivitiesAdvanced?manage_tabs_message=%s' % (
              self.absolute_url(), message))
1492

1493 1494 1495 1496 1497 1498 1499 1500 1501
    security.declarePublic('getMessageTempObjectList')
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1502
        message.__dict__.update(**sql_message.__dict__)
1503 1504 1505
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1506
    security.declarePublic('getMessageList')
1507
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1508 1509 1510
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1511
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1512 1513
      if not is_initialized:
        self.initialize()
1514 1515
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1516

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1517
      message_list = []
1518
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1519
        try:
1520
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1521 1522
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1523 1524
      return message_list

1525 1526 1527 1528 1529 1530
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1531
      for activity in activity_dict.itervalues():
1532
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1543
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1544 1545 1546 1547
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1548
      for activity in activity_dict.itervalues():
1549
        message_count += activity.countMessage(aq_inner(self), **kw)
1550 1551
      return message_count

1552
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1553
    def newActiveProcess(self, REQUEST=None, portal_type='Active Process', **kw):
1554 1555
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1556
      obj = self.newContent(portal_type=portal_type, **kw)
1557 1558 1559
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1560

1561
    # Active synchronisation methods
1562
    security.declarePrivate('validateOrder')
1563
    def validateOrder(self, message, validator_id, validation_value):
1564 1565 1566 1567 1568
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
Vincent Pelletier's avatar
Vincent Pelletier committed
1569 1570
      if not is_initialized:
        self.initialize()
1571
      message_list = []
1572
      method_id = "_validate_" + validator_id
1573
      for activity in activity_dict.itervalues():
1574 1575 1576 1577
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
1578
            message_list += [(activity, m) for m in result]
1579
      return message_list
1580

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1581 1582
    # Required for tests (time shift)
    def timeShift(self, delay):
Vincent Pelletier's avatar
Vincent Pelletier committed
1583 1584
      if not is_initialized:
        self.initialize()
1585
      for activity in activity_dict.itervalues():
1586
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1587

1588
InitializeClass(ActivityTool)