Commit 39548f4a authored by Jason Madden's avatar Jason Madden

Patch the join method of existing threads when patching threading. Fixes #747.

parent e788578d
......@@ -65,6 +65,9 @@
available (Python 3 and Python 2 with the ``futures`` backport
installed). This is helpful for, e.g., grpc. Reported in
:issue:`786` by Markus Padourek.
- Native threads created before monkey-patching threading can now be
joined. Previously on Python < 3.4, doing so would raise a
``LoopExit`` error. reported in :issue:`747` by Sergey Vasilyev.
1.1.0 (Mar 5, 2016)
===================
......
......@@ -292,7 +292,7 @@ def patch_thread(threading=True, _threading_local=True, Event=False, logging=Tru
Add *logging* and *existing_locks* params.
"""
# XXX: Simplify
# pylint:disable=too-many-branches
# pylint:disable=too-many-branches,too-many-locals
# Description of the hang:
# There is an incompatibility with patching 'thread' and the 'multiprocessing' module:
......@@ -322,11 +322,19 @@ def patch_thread(threading=True, _threading_local=True, Event=False, logging=Tru
# are in trouble. The latter is tricky because of the different names
# on different versions.
if threading:
__import__('threading')
threading_mod = __import__('threading')
# Capture the *real* current thread object before
# we start returning DummyThread objects, for comparison
# to the main thread.
orig_current_thread = threading_mod.current_thread()
else:
threading_mod = None
orig_current_thread = None
patch_module('thread')
if threading:
threading_mod = patch_module('threading')
patch_module('threading')
if Event:
from gevent.event import Event
......@@ -352,6 +360,36 @@ def patch_thread(threading=True, _threading_local=True, Event=False, logging=Tru
from gevent.local import local
patch_item(_threading_local, 'local', local)
def make_join_func(thread, thread_greenlet):
from gevent.hub import sleep
from time import time
def join(timeout=None):
end = None
if threading_mod.current_thread() is thread:
raise RuntimeError("Cannot join current thread")
if thread_greenlet is not None and thread_greenlet.dead:
return
if not thread.is_alive():
return
if timeout:
end = time() + timeout
while thread.is_alive():
if end is not None and time() > end:
return
sleep(0.01)
return join
if threading:
from gevent.threading import main_native_thread
for thread in threading_mod._active.values():
if thread == main_native_thread():
continue
thread.join = make_join_func(thread, None)
if sys.version_info[:2] >= (3, 4):
# Issue 18808 changes the nature of Thread.join() to use
......@@ -359,26 +397,15 @@ def patch_thread(threading=True, _threading_local=True, Event=False, logging=Tru
# (which is already running) cannot wait for the main thread---it
# hangs forever. We patch around this if possible. See also
# gevent.threading.
threading_mod = __import__('threading')
greenlet = __import__('greenlet')
if threading_mod.current_thread() == threading_mod.main_thread():
if orig_current_thread == threading_mod.main_thread():
main_thread = threading_mod.main_thread()
_greenlet = main_thread._greenlet = greenlet.getcurrent()
from gevent.hub import sleep
def join(timeout=None):
if threading_mod.current_thread() is main_thread:
raise RuntimeError("Cannot join current thread")
if _greenlet.dead or not main_thread.is_alive():
return
elif timeout:
raise ValueError("Cannot use a timeout to join the main thread")
# XXX: Make that work
else:
while main_thread.is_alive():
sleep(0.01)
main_thread.join = join
main_thread.join = make_join_func(main_thread, _greenlet)
# Patch up the ident of the main thread to match. This
# matters if threading was imported before monkey-patching
......
......@@ -94,15 +94,28 @@ class _DummyThread(_DummyThread_):
def _wait_for_tstate_lock(self, *args, **kwargs):
pass
if hasattr(__threading__, 'main_thread'): # py 3.4+
def main_native_thread():
return __threading__.main_thread() # pylint:disable=no-member
else:
_main_threads = [(_k, _v) for _k, _v in __threading__._active.items()
if isinstance(_v, __threading__._MainThread)]
assert len(_main_threads) == 1, "Too many main threads"
def main_native_thread():
return _main_threads[0][1]
# Make sure the MainThread can be found by our current greenlet ID,
# otherwise we get a new DummyThread, which cannot be joined.
# Fixes tests in test_threading_2 under PyPy, and generally makes things nicer
# when gevent.threading is imported before monkey patching or not at all
# XXX: This assumes that the import is happening in the "main" greenlet
if _get_ident() not in __threading__._active and len(__threading__._active) == 1:
_k, _v = next(iter(__threading__._active.items()))
# XXX: This assumes that the import is happening in the "main" greenlet/thread.
# XXX: We should really only be doing this from gevent.monkey.
if _get_ident() not in __threading__._active:
_v = main_native_thread()
_k = _v.ident
del __threading__._active[_k]
_v._Thread__ident = _get_ident()
_v._ident = _v._Thread__ident = _get_ident()
__threading__._active[_get_ident()] = _v
del _k
del _v
......
# If stdlib threading is imported *BEFORE* monkey patching, *and*
# there is a native thread created, we can still get the current
# (main) thread, and it's not a DummyThread.
# Joining the native thread also does not fail
import threading
from time import sleep as time_sleep
import greentest
class NativeThread(threading.Thread):
do_run = True
def run(self):
while self.do_run:
time_sleep(0.1)
def stop(self, timeout=None):
self.do_run = False
self.join(timeout=timeout)
native_thread = None
class Test(greentest.TestCase):
def test_main_thread(self):
current = threading.current_thread()
self.assertFalse(isinstance(current, threading._DummyThread))
self.assertTrue(isinstance(current, monkey.get_original('threading', 'Thread')))
# in 3.4, if the patch is incorrectly done, getting the repr
# of the thread fails
repr(current)
if hasattr(threading, 'main_thread'): # py 3.4
self.assertEqual(threading.current_thread(), threading.main_thread())
@greentest.ignores_leakcheck # because it can't be run multiple times
def test_join_native_thread(self):
self.assertTrue(native_thread.is_alive())
native_thread.stop(timeout=1)
self.assertFalse(native_thread.is_alive())
# again, idempotent
native_thread.stop()
self.assertFalse(native_thread.is_alive())
if __name__ == '__main__':
native_thread = NativeThread()
native_thread.start()
# Only patch after we're running
from gevent import monkey
monkey.patch_all()
greentest.main()
......@@ -24,6 +24,15 @@ commands =
# The real solution is probably to put "gevent" beneath a "src" directory.
usedevelop = True
[testenv:py33]
# On OS X, at least, the binary wheel for 1.5.2 is broken
deps =
greenlet
cython
coverage
psutil
cffi==1.5.1
[testenv:py27-full]
basepython = python2.7
commands =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment