Commit f5b0eb1a authored by Sebastien Robin's avatar Sebastien Robin

kill ongoing tests if the test result was cancelled on server side

parent c791efc3
...@@ -39,6 +39,9 @@ class SubprocessError(EnvironmentError): ...@@ -39,6 +39,9 @@ class SubprocessError(EnvironmentError):
def __str__(self): def __str__(self):
return 'Error %i' % self.status_code return 'Error %i' % self.status_code
class CancellationError(EnvironmentError):
pass
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search _format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'")) _format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
...@@ -94,8 +97,11 @@ class ProcessManager(object): ...@@ -94,8 +97,11 @@ class ProcessManager(object):
self.log = log self.log = log
self.process_pid_set = set() self.process_pid_set = set()
signal.signal(signal.SIGTERM, self.sigterm_handler) signal.signal(signal.SIGTERM, self.sigterm_handler)
self.under_cancellation = False
def spawn(self, *args, **kw): def spawn(self, *args, **kw):
if self.under_cancellation:
raise CancellationError("Test Result was cancelled")
get_output = kw.pop('get_output', True) get_output = kw.pop('get_output', True)
log_prefix = kw.pop('log_prefix', '') log_prefix = kw.pop('log_prefix', '')
new_session = kw.pop('new_session', False) new_session = kw.pop('new_session', False)
...@@ -118,13 +124,17 @@ class ProcessManager(object): ...@@ -118,13 +124,17 @@ class ProcessManager(object):
get_output=get_output) get_output=get_output)
result = dict(status_code=p.returncode, command=command, result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr) stdout=stdout, stderr=stderr)
self.process_pid_set.remove(p.pid) self.process_pid_set.discard(p.pid)
if self.under_cancellation:
raise CancellationError("Test Result was cancelled")
if raise_error_if_fail and p.returncode: if raise_error_if_fail and p.returncode:
raise SubprocessError(result) raise SubprocessError(result)
return result return result
def killPreviousRun(self): def killPreviousRun(self, cancellation=False):
self.log('ProcessManager killPreviousRun, going to kill %r' % (self.process_pid_set,)) self.log('ProcessManager killPreviousRun, going to kill %r' % (self.process_pid_set,))
if cancellation:
self.under_cancellation = True
for pgpid in self.process_pid_set: for pgpid in self.process_pid_set:
try: try:
os.kill(pgpid, signal.SIGTERM) os.kill(pgpid, signal.SIGTERM)
......
...@@ -36,7 +36,7 @@ import glob ...@@ -36,7 +36,7 @@ import glob
import SlapOSControler import SlapOSControler
import threading import threading
from ProcessManager import SubprocessError, ProcessManager from ProcessManager import SubprocessError, ProcessManager, CancellationError
from Updater import Updater from Updater import Updater
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
...@@ -65,12 +65,13 @@ def safeRpcCall(log, proxy, function_id, retry, *args): ...@@ -65,12 +65,13 @@ def safeRpcCall(log, proxy, function_id, retry, *args):
class RemoteLogger(object): class RemoteLogger(object):
def __init__(self, log, log_file, test_node_title): def __init__(self, log, log_file, test_node_title, process_manager):
self.portal = None self.portal = None
self.test_result_path = None self.test_result_path = None
self.test_node_title = test_node_title self.test_node_title = test_node_title
self.log = log self.log = log
self.log_file = log_file self.log_file = log_file
self.process_manager = process_manager
self.finish = False self.finish = False
self.quit = False self.quit = False
...@@ -117,6 +118,14 @@ class RemoteLogger(object): ...@@ -117,6 +118,14 @@ class RemoteLogger(object):
erp5testnode_log.close() erp5testnode_log.close()
if end_size == size: if end_size == size:
output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S") output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S")
# check if the test result is still alive
is_alive = safeRpcCall(self.log, self.portal, "isTaskAlive", False,
self.test_result_path)
self.log('isTaskAlive result %r' % is_alive)
if is_alive is not None and is_alive == 0:
self.log('Test Result cancelled on server side, stop current test')
self.process_manager.killPreviousRun(cancellation=True)
return
status_dict = dict(command='erp5testnode', status_code=0, status_dict = dict(command='erp5testnode', status_code=0,
stdout=''.join(output), stderr='') stdout=''.join(output), stderr='')
safeRpcCall(self.log, self.portal, "reportTaskStatus", retry, safeRpcCall(self.log, self.portal, "reportTaskStatus", retry,
...@@ -199,7 +208,7 @@ branch = %(branch)s ...@@ -199,7 +208,7 @@ branch = %(branch)s
sys.path.append(repository_path) sys.path.append(repository_path)
test_suite_title = config['test_suite_title'] or config['test_suite'] test_suite_title = config['test_suite_title'] or config['test_suite']
retry_software = False retry = False
retry_software_count = 0 retry_software_count = 0
try: try:
while True: while True:
...@@ -230,14 +239,15 @@ branch = %(branch)s ...@@ -230,14 +239,15 @@ branch = %(branch)s
full_revision_list.append('%s=%s' % (repository_id, revision)) full_revision_list.append('%s=%s' % (repository_id, revision))
revision = ','.join(full_revision_list) revision = ','.join(full_revision_list)
if previous_revision == revision: if previous_revision == revision:
log('Same Revision Sleeping a bit') log('Same Revision')
time.sleep(DEFAULT_SLEEP_TIMEOUT)
same_revision_count += 1 same_revision_count += 1
if not(retry_software) and same_revision_count <= 2: if not(retry) and same_revision_count <= 2:
log('Sleeping a bit since same revision')
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue continue
same_revision_count = 0 same_revision_count = 0
log('Retrying install or checking if previous test was cancelled') log('Retrying install or checking if previous test was cancelled')
retry_software = False retry = False
previous_revision = revision previous_revision = revision
portal_url = config['test_suite_master_url'] portal_url = config['test_suite_master_url']
test_result_path = None test_result_path = None
...@@ -259,7 +269,9 @@ branch = %(branch)s ...@@ -259,7 +269,9 @@ branch = %(branch)s
if test_result: if test_result:
test_result_path, test_revision = test_result test_result_path, test_revision = test_result
if config.get('log_file'): if config.get('log_file'):
remote_logger = RemoteLogger(log, config['log_file'], config['test_node_title']) remote_logger = RemoteLogger(log, config['log_file'],
config['test_node_title'],
process_manager)
remote_logger.portal = portal remote_logger.portal = portal
remote_logger.test_result_path = test_result_path remote_logger.test_result_path = test_result_path
remote_logger_thread = threading.Thread(target=remote_logger) remote_logger_thread = threading.Thread(target=remote_logger)
...@@ -293,7 +305,7 @@ branch = %(branch)s ...@@ -293,7 +305,7 @@ branch = %(branch)s
environment=config['environment'], environment=config['environment'],
) )
if status_dict['status_code'] != 0: if status_dict['status_code'] != 0:
retry_software = True retry = True
retry_software_count += 1 retry_software_count += 1
raise SubprocessError(status_dict) raise SubprocessError(status_dict)
else: else:
...@@ -344,6 +356,11 @@ branch = %(branch)s ...@@ -344,6 +356,11 @@ branch = %(branch)s
log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT) log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT)
time.sleep(DEFAULT_SLEEP_TIMEOUT) time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue continue
except CancellationError, e:
log("CancellationError", exc_info=sys.exc_info())
process_manager.under_cancellation = False
retry = True
continue
except: except:
log("erp5testnode exception", exc_info=sys.exc_info()) log("erp5testnode exception", exc_info=sys.exc_info())
raise raise
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment