Commit e2a8b1ea authored by Rafael Monnerat's avatar Rafael Monnerat

Initial implementation of erp5.util.taskdistribution

Implement a common library to connect to ERP5 Task Distribution Tool, this should be
the central place for use this.
parent 09cb059a
taskdistribution
----------------
This library contains the API to use ERP5 Task Distribution as Client.
Basic Usage Example:
>>> from erp5.util.taskdistribution import TaskDistributionClient
>>> client = TaskDistributionClient("http://www.url.master.node.com/")
>>> print client.getProtocolRevision()
safeRpcCall called with method : getProtocolRevision
1
>>> client.isTaskAlive("test_result_module/20120425-725D3B5/290")
safeRpcCall called with method : isTaskAlive
0
Using RemoteLogger to keep updated the information on Task Distribution Module:
1) First you need a log function
>>> from erp5.util.log import DefaultLogger
>>> default_log = DefaultLogger("TestLogger")
>>> default_log.enableFileLog("/tmp/default_log")
2) Create the Remote Logger and invoke it
>>> logger = RemoteLogger(default_log, "/tmp/default_log", "ERP5-RAFAEL", None)
3) Update with Master URL and test_path
>>> logger.update("http://www.url.master.node.com/", "test_result_module/20120425-725D3B5")
4) Call it, and this will handle the update of the information for you, you can use
thread to monitor in parallel some process group.
>>> logger()
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime
import time
from erp5.util.taskdistribution import TaskDistributionClient
class RemoteLogger(object):
def __init__(self, log, log_file, test_node_title, process_manager=None):
self.portal = None
self.test_result_path = None
self.test_node_title = test_node_title
self.log = log
self.log_file = log_file
self.process_manager = process_manager
self.finish = False
self.quit = False
def update(self, portal, test_result_path):
self.portal = portal
self.test_result_path = test_result_path
def getSize(self):
erp5testnode_log = open(self.log_file, 'r')
try:
erp5testnode_log.seek(0, 2)
size = erp5testnode_log.tell()
finally:
erp5testnode_log.close()
return size
def __call__(self):
size = self.getSize()
while True:
for x in xrange(0,60):
if self.quit or self.finish:
break
time.sleep(1)
if self.quit:
return
finish = retry = self.finish
if self.test_result_path is None:
if finish:
return
continue
start_size = size
end_size = self.getSize()
# file was truncated
if end_size < start_size:
size = end_size
continue
# display some previous data
if start_size >= 5000:
start_size -= 5000
# do not send tons of log, only last logs
if (end_size-start_size >= 10000):
start_size = end_size - 10000
erp5testnode_log = open(self.log_file, 'r')
try:
erp5testnode_log.seek(start_size)
output = erp5testnode_log.read()
finally:
erp5testnode_log.close()
if end_size == size:
output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S")
# check if the test result is still alive
task_distribution_client = TaskDistributionClient(self.portal, self.log)
is_alive = task_distribution_client.isTaskAlive(self.test_result_path, retry=False)
self.log('isTaskAlive result %r' % is_alive)
if is_alive is not None and is_alive == 0:
self.log('Test Result cancelled on server side, stop current test')
if self.process_manager is None:
self.log('Unable to invoke kill previous run on ProcessManager, ' +\
'self.process_manager is None.')
return
self.process_manager.killPreviousRun(cancellation=True)
return
status_dict = dict(command='erp5testnode', status_code=0,
stdout=''.join(output), stderr='')
task_distribution_client.reportTaskStatus(self.test_result_path, status_dict,
self.test_node_title, retry=retry)
size = end_size
if finish:
return
import socket
import sys
import xmlrpclib
import time
from erp5.util.log import DefaultLogger
def safeRpcCall(log, proxy, function_id, retry, *args):
# this method will try infinitive calls to backend
# this can cause testnode to looked "stalled"
retry_time = 64
while True:
try:
# it safer to pass proxy and function_id so we avoid httplib.ResponseNotReady
# by trying reconnect before server keep-alive ends and the socket closes
log('safeRpcCall called with method : %s' % function_id)
function = getattr(proxy, function_id)
return function(*args)
except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e:
log('Exception in safeRpcCall when trying %s with %r' % (function_id, args),
exc_info=sys.exc_info())
if not(retry):
return
log('will retry safeRpcCall in %i seconds' % retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
class TaskDistributionClient(object):
def __init__(self, master_url, log=None):
"""
Create one xmlrpclib connection to an ERP5 which
contains the
"""
assert master_url is not None
if master_url[-1] != "/":
master_url += '/'
self.master_url = "%sportal_task_distribution" % master_url
if log is None:
log = DefaultLogger("TaskDistributionClient")
log.enableConsole()
self.log = log
def __safeRpcCall(self, function_id, retry, *kw):
"""
Invoke safeRpcCall using self.log
"""
master = self.__getConnection()
return safeRpcCall(self.log, master, function_id, retry, *kw)
def __getConnection(self):
"""
Create one xmlrpclib connection to an ERP5 which
contains the portal_task_distribution installed.
"""
return xmlrpclib.ServerProxy(self.master_url, allow_none=1)
def reportTaskFailure(self, test_result_path, status_dict, node_title, retry=False):
""" report failure when a node can not handle task"""
return self.__safeRpcCall("reportTaskFailure", retry,
test_result_path, status_dict, node_title)
def reportTaskStatus(self, test_result_path, status_dict, node_title, retry=False):
""" report task current status """
return self.__safeRpcCall("reportTaskStatus", retry,
test_result_path, status_dict, node_title)
def createTestResult(self, name, revision, test_name_list, allow_restart,
test_title=None, node_title=None,
project_title=None, retry=False):
""" Request the creation of a new Test Result """
return self.__safeRpcCall("createTestResult", retry,
name, revision, test_name_list, allow_restart,
test_title, node_title, project_title)
def getProtocolRevision(self, retry=False):
""" Get Protocol Revision """
return self.__safeRpcCall("getProtocolRevision", retry)
def startUnitTest(self, test_result_path, exclude_list=(), retry=False):
""" Report the Unit Test Start """
return self.__safeRpcCall("startUnitTest", retry,
test_result_path, exclude_list)
def stopUnitTest(self, test_path, status_dict, retry=False):
""" Report the Unit Test Stop """
return self.__safeRpcCall("stopUnitTest", retry, test_path, status_dict)
def isTaskAlive(self, test_path, retry=False):
""" Verify if Task still alive """
return self.__safeRpcCall("isTaskAlive", retry, test_path)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment