Commit e0d9b60a authored by Jason Madden's avatar Jason Madden

A cooperative version of waitpid that plays nicely with child watchers and...

A cooperative version of waitpid that plays nicely with child watchers and gevent.subprocess. Fixes #600. Ref #452.
parent 7b95fe72
...@@ -16,6 +16,12 @@ Unreleased ...@@ -16,6 +16,12 @@ Unreleased
- Fixed regression that failed to set the ``successful`` value to - Fixed regression that failed to set the ``successful`` value to
False when killing a greenlet before it ran with a non-default False when killing a greenlet before it ran with a non-default
exception. Fixed in :pr:`608` by Heungsub Lee. exception. Fixed in :pr:`608` by Heungsub Lee.
- libev's child watchers caused ``os.waitpid`` to become unreliable
due to the use of signals on POSIX platforms. This was especially
noticeable when using ``gevent.subprocess`` in combination with
``multiprocessing``. Now, the monkey-patched ``os`` module provides
a ``waitpid`` function that seeks to ameliorate this. Reported in
:issue:`600` by champax and :issue:`452` by Łukasz Kawczyński.
1.1a2 (Jul 8, 2015) 1.1a2 (Jul 8, 2015)
=================== ===================
......
...@@ -2,7 +2,7 @@ cdef class Semaphore: ...@@ -2,7 +2,7 @@ cdef class Semaphore:
cdef public int counter cdef public int counter
cdef readonly object _links cdef readonly object _links
cdef readonly object _notifier cdef readonly object _notifier
cdef readonly int _dirty cdef public int _dirty
cpdef locked(self) cpdef locked(self)
cpdef release(self) cpdef release(self)
......
...@@ -343,6 +343,10 @@ class Hub(greenlet): ...@@ -343,6 +343,10 @@ class Hub(greenlet):
return result + '>' return result + '>'
def handle_error(self, context, type, value, tb): def handle_error(self, context, type, value, tb):
if isinstance(value, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
value = type(value)
if not issubclass(type, self.NOT_ERROR): if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb) self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR): if context is None or issubclass(type, self.SYSTEM_ERROR):
......
...@@ -102,6 +102,99 @@ if hasattr(os, 'fork'): ...@@ -102,6 +102,99 @@ if hasattr(os, 'fork'):
reinit() reinit()
return result return result
if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'):
# We can only do this on POSIX
import time
_waitpid = os.waitpid
_WNOHANG = os.WNOHANG
# {pid -> watcher or tuple(pid, rstatus, timestamp)}
_watched_children = {}
def _on_child(watcher, callback):
# XXX: Could handle tracing here by not stopping
# until the pid is terminated
watcher.stop()
_watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
if callback:
callback(watcher)
# now is as good a time as any to reap children
_reap_children()
def _reap_children(timeout=60):
# Remove all the dead children that haven't been waited on
# for the *timeout*
now = time.time()
oldest_allowed = now - timeout
for pid in _watched_children.keys():
val = _watched_children[pid]
if isinstance(val, tuple) and val[2] < oldest_allowed:
del _watched_children[pid]
def waitpid(pid, options):
# XXX Does not handle tracing children
if pid <= 0:
# magic functions for multiple children. Pass.
return _waitpid(pid, options)
if pid in _watched_children:
# yes, we're watching it
if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
# We're either asked not to block, or it already finished, in which
# case blocking doesn't matter
result = _watched_children[pid]
if isinstance(result, tuple):
# it finished. libev child watchers
# are one-shot
del _watched_children[pid]
return result[:2]
# it's not finished
return (0, 0)
else:
# we should block. Let the underlying OS call block; it should
# eventually die with OSError, depending on signal delivery
try:
return _waitpid(pid, options)
except OSError:
if pid in _watched_children and isinstance(_watched_children, tuple):
result = _watched_children[pid]
del _watched_children[pid]
return result[:2]
raise
# we're not watching it
return _waitpid(pid, options)
def fork_and_watch(callback=None, loop=None, fork=fork):
"""
Fork a child process and start a child watcher for it in the parent process.
This call cooperates with the :func:`waitpid`` function defined in this module.
:keyword callback: If given, a callable that will be called with the child watcher
when the child finishes.
:keyword loop: The loop to start the watcher in. Defaults to the
current loop.
:keyword fork: The fork function. Defaults to the one defined in this
module.
.. versionadded: 1.1a3
"""
pid = fork()
if pid:
# parent
loop = loop or get_hub().loop
watcher = loop.child(pid)
_watched_children[pid] = watcher
watcher.start(_on_child, watcher, callback)
return pid
# Watch children by default
fork = fork_and_watch
__extensions__.append('fork_and_watch')
__implements__.append("waitpid")
else: else:
__implements__.remove('fork') __implements__.remove('fork')
......
...@@ -112,6 +112,7 @@ else: ...@@ -112,6 +112,7 @@ else:
import pickle import pickle
from gevent import monkey from gevent import monkey
fork = monkey.get_original('os', 'fork') fork = monkey.get_original('os', 'fork')
from gevent.os import fork_and_watch
if PY3: if PY3:
def call(*popenargs, **kwargs): def call(*popenargs, **kwargs):
...@@ -934,7 +935,7 @@ class Popen(object): ...@@ -934,7 +935,7 @@ class Popen(object):
# write to stderr -> hang. http://bugs.python.org/issue1336 # write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable() gc.disable()
try: try:
self.pid = fork() self.pid = fork_and_watch(self._on_child, self._loop, fork)
except: except:
if gc_was_enabled: if gc_was_enabled:
gc.enable() gc.enable()
...@@ -1040,8 +1041,6 @@ class Popen(object): ...@@ -1040,8 +1041,6 @@ class Popen(object):
os._exit(1) os._exit(1)
# Parent # Parent
self._watcher = self._loop.child(self.pid)
self._watcher.start(self._on_child, self._watcher)
if gc_was_enabled: if gc_was_enabled:
gc.enable() gc.enable()
......
# Make sure that libev child watchers, implicitly installed through the use
# of subprocess, do not cause waitpid() to fail to poll for processes.
# NOTE: This was only reproducible under python 2.
import gevent
from gevent import monkey
monkey.patch_all()
from multiprocessing import Process
from gevent.subprocess import Popen, PIPE
def test_invoke():
# Run a subprocess through Popen to make sure
# libev is handling SIGCHLD. This could *probably* be simplified to use
# just hub.loop.install_sigchld
p = Popen("true", stdout=PIPE, stderr=PIPE)
gevent.sleep(0)
p.communicate()
gevent.sleep(0)
def f(sleep_sec):
gevent.sleep(sleep_sec)
def test_process():
# Launch
p = Process(target=f, args=(1.0,))
p.start()
with gevent.Timeout(3):
# Poll for up to 10 seconds. If the bug exists,
# this will timeout because our subprocess should
# be long gone by now
p.join(10)
if __name__ == '__main__':
# do a subprocess open
test_invoke()
test_process()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment