Commit afcebf01 authored by Jason Madden's avatar Jason Madden

Updates for gevent.os.fork/waitpid, mostly documentation, but also make it...

Updates for gevent.os.fork/waitpid, mostly documentation, but also make it possible to switch off the waitpid behaviour and access the 'basic' fork function.
parent 2fc52c0e
...@@ -136,7 +136,12 @@ def patch_sys(stdin=True, stdout=True, stderr=True): ...@@ -136,7 +136,12 @@ def patch_sys(stdin=True, stdout=True, stderr=True):
def patch_os(): def patch_os():
"""Replace :func:`os.fork` with :func:`gevent.fork`. Does nothing if fork is not available.""" """
Replace :func:`os.fork` with :func:`gevent.fork`, and, on POSIX,
:func:`os.waitpid` with :func:`gevent.os.waitpid` (if the
environment variable ``GEVENT_NOWAITPID`` is not defined). Does
nothing if fork is not available.
"""
patch_module('os') patch_module('os')
......
""" """
This module provides cooperative versions of os.read() and os.write(). Low-level operating system functions from :mod:`os`.
On Posix platforms this uses non-blocking IO, on Windows a threadpool Cooperative I/O
is used. ===============
This module provides cooperative versions of :func:`os.read` and
:func:`os.write`. These functions are *not* monkey-patched; you
must explicitly call them or monkey patch them yourself.
POSIX functions
---------------
On POSIX, non-blocking IO is available.
- :func:`nb_read`
- :func:`nb_write`
- :func:`make_nonblocking`
All Platforms
-------------
On non-POSIX platforms (e.g., Windows), non-blocking IO is not
available. On those platforms (and on POSIX), cooperative IO can
be done with the threadpool.
- :func:`tp_read`
- :func:`tb_write`
Child Processes
===============
The functions :func:`fork` and (on POSIX) :func:`waitpid` can be used
to manage child processes.
.. warning::
Forking a process that uses greenlets does not eliminate all non-running
greenlets. Any that were scheduled in the hub of the forking thread in the parent
remain scheduled in the child; compare this to how normal threads operate. (This behaviour
may change is a subsequent major release.)
""" """
from __future__ import absolute_import from __future__ import absolute_import
...@@ -104,14 +140,39 @@ def tp_write(fd, buf): ...@@ -104,14 +140,39 @@ def tp_write(fd, buf):
if hasattr(os, 'fork'): if hasattr(os, 'fork'):
_fork = os.fork _raw_fork = os.fork
def fork(): def fork_gevent():
result = _fork() """
Forks the process using :func:`os.fork` and prepares the
child process to continue using gevent before returning.
.. note::
The PID returned by this function may not be
waitable with either :func:`os.waitpid` or :func:`waitpid`
if libev child watchers are in use. For example, the
:mod:`gevent.subprocess` module uses libev child watchers
(which parts of gevent use libev child watchers is subject to change
at any time). Most applications should use :func:`fork_and_watch`,
which is monkey-patched as the default replacement for :func:`os.fork`
and implements the ``fork`` function of this module by default, unless
the environment variable ``GEVENT_NOWAITPID`` is defined before this
module is imported.
.. versionadded:: 1.1b2
"""
result = _raw_fork()
if not result: if not result:
reinit() reinit()
return result return result
def fork():
"""
A wrapper for :func:`fork_gevent` for non-POSIX platforms.
"""
return fork_gevent()
if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'): if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'):
# We can only do this on POSIX # We can only do this on POSIX
import time import time
...@@ -134,7 +195,20 @@ if hasattr(os, 'fork'): ...@@ -134,7 +195,20 @@ if hasattr(os, 'fork'):
def _reap_children(timeout=60): def _reap_children(timeout=60):
# Remove all the dead children that haven't been waited on # Remove all the dead children that haven't been waited on
# for the *timeout* # for the *timeout* seconds.
# Some platforms queue delivery of SIGCHLD for all children that die;
# in that case, a well-behaved application should call waitpid() for each
# signal.
# Some platforms (linux) only guarantee one delivery if multiple children
# die. On that platform, the well-behave application calls waitpid() in a loop
# until it gets back -1, indicating no more dead children need to be waited for.
# In either case, waitpid should be called the same number of times as dead children,
# thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout
# is to work with applications that neglect to call waitpid and prevent "unlimited"
# growth.
# Note that we don't watch for the case of pid wraparound. That is, we fork a new
# child with the same pid as an existing watcher, but the child is already dead,
# just not waited on yet.
now = time.time() now = time.time()
oldest_allowed = now - timeout oldest_allowed = now - timeout
dead = [pid for pid, val dead = [pid for pid, val
...@@ -144,6 +218,20 @@ if hasattr(os, 'fork'): ...@@ -144,6 +218,20 @@ if hasattr(os, 'fork'):
del _watched_children[pid] del _watched_children[pid]
def waitpid(pid, options): def waitpid(pid, options):
"""
Wait for a child process to finish.
If the child process was spawned using :func:`fork_and_watch`, then this
function behaves cooperatively. If not, it *may* have race conditions; see
:func:`fork_gevent` for more information.
The arguments are as for the underlying :func:`os.waitpid`. Some combinations
of *options* may not be supported (as of 1.1 that includes WUNTRACED).
Availability: POSIX.
.. versionadded: 1.1a3
"""
# XXX Does not handle tracing children # XXX Does not handle tracing children
if pid <= 0: if pid <= 0:
# magic functions for multiple children. Pass. # magic functions for multiple children. Pass.
...@@ -176,27 +264,24 @@ if hasattr(os, 'fork'): ...@@ -176,27 +264,24 @@ if hasattr(os, 'fork'):
# we're not watching it # we're not watching it
return _waitpid(pid, options) return _waitpid(pid, options)
def fork_and_watch(callback=None, loop=None, ref=False, fork=fork): def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
""" """
Fork a child process and start a child watcher for it in the parent process. Fork a child process and start a child watcher for it in the parent process.
This call cooperates with the :func:`gevent.os.waitpid` to enable cooperatively waiting This call cooperates with :func:`waitpid` to enable cooperatively waiting
for children to finish. When monkey-patching, these functions are patched in as for children to finish. When monkey-patching, these functions are patched in as
:func:`os.fork` and :func:`os.waitpid`, respectively. :func:`os.fork` and :func:`os.waitpid`, respectively.
In the child process, this function calls :func:`gevent.hub.reinit` before returning. In the child process, this function calls :func:`gevent.hub.reinit` before returning.
.. warning:: Forking a process that uses greenlets does not eliminate all non-running Availability: POSIX.
greenlets. Any that were scheduled in the hub of the forking thread in the parent
remain scheduled in the child; compare this to how normal threads operate. (This behaviour
may change is a subsequent major release.)
:keyword callback: If given, a callable that will be called with the child watcher :keyword callback: If given, a callable that will be called with the child watcher
when the child finishes. when the child finishes.
:keyword loop: The loop to start the watcher in. Defaults to the :keyword loop: The loop to start the watcher in. Defaults to the
loop of the current hub. loop of the current hub.
:keyword fork: The fork function. Defaults to the one defined in this :keyword fork: The fork function. Defaults to :func:`the one defined in this
module (which automatically calls :func:`gevent.hub.reinit`). module <gevent_fork>` (which automatically calls :func:`gevent.hub.reinit`).
Pass the builtin :func:`os.fork` function if you do not need to Pass the builtin :func:`os.fork` function if you do not need to
initialize gevent in the child process. initialize gevent in the child process.
...@@ -211,11 +296,35 @@ if hasattr(os, 'fork'): ...@@ -211,11 +296,35 @@ if hasattr(os, 'fork'):
watcher.start(_on_child, watcher, callback) watcher.start(_on_child, watcher, callback)
return pid return pid
# Watch children by default
fork = fork_and_watch
__extensions__.append('fork_and_watch') __extensions__.append('fork_and_watch')
__implements__.append("waitpid") __extensions__.append('fork_gevent')
# Watch children by default
if not os.getenv('GEVENT_NOWAITPID'):
def fork(*args, **kwargs):
"""
Forks a child process and starts a child watcher for it in the
parent process.
This implementation of ``fork`` is a wrapper for :func:`fork_and_watch`
when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
This is the default and should be used by most applications.
"""
# take any args to match fork_and_watch
return fork_and_watch(*args, **kwargs)
__implements__.append("waitpid")
else:
def fork():
"""
Forks a child process, initializes gevent in the child,
but *does not* prepare the parent to wait for the child.
This implementation of ``fork`` is a wrapper for :func:`fork_gevent`
when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
This is not recommended for most applications.
"""
return fork_gevent()
__extensions__.append("waitpid")
else: else:
__implements__.remove('fork') __implements__.remove('fork')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment