Commit 1e5f1bbc authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1743 from gevent/issue1739.py

Make AsyncResult behave more like in older versions in cross-thread usage
parents 33de7406 7eb9b595
Make ``AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.
``AsyncResult`` has *never* been safe to use from multiple threads.
It, like most gevent objects, is intended to work with greenlets from
a single thread. Using ``AsyncResult`` from multiple threads has
undefined semantics. The safest way to communicate between threads is
using an event loop async watcher.
Those undefined semantics changed in recent gevent versions, making it
more likely that an abused ``AsyncResult`` would misbehave in ways
that could cause the program to hang.
Now, when ``AsyncResult`` detects a situation that would hang, it
prints a warning to stderr. Note that this is best-effort, and hangs
are still possible, especially under PyPy 7.3.3.
At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
......@@ -9,7 +9,9 @@ from __future__ import division
from __future__ import print_function
import sys
from gc import get_objects
from greenlet import greenlet
from greenlet import error as greenlet_error
from gevent._compat import thread_mod_name
......@@ -40,6 +42,15 @@ class _FakeNotifier(object):
def __init__(self):
self.pending = False
def get_roots_and_hubs():
from gevent.hub import Hub # delay import
return {
x.parent: x
for x in get_objects()
if isinstance(x, Hub)
}
class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and
......@@ -328,13 +339,82 @@ class AbstractLinkable(object):
# must have more links than we started with. We need to schedule the
# wakeup.
self._check_and_notify()
# If we added unswitched greenlets, however, don't add them back to the links yet.
# We wouldn't be able to call them in this hub anyway.
# TODO: Instead of just adding these back to self._links, we should try to detect their
# "home" hub and mode the callback to that hub. As it stands, there's a chance that
# if no greenlet tries to acquire/release this object in that hub, these objects
# will never get to run.
self._links.extend(unswitched)
if unswitched:
self._handle_unswitched_notifications(unswitched)
def _handle_unswitched_notifications(self, unswitched):
# Given a list of callable objects that raised
# ``greenlet.error`` when we called them: If we can determine
# that it is a parked greenlet (the callablle is a
# ``greenlet.switch`` method) and we can determine the hub
# that the greenlet belongs to (either its parent, or, in the
# case of a main greenlet, find a hub with the same parent as
# this greenlet object) then:
# Move this to be a callback in that thread.
# (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being
# thread-safe! Note that the CFFI implementations are definitely
# NOT thread-safe. TODO: Make them? Or an alternative?)
#
# Otherwise, print some error messages.
# TODO: Inline this for individual links. That handles the
# "only while ready" case automatically. Be careful about locking in that case.
#
# TODO: Add a 'strict' mode that prevents doing this dance, since it's
# inherently not safe.
root_greenlets = None
printed_tb = False
only_while_ready = not self._notify_all
while unswitched:
if only_while_ready and not self.ready():
self.__print_unswitched_warning(unswitched, printed_tb)
break
link = unswitched.pop(0)
hub = None # Also serves as a "handled?" flag
# Is it a greenlet.switch method?
if (getattr(link, '__name__', None) == 'switch'
and isinstance(getattr(link, '__self__', None), greenlet)):
glet = link.__self__
parent = glet.parent
while parent is not None:
if hasattr(parent, 'loop'): # Assuming the hub.
hub = glet.parent
break
parent = glet.parent
if hub is None:
if root_greenlets is None:
root_greenlets = get_roots_and_hubs()
hub = root_greenlets.get(glet)
if hub is not None:
hub.loop.run_callback(link, self)
if hub is None:
# We couldn't handle it
self.__print_unswitched_warning(link, printed_tb)
printed_tb = True
def __print_unswitched_warning(self, link, printed_tb):
print('gevent: error: Unable to switch to greenlet', link,
'from', self, '; crossing thread boundaries is not allowed.',
file=sys.stderr)
if not printed_tb:
printed_tb = True
print(
'gevent: error: '
'This is a result of using gevent objects from multiple threads,',
'and is a bug in the calling code.', file=sys.stderr)
import traceback
traceback.print_stack()
def _quiet_unlink_all(self, obj):
if obj is None:
......
......@@ -9,6 +9,7 @@ cdef InvalidThreadUseError
cdef Timeout
cdef _get_thread_ident
cdef bint _greenlet_imported
cdef get_objects
cdef extern from "greenlet/greenlet.h":
......@@ -32,6 +33,8 @@ cdef inline void greenlet_init():
cdef void _init()
cdef dict get_roots_and_hubs()
cdef class _FakeNotifier(object):
cdef bint pending
......@@ -66,6 +69,9 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)
cdef _handle_unswitched_notifications(self, list unswitched)
cdef __print_unswitched_warning(self, link, bint printed_tb)
cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self)
......
......@@ -281,6 +281,31 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
def __exit__(self, t, v, tb):
self.release()
def _handle_unswitched_notifications(self, unswitched):
# If we fail to switch to a greenlet in another thread to send
# a notification, just re-queue it, in the hopes that the
# other thread will eventually run notifications itself.
#
# We CANNOT do what the ``super()`` does and actually allow
# this notification to get run sometime in the future by
# scheduling a callback in the other thread. The algorithm
# that we use to handle cross-thread locking/unlocking was
# designed before the schedule-a-callback mechanism was
# implemented. If we allow this to be run as a callback, we
# can find ourself the victim of ``InvalidSwitchError`` (or
# worse, silent corruption) because the switch can come at an
# unexpected time: *after* the destination thread has already
# acquired the lock.
#
# This manifests in a fairly reliable test failure,
# ``gevent.tests.test__semaphore``
# ``TestSemaphoreMultiThread.test_dueling_threads_with_hub``,
# but ONLY when running in PURE_PYTHON mode.
#
# TODO: Maybe we can rewrite that part of the algorithm to be friendly to
# running the callbacks?
self._links.extend(unswitched)
def __add_link(self, link):
if not self._notifier:
self.rawlink(link)
......
......@@ -29,6 +29,10 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
one or more others. It has the same interface as
:class:`threading.Event` but works across greenlets.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
An event object manages an internal flag that can be set to true
with the :meth:`set` method and reset to false with the
:meth:`clear` method. The :meth:`wait` method blocks until the
......@@ -166,14 +170,21 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
"""A one-time event that stores a value or an exception.
"""
A one-time event that stores a value or an exception.
Like :class:`Event` it wakes up all the waiters when :meth:`set`
or :meth:`set_exception` is called. Waiters may receive the passed
value or exception by calling :meth:`get` instead of :meth:`wait`.
An :class:`AsyncResult` instance cannot be reset.
Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception`
is called. Waiters may receive the passed value or exception by calling :meth:`get`
instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as
those made in the future) will return the value:
To pass a value call :meth:`set`. Calls to :meth:`get` (those that
are currently blocking as well as those made in the future) will
return the value::
>>> from gevent.event import AsyncResult
>>> result = AsyncResult()
......@@ -181,7 +192,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
>>> result.get()
100
To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception:
To pass an exception call :meth:`set_exception`. This will cause
:meth:`get` to raise that exception::
>>> result = AsyncResult()
>>> result.set_exception(RuntimeError('failure'))
......@@ -190,7 +202,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
...
RuntimeError: failure
:class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target:
:class:`AsyncResult` implements :meth:`__call__` and thus can be
used as :meth:`link` target::
>>> import gevent
>>> result = AsyncResult()
......@@ -202,6 +215,7 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
ZeroDivisionError
.. note::
The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
......@@ -209,16 +223,25 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
the waiting greenlets being awakened. These details may change in the future.
.. versionchanged:: 1.1
The exact order in which waiting greenlets are awakened is not the same
as in 1.0.
The exact order in which waiting greenlets
are awakened is not the same as in 1.0.
.. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged.
Callbacks :meth:`linked <rawlink>` to this object are required to
be hashable, and duplicates are merged.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
Waiting greenlets are now awakened in the order in which they
waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
The low-level ``rawlink`` method
(most users won't use this) now automatically unlinks waiters
before calling them.
"""
__slots__ = ('_value', '_exc_info', '_imap_task_index')
......
......@@ -201,6 +201,8 @@ def _ignores_DoNotPatch(func):
# maps module name -> {attribute name: original item}
# e.g. "time" -> {"sleep": built-in function sleep}
# NOT A PUBLIC API. However, third-party monkey-patchers may be using
# it? TODO: Provide better API for them.
saved = {}
......@@ -229,6 +231,18 @@ def is_object_patched(mod_name, item_name):
return is_module_patched(mod_name) and item_name in saved[mod_name]
def is_anything_patched():
# Check if this module has done any patching in the current process.
# This is currently only used in gevent tests.
#
# Not currently a documented, public API, because I'm not convinced
# it is 100% reliable in the event of third-party patch functions that
# don't use ``saved``.
#
# .. versionadded:: NEXT
return bool(saved)
def _get_original(name, items):
d = saved.get(name, {})
values = []
......
......@@ -435,3 +435,7 @@ class TestCase(TestCaseMetaClass("NewBase",
def assertStartsWith(self, it, has_prefix):
self.assertTrue(it.startswith(has_prefix), (it, has_prefix))
def assertNotMonkeyPatched(self):
from gevent import monkey
self.assertFalse(monkey.is_anything_patched())
from __future__ import absolute_import, print_function, division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import weakref
......@@ -138,6 +140,88 @@ class TestAsyncResult(greentest.TestCase):
self.assertRaises(gevent.Timeout, ar.get, block=False)
self.assertRaises(gevent.Timeout, ar.get_nowait)
@greentest.ignores_leakcheck
def test_cross_thread_use(self, timed_wait=False, wait_in_bg=False):
# Issue 1739.
# AsyncResult has *never* been thread safe, and using it from one
# thread to another is not safe. However, in some very careful use cases
# that can actually work.
#
# This test makes sure it doesn't hang in one careful use
# scenario.
self.assertNotMonkeyPatched() # Need real threads, event objects
from threading import Thread as NativeThread
from threading import Event as NativeEvent
# On PyPy 7.3.3, switching to the main greenlet of a thread from a
# different thread silently does nothing. We can't detect the cross-thread
# switch, and so this test breaks
# https://foss.heptapod.net/pypy/pypy/-/issues/3381
if not wait_in_bg and greentest.PYPY:
import sys
if sys.pypy_version_info[:3] <= (7, 3, 3): # pylint:disable=no-member
self.skipTest("PyPy bug: https://foss.heptapod.net/pypy/pypy/-/issues/3381")
class Thread(NativeThread):
def __init__(self):
NativeThread.__init__(self)
self.daemon = True
self.running_event = NativeEvent()
self.finished_event = NativeEvent()
self.async_result = AsyncResult()
self.result = '<never set>'
def run(self):
# Give the loop in this thread something to do
g_event = Event()
def spin():
while not g_event.is_set():
g_event.wait(DELAY * 2)
glet = gevent.spawn(spin)
def work():
self.running_event.set()
# XXX: If we use a timed wait(), the bug doesn't manifest.
# Why not?
if timed_wait:
self.result = self.async_result.wait(DELAY * 5)
else:
self.result = self.async_result.wait()
if wait_in_bg:
# This results in a separate code path
worker = gevent.spawn(work)
worker.join()
del worker
else:
work()
g_event.set()
glet.join()
del glet
self.finished_event.set()
gevent.get_hub().destroy(destroy_loop=True)
thread = Thread()
thread.start()
try:
thread.running_event.wait()
thread.async_result.set('from main')
thread.finished_event.wait(DELAY * 5)
finally:
thread.join(DELAY * 15)
self.assertEqual(thread.result, 'from main')
def test_cross_thread_use_bg(self):
self.test_cross_thread_use(timed_wait=False, wait_in_bg=True)
def test_cross_thread_use_timed(self):
self.test_cross_thread_use(timed_wait=True, wait_in_bg=False)
def test_cross_thread_use_timed_bg(self):
self.test_cross_thread_use(timed_wait=True, wait_in_bg=True)
class TestAsyncResultAsLinkTarget(greentest.TestCase):
error_fatal = False
......
......@@ -267,7 +267,9 @@ class TestSemaphoreMultiThread(greentest.TestCase):
if not run:
break
sem.acquire(*acquire_args)
acquired = sem.acquire(*acquire_args)
assert acquire_args or acquired
if acquired:
sem.release()
results[ix] = i
if not create_hub:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment