From a4b9816a91dfadbcc11fddcf86f5b7da5efce812 Mon Sep 17 00:00:00 2001 From: Sebastien Robin <seb@nexedi.com> Date: Tue, 19 Apr 2011 15:42:45 +0000 Subject: [PATCH] This is mostly a duplicate of the products/ERP5/bin/run_test_suite script made by Julien. There is following modifications : - code is moved to ERP5Type/tests - updater is not kept since the update of code will be managed at upper level (erp5 testnode) - use argparse instead of optparse runTestSuite then will be called by erp5 testnode or can be called manually git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@45579 20353a03-c40f-0410-a6d1-a30d3c3de9de --- .../tests/DummyTaskDistributionTool.py | 27 ++ product/ERP5Type/tests/ERP5TypeTestSuite.py | 232 ++++++++++++++++++ product/ERP5Type/tests/runTestSuite.py | 85 +++++++ 3 files changed, 344 insertions(+) create mode 100644 product/ERP5Type/tests/DummyTaskDistributionTool.py create mode 100644 product/ERP5Type/tests/ERP5TypeTestSuite.py create mode 100644 product/ERP5Type/tests/runTestSuite.py diff --git a/product/ERP5Type/tests/DummyTaskDistributionTool.py b/product/ERP5Type/tests/DummyTaskDistributionTool.py new file mode 100644 index 0000000000..c9b686cc17 --- /dev/null +++ b/product/ERP5Type/tests/DummyTaskDistributionTool.py @@ -0,0 +1,27 @@ +import threading + +class DummyTaskDistributionTool(object): + + def __init__(self): + self.lock = threading.Lock() + + def createTestResult(self, name, revision, test_name_list, allow_restart): + self.test_name_list = list(test_name_list) + return None, revision + + def updateTestResult(self, name, revision, test_name_list): + self.test_name_list = list(test_name_list) + return None, revision + + def startUnitTest(self, test_result_path, exclude_list=()): + self.lock.acquire() + try: + for i, test in enumerate(self.test_name_list): + if test not in exclude_list: + del self.test_name_list[i] + return None, test + finally: + self.lock.release() + + def stopUnitTest(self, test_path, status_dict): + pass \ No newline at end of file diff --git a/product/ERP5Type/tests/ERP5TypeTestSuite.py b/product/ERP5Type/tests/ERP5TypeTestSuite.py new file mode 100644 index 0000000000..ed9f92d8b4 --- /dev/null +++ b/product/ERP5Type/tests/ERP5TypeTestSuite.py @@ -0,0 +1,232 @@ +import re, imp, sys, threading, os, shlex, subprocess, shutil + +# The content of this file might be partially moved to an egg +# in order to allows parallel tests without the code of ERP5 + +_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search +_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'")) +def format_command(*args, **kw): + cmdline = [] + for k, v in sorted(kw.items()): + if _format_command_search(v): + v = _format_command_escape(v) + cmdline.append('%s=%s' % (k, v)) + for v in args: + if _format_command_search(v): + v = _format_command_escape(v) + cmdline.append(v) + return ' '.join(cmdline) + +def subprocess_capture(p, quiet=False): + def readerthread(input, output, buffer): + while True: + data = input.readline() + if not data: + break + output(data) + buffer.append(data) + if p.stdout: + stdout = [] + output = quiet and (lambda data: None) or sys.stdout.write + stdout_thread = threading.Thread(target=readerthread, + args=(p.stdout, output, stdout)) + stdout_thread.setDaemon(True) + stdout_thread.start() + if p.stderr: + stderr = [] + stderr_thread = threading.Thread(target=readerthread, + args=(p.stderr, sys.stderr.write, stderr)) + stderr_thread.setDaemon(True) + stderr_thread.start() + if p.stdout: + stdout_thread.join() + if p.stderr: + stderr_thread.join() + p.wait() + return (p.stdout and ''.join(stdout), + p.stderr and ''.join(stderr)) + +class Persistent(object): + """Very simple persistent data storage for optimization purpose + + This tool should become a standalone daemon communicating only with an ERP5 + instance. But for the moment, it only execute 1 test suite and exists, + and test suite classes may want some information from previous runs. + """ + + def __init__(self, filename): + self._filename = filename + + def __getattr__(self, attr): + if attr == '_db': + try: + db = file(self._filename, 'r+') + except IOError, e: + if e.errno != errno.ENOENT: + raise + db = file(self._filename, 'w+') + else: + try: + self.__dict__.update(eval(db.read())) + except StandardError: + pass + self._db = db + return db + self._db + return super(Persistent, self).__getattribute__(attr) + + def sync(self): + self._db.seek(0) + db = dict(x for x in self.__dict__.iteritems() if x[0][:1] != '_') + pprint.pprint(db, self._db) + self._db.truncate() + +class TestSuite(object): + + mysql_db_count = 1 + allow_restart = False + realtime_output = True + stdin = file(os.devnull) + + def __init__(self, max_instance_count, **kw): + self.__dict__.update(kw) + self._path_list = ['tests'] + pool = threading.Semaphore(max_instance_count) + self.acquire = pool.acquire + self.release = pool.release + self._instance = threading.local() + self._pool = max_instance_count == 1 and [None] or \ + range(1, max_instance_count + 1) + self._ready = set() + self.running = {} + if max_instance_count != 1: + self.realtime_output = False + elif os.isatty(1): + self.realtime_output = True + self.persistent = Persistent('run_test_suite-%s.tmp' + % self.__class__.__name__) + + instance = property(lambda self: self._instance.id) + + def start(self, test, on_stop=None): + assert test not in self.running + self.running[test] = instance = self._pool.pop(0) + def run(): + self._instance.id = instance + if instance not in self._ready: + self._ready.add(instance) + self.setup() + status_dict = self.run(test) + if on_stop is not None: + on_stop(status_dict) + self._pool.append(self.running.pop(test)) + self.release() + thread = threading.Thread(target=run) + thread.setDaemon(True) + thread.start() + + def update(self): + self.checkout() # by default, update everything + + def setup(self): + pass + + def run(self, test): + raise NotImplementedError + + def getTestList(self): + raise NotImplementedError + + def spawn(self, *args, **kw): + quiet = kw.pop('quiet', False) + env = kw and dict(os.environ, **kw) or None + command = format_command(*args, **kw) + print '\n$ ' + command + sys.stdout.flush() + p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + if self.realtime_output: + stdout, stderr = subprocess_capture(p, quiet) + else: + stdout, stderr = p.communicate() + if not quiet: + sys.stdout.write(stdout) + sys.stderr.write(stderr) + result = dict(status_code=p.returncode, command=command, + stdout=stdout, stderr=stderr) + if p.returncode: + raise SubprocessError(result) + return result + +class ERP5TypeTestSuite(TestSuite): + + RUN_RE = re.compile( + r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s', + re.DOTALL) + + STATUS_RE = re.compile(r""" + (OK|FAILED)\s+\( + (failures=(?P<failures>\d+),?\s*)? + (errors=(?P<errors>\d+),?\s*)? + (skipped=(?P<skips>\d+),?\s*)? + (expected\s+failures=(?P<expected_failures>\d+),?\s*)? + (unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)? + \) + """, re.DOTALL | re.VERBOSE) + + def setup(self): + instance_home = self.instance and 'unit_test.%u' % self.instance \ + or 'unit_test' + tests = os.path.join(instance_home, 'tests') + if os.path.exists(tests): + shutil.rmtree(instance_home + '.previous', True) + shutil.move(tests, instance_home + '.previous') + + def run(self, test): + return self.runUnitTest(test) + + def runUnitTest(self, *args, **kw): + if self.instance: + args = ('--instance_home=unit_test.%u' % self.instance,) + args + instance_number = self.instance or 1 + mysql_db_list = self.mysql_db_list[ + (instance_number-1) * self.mysql_db_count: + (instance_number) * self.mysql_db_count] + if len(mysql_db_list) > 1: + kw['extra_sql_connection_string_list'] = ','.join(mysql_db_list[1:]) + try: + runUnitTest = os.environ.get('RUN_UNIT_TEST', + 'runUnitTest') + args = tuple(shlex.split(runUnitTest)) \ + + ('--verbose', '--erp5_sql_connection_string=' + mysql_db_list[0]) \ + + args + status_dict = self.spawn(*args, **kw) + except SubprocessError, e: + status_dict = e.status_dict + test_log = status_dict['stderr'] + search = self.RUN_RE.search(test_log) + if search: + groupdict = search.groupdict() + status_dict.update(duration=float(groupdict['seconds']), + test_count=int(groupdict['all_tests'])) + search = self.STATUS_RE.search(test_log) + if search: + groupdict = search.groupdict() + status_dict.update(error_count=int(groupdict['errors'] or 0), + failure_count=int(groupdict['failures'] or 0), + skip_count=int(groupdict['skips'] or 0) + +int(groupdict['expected_failures'] or 0) + +int(groupdict['unexpected_successes'] or 0)) + return status_dict + +class SubprocessError(EnvironmentError): + def __init__(self, status_dict): + self.status_dict = status_dict + def __getattr__(self, name): + return self.status_dict[name] + def __str__(self): + return 'Error %i' % self.status_code + +sys.modules['test_suite'] = module = imp.new_module('test_suite') +for var in SubprocessError, TestSuite, ERP5TypeTestSuite: + setattr(module, var.__name__, var) \ No newline at end of file diff --git a/product/ERP5Type/tests/runTestSuite.py b/product/ERP5Type/tests/runTestSuite.py new file mode 100644 index 0000000000..af2981cbf8 --- /dev/null +++ b/product/ERP5Type/tests/runTestSuite.py @@ -0,0 +1,85 @@ +#!/usr/bin/python2.6 +import argparse, pprint, socket, sys, time, xmlrpclib +from DummyTaskDistributionTool import DummyTaskDistributionTool +from ERP5TypeTestSuite import ERP5TypeTestSuite + +def makeSuite(node_quantity=None, test_suite=None, revision=None, + db_list=None): + for k in sys.modules.keys(): + if k == 'tests' or k.startswith('tests.'): + del sys.modules[k] + module_name, class_name = ('tests.' + \ + test_suite).rsplit('.', 1) + + try: + suite_class = getattr(__import__(module_name, None, None, [class_name]), + class_name) + except (AttributeError, ImportError): + raise + suite = suite_class(revision=revision, + max_instance_count=node_quantity, + mysql_db_list=db_list.split(','), + ) + return suite + +def safeRpcCall(function, *args): + retry = 64 + while True: + try: + return function(*args) + except (socket.error, xmlrpclib.ProtocolError), e: + print >>sys.stderr, e + pprint.pprint(args, file(function._Method__name, 'w')) + time.sleep(retry) + retry += retry >> 1 + +def main(): + parser = argparse.ArgumentParser(description='Run a test suite.') + parser.add_argument('--test_suite', help='The test suite name') + parser.add_argument('--revision', help='The revision to test', + default='dummy_revision') + parser.add_argument('--node_quantity', help='Number of parallel tests to run', + default=1, type=int) + parser.add_argument('--master_url', + help='The Url of Master controling many suites', + default=None) + parser.add_argument('--db_list', help='A list of sql connection strings') + # parameters that needs to be passed to runUnitTest + parser.add_argument('--conversion_server_hostname', default=None) + parser.add_argument('--conversion_server_port', default=None) + parser.add_argument('--volatile_memcached_server_hostname', default=None) + parser.add_argument('--volatile_memcached_server_port', default=None) + parser.add_argument('--persistent_memcached_server_hostname', default=None) + parser.add_argument('--persistent_memcached_server_port', default=None) + + args = parser.parse_args() + if args.master_url is not None: + master_url = args.master_url + if master_url[-1] != '/': + master_url += '/' + master = xmlrpclib.ServerProxy("%s%s" % + (master_url, 'portal_task_distribution'), + allow_none=1) + assert master.getProtocolRevision() == 1 + else: + master = DummyTaskDistributionTool() + revision = args.revision + if ',' in revision: + revision = revision.split(',') + suite = makeSuite(test_suite=args.test_suite, + node_quantity=args.node_quantity, + revision=revision, + db_list=args.db_list) + test_result = safeRpcCall(master.createTestResult, + args.test_suite, revision, suite.getTestList(), + suite.allow_restart) + if test_result: + test_result_path, test_revision = test_result + while suite.acquire(): + test = safeRpcCall(master.startUnitTest, test_result_path, + suite.running.keys()) + if test: + suite.start(test[1], lambda status_dict, __test_path=test[0]: + safeRpcCall(master.stopUnitTest, __test_path, status_dict)) + elif not suite.running: + break \ No newline at end of file -- 2.30.9