Commit 45dc74d8 authored by Sebastien Robin's avatar Sebastien Robin

erp5testnode: fixed some broken logic and other improvements

- some variables were designed for a single test suite, so create
  NodeTestSuite objects to store them (some logic was broken after
  the split of the long run method into several methods)
- remove some optimization that were trying to reduce a bit the
  load on master. This was adding complexity for little benefit
- take into account new status code from slapos.core
parent df31be0a
...@@ -125,6 +125,8 @@ class SlapOSControler(object): ...@@ -125,6 +125,8 @@ class SlapOSControler(object):
status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c', status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c',
config['slapos_config'], raise_error_if_fail=False, config['slapos_config'], raise_error_if_fail=False,
log_prefix='slapgrid_sr', get_output=False) log_prefix='slapgrid_sr', get_output=False)
if status_dict['status_code'] == 0:
break
return status_dict return status_dict
def runComputerPartition(self, config, environment, def runComputerPartition(self, config, environment,
...@@ -150,4 +152,8 @@ class SlapOSControler(object): ...@@ -150,4 +152,8 @@ class SlapOSControler(object):
config['slapos_config'], raise_error_if_fail=False, config['slapos_config'], raise_error_if_fail=False,
log_prefix='slapgrid_cp', get_output=False) log_prefix='slapgrid_cp', get_output=False)
self.log('slapgrid_cp status_dict : %r' % (status_dict,)) self.log('slapgrid_cp status_dict : %r' % (status_dict,))
# we can continue if status code is ok (0) or if we have just
# a promise failure (2)
if status_dict['status_code'] in (0, 2):
break
return status_dict return status_dict
...@@ -51,12 +51,23 @@ class DummyLogger(object): ...@@ -51,12 +51,23 @@ class DummyLogger(object):
'critical', 'fatal'): 'critical', 'fatal'):
setattr(self, name, func) setattr(self, name, func)
class NodeTestSuite(object):
def __init__(self, reference=None):
self.reference = reference
self.retry_software_count = 0
self.retry = False
def edit(self, **kw):
self.__dict__.update(**kw)
class TestNode(object): class TestNode(object):
def __init__(self, log, config): def __init__(self, log, config):
self.log = log self.log = log
self.config = config self.config = config
self.process_manager = ProcessManager(log) self.process_manager = ProcessManager(log)
self.node_test_suite_dict = {}
def checkOldTestSuite(self,test_suite_data): def checkOldTestSuite(self,test_suite_data):
config = self.config config = self.config
...@@ -66,24 +77,36 @@ class TestNode(object): ...@@ -66,24 +77,36 @@ class TestNode(object):
wished_reference_set) wished_reference_set)
for y in to_remove_reference_set: for y in to_remove_reference_set:
fpath = os.path.join(config['slapos_directory'],y) fpath = os.path.join(config['slapos_directory'],y)
self.delNodeTestSuite(y)
if os.path.isdir(fpath): if os.path.isdir(fpath):
shutil.rmtree(fpath) shutil.rmtree(fpath)
else: else:
os.remove(fpath) os.remove(fpath)
def getNodeTestSuite(self, reference):
node_test_suite = self.node_test_suite_dict.get(reference)
if node_test_suite is None:
node_test_suite = self.node_test_suite_dict[reference] = NodeTestSuite(
reference=reference)
return node_test_suite
def delNodeTestSuite(self, reference):
if self.node_test_suite_dict.haskey(reference):
self.node_test_suite_dict.pop(reference)
def updateConfigForTestSuite(self, test_suite_data): def updateConfigForTestSuite(self, test_suite_data):
config = self.config config = self.config
config["project_title"] = test_suite_data["project_title"] node_test_suite = self.getNodeTestSuite(test_suite_data["test_suite_reference"])
config["test_suite"] = test_suite_data["test_suite"] node_test_suite.edit(project_title=test_suite_data["project_title"],
config["test_suite_title"] = test_suite_data["test_suite_title"] test_suite=test_suite_data["test_suite"],
config["test_suite_reference"] = test_suite_data["test_suite_reference"] test_suite_title=test_suite_data["test_suite_title"])
try: try:
config["additional_bt5_repository_id"] = test_suite_data["additional-bt5-repository-id"] config["additional_bt5_repository_id"] = test_suite_data["additional-bt5-repository-id"]
except KeyError: except KeyError:
pass pass
config["vcs_repository_list"] = test_suite_data["vcs_repository_list"] config["vcs_repository_list"] = test_suite_data["vcs_repository_list"]
config['working_directory'] = os.path.join(config['slapos_directory'], config['working_directory'] = os.path.join(config['slapos_directory'],
config['test_suite_reference']) node_test_suite.reference)
if not(os.path.exists(config['working_directory'])): if not(os.path.exists(config['working_directory'])):
os.mkdir(config['working_directory']) os.mkdir(config['working_directory'])
config['instance_root'] = os.path.join(config['working_directory'], config['instance_root'] = os.path.join(config['working_directory'],
...@@ -155,11 +178,10 @@ branch = %(branch)s ...@@ -155,11 +178,10 @@ branch = %(branch)s
sys.path.append(repository_path) sys.path.append(repository_path)
return vcs_repository_list return vcs_repository_list
def getFullRevisionList(self, revision_dict): def getAndUpdateFullRevisionList(self, node_test_suite):
full_revision_list = [] full_revision_list = []
config = self.config config = self.config
log = self.log log = self.log
test_suite_title = config['test_suite_title']
process_manager = self.process_manager process_manager = self.process_manager
vcs_repository_list = self.config['vcs_repository_list'] vcs_repository_list = self.config['vcs_repository_list']
for vcs_repository in vcs_repository_list: for vcs_repository in vcs_repository_list:
...@@ -176,9 +198,9 @@ branch = %(branch)s ...@@ -176,9 +198,9 @@ branch = %(branch)s
updater = Updater(repository_path, git_binary=config['git_binary'], updater = Updater(repository_path, git_binary=config['git_binary'],
log=log, process_manager=process_manager) log=log, process_manager=process_manager)
updater.checkout() updater.checkout()
revision_dict[test_suite_title] = "-".join(updater.getRevision()) revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision_dict[test_suite_title])) full_revision_list.append('%s=%s' % (repository_id, revision))
revision_dict[test_suite_title] = ','.join(full_revision_list) node_test_suite.revision = ','.join(full_revision_list)
return full_revision_list return full_revision_list
def addWatcher(self,test_result): def addWatcher(self,test_result):
...@@ -191,34 +213,32 @@ branch = %(branch)s ...@@ -191,34 +213,32 @@ branch = %(branch)s
test_result.addWatch(log_file_name,log_file,max_history_bytes=10000) test_result.addWatch(log_file_name,log_file,max_history_bytes=10000)
return log_file_name return log_file_name
def checkRevision(self,test_result,revision_dict,previous_revision_dict, def checkRevision(self, test_result, node_test_suite, vcs_repository_list):
vcs_repository_list):
config = self.config config = self.config
log = self.log log = self.log
process_manager = self.process_manager process_manager = self.process_manager
test_suite_title = config['test_suite_title'] if node_test_suite.revision != test_result.revision:
if revision_dict[test_suite_title] != test_result.revision:
previous_revision_dict[test_suite_title] = test_result.revision
log('Disagreement on tested revision, checking out:') log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(previous_revision_dict[test_suite_title].split(',')): for i, repository_revision in enumerate(test_result.revision.split(',')):
vcs_repository = vcs_repository_list[i] vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path'] repository_path = vcs_repository['repository_path']
revision_dict[test_suite_title] = repository_revision.rsplit('-', 1)[1] node_test_suite.revision = repository_revision.rsplit('-', 1)[1]
# other testnodes on other boxes are already ready to test another # other testnodes on other boxes are already ready to test another
# revision # revision
log(' %s at %s' % (repository_path, revision_dict[test_suite_title])) log(' %s at %s' % (repository_path, node_test_suite.revision))
updater = Updater(repository_path, git_binary=config['git_binary'], updater = Updater(repository_path, git_binary=config['git_binary'],
revision=revision_dict[test_suite_title], log=log, revision=node_test_suite.revision, log=log,
process_manager=process_manager) process_manager=process_manager)
updater.checkout() updater.checkout()
def prepareSlapOS(self,retry_software_count=0,retry=False): def prepareSlapOS(self,node_test_suite):
config = self.config config = self.config
log = self.log log = self.log
process_manager = self.process_manager process_manager = self.process_manager
slapproxy_log = os.path.join(config['log_directory'], slapproxy_log = os.path.join(config['log_directory'],
'slapproxy.log') 'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log) log('Configured slapproxy log to %r' % slapproxy_log)
retry_software_count = node_test_suite.retry_software_count
log('testnode, retry_software_count : %r' % retry_software_count) log('testnode, retry_software_count : %r' % retry_software_count)
slapos_controler = SlapOSControler.SlapOSControler(config, slapos_controler = SlapOSControler.SlapOSControler(config,
log=log, slapproxy_log=slapproxy_log, process_manager=process_manager, log=log, slapproxy_log=slapproxy_log, process_manager=process_manager,
...@@ -228,12 +248,12 @@ branch = %(branch)s ...@@ -228,12 +248,12 @@ branch = %(branch)s
status_dict = slapos_method(config, status_dict = slapos_method(config,
environment=config['environment'], environment=config['environment'],
) )
if status_dict['status_code'] != 0: if status_dict['status_code'] not in (0, 2):
retry = True node_test_suite.retry = True
retry_software_count += 1 node_test_suite.retry_software_count += 1
raise SubprocessError(status_dict) raise SubprocessError(status_dict)
else: else:
retry_software_count = 0 node_test_suite.retry_software_count = 0
return status_dict return status_dict
def _dealShebang(self,run_test_suite_path): def _dealShebang(self,run_test_suite_path):
...@@ -243,23 +263,20 @@ branch = %(branch)s ...@@ -243,23 +263,20 @@ branch = %(branch)s
invocation_list = line[2:].split() invocation_list = line[2:].split()
return invocation_list return invocation_list
def runTestSuite(self,revision_dict,portal_url): def runTestSuite(self, node_test_suite, portal_url):
config = self.config config = self.config
test_suite_title = config['test_suite_title']
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root']) run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root'])
if not len(run_test_suite_path_list): if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.') raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0] run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = revision_dict[test_suite_title] run_test_suite_revision = node_test_suite.revision
if isinstance(revision_dict[test_suite_title], tuple):
revision_dict[test_suite_title] = ','.join(revision_dict[test_suite_title])
# Deal with Shebang size limitation # Deal with Shebang size limitation
invocation_list = self._dealShebang(run_test_suite_path) invocation_list = self._dealShebang(run_test_suite_path)
invocation_list.extend([run_test_suite_path, invocation_list.extend([run_test_suite_path,
'--test_suite', config['test_suite'], '--test_suite', node_test_suite.test_suite,
'--revision', revision_dict[test_suite_title], '--revision', node_test_suite.revision,
'--test_suite_title', test_suite_title, '--test_suite_title', node_test_suite.test_suite_title,
'--node_quantity', config['node_quantity'], '--node_quantity', config['node_quantity'],
'--master_url', portal_url]) '--master_url', portal_url])
firefox_bin_list = glob.glob("%s/*/parts/firefox/firefox-slapos" % config["software_root"]) firefox_bin_list = glob.glob("%s/*/parts/firefox/firefox-slapos" % config["software_root"])
...@@ -274,7 +291,7 @@ branch = %(branch)s ...@@ -274,7 +291,7 @@ branch = %(branch)s
# From this point, test runner becomes responsible for updating test # From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able # result. We only do cleanup if the test runner itself is not able
# to run. # to run.
process_manager.spawn(*invocation_list, self.process_manager.spawn(*invocation_list,
cwd=config['test_suite_directory'], cwd=config['test_suite_directory'],
log_prefix='runTestSuite', get_output=False) log_prefix='runTestSuite', get_output=False)
...@@ -300,7 +317,6 @@ branch = %(branch)s ...@@ -300,7 +317,6 @@ branch = %(branch)s
try: try:
while True: while True:
try: try:
now = time.time()
begin = time.time() begin = time.time()
portal_url = config['test_suite_master_url'] portal_url = config['test_suite_master_url']
portal = taskdistribution.TaskDistributionTool(portal_url, logger=DummyLogger(log)) portal = taskdistribution.TaskDistributionTool(portal_url, logger=DummyLogger(log))
...@@ -313,50 +329,32 @@ branch = %(branch)s ...@@ -313,50 +329,32 @@ branch = %(branch)s
self.checkOldTestSuite(test_suite_data) self.checkOldTestSuite(test_suite_data)
for test_suite in test_suite_data: for test_suite in test_suite_data:
self.updateConfigForTestSuite(test_suite) self.updateConfigForTestSuite(test_suite)
node_test_suite = self.getNodeTestSuite(test_suite["test_suite_reference"])
run_software = True run_software = True
self.process_manager.supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run', self.process_manager.supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid') 'supervisord.pid')
# Write our own software.cfg to use the local repository # Write our own software.cfg to use the local repository
vcs_repository_list = self.constructProfile() vcs_repository_list = self.constructProfile()
retry = False
retry_software_count = 0
same_revision_count = 0
test_suite_title = config['test_suite_title']
# kill processes from previous loop if any # kill processes from previous loop if any
process_manager.killPreviousRun() process_manager.killPreviousRun()
full_revision_list = self.getFullRevisionList(revision_dict) self.getAndUpdateFullRevisionList(node_test_suite)
# Make sure we have local repository # Make sure we have local repository
now = time.time() test_result = portal.createTestResult(node_test_suite.revision, [],
try: config['test_node_title'], False,
if previous_revision_dict[test_suite_title] == revision_dict[test_suite_title]: node_test_suite.test_suite_title,
log('Same Revision') node_test_suite.project_title)
same_revision_count += 1
if not(retry) and same_revision_count <= 2:
log('Sleeping a bit since same revision')
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue
same_revision_count = 0
log('Retrying install or checking if previous test was cancelled')
except KeyError:
pass
retry = False
previous_revision_dict[test_suite_title] = revision_dict[test_suite_title]
now = time.time()
test_result = portal.createTestResult(revision_dict[test_suite_title],[],
config['test_node_title'],False,test_suite_title,config['project_title'])
remote_test_result_needs_cleanup = True remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, )) log("testnode, test_result : %r" % (test_result, ))
if test_result is not None: if test_result is not None:
log_file_name = self.addWatcher(test_result) log_file_name = self.addWatcher(test_result)
self.checkRevision(test_result,revision_dict,previous_revision_dict, self.checkRevision(test_result,node_test_suite,vcs_repository_list)
vcs_repository_list)
# Now prepare the installation of SlapOS and create instance # Now prepare the installation of SlapOS and create instance
status_dict = self.prepareSlapOS(retry_software_count,retry) status_dict = self.prepareSlapOS(node_test_suite)
# Give some time so computer partitions may start # Give some time so computer partitions may start
# as partitions can be of any kind we have and likely will never have # as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ... # a reliable way to check if they are up or not ...
time.sleep(20) time.sleep(20)
self.runTestSuite(revision_dict,portal_url) self.runTestSuite(node_test_suite,portal_url)
test_result.removeWatch(log_file_name) test_result.removeWatch(log_file_name)
except (SubprocessError, CalledProcessError) as e: except (SubprocessError, CalledProcessError) as e:
log("SubprocessError", exc_info=sys.exc_info()) log("SubprocessError", exc_info=sys.exc_info())
...@@ -375,11 +373,12 @@ branch = %(branch)s ...@@ -375,11 +373,12 @@ branch = %(branch)s
except CancellationError, e: except CancellationError, e:
log("CancellationError", exc_info=sys.exc_info()) log("CancellationError", exc_info=sys.exc_info())
process_manager.under_cancellation = False process_manager.under_cancellation = False
retry = True node_test_suite.retry = True
continue continue
except: except:
log("erp5testnode exception", exc_info=sys.exc_info()) log("erp5testnode exception", exc_info=sys.exc_info())
raise raise
now = time.time()
if (now-begin) < 120: if (now-begin) < 120:
sleep_time = 120 - (now-begin) sleep_time = 120 - (now-begin)
log("End of processing, going to sleep %s" % sleep_time) log("End of processing, going to sleep %s" % sleep_time)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment