SQLQueue.py 16.1 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30
from Products.CMFActivity.ActivityTool import registerActivity
31
from RAMQueue import RAMQueue
32
from DateTime import DateTime
33
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
Romain Courteaud's avatar
Romain Courteaud committed
35
from ZODB.POSException import ConflictError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
36

37 38 39 40 41
try:
  from transaction import get as get_transaction
except ImportError:
  pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
42 43
from zLOG import LOG

44 45 46 47 48 49 50 51 52 53 54 55 56
MAX_PRIORITY = 5

priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1

class ActivityFlushError(Exception):
    """Error during active message flush"""

class SQLQueue(RAMQueue):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58 59 60
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61
  """
62
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
63 64 65 66
    if m.is_registered:
      activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
67
                                          broadcast = m.activity_kw.get('broadcast', 0),
68
                                          message = self.dumpMessage(m),
69 70
                                          date = m.activity_kw.get('at_date', DateTime()),
                                          tag = m.activity_kw.get('tag', ''))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
71

72 73
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
74
    #LOG("prepareDeleteMessage", 0, str(m.__dict__))
75
    activity_tool.SQLQueue_delMessage(uid = m.uid)
76

Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
  def dequeueMessage(self, activity_tool, processing_node):
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
    # Next processing date in case of error
    next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
    priority = random.choice(priority_weight)
    # Try to find a message at given priority level
    result = readMessage(processing_node=processing_node, priority=priority,
                         to_date=now_date)
    if len(result) == 0:
      # If empty, take any message
      result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
    if len(result) > 0:
      line = result[0]
      path = line.path
      method_id = line.method_id
      # Make sure message can not be processed anylonger
      activity_tool.SQLQueue_processMessage(uid=line.uid)
      get_transaction().commit() # Release locks before starting a potentially long calculation
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116

      # At this point, the message is marked as processed.
      try:
        m = self.loadMessage(line.message)
        # Make sure object exists
        validation_state = m.validate(self, activity_tool)
        if validation_state is not VALID:
          if validation_state in (EXCEPTION, INVALID_PATH):
            if line.priority > MAX_PRIORITY:
              # This is an error.
              # Assign message back to 'error' state.
              activity_tool.SQLQueue_assignMessage(uid=line.uid,
                                                   processing_node = VALIDATE_ERROR_STATE)
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
117
          else:
118 119 120
            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
                                                priority = line.priority)
121
            get_transaction().commit() # Release locks before starting a potentially long calculation
122
          return 0
123 124 125 126 127

        # Try to invoke
        activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
        if m.is_executed:                                          # Make sure message could be invoked
          get_transaction().commit()                                        # If successful, commit
128 129
      except ConflictError:
        # If a conflict occurs, catch it and delay the operation.
130 131 132 133 134
        get_transaction().abort()
        activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
                                           priority = line.priority)
        get_transaction().commit()
        return 0
135 136 137 138 139 140 141
      except:
        # For the other exceptions, put it into an error state.
        get_transaction().abort()
        activity_tool.SQLQueue_assignMessage(uid = line.uid, processing_node = INVOKE_ERROR_STATE)
        get_transaction().commit()
        return 0

142

143
      if m.is_executed:
144
        activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
145
      else:
146 147 148 149 150 151
        get_transaction().abort()                                         # If not, abort transaction and start a new one
        if line.priority > MAX_PRIORITY:
          # This is an error
          activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
                                                                            # Assign message back to 'error' state
          m.notifyUser(activity_tool)                                       # Notify Error
152
        else:
153 154 155
          # Lower priority
          activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
                                              priority = line.priority + 1)
156
      get_transaction().commit()
157 158
      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
159
    return 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160

161
  def hasActivity(self, activity_tool, object, **kw):
162 163
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
164 165
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
166
        result = hasMessage(path=my_object_path, **kw)
167 168 169 170
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
171
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172

173
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
174 175
    """
      object_path is a tuple
176 177 178 179 180 181 182 183

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
184
    """
185 186
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
187 188 189 190 191 192 193 194 195
      #return # Do nothing here to precent overlocking
      path = '/'.join(object_path)
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if object_path == m.object_path and (method_id is None or method_id == m.method_id):
          if invoke: activity_tool.invoke(m)
          activity_tool.unregisterMessage(self, m)
      # Parse each message in SQL queue
      #LOG('Flush', 0, str((path, invoke, method_id)))
196
      result = readMessageList(path=path, method_id=method_id,processing_node=None)
197 198 199 200 201 202 203 204 205 206 207 208
      #LOG('Flush', 0, str(len(result)))
      method_dict = {}
      for line in result:
        path = line.path
        method_id = line.method_id
        if not method_dict.has_key(method_id):
          # Only invoke once (it would be different for a queue)
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          self.deleteMessage(activity_tool, m)
          if invoke:
            # First Validate
209
            if m.validate(self, activity_tool) is VALID:
210 211 212 213 214 215
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
                    'Could not evaluate %s on %s' % (method_id , path))
            else:
216 217
              # The message no longer exists
              raise ActivityFlushError, (
218
                  'The document %s does not exist' % path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
219

220 221 222
  # def start(self, activity_tool, active_process=None):
  #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
  #   activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE)
223

224 225 226
  # def stop(self, activity_tool, active_process=None):
  #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
  #   activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE)
227

228
  def getMessageList(self, activity_tool, processing_node=None,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
229
    message_list = []
230 231 232
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None)
233 234 235 236 237
      for line in result:
        m = self.loadMessage(line.message)
        m.processing_node = line.processing_node
        m.priority = line.priority
        message_list.append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
238 239
    return message_list

240 241 242
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
243 244 245
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
246 247 248 249
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list
250

251 252
  def distribute(self, activity_tool, node_count):
    processing_node = 1
253 254 255
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
256 257 258
      #LOG('distribute count',0,str(len(result)) )
      #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
      #get_transaction().commit() # Release locks before starting a potentially long calculation
259 260 261 262 263 264 265
      result = list(result)[0:100]
      for line in result:
        broadcast = line.broadcast
        uid = line.uid
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
266 267 268 269 270 271 272 273 274
          if node_count > 1:
            for node in range(2, node_count+1):
              activity_tool.SQLQueue_writeMessage( path = line.path,
                                                  method_id = line.method_id,
                                                  priority = line.priority,
                                                  broadcast = 1,
                                                  processing_node = node,
                                                  message = line.message,
                                                  date = line.date)
275 276 277 278 279 280 281
        else:
          #LOG("distribute", 0, "assign %s" % uid)
          activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
          #get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
282

283 284 285
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
286 287 288
    #get_transaction().commit()
    if type(value) == type(''):
      value = [value]
289
    result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
290 291
    #LOG('SQLQueue._validate_after_method_id, method_id',0,value)
    #LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
292 293 294
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
295

296 297
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
298 299
    if type(value) == type(''):
      value = [value]
300 301 302 303
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
304

305 306 307 308 309 310 311
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id and path
    if (type(value) != type( (0,) ) and type(value) != type ([])) or len(value)<2:
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method : %s' % repr(value))
      return VALID
    path = value[0]
    method = value[1]
    if type(path) == type(''):
      path = [path]
    if type(method) == type(''):
      method = [method]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, path=path)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
327

328 329 330 331 332 333 334 335
  def _validate_after_tag(self, activity_tool, message, value):
    # Count number of occurances of tag
    if type(value) == type(''):
      value = [value]
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, tag=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
336

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
      return VALID
    tag = value[0]
    method = value[1]
    if type(tag) == type(''):
      tag = [tag]
    if type(method) == type(''):
      method = [method]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, tag=tag)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
352

353
  # Required for tests (time shift)
354
  def timeShift(self, activity_tool, delay, processing_node = None):
355 356 357 358
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
359
    activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node)
360

Jean-Paul Smets's avatar
Jean-Paul Smets committed
361
registerActivity(SQLQueue)