Commit 1e6a1cc6 authored by Kirill Smelkov's avatar Kirill Smelkov

Switch tee from threading.Thread to sync.WorkGroup

The reason is that with threading.Thread if exception happens in that
spawned thread, this error is not propagated to main driver, while with
sync.WorkGroup an exception from any spawned worker is propagated back
to main. For example with the following injected error

    --- a/nxdtest/__init__.py
    +++ b/nxdtest/__init__.py
    @@ -267,6 +267,7 @@ def main():

     # tee, similar to tee(1) utility, copies data from fin to fout appending them to buf.
     def tee(ctx, fin, fout, buf):
    +    1/0
         while 1:

before this patch nxdtest behaves like ...

    (neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ nxdtest
    date:   Tue, 24 Nov 2020 14:55:08 MSK
    xnode:  kirr@deco.navytux.spb.ru
    uname:  Linux deco 5.9.0-2-amd64 #1 SMP Debian 5.9.6-1 (2020-11-08) x86_64
    cpu:    Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz

    >>> pytest
    $ python -m pytest
    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
        self.run()
      File "/usr/lib/python2.7/threading.py", line 754, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
        1/0
    ZeroDivisionError: integer division or modulo by zero

    Exception in thread Thread-1:
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
        self.run()
      File "/usr/lib/python2.7/threading.py", line 754, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
        1/0
    ZeroDivisionError: integer division or modulo by zero

    error   pytest  0.583s  # 1t 1e 0f 0s
    (neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ echo $?
    0

Here the error in another thread is only printed, but nxdtest is not aborted.
Above it reported "error", but e.g. when testing pygolang/py3 and raising an
error in tee it even reported it was succeeding
( !6 (comment 121393) ):

    slapuser34@vifibcloud-rapidspace-hosting-007:~/srv/runner/instance/slappart0$ ./bin/runTestSuite
    date:   Tue, 24 Nov 2020 12:51:23 MSK
    xnode:  slapuser34@vifibcloud-rapidspace-hosting-007
    uname:  Linux vifibcloud-rapidspace-hosting-007 4.19.0-6-amd64 #1 SMP Debian 4.19.67-2+deb10u2 (2019-11-11) x86_64
    cpu:    Intel(R) Xeon(R) CPU E5-2678 v3 @ 2.50GHz

    >>> thread
    $ python -m pytest
    Exception in thread Thread-1:
    Traceback (most recent call last):
      File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 926, in _bootstrap_inner
        self.run()
      File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/srv/slapgrid/slappart34/srv/runner/software/44fe7dd3f13ecd100894c6368a35c055/parts/nxdtest/nxdtest/__init__.py", line 268, in tee
        fout.write(data)
    TypeError: write() argument must be str, not bytes

    ok      thread  9.145s  # 1t 0e 0f 0s

    >>> gevent
    $ gpython -m pytest
    Exception in thread Thread-3:
    Traceback (most recent call last):
      File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 926, in _bootstrap_inner
        self.run()
      File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/srv/slapgrid/slappart34/srv/runner/software/44fe7dd3f13ecd100894c6368a35c055/parts/nxdtest/nxdtest/__init__.py", line 268, in tee
        fout.write(data)
    TypeError: write() argument must be str, not bytes

    ok      gevent  21.980s # 1t 0e 0f 0s

After this patch nxdtest correctly handles and propagates an error originating
in spawned thread back to main driver:

    (neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ nxdtest
    date:   Tue, 24 Nov 2020 14:54:19 MSK
    xnode:  kirr@deco.navytux.spb.ru
    uname:  Linux deco 5.9.0-2-amd64 #1 SMP Debian 5.9.6-1 (2020-11-08) x86_64
    cpu:    Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz

    >>> pytest
    $ python -m pytest
    Traceback (most recent call last):
      File "/home/kirr/src/wendelin/venv/z4-dev/bin/nxdtest", line 11, in <module>
        load_entry_point('nxdtest', 'console_scripts', 'nxdtest')()
      File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 230, in main
        wg.wait()
      File "golang/_sync.pyx", line 237, in golang._sync.PyWorkGroup.wait
        pyerr_reraise(pyerr)
      File "golang/_sync.pyx", line 217, in golang._sync.PyWorkGroup.go.pyrunf
        f(pywg._pyctx, *argv, **kw)
      File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
        1/0
    ZeroDivisionError: integer division or modulo by zero
    (neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ echo $?
    1

NOTE sync.WorkGroup requires every worker to handle context cancellation, so
that whenever there is an error, all other workers are canceled. We add such
cancellation handling to tee but only lightly: before going to block in
read/write syscalls we check for whether ctx is canceled or not. However the
proper handling would be to switch file descriptors into non-block mode and to
select at every IO point on both potential IO events and potential
cancellation. This is left as TODO for the future.

/reviewed-on !7
parent 40e2c4ab
...@@ -58,9 +58,10 @@ from __future__ import print_function, absolute_import ...@@ -58,9 +58,10 @@ from __future__ import print_function, absolute_import
from erp5.util.taskdistribution import TaskDistributor from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from time import time, strftime, gmtime, localtime from time import time, strftime, gmtime, localtime
import os, sys, threading, argparse, logging, traceback, re, pwd, socket import os, sys, argparse, logging, traceback, re, pwd, socket
import six import six
from golang import b from golang import b
from golang import context, sync
# loadNXDTestFile loads .nxdtest file located @path. # loadNXDTestFile loads .nxdtest file located @path.
def loadNXDTestFile(path): # -> TestEnv def loadNXDTestFile(path): # -> TestEnv
...@@ -223,13 +224,12 @@ def main(): ...@@ -223,13 +224,12 @@ def main():
# (explicit teeing instead of p.communicate() to be able to see incremental progress) # (explicit teeing instead of p.communicate() to be able to see incremental progress)
buf_out = [] buf_out = []
buf_err = [] buf_err = []
tout = threading.Thread(target=tee, args=(p.stdout, bstdout, buf_out)) wg = sync.WorkGroup(context.background())
terr = threading.Thread(target=tee, args=(p.stderr, bstderr, buf_err)) wg.go(tee, p.stdout, bstdout, buf_out)
tout.start() wg.go(tee, p.stderr, bstderr, buf_err)
terr.start() wg.wait()
stdout = b''.join(buf_out)
tout.join(); stdout = b''.join(buf_out) stderr = b''.join(buf_err)
terr.join(); stderr = b''.join(buf_err)
p.wait() p.wait()
if p.returncode != 0: if p.returncode != 0:
...@@ -266,8 +266,15 @@ def main(): ...@@ -266,8 +266,15 @@ def main():
test_result_line.stop(**tres) test_result_line.stop(**tres)
# tee, similar to tee(1) utility, copies data from fin to fout appending them to buf. # tee, similar to tee(1) utility, copies data from fin to fout appending them to buf.
def tee(fin, fout, buf): def tee(ctx, fin, fout, buf):
while 1: while 1:
# poll for cancellation periodically and stop if requested.
# FIXME handle ↓↓↓ os.read and .write in non-blocking way so that we
# are not stuck in case ctx is cancelled but we remain blocked in any of those calls.
e = ctx.err()
if e is not None:
raise e
# NOTE use raw os.read because it does not wait for full data to be available. # NOTE use raw os.read because it does not wait for full data to be available.
# ( we could use fin.readline(), but there are cases when e.g. progress # ( we could use fin.readline(), but there are cases when e.g. progress
# is reported via printing consequent dots on the same line and # is reported via printing consequent dots on the same line and
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment