Commit e5a85307 authored by Jason Madden's avatar Jason Madden

Import the 3.4 version of test_subprocess and make it (mostly) pass. Includes...

Import the 3.4 version of test_subprocess and make it (mostly) pass. Includes a test for #374; fixes #374.
parent 9358b0ef
[pep8] [pep8]
ignore=E702,E265,E402,E731,E266,E261,W503,E129 ignore=E702,E265,E402,E731,E266,E261,W503,E129
max_line_length=160 max_line_length=160
exclude=.tox,.git,build,2.6,2.7,2.7pypy,3.3,test_support.py,test_queue.py,patched_tests_setup.py,test_threading_2.py,lock_tests.py,_sslgte279.py exclude=.tox,.git,build,2.6,2.7,2.7pypy,3.3,test_support.py,test_queue.py,patched_tests_setup.py,test_threading_2.py,lock_tests.py,_sslgte279.py,3.4
...@@ -26,11 +26,17 @@ Unreleased ...@@ -26,11 +26,17 @@ Unreleased
Python 3, using a gevent SSL socket could cause the greenlet to Python 3, using a gevent SSL socket could cause the greenlet to
block. See :issue:`597` by David Ford. block. See :issue:`597` by David Ford.
- ``gevent.socket.socket.sendall`` supports arbitrary objects that - ``gevent.socket.socket.sendall`` supports arbitrary objects that
implement the buffer protocol (such as ctypes structurs), just like implement the buffer protocol (such as ctypes structures), just like
native sockets. Reported in :issue:`466` by tzickel. native sockets. Reported in :issue:`466` by tzickel.
- Added support for the ``onerror`` attribute present in CFFI 1.2.0 - Added support for the ``onerror`` attribute present in CFFI 1.2.0
for better signal handling under PyPy. Thanks to Armin Rigo and Omer for better signal handling under PyPy. Thanks to Armin Rigo and Omer
Katz. (See https://bitbucket.org/cffi/cffi/issue/152/handling-errors-from-signal-handlers-in) Katz. (See https://bitbucket.org/cffi/cffi/issue/152/handling-errors-from-signal-handlers-in)
- The ``gevent.subprocess`` module is closer in behaviour to the
standard library under Python 3, at least on POSIX. The
``pass_fds``, ``restore_signals``, and ``start_new_session``
arguments are still unimplemented.
- An exception starting a child process with the ``gevent.subprocess``
module no longer leaks file descriptors. Reported in :pr:`374` by 陈小玉.
1.1a1 (Jun 29, 2015) 1.1a1 (Jun 29, 2015)
==================== ====================
......
...@@ -451,15 +451,26 @@ def _kill(greenlet, exception, waiter): ...@@ -451,15 +451,26 @@ def _kill(greenlet, exception, waiter):
def joinall(greenlets, timeout=None, raise_error=False, count=None): def joinall(greenlets, timeout=None, raise_error=False, count=None):
"""
Wait for the ``greenlets`` to finish.
:param greenlets: A sequence of greenlets to wait for.
:keyword float timeout: If given, the maximum number of seconds to wait.
:return: A sequence of the greenlets that finished before the timeout (if any)
expired.
"""
if not raise_error: if not raise_error:
wait(greenlets, timeout=timeout, count=count) return wait(greenlets, timeout=timeout, count=count)
else:
done = []
for obj in iwait(greenlets, timeout=timeout, count=count): for obj in iwait(greenlets, timeout=timeout, count=count):
if getattr(obj, 'exception', None) is not None: if getattr(obj, 'exception', None) is not None:
if hasattr(obj, '_raise_exception'): if hasattr(obj, '_raise_exception'):
obj._raise_exception() obj._raise_exception()
else: else:
raise obj.exception raise obj.exception
done.append(obj)
return done
def _killall3(greenlets, exception, waiter): def _killall3(greenlets, exception, waiter):
......
from __future__ import absolute_import from __future__ import absolute_import
import sys
import os
import errno import errno
import gc import gc
import io
import os
import signal import signal
import sys
import traceback import traceback
from gevent.event import AsyncResult from gevent.event import AsyncResult
from gevent.hub import get_hub, linkproxy, sleep, getcurrent, integer_types, string_types, xrange from gevent.hub import get_hub, linkproxy, sleep, getcurrent, integer_types, string_types, xrange
from gevent.hub import PY3
from gevent.fileobject import FileObject from gevent.fileobject import FileObject
from gevent.greenlet import Greenlet, joinall from gevent.greenlet import Greenlet, joinall
spawn = Greenlet.spawn spawn = Greenlet.spawn
...@@ -18,6 +20,9 @@ __implements__ = ['Popen', ...@@ -18,6 +20,9 @@ __implements__ = ['Popen',
'call', 'call',
'check_call', 'check_call',
'check_output'] 'check_output']
if PY3:
__implements__.append("_posixsubprocess")
_posixsubprocess = None
# Standard functions and classes that this module re-imports. # Standard functions and classes that this module re-imports.
...@@ -59,7 +64,8 @@ __extra__ = ['MAXFD', ...@@ -59,7 +64,8 @@ __extra__ = ['MAXFD',
if sys.version_info[:2] >= (3, 3): if sys.version_info[:2] >= (3, 3):
__imports__ += ['DEVNULL', __imports__ += ['DEVNULL',
'getstatusoutput', 'getstatusoutput',
'getoutput'] 'getoutput',
'TimeoutExpired']
for name in __imports__[:]: for name in __imports__[:]:
try: try:
...@@ -106,8 +112,23 @@ else: ...@@ -106,8 +112,23 @@ else:
from gevent import monkey from gevent import monkey
fork = monkey.get_original('os', 'fork') fork = monkey.get_original('os', 'fork')
if PY3:
def call(*popenargs, **kwargs): def call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete or
timeout, then return the returncode attribute.
The arguments are the same as for the Popen constructor. Example:
retcode = call(["ls", "-l"])
"""
timeout = kwargs.pop('timeout', None)
with Popen(*popenargs, **kwargs) as p:
try:
return p.wait(timeout=timeout)
except:
p.kill()
p.wait()
raise
else:
def call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete, then """Run command with arguments. Wait for command to complete, then
return the returncode attribute. return the returncode attribute.
...@@ -136,8 +157,59 @@ def check_call(*popenargs, **kwargs): ...@@ -136,8 +157,59 @@ def check_call(*popenargs, **kwargs):
raise CalledProcessError(retcode, cmd) raise CalledProcessError(retcode, cmd)
return 0 return 0
if PY3:
def check_output(*popenargs, **kwargs): def check_output(*popenargs, **kwargs):
r"""Run command with arguments and return its output.
If the exit code was non-zero it raises a CalledProcessError. The
CalledProcessError object will have the return code in the returncode
attribute and output in the output attribute.
The arguments are the same as for the Popen constructor. Example:
>>> check_output(["ls", "-l", "/dev/null"])
b'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n'
The stdout argument is not allowed as it is used internally.
To capture standard error in the result, use stderr=STDOUT.
>>> check_output(["/bin/sh", "-c",
... "ls -l non_existent_file ; exit 0"],
... stderr=STDOUT)
b'ls: non_existent_file: No such file or directory\n'
There is an additional optional argument, "input", allowing you to
pass a string to the subprocess's stdin. If you use this argument
you may not also use the Popen constructor's "stdin" argument, as
it too will be used internally. Example:
>>> check_output(["sed", "-e", "s/foo/bar/"],
... input=b"when in the course of fooman events\n")
b'when in the course of barman events\n'
If universal_newlines=True is passed, the return value will be a
string rather than bytes.
"""
timeout = kwargs.pop('timeout', None)
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
if 'input' in kwargs:
if 'stdin' in kwargs:
raise ValueError('stdin and input arguments may not both be used.')
inputdata = kwargs['input']
del kwargs['input']
kwargs['stdin'] = PIPE
else:
inputdata = None
with Popen(*popenargs, stdout=PIPE, **kwargs) as process:
try:
output, unused_err = process.communicate(inputdata, timeout=timeout)
except TimeoutExpired:
process.kill()
output, unused_err = process.communicate()
raise TimeoutExpired(process.args, timeout, output=output)
except:
process.kill()
process.wait()
raise
retcode = process.poll()
if retcode:
raise CalledProcessError(retcode, process.args, output=output)
return output
else:
def check_output(*popenargs, **kwargs):
r"""Run command with arguments and return its output as a byte string. r"""Run command with arguments and return its output as a byte string.
If the exit code was non-zero it raises a CalledProcessError. The If the exit code was non-zero it raises a CalledProcessError. The
...@@ -181,9 +253,18 @@ class Popen(object): ...@@ -181,9 +253,18 @@ class Popen(object):
cwd=None, env=None, universal_newlines=False, cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0, threadpool=None): startupinfo=None, creationflags=0, threadpool=None):
"""Create new Popen instance.""" """Create new Popen instance."""
# XXX: On Python 3, we don't implement these keyword arguments:
# (see patched_tests_setup)
# - pass_fds,
# - start_new_session,
# - restore_signals
hub = get_hub()
if bufsize is None and PY3:
bufsize = -1 # restore default
if not isinstance(bufsize, integer_types): if not isinstance(bufsize, integer_types):
raise TypeError("bufsize must be an integer") raise TypeError("bufsize must be an integer")
hub = get_hub()
if mswindows: if mswindows:
if preexec_fn is not None: if preexec_fn is not None:
...@@ -208,6 +289,8 @@ class Popen(object): ...@@ -208,6 +289,8 @@ class Popen(object):
assert threadpool is None assert threadpool is None
self._loop = hub.loop self._loop = hub.loop
if PY3:
self.args = args
self.stdin = None self.stdin = None
self.stdout = None self.stdout = None
self.stderr = None self.stderr = None
...@@ -235,12 +318,40 @@ class Popen(object): ...@@ -235,12 +318,40 @@ class Popen(object):
c2pread, c2pwrite, c2pread, c2pwrite,
errread, errwrite) = self._get_handles(stdin, stdout, stderr) errread, errwrite) = self._get_handles(stdin, stdout, stderr)
self._closed_child_pipe_fds = False
try:
self._execute_child(args, executable, preexec_fn, close_fds, self._execute_child(args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines, cwd, env, universal_newlines,
startupinfo, creationflags, shell, startupinfo, creationflags, shell,
p2cread, p2cwrite, p2cread, p2cwrite,
c2pread, c2pwrite, c2pread, c2pwrite,
errread, errwrite) errread, errwrite)
except:
# Cleanup if the child failed starting.
# (gevent: New in python3, but reported as gevent bug in #347)
for f in filter(None, (self.stdin, self.stdout, self.stderr)):
try:
f.close()
except OSError:
pass # Ignore EBADF or other errors.
if not self._closed_child_pipe_fds:
to_close = []
if stdin == PIPE:
to_close.append(p2cread)
if stdout == PIPE:
to_close.append(c2pwrite)
if stderr == PIPE:
to_close.append(errwrite)
if hasattr(self, '_devnull'):
to_close.append(self._devnull)
for fd in to_close:
try:
os.close(fd)
except OSError:
pass
raise
if mswindows: if mswindows:
if p2cwrite is not None: if p2cwrite is not None:
...@@ -251,6 +362,12 @@ class Popen(object): ...@@ -251,6 +362,12 @@ class Popen(object):
errread = msvcrt.open_osfhandle(errread.Detach(), 0) errread = msvcrt.open_osfhandle(errread.Detach(), 0)
if p2cwrite is not None: if p2cwrite is not None:
if PY3 and universal_newlines:
self.stdin = FileObject(p2cwrite, 'w', bufsize)
self.stdin._tranlate = True
self.stdin.io = io.TextIOWrapper(self.stdin.io, write_through=True,
line_buffering=(bufsize == 1))
else:
self.stdin = FileObject(p2cwrite, 'wb', bufsize) self.stdin = FileObject(p2cwrite, 'wb', bufsize)
if c2pread is not None: if c2pread is not None:
if universal_newlines: if universal_newlines:
...@@ -275,44 +392,115 @@ class Popen(object): ...@@ -275,44 +392,115 @@ class Popen(object):
self.returncode = os.WEXITSTATUS(status) self.returncode = os.WEXITSTATUS(status)
self.result.set(self.returncode) self.result.set(self.returncode)
def communicate(self, input=None): def _get_devnull(self):
if not hasattr(self, '_devnull'):
self._devnull = os.open(os.devnull, os.O_RDWR)
return self._devnull
_stdout_buffer = None
_stderr_buffer = None
def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin. Read data from """Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a process to terminate. The optional input argument should be a
string to be sent to the child process, or None, if no data string to be sent to the child process, or None, if no data
should be sent to the child. should be sent to the child.
communicate() returns a tuple (stdout, stderr).""" communicate() returns a tuple (stdout, stderr).
:keyword timeout: Under Python 2, this is a gevent extension; if
given and it expires, we will raise :class:`gevent.timeout.Timeout`"""
greenlets = [] greenlets = []
if self.stdin: if self.stdin:
greenlets.append(spawn(write_and_close, self.stdin, input)) greenlets.append(spawn(write_and_close, self.stdin, input))
# If the timeout parameter is used, and the caller calls back after
# getting a TimeoutExpired exception, we can wind up with multiple
# greenlets trying to run and read from and close stdout/stderr.
# That's bad because it can lead to 'RuntimeError: reentrant call in io.BufferedReader'.
# We can't just kill the previous greenlets when a timeout happens,
# though, because we risk losing the output collected by that greenlet
# (and Python 3, where timeout is an official parameter, explicitly says
# that no output should be lost in the event of a timeout.) Instead, we're
# watching for the exception and ignoring it. It's not elegant,
# but it works
if self.stdout: if self.stdout:
stdout = spawn(self.stdout.read) def _read_out():
try:
data = self.stdout.read()
except RuntimeError:
return
if self._stdout_buffer is not None:
self._stdout_buffer += data
else:
self._stdout_buffer = data
stdout = spawn(_read_out)
greenlets.append(stdout) greenlets.append(stdout)
else: else:
stdout = None stdout = None
if self.stderr: if self.stderr:
stderr = spawn(self.stderr.read) def _read_err():
try:
data = self.stderr.read()
except RuntimeError:
return
if self._stderr_buffer is not None:
self._stderr_buffer += data
else:
self._stderr_buffer = data
stderr = spawn(_read_err)
greenlets.append(stderr) greenlets.append(stderr)
else: else:
stderr = None stderr = None
joinall(greenlets) done = joinall(greenlets, timeout=timeout)
if timeout is not None and len(done) != len(greenlets):
if PY3:
raise TimeoutExpired(self.args, timeout)
from gevent.timeout import Timeout
raise Timeout(timeout)
if self.stdout: if self.stdout:
try:
self.stdout.close() self.stdout.close()
except RuntimeError:
pass
if self.stderr: if self.stderr:
try:
self.stderr.close() self.stderr.close()
except RuntimeError:
pass
self.wait() self.wait()
return (None if stdout is None else stdout.value or b'', stdout_value = self._stdout_buffer
None if stderr is None else stderr.value or b'') self._stdout_buffer = None
stderr_value = self._stderr_buffer
self._stderr_buffer = None
# XXX: Under python 3 in universal newlines mode we should be
# returning str, not bytes
return (None if stdout is None else stdout_value or b'',
None if stderr is None else stderr_value or b'')
def poll(self): def poll(self):
return self._internal_poll() return self._internal_poll()
if PY3:
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self.stdout:
self.stdout.close()
if self.stderr:
self.stderr.close()
try: # Flushing a BufferedWriter may raise an error
if self.stdin:
self.stdin.close()
finally:
# Wait for the process to terminate, to avoid zombies.
self.wait()
if mswindows: if mswindows:
# #
# Windows methods # Windows methods
...@@ -539,10 +727,19 @@ class Popen(object): ...@@ -539,10 +727,19 @@ class Popen(object):
c2pread, c2pwrite = None, None c2pread, c2pwrite = None, None
errread, errwrite = None, None errread, errwrite = None, None
try:
DEVNULL
except NameError:
_devnull = object()
else:
_devnull = DEVNULL
if stdin is None: if stdin is None:
pass pass
elif stdin == PIPE: elif stdin == PIPE:
p2cread, p2cwrite = self.pipe_cloexec() p2cread, p2cwrite = self.pipe_cloexec()
elif stdin == _devnull:
p2cread = self._get_devnull()
elif isinstance(stdin, int): elif isinstance(stdin, int):
p2cread = stdin p2cread = stdin
else: else:
...@@ -553,6 +750,8 @@ class Popen(object): ...@@ -553,6 +750,8 @@ class Popen(object):
pass pass
elif stdout == PIPE: elif stdout == PIPE:
c2pread, c2pwrite = self.pipe_cloexec() c2pread, c2pwrite = self.pipe_cloexec()
elif stdout == _devnull:
c2pwrite = self._get_devnull()
elif isinstance(stdout, int): elif isinstance(stdout, int):
c2pwrite = stdout c2pwrite = stdout
else: else:
...@@ -565,6 +764,8 @@ class Popen(object): ...@@ -565,6 +764,8 @@ class Popen(object):
errread, errwrite = self.pipe_cloexec() errread, errwrite = self.pipe_cloexec()
elif stderr == STDOUT: elif stderr == STDOUT:
errwrite = c2pwrite errwrite = c2pwrite
elif stderr == _devnull:
errwrite = self._get_devnull()
elif isinstance(stderr, int): elif isinstance(stderr, int):
errwrite = stderr errwrite = stderr
else: else:
...@@ -623,7 +824,9 @@ class Popen(object): ...@@ -623,7 +824,9 @@ class Popen(object):
errread, errwrite): errread, errwrite):
"""Execute program (POSIX version)""" """Execute program (POSIX version)"""
if isinstance(args, string_types): if PY3 and isinstance(args, (str, bytes)):
args = [args]
elif not PY3 and isinstance(args, string_types):
args = [args] args = [args]
else: else:
args = list(args) args = list(args)
...@@ -642,6 +845,13 @@ class Popen(object): ...@@ -642,6 +845,13 @@ class Popen(object):
# The first char specifies the exception type: 0 means # The first char specifies the exception type: 0 means
# OSError, 1 means some other error. # OSError, 1 means some other error.
errpipe_read, errpipe_write = self.pipe_cloexec() errpipe_read, errpipe_write = self.pipe_cloexec()
# errpipe_write must not be in the standard io 0, 1, or 2 fd range.
low_fds_to_close = []
while errpipe_write < 3:
low_fds_to_close.append(errpipe_write)
errpipe_write = os.dup(errpipe_write)
for low_fd in low_fds_to_close:
os.close(low_fd)
try: try:
try: try:
gc_was_enabled = gc.isenabled() gc_was_enabled = gc.isenabled()
...@@ -736,12 +946,18 @@ class Popen(object): ...@@ -736,12 +946,18 @@ class Popen(object):
# be sure the FD is closed no matter what # be sure the FD is closed no matter what
os.close(errpipe_write) os.close(errpipe_write)
if p2cread is not None and p2cwrite is not None: # self._devnull is not always defined.
devnull_fd = getattr(self, '_devnull', None)
if p2cread is not None and p2cwrite is not None and p2cread != devnull_fd:
os.close(p2cread) os.close(p2cread)
if c2pwrite is not None and c2pread is not None: if c2pwrite is not None and c2pread is not None and c2pwrite != devnull_fd:
os.close(c2pwrite) os.close(c2pwrite)
if errwrite is not None and errread is not None: if errwrite is not None and errread is not None and errwrite != devnull_fd:
os.close(errwrite) os.close(errwrite)
if devnull_fd is not None:
os.close(devnull_fd)
# Prevent a double close of these fds from __init__ on error.
self._closed_child_pipe_fds = True
# Wait for exec to fail or succeed; possibly raising exception # Wait for exec to fail or succeed; possibly raising exception
errpipe_read = FileObject(errpipe_read, 'rb') errpipe_read = FileObject(errpipe_read, 'rb')
...@@ -782,8 +998,16 @@ class Popen(object): ...@@ -782,8 +998,16 @@ class Popen(object):
def wait(self, timeout=None): def wait(self, timeout=None):
"""Wait for child process to terminate. Returns returncode """Wait for child process to terminate. Returns returncode
attribute.""" attribute.
return self.result.wait(timeout=timeout)
:keyword timeout: The floating point number of seconds to wait.
Under Python 2, this is a gevent extension. Under Python 3,
if this time elapses without finishing the process, TimeoutExpired
is raised."""
result = self.result.wait(timeout=timeout)
if PY3 and timeout is not None and not self.result.ready():
raise TimeoutExpired(self.args, timeout)
return result
def send_signal(self, sig): def send_signal(self, sig):
"""Send a signal to the process """Send a signal to the process
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -197,6 +197,55 @@ if hasattr(sys, 'pypy_version_info'): ...@@ -197,6 +197,55 @@ if hasattr(sys, 'pypy_version_info'):
# https://bitbucket.org/cffi/cffi/issue/152/handling-errors-from-signal-handlers-in # https://bitbucket.org/cffi/cffi/issue/152/handling-errors-from-signal-handlers-in
] ]
if sys.version_info[:2] >= (3, 4):
disabled_tests += [
'test_subprocess.ProcessTestCase.test_threadsafe_wait',
# XXX: It seems that threading.Timer is not being greened properly, possibly
# due to a similar issue to what gevent.threading documents for normal threads.
# In any event, this test hangs forever
'test_subprocess.ProcessTestCase.test_io_buffered_by_default',
'test_subprocess.ProcessTestCase.test_io_unbuffered_works',
# These tests want to assert on the type of the class that implements
# `Popen.stdin`; we use a FileObject, but they expect different subclasses
# from the `io` module
'test_subprocess.POSIXProcessTestCase.test_terminate_dead',
'test_subprocess.POSIXProcessTestCase.test_send_signal_dead',
'test_subprocess.POSIXProcessTestCase.test_kill_dead',
# With our monkey patch in place,
# they fail because the process they're looking for has been allowed to exit.
# Our monkey patch waits for the process with a watcher and so detects
# the exit before the normal polling mechanism would
'test_subprocess.POSIXProcessTestCase.test_close_fds',
'test_subprocess.POSIXProcessTestCase.test_pass_fds',
'test_subprocess.POSIXProcessTestCase.test_pass_fds_inheritable',
# XXX: We don't implement the pass_fds option yet
'test_subprocess.POSIXProcessTestCase.test_restore_signals',
# XXX: We don't implement the restore_signals option yet
'test_subprocess.POSIXProcessTestCase.test_start_new_session',
# XXX: We don't implement the start_new_session option yet
'test_subprocess.POSIXProcessTestCase.test_exception_bad_args_0',
'test_subprocess.POSIXProcessTestCase.test_exception_bad_executable',
'test_subprocess.POSIXProcessTestCase.test_exception_cwd',
# These all want to inspect the string value of an exception raised
# by the exec() call in the child. The _posixsubprocess module arranges
# for better exception handling and printing than we do.
'test_subprocess.POSIXProcessTestCase.test_preexec_errpipe_does_not_double_close_pipes',
# Subclasses Popen, and overrides _execute_child. Expects things to be done
# in a particular order in an exception case, but we don't follow that
# exact order
'test_subprocess.POSIXProcessTestCase.test_small_errpipe_write_fd',
# Python 3 fixed a bug if the stdio file descriptors were closed;
# we still have that bug
]
# if 'signalfd' in os.environ.get('GEVENT_BACKEND', ''): # if 'signalfd' in os.environ.get('GEVENT_BACKEND', ''):
# # tests that don't interact well with signalfd # # tests that don't interact well with signalfd
# disabled_tests.extend([ # disabled_tests.extend([
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment