ActivityTool.py 61.2 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34
from collections import defaultdict
35
from cPickle import dumps, loads
36
from Products.CMFCore import permissions as CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40
from Products.CMFActivity.ActivityConnection import ActivityConnection
41
from Products.PythonScripts.Utility import allow_class
42
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
43 44 45 46
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
47
from AccessControl.User import system as system_user
48
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser
49
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
50
from Acquisition import aq_base, aq_inner, aq_parent
51
from ActivityBuffer import ActivityBuffer
52
from ActivityRuntimeEnvironment import BaseMessage
53
from zExceptions import ExceptionFormatter
54
from BTrees.OIBTree import OIBTree
55 56
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
57
from zope.site.hooks import setSite
58
import transaction
59
from App.config import getConfiguration
60

61 62 63 64
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
localizer_contexts = Products.Localizer.patches._requests
LocalizerContext = lambda request: request
65

66

67
from ZODB.POSException import ConflictError
68
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69

70
from zLOG import LOG, INFO, WARNING, ERROR
71
import warnings
72
from time import time
73 74

try:
75
  from Products.TimerService import getTimerService
76
except ImportError:
77 78
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80
from traceback import format_list, extract_stack
81

Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
86 87
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
88
is_running_lock = threading.Lock()
89
currentNode = None
90
_server_address = None
91 92
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93

94 95 96 97 98
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
99
activity_tracking_logger = logging.getLogger('Tracking')
100
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
101 102 103 104 105 106 107 108 109 110

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
111 112
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
113 114 115 116 117 118 119 120
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

121 122 123 124 125 126 127 128
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

129 130 131
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
132 133
global_activity_buffer = defaultdict(dict)
from thread import get_ident
134

135 136 137 138
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

139

140 141 142 143
class SkippedMessage(Exception):
  pass


144
class Message(BaseMessage):
145
  """Activity Message Class.
146

147 148
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
149

150
  active_process = None
151
  active_process_uid = None
152
  call_traceback = None
153
  exc_info = None
154 155 156
  is_executed = MESSAGE_NOT_EXECUTED
  processing = None
  traceback = None
157
  oid = None
158
  is_registered = False
159

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  def __init__(
      self,
      url,
      oid,
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
    self.oid = oid
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178 179
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
180
    if getattr(portal_activities, 'activity_creation_trace', False):
181 182 183 184
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
185
    self.user_name = str(_getAuthenticatedUser(self))
186
    # Store REQUEST Info
187
    self.request_info = {}
188
    if request is not None:
189 190 191 192 193 194 195 196 197
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198

199 200 201 202 203 204 205 206
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

207 208 209 210 211 212 213
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

214 215 216 217 218 219
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

220
  def getObject(self, activity_tool):
221
    """return the object referenced in this message."""
222
    try:
223
      obj = self._getObject(activity_tool)
224
    except KeyError:
225 226 227
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
          % (self.object_path,), error=sys.exc_info())
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
228
    else:
229 230 231
      if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and
          # XXX: BusinessTemplate must be fixed to preserve OID
          'portal_workflow' not in self.object_path):
232 233 234 235 236
        raise ValueError("OID mismatch for %r" % obj)
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
237 238 239 240
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
241 242 243 244 245 246 247 248 249 250
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
251
        obj = self._getObject(activity_tool)
252 253 254 255
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
      except StandardError:
        pass
    return 1
256

257
  def changeUser(self, user_name, activity_tool):
258
    """restore the security context for the calling user."""
259 260 261
    portal = activity_tool.getPortalObject()
    portal_uf = portal.acl_users
    uf = portal_uf
262
    user = uf.getUserById(user_name)
263
    # if the user is not found, try to get it from a parent acl_users
264
    # XXX this is still far from perfect, because we need to store all
265
    # information about the user (like original user folder, roles) to
266 267
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
268
    if user is None:
269
      uf = portal.aq_parent.acl_users
270
      user = uf.getUserById(user_name)
271
    if user is None and user_name == system_user.getUserName():
272 273 274 275
      # The following logic partly comes from unrestricted_apply()
      # implementation in ERP5Type.UnrestrictedMethod but we get roles
      # from the portal to have more roles.
      uf = portal_uf
276 277
      role_list = uf.valid_roles()
      user = PrivilegedUser(user_name, None, role_list, ()).__of__(uf)
278 279 280
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
281
      transaction.get().setUser(user_name, '/'.join(uf.getPhysicalPath()))
282
    else :
283
      LOG("CMFActivity", WARNING,
284
          "Unable to find user %r in the portal" % user_name)
285
      noSecurityManager()
286 287
    return user

288 289 290 291 292
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
293
    active_process.postResult(result)
294

Jean-Paul Smets's avatar
Jean-Paul Smets committed
295
  def __call__(self, activity_tool):
296
    try:
297
      obj = self.getObject(activity_tool)
298
      if obj is not None:
299 300
        old_security_manager = getSecurityManager()
        try:
301 302 303
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
304 305 306
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
307 308 309
          transaction.get().note(
            'CMFActivity ' + '/'.join(self.object_path) + '/' + self.method_id
          )
310 311 312 313
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
314
          else:
315
            result = method(*self.args, **self.kw)
316 317 318 319
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
320 321 322 323
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
324
          self.setExecutionState(MESSAGE_EXECUTED)
325 326
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
327

328 329 330 331 332
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

333
  def notifyUser(self, activity_tool, retry=False):
334 335
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
336
    user_email = portal.getProperty('email_to_address',
337
                       portal.getProperty('email_from_address'))
338 339
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
340
    fail_count = self.line.retry + 1
341
    if retry:
342 343 344 345
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
346
    mail_text = """From: %s <%s>
347
To: %s
348
Subject: %s: %s/%s
349

350
Node: %s
351
Failures: %s
352
User name: %r
353
Uid: %u
354 355
Document: %s
Method: %s
356 357
Arguments: %r
Named Parameters: %r
358 359
""" % (email_from_name, activity_tool.email_from_address, user_email, message,
       path, self.method_id, activity_tool.getCurrentNode(), fail_count,
360
       self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
361 362 363 364
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
365
    try:
366
      portal.MailHost.send(mail_text)
Vincent Pelletier's avatar
Vincent Pelletier committed
367
    except (socket.error, MailHostError), message:
368 369
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
370

371
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
372
    # Reactivate the original object.
373
    obj = self._getObject(activity_tool)
374
    old_security_manager = getSecurityManager()
375
    try:
376 377 378
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
379
      active_obj = obj.activate(activity=activity, **self.activity_kw)
380 381 382
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
383
      setSecurityManager(old_security_manager)
384

385 386 387 388 389 390
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
391

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
412
    if is_executed == MESSAGE_NOT_EXECUTED:
413
      if not exc_info:
414
        exc_info = sys.exc_info()
415 416
      if self.on_error_callback is not None:
        self.exc_info = exc_info
417 418
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
419 420 421
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
422 423
        except Exception:
          exc_info = sys.exc_info()
424 425
      elif exc_info[0] is SkippedMessage:
        return
426 427 428 429 430
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
431
          error_log.raising(exc_info)
432
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
433 434 435 436

  def getExecutionState(self):
    return self.is_executed

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
class GroupedMessage(object):
  __slots__ = 'object', '_message', 'result', 'exc_info'

  def __init__(self, object, message):
    self.object = object
    self._message = message

  args = property(lambda self: self._message.args)
  kw = property(lambda self: self._message.kw)

  def raised(self, exc_info=None):
    self.exc_info = exc_info or sys.exc_info()
    try:
      del self.result
    except AttributeError:
      pass

# XXX: Allowing restricted code to implement a grouping method is questionable
#      but there already exist some.
456
  __parent__ = property(lambda self: self.object) # for object
457
  _guarded_writes = 1 # for result
458
allow_class(GroupedMessage)
459 460 461 462 463 464 465 466

# Activity Registration
def activity_dict():
  from Activity import SQLDict, SQLQueue
  return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict()


Vincent Pelletier's avatar
Vincent Pelletier committed
467 468
class Method(object):
  __slots__ = (
469 470 471 472 473 474 475 476 477
    '_portal_activities',
    '_passive_url',
    '_passive_oid',
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
478
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
479

480 481
  def __init__(self, portal_activities, passive_url, passive_oid, activity,
      active_process, active_process_uid, kw, method_id, request):
482 483 484 485 486 487 488 489 490
    self._portal_activities = portal_activities
    self._passive_url = passive_url
    self._passive_oid = passive_oid
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
491 492

  def __call__(self, *args, **kw):
493
    portal_activities = self._portal_activities
494
    m = Message(
495 496 497 498 499 500
      url=self._passive_url,
      oid=self._passive_oid,
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
501 502
      args=args,
      kw=kw,
503
      request=self._request,
504 505
      portal_activities=portal_activities,
    )
506
    if portal_activities.activity_tracking:
507
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
508
    portal_activities.getActivityBuffer().deferredQueueMessage(
509
      portal_activities, activity_dict[self._activity], m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
510

511 512
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
513 514 515
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
516 517
    '__passive_url',
    '__passive_oid',
Vincent Pelletier's avatar
Vincent Pelletier committed
518 519
    '__activity',
    '__active_process',
520
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
521 522 523
    '__kw',
    '__request',
  )
524 525 526
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

527 528
  def __init__(self, portal_activities, url, oid, activity, active_process,
      active_process_uid, kw, request):
529
    # second parameter can be an object or an object's path
530
    self.__portal_activities = portal_activities
531 532
    self.__passive_url = url
    self.__passive_oid = oid
533 534
    self.__activity = activity
    self.__active_process = active_process
535
    self.__active_process_uid = active_process_uid
536 537 538 539 540 541
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
542 543
      self.__passive_url,
      self.__passive_oid,
544 545
      self.__activity,
      self.__active_process,
546
      self.__active_process_uid,
547 548 549 550
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
551

552
  def __repr__(self):
553 554
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
555

556 557 558
# True when activities cannot be executing any more.
has_processed_shutdown = False

559 560 561 562 563 564 565 566
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
567

568
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
569
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
570 571 572 573 574 575 576 577 578 579 580 581
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
582 583 584
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
585
    portal_type = 'Activity Tool'
586
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
587 588
    security = ClassSecurityInfo()

589 590
    isIndexable = False

591 592
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
593
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
594
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
595
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
596
                     ,
597
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
598 599 600 601

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

602 603 604
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

605 606
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
607

608 609
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
610

611 612
    distributingNode = ''
    _nodes = ()
613
    activity_creation_trace = False
614
    activity_tracking = False
615
    activity_timing_log = False
616
    cancel_and_invoke_links_hidden = False
617

618 619 620 621 622
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

623 624 625 626
    def __init__(self, id=None):
        if id is None:
          id = ActivityTool.id
        return Folder.__init__(self, id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
627

628 629 630
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
631
        all = Folder.filtered_meta_types(self)
632 633 634 635 636 637
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

638 639 640 641 642 643 644 645 646 647 648 649 650 651
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

652
    security.declarePrivate('initialize')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
653
    def initialize(self):
654
      self.maybeMigrateConnectionClass()
655 656
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=False)
657

658 659
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
660 661 662 663 664 665
      """
      return True, if we are subscribed to TimerService.
      Otherwise return False.
      """
      service = getTimerService(self)
      if service:
666
        path = '/'.join(self.getPhysicalPath())
667 668 669
        return path in service.lisSubscriptions()
      LOG('ActivityTool', INFO, 'TimerService not available')
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
670

671
    security.declareProtected(Permissions.manage_properties, 'subscribe')
672
    def subscribe(self, REQUEST=None, RESPONSE=None):
673 674
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
675
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
676
        if not service:
677
            LOG('ActivityTool', INFO, 'TimerService not available')
678 679 680 681
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
682 683
        if RESPONSE is not None:
            RESPONSE.redirect(url)
684 685

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
686
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
687 688
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
689
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
690
        if not service:
691
            LOG('ActivityTool', INFO, 'TimerService not available')
692 693 694 695
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
696 697
        if RESPONSE is not None:
            RESPONSE.redirect(url)
698

699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

801 802
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
803
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
804

805 806
    def manage_afterAdd(self, item, container):
        self.subscribe()
807
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828

    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
        global _server_address
        if _server_address is None:
            ip = port = ''
            from asyncore import socket_map
            for k, v in socket_map.items():
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
            if ip == '0.0.0.0':
                ip = socket.gethostbyname(socket.gethostname())
            _server_address = '%s:%s' %(ip, port)
        return _server_address

829
    def getCurrentNode(self):
830
        """ Return current node identifier """
831 832
        global currentNode
        if currentNode is None:
833 834 835 836 837 838 839 840
          currentNode = getattr(
            getConfiguration(),
            'product_config',
            {},
          ).get('cmfactivity', {}).get('node-id')
        if currentNode is None:
          warnings.warn('Node name auto-generation is deprecated, please add a'
            '\n'
841
            '<product-config CMFActivity>\n'
842 843 844 845
            '  node-id = ...\n'
            '</product-config>\n'
            'section in your zope.conf, replacing "..." with a cluster-unique '
            'node identifier.', DeprecationWarning)
846
          currentNode = self.getServerAddress()
847
        return currentNode
848

849 850 851 852 853
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

854 855 856 857 858 859 860 861 862 863 864 865 866
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
867
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
868 869 870 871 872
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
873 874 875 876 877 878 879 880 881 882 883
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
          server_address = self.getServerAddress()
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
884 885 886 887 888 889 890 891 892 893 894 895
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

896
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
897 898
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
899

900 901
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
902
      return isinstance(node_name, str)
903

904 905
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
906
        """ set the distributing node """
907
        if not distributingNode or self._isValidNodeName(distributingNode):
908 909 910 911 912 913 914 915 916 917 918 919 920
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
977

978 979 980 981 982
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
983
        global has_processed_shutdown
984 985
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
986 987 988 989
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

990
    def process_timer(self, tick, interval, prev="", next=""):
991 992 993 994 995 996 997 998
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
999
        try:
1000 1001 1002 1003 1004 1005
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
1006
          old_sm = getSecurityManager()
1007
          try:
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

            currentNode = self.getCurrentNode()
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1025
            try:
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
                error=sys.exc_info())
1039 1040
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1041
        finally:
1042
          timerservice_lock.release()
1043

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1044 1045 1046 1047 1048 1049
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Call distribute on each queue
1050
      for activity in activity_dict.itervalues():
1051
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1052

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1053
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1054
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1055 1056
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1057
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1058
      """
1059
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1060 1061

      # return if the number of threads is too high
1062
      # else, increase the number of active_threads and continue
1063 1064
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
1065
      if not too_many_threads or force:
1066
        active_threads += 1
1067 1068 1069
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
1070
      tic_lock.release()
1071

1072
      inner_self = aq_inner(self)
1073

1074
      try:
1075 1076 1077
        # Loop as long as there are activities. Always process the queue with
        # "highest" priority. If several queues have same highest priority, do
        # not choose one that has just been processed.
1078
        # This algorithm is fair enough because we only have 2 queues.
1079 1080 1081 1082 1083 1084 1085 1086
        # Otherwise, a round-robin of highest-priority queues would be required.
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
        last = None
        def sort_key(activity):
          return activity.getPriority(self), activity is last
        while is_running_lock.acquire(0):
          try:
1087
            for last in sorted(activity_dict.values(), key=sort_key):
1088 1089 1090 1091 1092 1093 1094
              # Transaction processing is the responsability of the activity
              if not last.dequeueMessage(inner_self, processing_node):
                break
            else:
              break
          finally:
            is_running_lock.release()
1095 1096 1097 1098 1099
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1100

1101
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1102
      # Check in each queue if the object has deferred tasks
1103 1104
      # if not argument is provided, then check on self
      if len(args) > 0:
1105
        obj = args[0]
1106
      else:
1107
        obj = self
1108
      for activity in activity_dict.itervalues():
1109
        if activity.hasActivity(aq_inner(self), obj, **kw):
1110 1111
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1112

1113 1114 1115 1116 1117 1118 1119 1120
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1136
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1137
      my_thread_key = get_ident()
1138 1139 1140
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1141
        if create_if_not_found:
1142
          buffer = ActivityBuffer()
1143 1144 1145
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1146
        return buffer
1147

1148
    def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        oid = None
        url = tuple(object.split('/'))
      else:
        try:
          oid = aq_base(object)._p_oid
          # Note that it's too early to get the OID of a newly created object,
          # so at this point, self.oid may still be None.
        except AttributeError:
          pass
        url = object.getPhysicalPath()
      if kw.get('serialization_tag', False) is None:
        del kw['serialization_tag']
      return ActiveWrapper(self, url, oid, activity,
                           active_process, active_process_uid, kw,
1172
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1173

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1174
    def getRegisteredMessageList(self, activity):
1175
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1176
      if activity_buffer is not None:
1177 1178
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1179
                                                 aq_inner(self))
1180 1181
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1182

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1183
    def unregisterMessage(self, activity, message):
1184 1185 1186
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1187

1188
    def flush(self, obj, invoke=0, **kw):
1189
      self.getActivityBuffer()
1190 1191
      if isinstance(obj, tuple):
        object_path = obj
1192
      else:
1193
        object_path = obj.getPhysicalPath()
1194
      for activity in activity_dict.itervalues():
1195
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1196 1197

    def invoke(self, message):
1198
      if self.activity_tracking:
1199
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1200
      old_localizer_context = False
1201 1202
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1203
        base_chain = [aq_base(x) for x in self.aq_chain]
1204 1205 1206
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1207 1208 1209 1210 1211 1212 1213 1214
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1215 1216
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1217 1218
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1219
        # XXX: PATH_INFO might not be set when runing unit tests.
1220
        if request.environ.get('PATH_INFO') is None:
1221
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1222

1223 1224
        # restore request information
        new_request = request.clone()
1225
        request_info = message.request_info
1226 1227
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1228 1229
        if '_script' in request_info:
          new_request._script = request_info['_script']
1230
        if 'SERVER_URL' in request_info:
1231
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1232 1233 1234
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1235
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1236 1237
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1238
          id = get_ident()
1239
          localizer_lock.acquire()
1240
          try:
1241 1242
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1243
          finally:
1244 1245
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1246
          new_request.processInputs()
1247 1248

        new_request_container = request_container.__class__(REQUEST=new_request)
1249 1250 1251 1252 1253 1254 1255 1256
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1257 1258 1259
      try:
        message(my_self)
      finally:
1260 1261 1262 1263
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1264 1265
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1266
          id = get_ident()
1267
          localizer_lock.acquire()
1268
          try:
1269 1270
            if old_localizer_context is None:
              del localizer_contexts[id]
1271
            else:
1272
              localizer_contexts[id] = old_localizer_context
1273
          finally:
1274
            localizer_lock.release()
1275
      if self.activity_tracking:
1276
        activity_tracking_logger.info('invoked message')
1277 1278 1279
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1280

1281
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1282
      if self.activity_tracking:
1283 1284 1285
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1286
      # Invoke a group method.
1287
      message_dict = {}
1288
      path_set = set()
1289 1290 1291
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1292
      for m in message_list:
1293 1294
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1295
        try:
1296 1297 1298
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1299
          message_dict[m] = expanded_object_list = []
1300
          for subobj in object_list:
1301 1302 1303 1304
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1305
              path_set.add(path)
1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1316
              expanded_object_list.append(GroupedMessage(subobj, m))
1317
        except:
1318
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1319

1320
      expanded_object_list = sum(message_dict.itervalues(), [])
1321
      try:
1322
        if expanded_object_list:
1323
          traverse = self.getPortalObject().unrestrictedTraverse
1324
          # FIXME: how to apply security here?
1325
          # NOTE: The callee must update each processed item of
1326 1327 1328 1329 1330
          #       expanded_object_list, by setting:
          #       - 'exc_info' in case of error
          #       - 'result' otherwise, with None or the result to post
          #          on the active process
          #       Skipped item must not be touched.
1331
          traverse(method_id)(expanded_object_list)
1332 1333
      except:
        # In this case, the group method completely failed.
1334
        exc_info = sys.exc_info()
1335
        for m in message_dict:
1336
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1337
        LOG('WARNING ActivityTool', 0,
1338
            'Could not call method %s on objects %s' %
1339 1340
            (method_id, [x.object for x in expanded_object_list]),
            error=exc_info)
1341 1342 1343
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1344
      else:
1345 1346 1347 1348
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
1349 1350 1351 1352 1353 1354
            try:
              if result.result is not None:
                result_list.append(result)
            except AttributeError:
              exc_info = getattr(result, "exc_info", (SkippedMessage,))
              break # failed or skipped message
1355 1356
          else:
            try:
1357 1358 1359
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
1360
                  m.activateResult(active_process, result.result, result.object)
1361
            except:
1362
              exc_info = None
1363
            else:
1364
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1365
              continue
1366
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, context=self)
1367
      if self.activity_tracking:
1368
        activity_tracking_logger.info('invoked group messages')
1369

1370 1371 1372 1373
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
1374 1375
          user_name = None
          sm = getSecurityManager()
1376 1377
          try:
            for m in message_list:
1378 1379 1380 1381
              message = m._message
              if user_name != message.user_name:
                user_name = message.user_name
                message.changeUser(user_name, m.object)
1382
              m.result = getattr(m.object, method_id)(*m.args, **m.kw)
1383
          except Exception:
1384
            m.raised()
1385 1386
          finally:
            setSecurityManager(sm)
1387 1388 1389
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1390 1391
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1392
      # Some Security Cheking should be made here XXX
1393
      self.getActivityBuffer()
1394
      activity_dict[activity].queueMessage(aq_inner(self),
1395 1396
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1397

1398
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1399 1400 1401 1402 1403 1404
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1405
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1406
      if REQUEST is not None:
1407 1408
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1409

1410
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1411 1412 1413 1414
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1415 1416 1417 1418
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
1419 1420 1421 1422
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1423
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1435
          self.absolute_url(), 'manageActivities'))
1436 1437 1438 1439 1440 1441

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1442 1443 1444 1445
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1446
      if REQUEST is not None:
1447 1448
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1449

1450 1451
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1452
    def manageClearActivities(self, keep=1, RESPONSE=None):
1453
      """
1454
        Recreate tables, clearing all activities
1455
      """
1456 1457
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=True)
1458

1459 1460 1461
      if RESPONSE is not None:
        return RESPONSE.redirect(self.absolute_url_path() +
          '/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
1462

1463 1464 1465 1466 1467 1468 1469 1470 1471
    security.declarePublic('getMessageTempObjectList')
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1472
        message.__dict__.update(**sql_message.__dict__)
1473 1474 1475
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1476
    security.declarePublic('getMessageList')
1477
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1478 1479 1480
      """
        List messages waiting in queues
      """
1481 1482
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1483

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1484
      message_list = []
1485
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1486
        try:
1487
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1488 1489
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1490 1491
      return message_list

1492 1493 1494 1495 1496 1497
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1498
      for activity in activity_dict.itervalues():
1499
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1510
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1511 1512 1513 1514
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1515
      for activity in activity_dict.itervalues():
1516
        message_count += activity.countMessage(aq_inner(self), **kw)
1517 1518
      return message_count

1519
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1520
    def newActiveProcess(self, REQUEST=None, **kw):
1521 1522
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1523
      obj = self.newContent(portal_type="Active Process", **kw)
1524 1525 1526
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1527

1528
    # Active synchronisation methods
1529
    security.declarePrivate('validateOrder')
1530
    def validateOrder(self, message, validator_id, validation_value):
1531 1532 1533 1534 1535 1536
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
      message_list = []
1537
      method_id = "_validate_" + validator_id
1538
      for activity in activity_dict.itervalues():
1539 1540 1541 1542
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
1543
            message_list += [(activity, m) for m in result]
1544
      return message_list
1545

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1546 1547
    # Required for tests (time shift)
    def timeShift(self, delay):
1548
      for activity in activity_dict.itervalues():
1549
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1550

1551
InitializeClass(ActivityTool)