Switch tee from threading.Thread to sync.WorkGroup
The reason is that with threading.Thread if exception happens in that spawned thread, this error is not propagated to main driver, while with sync.WorkGroup an exception from any spawned worker is propagated back to main. For example with the following injected error
--- a/nxdtest/__init__.py
+++ b/nxdtest/__init__.py
@@ -267,6 +267,7 @@ def main():
# tee, similar to tee(1) utility, copies data from fin to fout appending them to buf.
def tee(ctx, fin, fout, buf):
+ 1/0
while 1:
before this patch nxdtest behaves like ...
(neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ nxdtest
date: Tue, 24 Nov 2020 14:55:08 MSK
xnode: kirr@deco.navytux.spb.ru
uname: Linux deco 5.9.0-2-amd64 #1 SMP Debian 5.9.6-1 (2020-11-08) x86_64
cpu: Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz
>>> pytest
$ python -m pytest
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
1/0
ZeroDivisionError: integer division or modulo by zero
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
1/0
ZeroDivisionError: integer division or modulo by zero
error pytest 0.583s # 1t 1e 0f 0s
(neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ echo $?
0
Here the error in another thread is only printed, but nxdtest is not aborted. Above it reported "error", but e.g. when testing pygolang/py3 and raising an error in tee it even reported it was succeeding ( !6 (comment 121393) ):
slapuser34@vifibcloud-rapidspace-hosting-007:~/srv/runner/instance/slappart0$ ./bin/runTestSuite
date: Tue, 24 Nov 2020 12:51:23 MSK
xnode: slapuser34@vifibcloud-rapidspace-hosting-007
uname: Linux vifibcloud-rapidspace-hosting-007 4.19.0-6-amd64 #1 SMP Debian 4.19.67-2+deb10u2 (2019-11-11) x86_64
cpu: Intel(R) Xeon(R) CPU E5-2678 v3 @ 2.50GHz
>>> thread
$ python -m pytest
Exception in thread Thread-1:
Traceback (most recent call last):
File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/srv/slapgrid/slappart34/srv/runner/software/44fe7dd3f13ecd100894c6368a35c055/parts/nxdtest/nxdtest/__init__.py", line 268, in tee
fout.write(data)
TypeError: write() argument must be str, not bytes
ok thread 9.145s # 1t 0e 0f 0s
>>> gevent
$ gpython -m pytest
Exception in thread Thread-3:
Traceback (most recent call last):
File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/srv/slapgrid/slappart34/srv/runner/shared/python3/5497998c60d97cbbf748337ccce21db2/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/srv/slapgrid/slappart34/srv/runner/software/44fe7dd3f13ecd100894c6368a35c055/parts/nxdtest/nxdtest/__init__.py", line 268, in tee
fout.write(data)
TypeError: write() argument must be str, not bytes
ok gevent 21.980s # 1t 0e 0f 0s
After this patch nxdtest correctly handles and propagates an error originating in spawned thread back to main driver:
(neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ nxdtest
date: Tue, 24 Nov 2020 14:54:19 MSK
xnode: kirr@deco.navytux.spb.ru
uname: Linux deco 5.9.0-2-amd64 #1 SMP Debian 5.9.6-1 (2020-11-08) x86_64
cpu: Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz
>>> pytest
$ python -m pytest
Traceback (most recent call last):
File "/home/kirr/src/wendelin/venv/z4-dev/bin/nxdtest", line 11, in <module>
load_entry_point('nxdtest', 'console_scripts', 'nxdtest')()
File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 230, in main
wg.wait()
File "golang/_sync.pyx", line 237, in golang._sync.PyWorkGroup.wait
pyerr_reraise(pyerr)
File "golang/_sync.pyx", line 217, in golang._sync.PyWorkGroup.go.pyrunf
f(pywg._pyctx, *argv, **kw)
File "/home/kirr/src/wendelin/nxdtest/nxdtest/__init__.py", line 270, in tee
1/0
ZeroDivisionError: integer division or modulo by zero
(neo) (z4-dev) (g.env) kirr@deco:~/src/wendelin/nxdtest$ echo $?
1
NOTE sync.WorkGroup requires every worker to handle context cancellation, so that whenever there is an error, all other workers are canceled. We add such cancellation handling to tee but only lightly: before going to block in read/write syscalls we check for whether ctx is canceled or not. However the proper handling would be to switch file descriptors into non-block mode and to select at every IO point on both potential IO events and potential cancellation. This is left as TODO for the future.
mentioned in merge request !6 (merged)
Thanks @kirr
So that I understand, the plan about this TODO, it would be something more or less like this ?
index 838cbec..5f40379 100755 --- a/nxdtest/__init__.py +++ b/nxdtest/__init__.py @@ -58,7 +58,7 @@ from __future__ import print_function, absolute_import from erp5.util.taskdistribution import TaskDistributor from subprocess import Popen, PIPE from time import time, strftime, gmtime, localtime -import os, sys, argparse, logging, traceback, re, pwd, socket +import os, sys, argparse, logging, traceback, re, pwd, socket, fcntl, select import six from golang import b from golang import context, sync @@ -267,6 +267,12 @@ def main(): # tee, similar to tee(1) utility, copies data from fin to fout appending them to buf. def tee(ctx, fin, fout, buf): + # set file descriptors non blocking + for f in (fin, fout): + fd = f.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + while 1: # poll for cancellation periodically and stop if requested. # FIXME handle ↓↓↓ os.read and .write in non-blocking way so that we @@ -282,13 +288,23 @@ def tee(ctx, fin, fout, buf): # # besides when a lot of output is available it would be a waste to # read/flush it line-by-line. ) - data = os.read(fin.fileno(), 4096) - if not(data): - return # EOF - - fout.write(data) - fout.flush() - buf.append(data) + data = None + readable, _, _ = select.select([fin], [], [], 0) + if fin in readable: + data = os.read(fin.fileno(), 4096) + if not(data): + return # EOF + while 1: + e = ctx.err() + if e is not None: + raise e + _, writable, _ = select.select([], [fout], [], 0) + if fout in writable: + fout.write(data) + fout.flush() + break + + buf.append(data)
I'm not saying "let's do this", some tests are failing because they use BytesIO (with pytest's capsys), I just want to make sure I generally understood the idea.
@jerome, yes, you understood this idea correctly.
My preference is 1) indeed "let's not do this now" and 2) for library support for such IO to reside in pygolang runtime and be represented to user as e.g.
xio.read(ctx, f)
andxio.write(ctx, f, data)
utilities. Note also that in general it is not as easy as it sounds: on Linux select/epoll always reports IO as being ready for regular files, and it is only sockets and pipes that support polling for real.Once again, even if with adding this TODO/FIXME in place we are not doing any step backwards: the previous version of the code could also block forever in read/write - only there was no any cancellation attempt nor any comment about that.
P.S. for IO that supports contexts on Go side please see e.g. https://pkg.go.dev/lab.nexedi.com/kirr/go123@v0.0.0-20201018154837-96046edf8211/xio , kirr/go123@7ad867a3 and kirr/go123@d2dc6c09.
P.P.S. standard Go runtime automatically supports polling for sockets and pipes, but there is no (yet ?) user-visible API for contexts.
https://golang.org/doc/go1.11#os
https://golang.org/pkg/internal/poll/See also
Thanks @kirr I share your point of view on this, we can merge this now. I was not posting this patch as a way of suggesting "let's implement the TODO now, it's easy", I was curious so I tried to implement this a bit, to make sure I understood your idea and see how complex it would be and when trying this I also had the feeling that an higher level API taking care of cancellation would be good.
The links are interesting, thanks. I really need to spend time learning these aspects of golang
Thanks, @jerome, I understood your posting in exactly the way you describe it - there is no problem here and it is always good to learn and investigate trying to recheck ones understanding. As you say the work you did with this patch won't be lost, even if we don't merge it right now and in this exact form.
mentioned in commit 1e6a1cc6
I applied the patch to master as 1e6a1cc6.
@jerome, do you think we can already merge to slapos master bits from slapos!862 (merged) that setup nxdtest component and tests for it, so that we can setup lab pipelining for merge-requests on nxdtest itself?
This is not strictly neccessary, but would be nice. Also it will allow to merge changes from that slapos!862 (merged) step-by-step.
@jerome, do you think we can already merge to slapos master bits from slapos!862 (merged) that setup nxdtest component and tests for it, so that we can setup lab pipelining for merge-requests on nxdtest itself?
Yes, I was thinking the same. I applied commits and made test suites https://erp5.nexedi.net/test_suite_module/1138/ https://erp5.nexedi.net/test_suite_module/1128/
thanks !
You are welcome, and thanks, @jerome.