Commit 10311020 authored by Jason Madden's avatar Jason Madden

Add kludge to make the default threadpool and threaded resolver work in forked...

Add kludge to make the default threadpool and threaded resolver work in forked children. The real issue seems to be that fork watchers aren't firing; there doesn't seem to be a test case for that. Fixes #230.
parent 6fe727c0
...@@ -45,6 +45,10 @@ Unreleased ...@@ -45,6 +45,10 @@ Unreleased
- ``gevent.iwait`` no longer throws ``LoopExit`` if the caller - ``gevent.iwait`` no longer throws ``LoopExit`` if the caller
switches greenlets between return values. Reported and initial patch switches greenlets between return values. Reported and initial patch
in :pr:`467` by Alexey Borzenkov. in :pr:`467` by Alexey Borzenkov.
- The default threadpool and default threaded resolver work in a
forked child process, such as with ``multiprocessing.Process``.
Previously the child process would hang indefinitely. Reported in
:issue:`230` by Lx Yu.
1.1a1 (Jun 29, 2015) 1.1a1 (Jun 29, 2015)
==================== ====================
......
...@@ -153,6 +153,21 @@ def reinit(): ...@@ -153,6 +153,21 @@ def reinit():
hub = _get_hub() hub = _get_hub()
if hub is not None: if hub is not None:
hub.loop.reinit() hub.loop.reinit()
# XXX: libev's fork watchers seem not to be firing for some reason
# in both the cython (core.ppyx) and CFFI (corecffi.py) implementations
# (at least on OS X; confirm on other platforms)
# This breaks the threadpool and anything that uses it, including
# resolver_thread in the forked process (if there was already one thread
# in the pool before fork, adding an additional task will hang forever post-fork)
# The below is a kludge. The correct fix is to figure out why the fork watchers
# don't work. Fortunately, both of these methods are idempotent and can be called
# multiple times following a fork if the suddenly started working, or were already
# working on some platforms.
if hasattr(hub.threadpool, '_on_fork'):
hub.threadpool._on_fork()
# resolver_ares also has a fork watcher that's not firing
if hasattr(hub.resolver, '_on_fork'):
hub.resolver._on_fork()
def get_hub_class(): def get_hub_class():
......
...@@ -36,6 +36,7 @@ class Resolver(object): ...@@ -36,6 +36,7 @@ class Resolver(object):
return '<gevent.resolver_ares.Resolver at 0x%x ares=%r>' % (id(self), self.ares) return '<gevent.resolver_ares.Resolver at 0x%x ares=%r>' % (id(self), self.ares)
def _on_fork(self): def _on_fork(self):
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid() pid = os.getpid()
if pid != self.pid: if pid != self.pid:
self.hub.loop.run_callback(self.ares.destroy) self.hub.loop.run_callback(self.ares.destroy)
......
...@@ -86,7 +86,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -86,7 +86,8 @@ class ThreadPool(GroupMappingMixin):
def _on_fork(self): def _on_fork(self):
# fork() only leaves one thread; also screws up locks; # fork() only leaves one thread; also screws up locks;
# let's re-create locks and threads # let's re-create locks and threads.
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid() pid = os.getpid()
if pid != self.pid: if pid != self.pid:
self.pid = pid self.pid = pid
......
import gevent.monkey
gevent.monkey.patch_all()
import socket
import multiprocessing
# Make sure that using the resolver in a forked process
# doesn't hang forever.
def block():
socket.getaddrinfo('localhost', 8001)
def main():
socket.getaddrinfo('localhost', 8001)
p = multiprocessing.Process(target=block)
p.start()
p.join()
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment