Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
140
Merge Requests
140
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
nexedi
erp5
Commits
f9ad726b
Commit
f9ad726b
authored
Sep 03, 2012
by
Rafael Monnerat
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Drop code duplication and use code from erp5.util.
parent
b3d373cc
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
1 addition
and
207 deletions
+1
-207
product/ERP5Type/tests/ERP5TypeTestSuite.py
product/ERP5Type/tests/ERP5TypeTestSuite.py
+1
-207
No files found.
product/ERP5Type/tests/ERP5TypeTestSuite.py
View file @
f9ad726b
import
re
,
imp
,
sys
,
os
,
shlex
,
shutil
,
glob
,
random
try
:
from
erp5.util.testsuite
import
TestSuite
,
SubprocessError
except
ImportError
:
# erp5.util.testsuite should be updated
print
"Please update erp5.util to version >= 0.4.3"
import
threading
,
subprocess
import
traceback
import
errno
from
pprint
import
pprint
_format_command_search
=
re
.
compile
(
"[[
\
\
s $({?*
\
\
`#~';<>&|]"
).
search
_format_command_escape
=
lambda
s
:
"'%s'"
%
r"'\''"
.
join
(
s
.
split
(
"'"
))
def
format_command
(
*
args
,
**
kw
):
cmdline
=
[]
for
k
,
v
in
sorted
(
kw
.
items
()):
if
_format_command_search
(
v
):
v
=
_format_command_escape
(
v
)
cmdline
.
append
(
'%s=%s'
%
(
k
,
v
))
for
v
in
args
:
if
_format_command_search
(
v
):
v
=
_format_command_escape
(
v
)
cmdline
.
append
(
v
)
return
' '
.
join
(
cmdline
)
def
subprocess_capture
(
p
,
quiet
=
False
):
def
readerthread
(
input
,
output
,
buffer
):
while
True
:
data
=
input
.
readline
()
if
not
data
:
break
output
(
data
)
buffer
.
append
(
data
)
if
p
.
stdout
:
stdout
=
[]
output
=
quiet
and
(
lambda
data
:
None
)
or
sys
.
stdout
.
write
stdout_thread
=
threading
.
Thread
(
target
=
readerthread
,
args
=
(
p
.
stdout
,
output
,
stdout
))
stdout_thread
.
setDaemon
(
True
)
stdout_thread
.
start
()
if
p
.
stderr
:
stderr
=
[]
stderr_thread
=
threading
.
Thread
(
target
=
readerthread
,
args
=
(
p
.
stderr
,
sys
.
stderr
.
write
,
stderr
))
stderr_thread
.
setDaemon
(
True
)
stderr_thread
.
start
()
if
p
.
stdout
:
stdout_thread
.
join
()
if
p
.
stderr
:
stderr_thread
.
join
()
p
.
wait
()
return
(
p
.
stdout
and
''
.
join
(
stdout
),
p
.
stderr
and
''
.
join
(
stderr
))
class
SubprocessError
(
EnvironmentError
):
def
__init__
(
self
,
status_dict
):
self
.
status_dict
=
status_dict
def
__getattr__
(
self
,
name
):
return
self
.
status_dict
[
name
]
def
__str__
(
self
):
return
'Error %i'
%
self
.
status_code
class
Persistent
(
object
):
"""Very simple persistent data storage for optimization purpose
This tool should become a standalone daemon communicating only with an ERP5
instance. But for the moment, it only execute 1 test suite and exists,
and test suite classes may want some information from previous runs.
"""
def
__init__
(
self
,
filename
):
self
.
_filename
=
filename
def
__getattr__
(
self
,
attr
):
if
attr
==
'_db'
:
try
:
db
=
file
(
self
.
_filename
,
'r+'
)
except
IOError
,
e
:
if
e
.
errno
!=
errno
.
ENOENT
:
raise
db
=
file
(
self
.
_filename
,
'w+'
)
else
:
try
:
self
.
__dict__
.
update
(
eval
(
db
.
read
()))
except
StandardError
:
pass
self
.
_db
=
db
return
db
self
.
_db
return
super
(
Persistent
,
self
).
__getattribute__
(
attr
)
def
sync
(
self
):
self
.
_db
.
seek
(
0
)
db
=
dict
(
x
for
x
in
self
.
__dict__
.
iteritems
()
if
x
[
0
][:
1
]
!=
'_'
)
pprint
.
pprint
(
db
,
self
.
_db
)
self
.
_db
.
truncate
()
class
TestSuite
(
object
):
"""
Subclasses may redefine the following properties:
mysql_db_count (integer, >=1)
Maximum number of SQL databases connection strings needed by any tests ran
in this suite. Tests will get mysql_db_count - 1 connection strings in
extra_sql_connection_string_list environment variable.
"""
RUN_RE
=
re
.
compile
(
r'Ran (?P<all_tests>\
d+)
tests? in (?P<seconds>\
d+
\.\
d+)s
',
re.DOTALL)
STATUS_RE = re.compile(r"""
(OK|FAILED)\
s+
\(
(failures=(?P<failures>\
d+),?
\s*)?
(errors=(?P<errors>\
d+),?
\s*)?
(skipped=(?P<skips>\
d+),?
\s*)?
(expected\
s+
failures=(?P<expected_failures>\
d+),?
\s*)?
(unexpected\
s+successes=(?P<u
nexpected_successes>\
d+),?
\s*)?
\
)
""", re.DOTALL | re.VERBOSE)
mysql_db_count = 1
allow_restart = False
realtime_output = True
stdin = file(os.devnull)
def __init__(self, max_instance_count, **kw):
self.__dict__.update(kw)
self._path_list = ['
tests
']
pool = threading.Semaphore(max_instance_count)
self.acquire = pool.acquire
self.release = pool.release
self._instance = threading.local()
self._pool = max_instance_count == 1 and [None] or
\
range(1, max_instance_count + 1)
self._ready = set()
self.running = {}
if max_instance_count != 1:
self.realtime_output = False
elif os.isatty(1):
self.realtime_output = True
self.persistent = Persistent('
run_test_suite
-%
s
.
tmp
'
% self.__class__.__name__)
instance = property(lambda self: self._instance.id)
def start(self, test, on_stop=None):
assert test not in self.running
self.running[test] = instance = self._pool.pop(0)
def run():
try:
self._instance.id = instance
if instance not in self._ready:
self._ready.add(instance)
self.setup()
status_dict = self.run(test)
if on_stop is not None:
on_stop(status_dict)
self._pool.append(self.running.pop(test))
finally:
self.release()
thread = threading.Thread(target=run)
thread.setDaemon(True)
thread.start()
def update(self):
self.checkout() # by default, update everything
def setup(self):
pass
def run(self, test):
raise NotImplementedError
def getTestList(self):
raise NotImplementedError
def spawn(self, *args, **kw):
quiet = kw.pop('
quiet
', False)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '
\
n
$
' + command
sys.stdout.flush()
try:
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
except Exception:
# Catch any exception here, to warn user instead of beeing silent,
# by generating fake error result
result = dict(status_code=-1,
command=command,
stderr=traceback.format_exc(),
stdout='')
raise SubprocessError(result)
if self.realtime_output:
stdout, stderr = subprocess_capture(p, quiet)
else:
stdout, stderr = p.communicate()
if not quiet:
sys.stdout.write(stdout)
sys.stderr.write(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
from
erp5.util.testsuite
import
TestSuite
,
SubprocessError
class
ERP5TypeTestSuite
(
TestSuite
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment