Commit 16f437c6 authored by Julien Muchembled's avatar Julien Muchembled

Define test suites for ERP5 project in /tests

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@41845 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 8cd41163
#!/usr/bin/python
import atexit, errno, imp, os, pprint, random, re, socket, shlex, shutil
import signal, string, subprocess, sys, threading, time, urlparse, xmlrpclib
SVN_UP_REV=re.compile(r'^(?:At|Updated to) revision (\d+).$')
SVN_CHANGED_REV=re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE)
def killallIfParentDies():
os.setsid()
atexit.register(lambda: os.kill(0, 9))
from ctypes import cdll
libc = cdll.LoadLibrary('libc.so.6')
def PR_SET_PDEATHSIG(sig):
if libc.prctl(1, sig):
raise OSError
PR_SET_PDEATHSIG(signal.SIGINT)
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
output.write(data)
buffer.append(data)
if p.stdout:
stdout = []
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, sys.stdout, stdout))
stdout_thread.setDaemon(True)
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, sys.stderr, stderr))
stderr_thread.setDaemon(True)
stderr_thread.start()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
p.wait()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
class Updater(object):
realtime_output = True
stdin = file(os.devnull)
def __init__(self, revision=None):
self.revision = revision
self._path_list = []
def deletePycFiles(self, path):
"""Delete *.pyc files so that deleted/moved files can not be imported"""
for path, dir_list, file_list in os.walk(path):
for file in file_list:
if file[-4:] in ('.pyc', '.pyo'):
# allow several processes clean the same folder at the same time
try:
os.remove(os.path.join(path, file))
except OSError, e:
if e.errno != errno.ENOENT:
raise
def spawn(self, *args, **kw):
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '\n$ ' + command
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
if self.realtime_output:
stdout, stderr = subprocess_capture(p)
else:
stdout, stderr = p.communicate()
sys.stdout.write(stdout)
sys.stderr.write(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
def _git(self, *args, **kw):
return self.spawn('git', *args, **kw)['stdout'].strip()
def getRevision(self):
if os.path.isdir('.git'):
h = self._git('log', '-1', '--format=%H', *self._path_list)
return str(int(self._git('svn', 'find-rev', h)))
if os.path.isdir('.svn'):
stdout = self.spawn('svn', 'info', *self._path_list)['stdout']
return str(max(map(int, SVN_CHANGED_REV.findall(stdout))))
raise NotImplementedError
def checkout(self, *path_list):
revision = self.revision
if os.path.isdir('.git'):
# edit .git/info/sparse-checkout if you want sparse checkout
if revision:
h = self._git('svn', 'find-rev', 'r%s' % revision)
assert h
if h != self._git('rev-parse', 'HEAD'):
self.deletePycFiles('.')
self._git('reset', '--merge', h)
else:
self.deletePycFiles('.')
self._git('svn', 'rebase')
self.revision = str(int(self._git('svn', 'find-rev', 'HEAD')))
elif os.path.isdir('.svn'):
# following code allows sparse checkout
def svn_mkdirs(path):
path = os.path.dirname(path)
if path and not os.path.isdir(path):
svn_mkdirs(path)
self.spawn(*(args + ['--depth=empty', path]))
for path in path_list or ('.',):
args = ['svn', 'up', '--force', '--non-interactive']
if revision:
args.append('-r%s' % revision)
svn_mkdirs(path)
args += '--depth=infinity', path
self.deletePycFiles(path)
try:
status_dict = self.spawn(*args)
except SubprocessError, e:
if 'cleanup' not in e.stderr:
raise
self.spawn('svn', 'cleanup', path)
status_dict = self.spawn(*args)
if not revision:
self.revision = revision = SVN_UP_REV.findall(
status_dict['stdout'].splitlines()[-1])[0]
else:
raise NotImplementedError
self._path_list += path_list
class TestSuite(Updater):
mysql_db_count = 1
allow_restart = False
def __init__(self, max_instance_count, **kw):
self.__dict__.update(kw)
self._path_list = ['tests']
pool = threading.Semaphore(max_instance_count)
self.acquire = pool.acquire
self.release = pool.release
self._instance = threading.local()
self._pool = max_instance_count == 1 and [None] or \
range(1, max_instance_count + 1)
self._ready = set()
self.running = {}
if max_instance_count != 1:
self.realtime_output = False
elif os.isatty(1):
self.realtime_output = True
instance = property(lambda self: self._instance.id)
def start(self, test, on_stop=None):
assert test not in self.running
self.running[test] = instance = self._pool.pop(0)
def run():
self._instance.id = instance
if instance not in self._ready:
self._ready.add(instance)
self.setup()
status_dict = self.run(test)
if on_stop is not None:
on_stop(status_dict)
self._pool.append(self.running.pop(test))
self.release()
threading.Thread(target=run).start()
def update(self):
self.checkout() # by default, update everything
def setup(self):
pass
def run(self, test):
raise NotImplementedError
def getTestList(self):
raise NotImplementedError
class ERP5TypeTestSuite(TestSuite):
RUN_RE = re.compile(
r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s',
re.DOTALL)
STATUS_RE = re.compile(r"""
(OK|FAILED)\s+\(
(failures=(?P<failures>\d+),?\s*)?
(errors=(?P<errors>\d+),?\s*)?
(skipped=(?P<skips>\d+),?\s*)?
(expected\s+failures=(?P<expected_failures>\d+),?\s*)?
(unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)?
\)
""", re.DOTALL | re.VERBOSE)
def setup(self):
instance_home = self.instance and 'unit_test.%u' % self.instance \
or 'unit_test'
tests = os.path.join(instance_home, 'tests')
if os.path.exists(tests):
shutil.rmtree(instance_home + '.previous', True)
shutil.move(tests, instance_home + '.previous')
def run(self, test):
return self.runUnitTest(test)
def runUnitTest(self, *args, **kw):
if self.instance:
args = ('--instance_home=unit_test.%u' % self.instance,) + args
mysql_db_list = [string.Template(x).substitute(I=self.instance or '')
for x in self.mysql_db_list]
if len(mysql_db_list) > 1:
kw['extra_sql_connection_string_list'] = ','.join(mysql_db_list[1:])
try:
runUnitTest = os.environ.get('RUN_UNIT_TEST',
'Products/ERP5Type/tests/runUnitTest.py')
args = tuple(shlex.split(runUnitTest)) \
+ ('--verbose', '--erp5_sql_connection_string=' + mysql_db_list[0]) \
+ args
status_dict = self.spawn(*args, **kw)
except SubprocessError, e:
status_dict = e.status_dict
test_log = status_dict['stderr']
search = self.RUN_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(duration=float(groupdict['seconds']),
test_count=int(groupdict['all_tests']))
search = self.STATUS_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(error_count=int(groupdict['errors'] or 0),
failure_count=int(groupdict['failures'] or 0),
skip_count=int(groupdict['skips'] or 0)
+int(groupdict['expected_failures'] or 0)
+int(groupdict['unexpected_successes'] or 0))
return status_dict
#class LoadSaveExample(ERP5TypeTestSuite):
# def getTestList(self):
# return [test_path.split(os.sep)[-1][:-3]
# for test_path in glob.glob('tests/test*.py')]
#
# def setup(self):
# TestSuite.setup(self)
# return self.runUnitTest(self, '--save', 'testFoo')
#
# def run(self, test):
# return self.runUnitTest(self, '--load', test)
sys.modules['test_suite'] = module = imp.new_module('test_suite')
for var in TestSuite, ERP5TypeTestSuite:
setattr(module, var.__name__, var)
def safeRpcCall(function, *args):
retry = 64
while True:
try:
return function(*args)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def getOptionParser():
from optparse import OptionParser
parser = OptionParser(usage="%prog [options] <SUITE>[=<MAX_INSTANCES>]")
_ = parser.add_option
_("--master", help="URL of ERP5 instance, used as master node")
_("--mysql_db_list", help="comma-separated list of connection strings")
return parser
def main():
os.environ['LC_ALL'] = 'C'
parser = getOptionParser()
options, args = parser.parse_args()
try:
name, = args
if '=' in name:
name, max_instance_count = name.split('=')
max_instance_count = int(max_instance_count)
else:
max_instance_count = 1
except ValueError:
parser.error("invalid arguments")
db_list = options.mysql_db_list
if db_list:
db_list = db_list.split(',')
multi = max_instance_count != 1
try:
for db in db_list:
if db == string.Template(db).substitute(I=1) and multi:
raise KeyError
except KeyError:
parser.error("invalid value for --mysql_db_list")
else:
db_list = (max_instance_count == 1 and 'test test' or 'test$I test'),
def makeSuite(revision=None):
updater = Updater(revision)
updater.checkout('tests')
tests = imp.load_module('tests', *imp.find_module('tests', ['.']))
try:
suite_class = getattr(tests, name)
except AttributeError:
parser.error("unknown test suite")
if len(db_list) < suite_class.mysql_db_count:
parser.error("%r suite needs %u DB (only %u given)" %
(name, suite_class.mysql_db_count, len(db_list)))
suite = suite_class(revision=updater.revision,
max_instance_count=max_instance_count,
mysql_db_list=db_list[:suite_class.mysql_db_count])
suite.update()
return suite
portal_url = options.master
if portal_url[-1] != '/':
portal_url += '/'
portal = xmlrpclib.ServerProxy(portal_url, allow_none=1)
master = portal.portal_task_distribution
assert master.getProtocolRevision() == 1
suite = makeSuite()
revision = suite.getRevision()
test_result = safeRpcCall(master.createTestResult,
name, revision, suite.getTestList(), suite.allow_restart)
if test_result:
test_result_path, test_revision = test_result
url_parts = list(urlparse.urlparse(portal_url + test_result_path))
url_parts[1] = url_parts[1].split('@')[-1]
print 'ERP5_TEST_URL %s OK' % urlparse.urlunparse(url_parts) # for buildbot
while suite.acquire():
test = safeRpcCall(master.startUnitTest, test_result_path,
suite.running.keys())
if test:
if revision != test_revision:
suite = makeSuite(test_revision)
revision = test_revision
suite.acquire()
suite.start(test[1], lambda status_dict, __test_path=test[0]:
safeRpcCall(master.stopUnitTest, __test_path, status_dict))
elif not suite.running:
break
# We are finishing the suite. Let's disable idle nodes.
if __name__ == '__main__':
if not os.isatty(0):
killallIfParentDies()
sys.exit(main())
import glob, os, subprocess
# test_suite is provided by 'run_test_suite'
from test_suite import ERP5TypeTestSuite
class _ERP5(ERP5TypeTestSuite):
realtime_output = False
enabled_product_list = ('CMFActivity', 'CMFCategory', 'ERP5', 'ERP5Catalog',
'ERP5eGovSecurity', 'ERP5Form', 'ERP5Legacy',
'ERP5OOo', 'ERP5PropertySheetLegacy', 'ERP5Security',
'ERP5Subversion', 'ERP5SyncML', 'ERP5Type',
'ERP5Wizard', 'Formulator', 'HBTreeFolder2',
'MailTemplates', 'PortalTransforms', 'TimerService',
'ZLDAPConnection', 'ZLDAPMethods', 'ZMySQLDA',
'ZMySQLDDA', 'ZSQLCatalog')
def enableProducts(self):
product_set = set(self.enabled_product_list)
try:
dir_set = set(os.walk('Products').next()[1])
for product in dir_set - product_set:
os.unlink(os.path.join('Products', product))
product_set -= dir_set
except StopIteration:
os.mkdir('Products')
for product in product_set:
os.symlink(os.path.join('..', 'products', product),
os.path.join('Products', product))
def update(self, working_copy_list=None):
self.checkout('products', 'bt5')
self.enableProducts()
class PERF(_ERP5):
allow_restart = True
def getTestList(self):
return ('testPerformance',) * 3
def update(self):
self.checkout('products', 'bt5/erp5_base', 'bt5/erp5_ui_test')
self.enableProducts()
class ERP5(_ERP5):
mysql_db_count = 3
def getTestList(self):
test_list = []
for test_path in glob.glob('Products/*/tests/test*.py') + \
glob.glob('bt5/*/TestTemplateItem/test*.py'):
test_case = test_path.split(os.sep)[-1][:-3] # remove .py
product = test_path.split(os.sep)[-3]
# don't test 3rd party products
if product in ('PortalTransforms', 'MailTemplates'):
continue
# skip some tests
if test_case.startswith('testLive') or test_case.startswith('testVifib') \
or test_case in ('testPerformance', 'testSimulationPerformance'):
continue
test_list.append(test_case)
return test_list
def run(self, test):
if test in ('testConflictResolution', 'testInvalidationBug'):
status_dict = self.runUnitTest('--save', test)
if not status_dict['status_code']:
status_dict = self.runUnitTest('--load', '--activity_node=2', test)
return status_dict
return super(ERP5, self).run(test)
class ERP5_simulation(_ERP5):
def getTestList(self):
p = subprocess.Popen(('grep', '-lr', '--include=test*.py',
'-e', '@newSimulationExpectedFailure',
'-e', 'erp5_report_new_simulation_failures',
'Products/ERP5/tests'),
stdout=subprocess.PIPE)
return sorted(os.path.basename(x)[:-3]
for x in p.communicate()[0].splitlines())
def runUnitTest(self, *args, **kw):
return super(ERP5_simulation, self).runUnitTest(
erp5_report_new_simulation_failures='1', *args, **kw)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment