Commit eacc8d33 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1746 from gevent/issue1735

Add loop.run_callback_threadsafe and use it in AbstractLinkable
parents 1e5f1bbc 06129943
......@@ -30,6 +30,7 @@ All implementations of the loop provide a common minimum interface.
.. autointerface:: gevent._interfaces.ILoop
.. autointerface:: gevent._interfaces.IWatcher
.. autointerface:: gevent._interfaces.ICallback
Utilities
=========
......
Make ``AsyncResult`` print a warning when it detects improper
Make ``gevent.event.AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.
``AsyncResult`` has *never* been safe to use from multiple threads.
......@@ -18,4 +18,8 @@ are still possible, especially under PyPy 7.3.3.
At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
successfully across threads, this may restore your luck. In addition,
cross-thread wakeups are faster. Note that the gevent hub now uses an
extra file descriptor to implement this.
Similar changes apply to ``gevent.event.Event`` (see :issue:`1735`).
......@@ -406,6 +406,7 @@ def run_setup(ext_modules):
'docs': [
'repoze.sphinx.autointerface',
'sphinxcontrib-programoutput',
'zope.schema',
],
# To the extent possible, we should work to make sure
# our tests run, at least a basic set, without any of
......
......@@ -47,7 +47,11 @@ def get_roots_and_hubs():
return {
x.parent: x
for x in get_objects()
if isinstance(x, Hub)
# Make sure to only find hubs that have a loop
# and aren't destroyed. If we don't do that, we can
# get an old hub that no longer works leading to issues in
# combined test cases.
if isinstance(x, Hub) and x.loop is not None
}
......@@ -393,9 +397,9 @@ class AbstractLinkable(object):
root_greenlets = get_roots_and_hubs()
hub = root_greenlets.get(glet)
if hub is not None:
hub.loop.run_callback(link, self)
if hub is None:
if hub is not None and hub.loop is not None:
hub.loop.run_callback_threadsafe(link, self)
if hub is None or hub.loop is None:
# We couldn't handle it
self.__print_unswitched_warning(link, printed_tb)
printed_tb = True
......
from __future__ import absolute_import, print_function
from __future__ import absolute_import
from __future__ import print_function
from zope.interface import implementer
from gevent._interfaces import ICallback
__all__ = [
'callback',
]
@implementer(ICallback)
class callback(object):
__slots__ = ('callback', 'args')
......
......@@ -392,6 +392,7 @@ class AbstractLoop(object):
_default = None
_keepaliveset = _DiscardedSet()
_threadsafe_async = None
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
......@@ -405,7 +406,6 @@ class AbstractLoop(object):
self._keepaliveset = set()
self._init_loop_and_aux_watchers(flags, default)
def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._ptr = self._init_loop(flags, default)
......@@ -436,6 +436,8 @@ class AbstractLoop(object):
self._timer0.data = self._handle_to_self
self._init_callback_timer()
self._threadsafe_async = self.async_(ref=False)
self._threadsafe_async.start(lambda: None)
# TODO: We may be able to do something nicer and use the existing python_callback
# combined with onerror and the class check/timer/prepare to simplify things
# and unify our handling
......@@ -546,7 +548,9 @@ class AbstractLoop(object):
self.starting_timer_may_update_loop_time = False
def _stop_aux_watchers(self):
raise NotImplementedError()
if self._threadsafe_async is not None:
self._threadsafe_async.close()
self._threadsafe_async = None
def destroy(self):
ptr = self.ptr
......@@ -739,9 +743,13 @@ class AbstractLoop(object):
# _run_callbacks), this could happen almost immediately,
# without the loop cycling.
cb = callback(func, args)
self._callbacks.append(cb)
self._setup_for_run_callback()
self._callbacks.append(cb) # Relying on the GIL for this to be threadsafe
self._setup_for_run_callback() # XXX: This may not be threadsafe.
return cb
def run_callback_threadsafe(self, func, *args):
cb = self.run_callback(func, *args)
self._threadsafe_async.send()
return cb
def _format(self):
......
......@@ -69,6 +69,7 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)
@cython.locals(hub=SwitchOutGreenletWithLoop)
cdef _handle_unswitched_notifications(self, list unswitched)
cdef __print_unswitched_warning(self, link, bint printed_tb)
......
......@@ -19,12 +19,31 @@ import sys
from zope.interface import Interface
from zope.interface import Attribute
try:
from zope import schema
except ImportError: # pragma: no cover
class _Field(Attribute):
__allowed_kw__ = ('readonly', 'min',)
def __init__(self, description, required=False, **kwargs):
description = "%s (required? %s)" % (description, required)
for k in self.__allowed_kw__:
kwargs.pop(k, None)
if kwargs:
raise TypeError("Unexpected keyword arguments: %r" % (kwargs,))
Attribute.__init__(self, description)
class schema(object):
Bool = _Field
Float = _Field
# pylint:disable=no-method-argument, unused-argument, no-self-argument
# pylint:disable=inherit-non-class
__all__ = [
'ILoop',
'IWatcher',
'ICallback',
]
class ILoop(Interface):
......@@ -46,13 +65,20 @@ class ILoop(Interface):
this watcher is still started. *priority* is event loop specific.
"""
default = Attribute("Boolean indicating whether this is the default loop")
default = schema.Bool(
description=u"Boolean indicating whether this is the default loop",
required=True,
readonly=True,
)
approx_timer_resolution = Attribute(
"Floating point number of seconds giving (approximately) the minimum "
approx_timer_resolution = schema.Float(
description=u"Floating point number of seconds giving (approximately) the minimum "
"resolution of a timer (and hence the minimun value the sleep can sleep for). "
"On libuv, this is fixed by the library, but on libev it is just a guess "
"and the actual value is system dependent."
"and the actual value is system dependent.",
required=True,
min=0.0,
readonly=True,
)
def run(nowait=False, once=False):
......@@ -160,7 +186,7 @@ class ILoop(Interface):
"""
Create a watcher that fires when the process forks.
Availability: POSIX
Availability: Unix.
"""
def async_(ref=True, priority=None):
......@@ -182,6 +208,8 @@ class ILoop(Interface):
Create a watcher that fires for events on the child with process ID *pid*.
This is platform specific and not available on Windows.
Availability: Unix.
"""
def stat(path, interval=0.0, ref=True, priority=None):
......@@ -196,8 +224,29 @@ class ILoop(Interface):
"""
Run the *func* passing it *args* at the next opportune moment.
This is a way of handing control to the event loop and deferring
an action.
The next opportune moment may be the next iteration of the event loop,
the current iteration, or some other time in the future.
Returns a :class:`ICallback` object. See that documentation for
important caveats.
.. seealso:: :meth:`asyncio.loop.call_soon`
The :mod:`asyncio` equivalent.
"""
def run_callback_threadsafe(func, *args):
"""
Like :meth:`run_callback`, but for use from *outside* the
thread that is running this loop.
This not only schedules the *func* to run, it also causes the
loop to notice that the *func* has been scheduled (e.g., it causes
the loop to wake up).
.. versionadded:: NEXT
.. seealso:: :meth:`asyncio.loop.call_soon_threadsafe`
The :mod:`asyncio` equivalent.
"""
class IWatcher(Interface):
......@@ -242,3 +291,25 @@ class IWatcher(Interface):
undefined. You should dispose of any references you have to it
after calling this method.
"""
class ICallback(Interface):
"""
Represents a function that will be run some time in the future.
Callback functions run in the hub, and as such they cannot use
gevent's blocking API; any exception they raise cannot be caught.
"""
pending = schema.Bool(description="Has this callback run yet?",
readonly=True)
def stop():
"""
If this object is still `pending`, cause it to
no longer be `pending`; the function will not be run.
"""
def close():
"""
An alias of `stop`.
"""
......@@ -405,7 +405,7 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
thread_lock.acquire()
results = []
owning_hub.loop.run_callback(
owning_hub.loop.run_callback_threadsafe(
spawn_raw,
self.__acquire_from_other_thread_cb,
results,
......
......@@ -392,6 +392,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
## embedded struct members
cdef libev.ev_prepare _prepare
cdef libev.ev_timer _timer0
cdef libev.ev_async _threadsafe_async
# We'll only actually start this timer if we're on Windows,
# but it doesn't hurt to compile it in on all platforms.
cdef libev.ev_timer _periodic_signal_checker
......@@ -421,6 +422,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_timer_init(&self._timer0,
<void*>gevent_noop,
0.0, 0.0)
libev.ev_async_init(&self._threadsafe_async,
<void*>gevent_noop)
cdef unsigned int c_flags
if ptr:
......@@ -454,6 +457,10 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_prepare_start(self._ptr, &self._prepare)
libev.ev_unref(self._ptr)
libev.ev_async_start(self._ptr, &self._threadsafe_async)
libev.ev_unref(self._ptr)
def __init__(self, object flags=None, object default=None, libev.intptr_t ptr=0):
self._callbacks = CallbackFIFO()
# See libev.corecffi for this attribute.
......@@ -507,6 +514,9 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
if libev.ev_is_active(&self._periodic_signal_checker):
libev.ev_ref(ptr)
libev.ev_timer_stop(ptr, &self._periodic_signal_checker)
if libev.ev_is_active(&self._threadsafe_async):
libev.ev_ref(ptr)
libev.ev_async_stop(ptr, &self._threadsafe_async)
def destroy(self):
cdef libev.ev_loop* ptr = self._ptr
......@@ -520,8 +530,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# else with it will likely cause a crash.
return
# Mark as destroyed
libev.ev_set_userdata(ptr, NULL)
self._stop_watchers(ptr)
libev.ev_set_userdata(ptr, NULL)
if SYSERR_CALLBACK == self._handle_syserr:
set_syserr_cb(None)
libev.ev_loop_destroy(ptr)
......@@ -715,6 +725,12 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_ref(self._ptr)
return cb
def run_callback_threadsafe(self, func, *args):
# We rely on the GIL to make this threadsafe.
cb = self.run_callback(func, *args)
libev.ev_async_send(self._ptr, &self._threadsafe_async)
return cb
def _format(self):
if not self._ptr:
return 'destroyed'
......@@ -775,15 +791,17 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# Explicitly not EV_USE_SIGNALFD
raise AttributeError("sigfd")
try:
from zope.interface import classImplements
except ImportError:
pass
else:
# XXX: This invokes the side-table lookup, we would
# prefer to have it stored directly on the class.
from gevent._interfaces import ILoop
classImplements(loop, ILoop)
from zope.interface import classImplements
# XXX: This invokes the side-table lookup, we would
# prefer to have it stored directly on the class. That means we
# need a class variable ``__implemented__``, but that's hard in
# Cython
from gevent._interfaces import ILoop
from gevent._interfaces import ICallback
classImplements(loop, ILoop)
classImplements(callback, ICallback)
# about readonly _flags attribute:
# bit #1 set if object owns Python reference to itself (Py_INCREF was
......
......@@ -284,6 +284,7 @@ class loop(AbstractLoop):
libev.ev_timer_start(self._ptr, self._timer0)
def _stop_aux_watchers(self):
super(loop, self)._stop_aux_watchers()
if libev.ev_is_active(self._prepare):
self.ref()
libev.ev_prepare_stop(self._ptr, self._prepare)
......
......@@ -334,6 +334,7 @@ class loop(AbstractLoop):
def _stop_aux_watchers(self):
super(loop, self)._stop_aux_watchers()
assert self._prepare
assert self._check
assert self._signal_idle
......@@ -456,7 +457,8 @@ class loop(AbstractLoop):
pass
def break_(self, how=None):
libuv.uv_stop(self.ptr)
if self.ptr:
libuv.uv_stop(self.ptr)
def reinit(self):
# TODO: How to implement? We probably have to simply
......
......@@ -140,6 +140,26 @@ class TestAsyncResult(greentest.TestCase):
self.assertRaises(gevent.Timeout, ar.get, block=False)
self.assertRaises(gevent.Timeout, ar.get_nowait)
class TestAsyncResultCrossThread(greentest.TestCase):
def _makeOne(self):
return AsyncResult()
def _setOne(self, one):
one.set('from main')
BG_WAIT_DELAY = 60
def _check_pypy_switch(self):
# On PyPy 7.3.3, switching to the main greenlet of a thread from a
# different thread silently does nothing. We can't detect the cross-thread
# switch, and so this test breaks
# https://foss.heptapod.net/pypy/pypy/-/issues/3381
if greentest.PYPY:
import sys
if sys.pypy_version_info[:3] <= (7, 3, 3): # pylint:disable=no-member
self.skipTest("PyPy bug: https://foss.heptapod.net/pypy/pypy/-/issues/3381")
@greentest.ignores_leakcheck
def test_cross_thread_use(self, timed_wait=False, wait_in_bg=False):
# Issue 1739.
......@@ -153,15 +173,10 @@ class TestAsyncResult(greentest.TestCase):
from threading import Thread as NativeThread
from threading import Event as NativeEvent
# On PyPy 7.3.3, switching to the main greenlet of a thread from a
# different thread silently does nothing. We can't detect the cross-thread
# switch, and so this test breaks
# https://foss.heptapod.net/pypy/pypy/-/issues/3381
if not wait_in_bg and greentest.PYPY:
import sys
if sys.pypy_version_info[:3] <= (7, 3, 3): # pylint:disable=no-member
self.skipTest("PyPy bug: https://foss.heptapod.net/pypy/pypy/-/issues/3381")
if not wait_in_bg:
self._check_pypy_switch()
test = self
class Thread(NativeThread):
def __init__(self):
NativeThread.__init__(self)
......@@ -169,7 +184,7 @@ class TestAsyncResult(greentest.TestCase):
self.running_event = NativeEvent()
self.finished_event = NativeEvent()
self.async_result = AsyncResult()
self.async_result = test._makeOne()
self.result = '<never set>'
def run(self):
......@@ -182,10 +197,12 @@ class TestAsyncResult(greentest.TestCase):
def work():
self.running_event.set()
# XXX: If we use a timed wait(), the bug doesn't manifest.
# Why not?
# If we use a timed wait(), the bug doesn't manifest.
# This is probably because the loop wakes up to handle the timer,
# and notices the callback.
# See https://github.com/gevent/gevent/issues/1735
if timed_wait:
self.result = self.async_result.wait(DELAY * 5)
self.result = self.async_result.wait(test.BG_WAIT_DELAY)
else:
self.result = self.async_result.wait()
......@@ -207,12 +224,15 @@ class TestAsyncResult(greentest.TestCase):
thread.start()
try:
thread.running_event.wait()
thread.async_result.set('from main')
self._setOne(thread.async_result)
thread.finished_event.wait(DELAY * 5)
finally:
thread.join(DELAY * 15)
self.assertEqual(thread.result, 'from main')
self._check_result(thread.result)
def _check_result(self, result):
self.assertEqual(result, 'from main')
def test_cross_thread_use_bg(self):
self.test_cross_thread_use(timed_wait=False, wait_in_bg=True)
......@@ -223,6 +243,57 @@ class TestAsyncResult(greentest.TestCase):
def test_cross_thread_use_timed_bg(self):
self.test_cross_thread_use(timed_wait=True, wait_in_bg=True)
@greentest.ignores_leakcheck
def test_cross_thread_use_set_in_bg(self):
self.assertNotMonkeyPatched() # Need real threads, event objects
from threading import Thread as NativeThread
from threading import Event as NativeEvent
self._check_pypy_switch()
test = self
class Thread(NativeThread):
def __init__(self):
NativeThread.__init__(self)
self.daemon = True
self.running_event = NativeEvent()
self.finished_event = NativeEvent()
self.async_result = test._makeOne()
self.result = '<never set>'
def run(self):
self.running_event.set()
test._setOne(self.async_result)
self.finished_event.set()
gevent.get_hub().destroy(destroy_loop=True)
thread = Thread()
try:
glet = gevent.spawn(thread.start)
result = thread.async_result.wait(self.BG_WAIT_DELAY)
finally:
thread.join(DELAY * 15)
glet.join(DELAY)
self._check_result(result)
@greentest.ignores_leakcheck
def test_cross_thread_use_set_in_bg2(self):
# Do it again to make sure it works multiple times.
self.test_cross_thread_use_set_in_bg()
class TestEventCrossThread(TestAsyncResultCrossThread):
def _makeOne(self):
return Event()
def _setOne(self, one):
one.set()
def _check_result(self, result):
self.assertTrue(result)
class TestAsyncResultAsLinkTarget(greentest.TestCase):
error_fatal = False
......
......@@ -337,6 +337,24 @@ class TestLoopInterface(unittest.TestCase):
verify.verifyObject(ILoop, loop)
def test_callback_implements_ICallback(self):
from gevent.testing import verify
from gevent._interfaces import ICallback
loop = get_hub().loop
cb = loop.run_callback(lambda: None)
verify.verifyObject(ICallback, cb)
def test_callback_ts_implements_ICallback(self):
from gevent.testing import verify
from gevent._interfaces import ICallback
loop = get_hub().loop
cb = loop.run_callback_threadsafe(lambda: None)
verify.verifyObject(ICallback, cb)
class TestHandleError(unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment