TaskDistributionTool.py 11.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
##############################################################################
#
# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
#                    Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################

import random
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
from Products.ERP5Type.Tool.BaseTool import BaseTool
from zLOG import LOG
from xmlrpclib import Binary

class TaskDistributionTool(BaseTool):
  """
  A Task distribution tool (used for ERP5 unit test runs).
  """

  id = 'portal_task_distribution'
  meta_type = 'ERP5 Task Distribution Tool'
  portal_type = 'Task Distribution Tool'
  allowed_types = ()

  security = ClassSecurityInfo()
  security.declareObjectProtected(Permissions.AccessContentsInformation)

  security.declarePublic('getProtocolRevision')
  def getProtocolRevision(self):
    """
    """
    return 1

  def _getTestResultNode(self, test_result, node_title):
    node_list = [x for x in test_result.objectValues(
       portal_type='Test Result Node') if x.getTitle() == node_title]
    node_list_len = len(node_list)
    assert node_list_len in (0, 1)
    node = None
    if len(node_list):
      node = node_list[0]
    return node

  security.declarePublic('createTestResult')
  def createTestResult(self, name, revision, test_name_list, allow_restart,
                       test_title=None, node_title=None, project_title=None):
    """(temporary)
      - name (string)
      - revision (string representation of an integer)
      - test_name_list (list of strings)
      - allow_restart (boolean)

      XXX 'revision' should be a string representing the full revision
          of the tested code, because some projects are tested with different
          revisions of ERP5.

      -> (test_result_path, revision) or None if already completed
    """
    LOG('createTestResult', 0, (name, revision, test_title, project_title))
    portal = self.getPortalObject()
    if test_title is None:
      test_title = name
    def createNode(test_result, node_title):
      if node_title is not None:
        node = self._getTestResultNode(test_result, node_title)
        if node is None:
          node = test_result.newContent(portal_type='Test Result Node',
                                 title=node_title)
          node.start()
91 92
    def createTestResultLineList(test_result, test_name_list):
      duration_list = []
93 94 95 96 97 98 99 100 101
      previous_test_result_list = portal.test_result_module.searchFolder(
             title='=%s' % test_result.getTitle(),
             sort_on=[('creation_date','descending')],
             simulation_state='stopped',
             limit=1)
      if len(previous_test_result_list):
        previous_test_result = previous_test_result_list[0].getObject()
        for line in previous_test_result.objectValues():
          if line.getSimulationState() == 'stopped':
102 103 104
            duration_list.append((line.getTitle(),line.getProperty('duration')))
      duration_list.sort(key=lambda x: -x[1])
      sorted_test_list = [x[0] for x in duration_list]
105
      for test_name in test_name_list:
106 107 108 109 110 111
        index = 0
        if sorted_test_list:
          try:
            index = sorted_test_list.index(test_name)
          except ValueError:
            pass
112
        line = test_result.newContent(portal_type='Test Result Line',
113 114
                                      title=test_name,
                                      int_index=index)
115 116 117 118 119 120 121 122 123 124
    reference_list_string = None
    if type(revision) is str and '=' in revision:
      reference_list_string = revision
      int_index, reference = None, revision
    elif type(revision) is str:
      # backward compatibility
      int_index, reference = revision, None
    else:
      # backward compatibility
      int_index, reference = revision
125 126
    result_list = portal.test_result_module.searchFolder(
                         portal_type="Test Result",
127
                         title="=%s" % test_title,
128 129 130 131
                         sort_on=(("creation_date","descending"),),
                         limit=1)
    if result_list:
      test_result = result_list[0].getObject()
132
      if test_result is not None:
133 134 135 136 137 138 139 140 141
        last_state = test_result.getSimulationState()
        last_revision = str(test_result.getIntIndex())
        if last_state == 'started':
          createNode(test_result, node_title)
          reference = test_result.getReference()
          if reference_list_string:
            last_revision = reference
          elif reference:
            last_revision = last_revision, reference
142 143
          if len(test_result.objectValues(portal_type="Test Result Line")) == 0 \
              and len(test_name_list):
144
            test_result.serialize() # prevent duplicate test result lines
145
            createTestResultLineList(test_result, test_name_list)
146
          return test_result.getRelativeUrl(), last_revision
147
        if last_state in ('stopped',):
148
          if reference_list_string is not None:
149 150
            if reference_list_string == test_result.getReference() \
                and not allow_restart:
151 152 153 154 155 156 157
              return
          elif last_revision == int_index and not allow_restart:
            return
    test_result = portal.test_result_module.newContent(
      portal_type='Test Result',
      title=test_title,
      reference=reference,
158
      is_indexable=False)
159
    if int_index is not None:
160
      test_result._setIntIndex(int_index)
161 162 163
    if project_title is not None:
      project_list = portal.portal_catalog(portal_type='Project',
                                           title='="%s"' % project_title)
164
      if len(project_list) != 1:
165 166
        raise ValueError('found this list of project : %r for title %r' % \
                      ([x.path for x in project_list], project_title))
167
      test_result._setSourceProjectValue(project_list[0].getObject())
168 169
    test_result.updateLocalRolesOnSecurityGroups() # XXX
    test_result.start()
170 171 172 173
    del test_result.isIndexable
    test_result.immediateReindexObject()
    self.serialize() # prevent duplicate test result
    # following 2 functions only call 'newContent' on test_result
174
    createTestResultLineList(test_result, test_name_list)
175
    createNode(test_result, node_title)
176
    return test_result.getRelativeUrl(), revision
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

  security.declarePublic('startUnitTest')
  def startUnitTest(self, test_result_path, exclude_list=()):
    """(temporary)
      - test_result_path (string)
      - exclude_list (list of strings)

      -> test_path (string), test_name (string)
         or None if finished
    """
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    if test_result.getSimulationState() != 'started':
      return
    started_list = []
192 193
    for line in test_result.objectValues(portal_type="Test Result Line",
                                         sort_on=[("int_index","ascending")]):
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
      test = line.getTitle()
      if test not in exclude_list:
        state = line.getSimulationState()
        test = line.getRelativeUrl(), test
        if state == 'draft':
          line.start()
          return test
        # XXX Make sure we finish all tests.
        if state == 'started':
          started_list.append(test)
    if started_list:
      return random.choice(started_list)

  security.declarePublic('stopUnitTest')
  def stopUnitTest(self, test_path, status_dict):
    """(temporary)
      - test_path (string)
      - status_dict (dict)
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.stopUnitTest", 0, repr((test_path,status_dict)))
    portal = self.getPortalObject()
    line = portal.restrictedTraverse(test_path)
    test_result = line.getParentValue()
    if test_result.getSimulationState() == 'started':
219
      if line.getSimulationState() == "started":
220
        line.stop(**status_dict)
221 222 223
      if set([x.getSimulationState() for x in test_result.objectValues(
                portal_type="Test Result Line")]) == set(["stopped"]):
        test_result.stop()
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

  def _extractXMLRPCDict(self, xmlrpc_dict):
    """
    extract all xmlrpclib.Binary instance
    """
    return dict([(x,isinstance(y, Binary) and y.data or y) \
       for (x, y) in xmlrpc_dict.iteritems()])

  security.declarePublic('reportTaskFailure')
  def reportTaskFailure(self, test_result_path, status_dict, node_title):
    """report failure when a node can not handle task
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.reportTaskFailure", 0, repr((test_result_path,
                                                          status_dict)))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    node = self._getTestResultNode(test_result, node_title)
    assert node is not None
    node.fail(**status_dict)
    for node in test_result.objectValues(portal_type='Test Result Node'):
      if node.getSimulationState() != 'failed':
        break
    else:
248 249
      if test_result.getSimulationState() not in ('failed', 'cancelled'):
        test_result.fail()
250 251 252 253 254 255 256 257 258 259 260 261 262 263

  security.declarePublic('reportTaskStatus')
  def reportTaskStatus(self, test_result_path, status_dict, node_title):
    """report status of node
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.reportTaskStatus", 0, repr((test_result_path,
                                                          status_dict)))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    node = self._getTestResultNode(test_result, node_title)
    assert node is not None
    node.edit(cmdline=status_dict['command'],
              stdout=status_dict['stdout'], stderr=status_dict['stderr'])
264 265 266 267 268 269 270 271 272

  security.declarePublic('isTaskAlive')
  def isTaskAlive(self, test_result_path):
    """check status of a test suite
    """
    LOG("TaskDistributionTool.checkTaskStatus", 0, repr(test_result_path))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    return test_result.getSimulationState() == "started" and 1 or 0
273 274 275 276 277 278 279 280 281

  security.declareObjectProtected(Permissions.AccessContentsInformation)
  def getMemcachedDict(self):
    """ Return a dictionary used for non persistent data related to distribution
    """
    portal = self.getPortalObject()
    memcached_dict = portal.portal_memcached.getMemcachedDict(
                            "task_distribution", "default_memcached_plugin")
    return memcached_dict