Commit c791efc3 authored by Sebastien Robin's avatar Sebastien Robin

improve testnode for more reliability and better logging

- display the output of all commands launched by erp5testnode in
  erp5testnode log
- add a thread to upload ongoing logs to the master regularly
- if the software release is not built successfully after a
  few time, totally erase software. This help unblocking if
  buildout is unable to update software.
- check if the last test result was cancelled in order to
  allow relaunching test without restarting testnode
- improve logs to allow much better understanding of what is
  going on
parent b6921405
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import re
import subprocess
import threading
import signal
import sys
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, log, log_prefix, get_output=True):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
if get_output:
buffer.append(data)
if log_prefix:
data = "%s : " % log_prefix + data
data = data.rstrip('\n')
output(data)
if p.stdout:
stdout = []
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, log, stdout))
stdout_thread.daemon = True
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, log, stderr))
stderr_thread.daemon = True
stderr_thread.start()
p.wait()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
class ProcessManager(object):
stdin = file(os.devnull)
def __init__(self, log, *args, **kw):
self.log = log
self.process_pid_set = set()
signal.signal(signal.SIGTERM, self.sigterm_handler)
def spawn(self, *args, **kw):
get_output = kw.pop('get_output', True)
log_prefix = kw.pop('log_prefix', '')
new_session = kw.pop('new_session', False)
subprocess_kw = {}
cwd = kw.pop('cwd', None)
if cwd:
subprocess_kw['cwd'] = cwd
if new_session:
subprocess_kw['preexec_fn'] = os.setsid
raise_error_if_fail = kw.pop('raise_error_if_fail', True)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
self.log('subprocess_kw : %r' % (subprocess_kw,))
self.log('$ ' + command)
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, **subprocess_kw)
self.process_pid_set.add(p.pid)
stdout, stderr = subprocess_capture(p, self.log, log_prefix,
get_output=get_output)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
self.process_pid_set.remove(p.pid)
if raise_error_if_fail and p.returncode:
raise SubprocessError(result)
return result
def killPreviousRun(self):
self.log('ProcessManager killPreviousRun, going to kill %r' % (self.process_pid_set,))
for pgpid in self.process_pid_set:
try:
os.kill(pgpid, signal.SIGTERM)
except:
pass
try:
if os.path.exists(self.supervisord_pid_file):
supervisor_pid = int(open(self.supervisord_pid_file).read().strip())
self.log('ProcessManager killPreviousRun, going to kill supervisor with pid %r' % supervisor_pid)
os.kill(supervisor_pid, signal.SIGTERM)
except:
self.log('ProcessManager killPreviousRun, exception when killing supervisor')
pass
self.process_pid_set.clear()
def sigterm_handler(self, signal, frame):
self.log('SIGTERM_HANDLER')
sys.exit(1)
...@@ -36,10 +36,12 @@ MAX_SR_RETRIES = 3 ...@@ -36,10 +36,12 @@ MAX_SR_RETRIES = 3
class SlapOSControler(object): class SlapOSControler(object):
def __init__(self, config, log, process_group_pid_set=None, def __init__(self, config, log,
slapproxy_log=None): slapproxy_log=None, process_manager=None, reset_software=False):
log('SlapOSControler, initialize, reset_software: %r' % reset_software)
self.log = log self.log = log
self.config = config self.config = config
self.process_manager = process_manager
# By erasing everything, we make sure that we are able to "update" # By erasing everything, we make sure that we are able to "update"
# existing profiles. This is quite dirty way to do updates... # existing profiles. This is quite dirty way to do updates...
if os.path.exists(config['proxy_database']): if os.path.exists(config['proxy_database']):
...@@ -51,7 +53,7 @@ class SlapOSControler(object): ...@@ -51,7 +53,7 @@ class SlapOSControler(object):
kwargs['stderr'] = slapproxy_log_fp kwargs['stderr'] = slapproxy_log_fp
proxy = subprocess.Popen([config['slapproxy_binary'], proxy = subprocess.Popen([config['slapproxy_binary'],
config['slapos_config']], **kwargs) config['slapos_config']], **kwargs)
process_group_pid_set.add(proxy.pid) process_manager.process_pid_set.add(proxy.pid)
# XXX: dirty, giving some time for proxy to being able to accept # XXX: dirty, giving some time for proxy to being able to accept
# connections # connections
time.sleep(10) time.sleep(10)
...@@ -63,6 +65,14 @@ class SlapOSControler(object): ...@@ -63,6 +65,14 @@ class SlapOSControler(object):
self.software_profile, self.software_profile,
computer_guid=config['computer_id']) computer_guid=config['computer_id'])
computer = slap.registerComputer(config['computer_id']) computer = slap.registerComputer(config['computer_id'])
# Reset all previously generated software if needed
if reset_software:
software_root = config['software_root']
log('SlapOSControler : GOING TO RESET ALL SOFTWARE')
if os.path.exists(software_root):
shutil.rmtree(software_root)
os.mkdir(software_root)
os.chmod(software_root, 0750)
for i in range(0, MAX_PARTIONS): for i in range(0, MAX_PARTIONS):
# create partition and configure computer # create partition and configure computer
# XXX: at the moment all partitions do share same virtual interface address # XXX: at the moment all partitions do share same virtual interface address
...@@ -91,37 +101,23 @@ class SlapOSControler(object): ...@@ -91,37 +101,23 @@ class SlapOSControler(object):
'reference': config['computer_id'], 'reference': config['computer_id'],
'software_root': config['software_root']})) 'software_root': config['software_root']}))
def runSoftwareRelease(self, config, environment, process_group_pid_set=None, def spawn(self, *args, **kw):
stdout=None, stderr=None): return self.process_manager.spawn(*args, **kw)
def runSoftwareRelease(self, config, environment):
self.log("SlapOSControler.runSoftwareRelease") self.log("SlapOSControler.runSoftwareRelease")
cpu_count = os.sysconf("SC_NPROCESSORS_ONLN") cpu_count = os.sysconf("SC_NPROCESSORS_ONLN")
os.putenv('MAKEFLAGS', '-j%s' % cpu_count) os.putenv('MAKEFLAGS', '-j%s' % cpu_count)
os.environ['PATH'] = environment['PATH'] os.environ['PATH'] = environment['PATH']
command = [config['slapgrid_software_binary'], '-v', '-c',
#'--buildout-parameter',"'-U -N' -o",
config['slapos_config']]
# a SR may fail for number of reasons (incl. network failures) # a SR may fail for number of reasons (incl. network failures)
# so be tolerant and run it a few times before giving up # so be tolerant and run it a few times before giving up
for runs in range(0, MAX_SR_RETRIES): for runs in range(0, MAX_SR_RETRIES):
slapgrid = subprocess.Popen(command, status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c',
stdout=stdout, stderr=stderr, config['slapos_config'], raise_error_if_fail=False,
close_fds=True, preexec_fn=os.setsid) log_prefix='slapgrid_sr', get_output=False)
process_group_pid_set.add(slapgrid.pid)
slapgrid.wait()
stdout.seek(0)
stderr.seek(0)
process_group_pid_set.remove(slapgrid.pid)
status_dict = {'status_code':slapgrid.returncode,
'command': repr(command),
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
stderr.close()
return status_dict return status_dict
def runComputerPartition(self, config, environment, def runComputerPartition(self, config, environment,
process_group_pid_set=None,
stdout=None, stderr=None): stdout=None, stderr=None):
self.log("SlapOSControler.runComputerPartition") self.log("SlapOSControler.runComputerPartition")
slap = slapos.slap.slap() slap = slapos.slap.slap()
...@@ -130,26 +126,12 @@ class SlapOSControler(object): ...@@ -130,26 +126,12 @@ class SlapOSControler(object):
slap.registerOpenOrder().request(self.software_profile, slap.registerOpenOrder().request(self.software_profile,
partition_reference='testing partition', partition_reference='testing partition',
partition_parameter_kw=config['instance_dict']) partition_parameter_kw=config['instance_dict'])
command = [config['slapgrid_partition_binary'],
config['slapos_config'], '-c', '-v']
# try to run for all partitions as one partition may in theory request another one # try to run for all partitions as one partition may in theory request another one
# this not always is required but curently no way to know how "tree" of partitions # this not always is required but curently no way to know how "tree" of partitions
# may "expand" # may "expand"
for runs in range(0, MAX_PARTIONS): for runs in range(0, MAX_PARTIONS):
slapgrid = subprocess.Popen(command, status_dict = self.spawn(config['slapgrid_partition_binary'], '-v', '-c',
stdout=stdout, stderr=stderr, config['slapos_config'], raise_error_if_fail=False,
close_fds=True, preexec_fn=os.setsid) log_prefix='slapgrid_cp', get_output=False)
process_group_pid_set.add(slapgrid.pid)
slapgrid.wait()
process_group_pid_set.remove(slapgrid.pid)
stdout.seek(0)
stderr.seek(0)
status_dict = {'status_code':slapgrid.returncode,
'command': repr(command),
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
stderr.close()
return status_dict return status_dict
...@@ -32,69 +32,29 @@ import sys ...@@ -32,69 +32,29 @@ import sys
import threading import threading
from testnode import SubprocessError from testnode import SubprocessError
from ProcessManager import ProcessManager
SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$') SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$')
SVN_CHANGED_REV = re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE) SVN_CHANGED_REV = re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE)
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, quiet=False):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
output(data)
buffer.append(data)
if p.stdout:
stdout = []
output = quiet and (lambda data: None) or sys.stdout.write
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, output, stdout))
stdout_thread.setDaemon(True)
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, sys.stderr.write, stderr))
stderr_thread.setDaemon(True)
stderr_thread.start()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
p.wait()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
GIT_TYPE = 'git' GIT_TYPE = 'git'
SVN_TYPE = 'svn' SVN_TYPE = 'svn'
class Updater(object): class Updater(ProcessManager):
_git_cache = {} _git_cache = {}
stdin = file(os.devnull) stdin = file(os.devnull)
def __init__(self, repository_path, log, revision=None, git_binary=None, def __init__(self, repository_path, log, revision=None, git_binary=None,
realtime_output=True): realtime_output=True, process_manager=None):
self.log = log self.log = log
self.revision = revision self.revision = revision
self._path_list = [] self._path_list = []
self.repository_path = repository_path self.repository_path = repository_path
self.git_binary = git_binary self.git_binary = git_binary
self.realtime_output = realtime_output self.realtime_output = realtime_output
self.process_manager = process_manager
def getRepositoryPath(self): def getRepositoryPath(self):
return self.repository_path return self.repository_path
...@@ -128,25 +88,10 @@ class Updater(object): ...@@ -128,25 +88,10 @@ class Updater(object):
raise raise
def spawn(self, *args, **kw): def spawn(self, *args, **kw):
quiet = kw.pop('quiet', False) return self.process_manager.spawn(*args,
env = kw and dict(os.environ, **kw) or None log_prefix='git',
command = format_command(*args, **kw) cwd=self.getRepositoryPath(),
self.log('$ ' + command) **kw)
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env,
cwd=self.getRepositoryPath())
if self.realtime_output:
stdout, stderr = subprocess_capture(p, quiet)
else:
stdout, stderr = p.communicate()
self.log(stdout)
self.log(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
def _git(self, *args, **kw): def _git(self, *args, **kw):
return self.spawn(self.git_binary, *args, **kw)['stdout'].strip() return self.spawn(self.git_binary, *args, **kw)['stdout'].strip()
......
...@@ -30,7 +30,7 @@ import logging ...@@ -30,7 +30,7 @@ import logging
import os import os
import pkg_resources import pkg_resources
import testnode from testnode import TestNode
CONFIG = { CONFIG = {
'computer_id': 'COMPUTER', 'computer_id': 'COMPUTER',
...@@ -48,15 +48,21 @@ def main(*args): ...@@ -48,15 +48,21 @@ def main(*args):
parsed_argument = parser.parse_args(list(args)) parsed_argument = parser.parse_args(list(args))
else: else:
parsed_argument = parser.parse_args() parsed_argument = parser.parse_args()
logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s'
formatter = logging.Formatter(logger_format)
logging.basicConfig(level=logging.INFO,
format=logger_format)
logger = logging.getLogger('erp5testnode') logger = logging.getLogger('erp5testnode')
if parsed_argument.console or parsed_argument.logfile: if parsed_argument.console or parsed_argument.logfile:
logger.setLevel(logging.INFO)
if parsed_argument.console: if parsed_argument.console:
logger.addHandler(logging.StreamHandler()) logger.addHandler(logging.StreamHandler())
logger.info('Activated console output.') logger.info('Activated console output.')
if parsed_argument.logfile: if parsed_argument.logfile:
logger.addHandler(logging.FileHandler(filename=parsed_argument.logfile)) file_handler = logging.FileHandler(filename=parsed_argument.logfile)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info('Activated logfile %r output' % parsed_argument.logfile) logger.info('Activated logfile %r output' % parsed_argument.logfile)
CONFIG['log_file'] = parsed_argument.logfile
else: else:
logger.addHandler(logging.NullHandler()) logger.addHandler(logging.NullHandler())
CONFIG['logger'] = logger.info CONFIG['logger'] = logger.info
...@@ -118,4 +124,5 @@ def main(*args): ...@@ -118,4 +124,5 @@ def main(*args):
else: else:
instance_dict = {} instance_dict = {}
CONFIG['instance_dict'] = instance_dict CONFIG['instance_dict'] = instance_dict
testnode.run(CONFIG) testnode = TestNode(logger.info, CONFIG)
testnode.run()
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# #
############################################################################## ##############################################################################
from datetime import datetime
import os import os
import pprint
import signal import signal
import socket import socket
import subprocess import subprocess
...@@ -34,121 +34,155 @@ import time ...@@ -34,121 +34,155 @@ import time
import xmlrpclib import xmlrpclib
import glob import glob
import SlapOSControler import SlapOSControler
import logging import threading
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
from ProcessManager import SubprocessError, ProcessManager
from Updater import Updater from Updater import Updater
supervisord_pid_file = None DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
process_group_pid_set = set()
def sigterm_handler(signal, frame):
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler) supervisord_pid_file = None
def safeRpcCall(proxy, function_id, *args): def safeRpcCall(log, proxy, function_id, retry, *args):
# XXX: this method will try infinitive calls to backend # this method will try infinitive calls to backend
# this can cause testnode to looked "stalled" # this can cause testnode to looked "stalled"
retry = 64 retry_time = 64
while True: while True:
try: try:
# it safer to pass proxy and function_id so we avoid httplib.ResponseNotReady # it safer to pass proxy and function_id so we avoid httplib.ResponseNotReady
# by trying reconnect before server keep-alive ends and the socket closes # by trying reconnect before server keep-alive ends and the socket closes
log('safeRpcCall called with method : %s' % function_id)
function = getattr(proxy, function_id) function = getattr(proxy, function_id)
return function(*args) return function(*args)
except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e: except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e:
logging.warning(e) log('Exception in safeRpcCall when trying %s with %r' % (function_id, args),
pprint.pprint(args, file(function._Method__name, 'w')) exc_info=sys.exc_info())
time.sleep(retry) if not(retry):
retry += retry >> 1 return
log('will retry safeRpcCall in %i seconds' % retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
def getInputOutputFileList(config, command_name): class RemoteLogger(object):
stdout = open(os.path.join(
config['log_directory'],'%s_out' % command_name),
'w+')
stdout.write("%s\n" % command_name)
stderr = open(os.path.join(
config['log_directory'],'%s_err' % command_name),
'w+')
return (stdout, stderr)
slapos_controler = None def __init__(self, log, log_file, test_node_title):
self.portal = None
self.test_result_path = None
self.test_node_title = test_node_title
self.log = log
self.log_file = log_file
self.finish = False
self.quit = False
def killPreviousRun(): def update(self, portal, test_result_path):
for pgpid in process_group_pid_set: self.portal = portal
try: self.test_result_path = test_result_path
os.killpg(pgpid, signal.SIGTERM)
except: def getSize(self):
pass erp5testnode_log = open(self.log_file, 'r')
try: erp5testnode_log.seek(0, 2)
if os.path.exists(supervisord_pid_file): size = erp5testnode_log.tell()
os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM) erp5testnode_log.close()
except: return size
pass
def __call__(self):
size = self.getSize()
while True:
for x in xrange(0,60):
if self.quit or self.finish:
break
time.sleep(1)
if self.quit:
return
finish = retry = self.finish
if self.test_result_path is None:
if finish:
return
continue
start_size = size
end_size = self.getSize()
# file was truncated
if end_size < start_size:
size = end_size
continue
# display some previous data
if start_size >= 5000:
start_size -= 5000
# do not send tons of log, only last logs
if (end_size-start_size >= 10000):
start_size = end_size-10000
erp5testnode_log = open(self.log_file, 'r')
erp5testnode_log.seek(start_size)
output = erp5testnode_log.read()
erp5testnode_log.close()
if end_size == size:
output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S")
status_dict = dict(command='erp5testnode', status_code=0,
stdout=''.join(output), stderr='')
safeRpcCall(self.log, self.portal, "reportTaskStatus", retry,
self.test_result_path, status_dict, self.test_node_title)
size = end_size
if finish:
return
PROFILE_PATH_KEY = 'profile_path' PROFILE_PATH_KEY = 'profile_path'
def run(config):
log = config['logger']
slapgrid = None
global supervisord_pid_file
supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid')
previous_revision = None
run_software = True class TestNode(object):
# Write our own software.cfg to use the local repository
custom_profile_path = os.path.join(config['working_directory'], 'software.cfg') def __init__(self, log, config):
config['custom_profile_path'] = custom_profile_path self.log = log
vcs_repository_list = config['vcs_repository_list'] self.config = config
profile_content = '' self.process_manager = ProcessManager(log)
assert len(vcs_repository_list), "we must have at least one repository" self.process_manager.supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
try: 'supervisord.pid')
# BBB: Accept global profile_path, which is the same as setting it for the
# first configured repository. def run(self):
profile_path = config.pop(PROFILE_PATH_KEY) log = self.log
except KeyError: process_manager = self.process_manager
pass config = self.config
else: slapgrid = None
vcs_repository_list[0][PROFILE_PATH_KEY] = profile_path previous_revision = None
profile_path_count = 0
for vcs_repository in vcs_repository_list: run_software = True
url = vcs_repository['url'] # Write our own software.cfg to use the local repository
buildout_section_id = vcs_repository.get('buildout_section_id', None) custom_profile_path = os.path.join(config['working_directory'], 'software.cfg')
repository_id = buildout_section_id or \ config['custom_profile_path'] = custom_profile_path
url.split('/')[-1].split('.')[0] vcs_repository_list = config['vcs_repository_list']
repository_path = os.path.join(config['working_directory'],repository_id) profile_content = ''
vcs_repository['repository_id'] = repository_id assert len(vcs_repository_list), "we must have at least one repository"
vcs_repository['repository_path'] = repository_path
try: try:
profile_path = vcs_repository[PROFILE_PATH_KEY] # BBB: Accept global profile_path, which is the same as setting it for the
# first configured repository.
profile_path = config.pop(PROFILE_PATH_KEY)
except KeyError: except KeyError:
pass pass
else: else:
profile_path_count += 1 vcs_repository_list[0][PROFILE_PATH_KEY] = profile_path
if profile_path_count > 1: profile_path_count = 0
raise ValueError(PROFILE_PATH_KEY + ' defined more than once') for vcs_repository in vcs_repository_list:
profile_content = """ url = vcs_repository['url']
buildout_section_id = vcs_repository.get('buildout_section_id', None)
repository_id = buildout_section_id or \
url.split('/')[-1].split('.')[0]
repository_path = os.path.join(config['working_directory'],repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
try:
profile_path = vcs_repository[PROFILE_PATH_KEY]
except KeyError:
pass
else:
profile_path_count += 1
if profile_path_count > 1:
raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
profile_content = """
[buildout] [buildout]
extends = %(software_config_path)s extends = %(software_config_path)s
""" % {'software_config_path': os.path.join(repository_path, profile_path)} """ % {'software_config_path': os.path.join(repository_path, profile_path)}
if not(buildout_section_id is None): if not(buildout_section_id is None):
profile_content += """ profile_content += """
[%(buildout_section_id)s] [%(buildout_section_id)s]
repository = %(repository_path)s repository = %(repository_path)s
branch = %(branch)s branch = %(branch)s
...@@ -156,145 +190,168 @@ branch = %(branch)s ...@@ -156,145 +190,168 @@ branch = %(branch)s
'repository_path' : repository_path, 'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')} 'branch' : vcs_repository.get('branch','master')}
if not profile_path_count: if not profile_path_count:
raise ValueError(PROFILE_PATH_KEY + ' not defined') raise ValueError(PROFILE_PATH_KEY + ' not defined')
custom_profile = open(custom_profile_path, 'w') custom_profile = open(custom_profile_path, 'w')
custom_profile.write(profile_content) custom_profile.write(profile_content)
custom_profile.close() custom_profile.close()
config['repository_path'] = repository_path config['repository_path'] = repository_path
sys.path.append(repository_path) sys.path.append(repository_path)
test_suite_title = config['test_suite_title'] or config['test_suite'] test_suite_title = config['test_suite_title'] or config['test_suite']
retry_software = False retry_software = False
try: retry_software_count = 0
while True: try:
remote_test_result_needs_cleanup = False while True:
# kill processes from previous loop if any remote_test_result_needs_cleanup = False
try: remote_logger = None
killPreviousRun() remote_logger_thread = None
process_group_pid_set.clear() same_revision_count = 0
full_revision_list = [] try:
# Make sure we have local repository # kill processes from previous loop if any
for vcs_repository in vcs_repository_list: process_manager.killPreviousRun()
repository_path = vcs_repository['repository_path'] full_revision_list = []
repository_id = vcs_repository['repository_id']
if not os.path.exists(repository_path):
parameter_list = [config['git_binary'], 'clone',
vcs_repository['url']]
if vcs_repository.get('branch') is not None:
parameter_list.extend(['-b',vcs_repository.get('branch')])
parameter_list.append(repository_path)
log(subprocess.check_output(parameter_list, stderr=subprocess.STDOUT))
# Make sure we have local repository # Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'], for vcs_repository in vcs_repository_list:
log=log, realtime_output=False) repository_path = vcs_repository['repository_path']
updater.checkout() repository_id = vcs_repository['repository_id']
revision = "-".join(updater.getRevision()) if not os.path.exists(repository_path):
full_revision_list.append('%s=%s' % (repository_id, revision)) parameter_list = [config['git_binary'], 'clone',
revision = ','.join(full_revision_list) vcs_repository['url']]
if previous_revision == revision: if vcs_repository.get('branch') is not None:
log('Sleeping a bit') parameter_list.extend(['-b',vcs_repository.get('branch')])
time.sleep(DEFAULT_SLEEP_TIMEOUT) parameter_list.append(repository_path)
if not(retry_software): log(subprocess.check_output(parameter_list, stderr=subprocess.STDOUT))
continue # Make sure we have local repository
log('Retrying install') updater = Updater(repository_path, git_binary=config['git_binary'],
retry_software = False log=log, process_manager=process_manager)
previous_revision = revision updater.checkout()
portal_url = config['test_suite_master_url'] revision = "-".join(updater.getRevision())
test_result_path = None full_revision_list.append('%s=%s' % (repository_id, revision))
test_result = (test_result_path, revision) revision = ','.join(full_revision_list)
if portal_url: if previous_revision == revision:
if portal_url[-1] != '/': log('Same Revision Sleeping a bit')
portal_url += '/' time.sleep(DEFAULT_SLEEP_TIMEOUT)
portal = xmlrpclib.ServerProxy("%s%s" % same_revision_count += 1
(portal_url, 'portal_task_distribution'), if not(retry_software) and same_revision_count <= 2:
allow_none=1) continue
assert safeRpcCall(portal, "getProtocolRevision") == 1 same_revision_count = 0
test_result = safeRpcCall(portal, "createTestResult", log('Retrying install or checking if previous test was cancelled')
config['test_suite'], revision, [], retry_software = False
False, test_suite_title, previous_revision = revision
config['test_node_title'], config['project_title']) portal_url = config['test_suite_master_url']
remote_test_result_needs_cleanup = True test_result_path = None
log("testnode, test_result : %r" % (test_result, )) test_result = (test_result_path, revision)
if test_result: if portal_url:
test_result_path, test_revision = test_result if portal_url[-1] != '/':
if revision != test_revision: portal_url += '/'
log('Disagreement on tested revision, checking out:') portal = xmlrpclib.ServerProxy("%s%s" %
for i, repository_revision in enumerate(test_revision.split(',')): (portal_url, 'portal_task_distribution'),
vcs_repository = vcs_repository_list[i] allow_none=1)
repository_path = vcs_repository['repository_path'] assert safeRpcCall(log, portal, "getProtocolRevision", True) == 1
revision = repository_revision.rsplit('-', 1)[1] test_result = safeRpcCall(log, portal, "createTestResult", True,
# other testnodes on other boxes are already ready to test another config['test_suite'], revision, [],
# revision False, test_suite_title,
log(' %s at %s' % (repository_path, revision)) config['test_node_title'], config['project_title'])
updater = Updater(repository_path, git_binary=config['git_binary'], remote_test_result_needs_cleanup = True
revision=revision, log=log,
realtime_output=False) log("testnode, test_result : %r" % (test_result, ))
updater.checkout() if test_result:
test_result_path, test_revision = test_result
if config.get('log_file'):
remote_logger = RemoteLogger(log, config['log_file'], config['test_node_title'])
remote_logger.portal = portal
remote_logger.test_result_path = test_result_path
remote_logger_thread = threading.Thread(target=remote_logger)
remote_logger_thread.start()
if revision != test_revision:
previous_revision = test_revision
log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(test_revision.split(',')):
vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
revision = repository_revision.rsplit('-', 1)[1]
# other testnodes on other boxes are already ready to test another
# revision
log(' %s at %s' % (repository_path, revision))
updater = Updater(repository_path, git_binary=config['git_binary'],
revision=revision, log=log,
process_manager=process_manager)
updater.checkout()
# Now prepare the installation of SlapOS and create instance # Now prepare the installation of SlapOS and create instance
slapproxy_log = os.path.join(config['log_directory'], slapproxy_log = os.path.join(config['log_directory'],
'slapproxy.log') 'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log) log('Configured slapproxy log to %r' % slapproxy_log)
slapos_controler = SlapOSControler.SlapOSControler(config, log('testnode, retry_software_count : %r' % retry_software_count)
process_group_pid_set=process_group_pid_set, log=log, slapos_controler = SlapOSControler.SlapOSControler(config,
slapproxy_log=slapproxy_log) log=log, slapproxy_log=slapproxy_log, process_manager=process_manager,
for method_name in ("runSoftwareRelease", "runComputerPartition",): reset_software=(retry_software_count>0 and retry_software_count%10 == 0))
stdout, stderr = getInputOutputFileList(config, method_name) for method_name in ("runSoftwareRelease", "runComputerPartition",):
slapos_method = getattr(slapos_controler, method_name) slapos_method = getattr(slapos_controler, method_name)
status_dict = slapos_method(config, status_dict = slapos_method(config,
environment=config['environment'], environment=config['environment'],
process_group_pid_set=process_group_pid_set, )
stdout=stdout, stderr=stderr if status_dict['status_code'] != 0:
) retry_software = True
if status_dict['status_code'] != 0: retry_software_count += 1
retry_software = True raise SubprocessError(status_dict)
raise SubprocessError(status_dict) else:
# Give some time so computer partitions may start retry_software_count = 0
# as partitions can be of any kind we have and likely will never have # Give some time so computer partitions may start
# a reliable way to check if they are up or not ... # as partitions can be of any kind we have and likely will never have
time.sleep(20) # a reliable way to check if they are up or not ...
time.sleep(20)
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root']) run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root'])
if not len(run_test_suite_path_list): if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.') raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0] run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = revision run_test_suite_revision = revision
if isinstance(revision, tuple): if isinstance(revision, tuple):
revision = ','.join(revision) revision = ','.join(revision)
# Deal with Shebang size limitation # Deal with Shebang size limitation
line = open(run_test_suite_path, 'r').readline() line = open(run_test_suite_path, 'r').readline()
invocation_list = [] invocation_list = []
if line[:2] == '#!': if line[:2] == '#!':
invocation_list = line[2:].split() invocation_list = line[2:].split()
invocation_list.extend([run_test_suite_path, invocation_list.extend([run_test_suite_path,
'--test_suite', config['test_suite'], '--test_suite', config['test_suite'],
'--revision', revision, '--revision', revision,
'--test_suite_title', test_suite_title, '--test_suite_title', test_suite_title,
'--node_quantity', config['node_quantity'], '--node_quantity', config['node_quantity'],
'--master_url', portal_url]) '--master_url', portal_url])
bt5_path_list = config.get("bt5_path") bt5_path_list = config.get("bt5_path")
if bt5_path_list not in ('', None,): if bt5_path_list not in ('', None,):
invocation_list.extend(["--bt5_path", bt5_path_list]) invocation_list.extend(["--bt5_path", bt5_path_list])
# From this point, test runner becomes responsible for updating test # From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able # result. We only do cleanup if the test runner itself is not able
# to run. # to run.
log("call process : %r", (invocation_list,)) process_manager.spawn(*invocation_list,
run_test_suite = subprocess.Popen(invocation_list, cwd=config['test_suite_directory'], new_session=True,
preexec_fn=os.setsid, cwd=config['test_suite_directory'], log_prefix='runTestSuite', get_output=False)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if remote_logger:
process_group_pid_set.add(run_test_suite.pid) remote_logger.quit = True
log(run_test_suite.communicate()[0]) remote_logger_thread.join()
process_group_pid_set.remove(run_test_suite.pid) except SubprocessError, e:
except SubprocessError, e: log("SubprocessError", exc_info=sys.exc_info())
if remote_test_result_needs_cleanup: if remote_logger:
safeRpcCall(portal, "reportTaskFailure", remote_logger.finish = True
test_result_path, e.status_dict, config['test_node_title']) remote_logger_thread.join()
time.sleep(DEFAULT_SLEEP_TIMEOUT) if remote_test_result_needs_cleanup:
continue safeRpcCall(log, portal, "reportTaskFailure", True,
test_result_path, e.status_dict, config['test_node_title'])
finally: log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT)
# Nice way to kill *everything* generated by run process -- process time.sleep(DEFAULT_SLEEP_TIMEOUT)
# groups working only in POSIX compilant systems continue
# Exceptions are swallowed during cleanup phase except:
killPreviousRun() log("erp5testnode exception", exc_info=sys.exc_info())
raise
finally:
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
log('Testnode.run, finally close')
process_manager.killPreviousRun()
if remote_logger:
remote_logger.quit = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment