Commit cf300184 authored by Kirill Smelkov's avatar Kirill Smelkov

Move detection of leaked processes to trun

When test run is canceled via signal, we want to send SIGTERM to trun
and let it kill/wait the tested processes. However until now we were
sending SIGTERM to trun and immediately checking whether some spawned
tested processes remain without giving any time for trun to complete first.

Fix this by moving the code that detects/cleanups leaked processes to
trun itself, and in the main nxdtest driver to only send SIGTERM to trun
and let trun do the cleanup/leaked processes detection.

This fixes erroneous "leaked process detected" as noticed by @jerome in
the following example:

    (nexedi/nxdtest!16 (comment 150835))
    $ nxdtest
    ...
    >>> sleep
    $ sleep 10
    ^C# Interrupt
    # stopping due to cancel
    # leaked pid=188877 'sleep' ['sleep', '10']		<-- "leaked" is a bit misleading here
    error   sleep   1.030s  # 1t 1e 0f 0s
    # test run canceled
    # ran 1 test case:  1·error

After the patch the output in this case becomes:

    $ nxdtest
    ...
    >>> sleep
    $ sleep 10
    ^C# Interrupt
    ok      sleep   0.604s  # 1t 0e 0f 0s
    # test run canceled
    # ran 1 test case:  1·ok

There is no more "leaked pid=...." emitted, but the change from "error" to "ok" status is wrong.
We will fix it in the next patch.

/reviewed-by @jerome
/reviewed-on nexedi/nxdtest!16
parent 548a5325
......@@ -59,13 +59,11 @@ from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE
from time import strftime, gmtime, localtime
import os, sys, argparse, logging, traceback, re, pwd, socket
from errno import ESRCH, EPERM
from os.path import dirname
import six
from golang import b, chan, defer, func, go, select, default
from golang import errors, context, os as gos, sync, syscall, time
from golang.os import signal
import psutil
# trun.py is a helper via which we run tests.
trun_py = "%s/trun.py" % dirname(__file__)
......@@ -293,16 +291,14 @@ def main():
# In addition to kw['env'], kw['envadj'] allows users to define
# only adjustments instead of providing full env dict.
# Test command is spawned with unchanged cwd. Instance wrapper cares to set cwd before running us.
# The test is run via trun.py helper to which we delegate checking
# for leaked files/processes/mounts/... We trust trun.py not to hang.
kw = t.kw.copy()
env = kw.pop('env', os.environ)
env = env.copy()
envadj = kw.pop('envadj', {})
env.update(envadj)
# run the command in a new session, so that it is easy to find out leaked spawned subprocesses.
# TODO session -> cgroup, because a child process could create another new session.
def newsession():
os.setsid()
p = Popen([sys.executable, trun_py] + t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, preexec_fn=newsession, **kw)
p = Popen([sys.executable, trun_py] + t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, **kw)
except:
stdout, stderr = b'', b(traceback.format_exc())
bstderr.write(stderr)
......@@ -315,8 +311,10 @@ def main():
wg = sync.WorkGroup(ctx)
wg.go(tee, p.stdout, bstdout, buf_out)
wg.go(tee, p.stderr, bstderr, buf_err)
# wait for p to exit
# wait for trun to exit; terminate it on cancel
@func
def _(ctx):
defer(p.wait)
err = None
while 1:
done = p.poll()
......@@ -336,20 +334,6 @@ def main():
time.sleep(0.1)
# p should be done - check if it leaked processes and terminate/kill them
# kill p in the end if it does not stop from just SIGTERM.
while 1:
procv = session_proclist(sid=p.pid)
if len(procv) == 0:
break
for proc in procv:
if proc.pid != p.pid:
emit('# leaked pid=%d %r %s' % (proc.pid, proc.name(), proc.cmdline()))
proc.terminate()
gone, alive = psutil.wait_procs(procv, timeout=5)
for proc in alive:
proc.kill()
if err is not None:
raise err
wg.go(_)
......@@ -472,23 +456,6 @@ def get1(path, field, default=None):
raise KeyError('%s does not have field %r' % (path, field))
# session_proclist returns all processes that belong to specified session.
def session_proclist(sid):
procv = []
for proc in psutil.process_iter(['pid']):
try:
proc_sid = os.getsid(proc.pid)
except OSError as e:
if e.errno in (ESRCH, EPERM):
# proc either finished, or we are not allowed to retrieve its sid
# (see getsid(1) for details)
continue
raise
if proc_sid == sid:
procv.append(proc)
return procv
# LocalTestResult* handle tests runs, when master_url was not provided and tests are run locally.
class LocalTestResult:
......
......@@ -359,6 +359,7 @@ TestCase('TEST1', ['%s'])
assert "# master asks to cancel test run" in captured.out
assert "# test run canceled" in captured.out
assert "hang: terminating" in captured.out
assert "leaked pid" not in captured.out
assert captured.err == ''
......@@ -414,3 +415,4 @@ TestCase('TEST1', ['%s'])
assert b("# %s" % sigmsg) in out
assert b"# test run canceled" in out
assert b"hang: terminating" in out
assert b"leaked pid" not in out
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -20,10 +20,10 @@
""" `trun ...` - run test specified by `...`
The test is run in dedicated environment, which, after test completes, is
checked for leaked files, leaked mount entries, etc.
checked for leaked files, leaked mount entries, leaked processes, etc.
The environment is activated only if user namespaces are available(*).
If user namespaces are not available, the test is still run but without the checks.
If user namespaces are not available, the test is still run but without most of the checks.
(*) see https://man7.org/linux/man-pages/man7/user_namespaces.7.html
"""
......@@ -31,10 +31,13 @@ If user namespaces are not available, the test is still run but without the chec
from __future__ import print_function, absolute_import
import errno, os, sys, stat, difflib, pwd, grp, shutil
from subprocess import check_call as xrun, CalledProcessError
from subprocess import check_call as xrun, CalledProcessError, Popen
from os.path import join, devnull
from golang import func, defer
from golang import func, defer, chan, select, default
from golang import os as gos, syscall, time
from golang.os import signal
import prctl
import psutil
# userns_available detects if user-namespaces and necessary features are provided by OS kernel.
def userns_available(): # -> (yes|no, {details})
......@@ -99,9 +102,14 @@ def main():
# either respawned in new namespace, or entered here without respawn with in_userns=n.
# run the test via corresponding driver.
run = run_in_userns if in_userns else run_no_userns
@func
def _():
try:
xrun(sys.argv[1:])
# run the command in a new session, so that it is easy to find out leaked spawned subprocesses.
# TODO session -> cgroup, because a child process could create another new session.
def newsession():
os.setsid()
p = Popen(sys.argv[1:], preexec_fn=newsession)
except OSError as e:
if e.errno != errno.ENOENT:
raise
......@@ -109,8 +117,50 @@ def main():
print("%s: %s" % (sys.argv[1], os.strerror(e.errno)), # e.filename is also ø on py2
file=sys.stderr)
sys.exit(127)
except CalledProcessError as e:
sys.exit(e.returncode)
def _():
p.wait()
sys.exit(p.returncode)
defer(_)
# in the end: check if p leaked processes and terminate/kill them
# kill p in the end if it does not stop from just SIGTERM.
def _():
while 1:
procv = session_proclist(sid=p.pid)
if len(procv) == 0:
break
for proc in procv:
if proc.pid != p.pid:
print('# leaked pid=%d %r %s' % (proc.pid, proc.name(), proc.cmdline()))
proc.terminate()
gone, alive = psutil.wait_procs(procv, timeout=5)
for proc in alive:
proc.kill()
defer(_)
# wait for p to complete
# terminate it on any signal
sigq = chan(1, dtype=gos.Signal)
signal.Notify(sigq, syscall.SIGINT, syscall.SIGTERM)
def _():
signal.Stop(sigq)
defer(_)
while 1:
if p.poll() is not None:
break
_, _rx = select(
default, # 0
sigq.recv, # 1
)
if _ == 1:
p.terminate()
break
time.sleep(0.1)
run(_)
......@@ -227,6 +277,23 @@ def writefile(path, data):
with open(path, "w") as f:
f.write(data)
# session_proclist returns all processes that belong to specified session.
def session_proclist(sid):
procv = []
for proc in psutil.process_iter(['pid']):
try:
proc_sid = os.getsid(proc.pid)
except OSError as e:
if e.errno in (errno.ESRCH, errno.EPERM):
# proc either finished, or we are not allowed to retrieve its sid
# (see getsid(1) for details)
continue
raise
if proc_sid == sid:
procv.append(proc)
return procv
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment