Commit 9a22eac9 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1718 from gevent/issue1698

Rework the way Semaphore deals with cross-thread usage
parents 00337094 c79a1978
Improve the ability to use monkey-patched locks, and
`gevent.lock.BoundedSemaphore`, across threads, especially when the
various threads might not have a gevent hub or any other active
greenlets. In particular, this handles some cases that previously
raised ``LoopExit`` or would hang. Note that this may not be reliable
on PyPy on Windows; such an environment is not currently recommended.
The semaphore tries to avoid creating a hub if it seems unnecessary,
automatically creating one in the single-threaded case when it would
block, but not in the multi-threaded case. While the differences
should be correctly detected, it's possible there are corner cases
where they might not be.
If your application appears to hang acquiring semaphores, but adding a
call to ``gevent.get_hub()`` in the thread attempting to acquire the
semaphore before doing so fixes it, please file an issue.
......@@ -45,9 +45,17 @@ else
echo "Compiling with -Ofast"
export CFLAGS="-Ofast $GEVENT_WARNFLAGS"
fi
# Needed for clock_gettime libc support on this version. This used to be spelled with LDFLAGS,
# but that is deprecated and produces a warning.
export LIBS="-lrt"
# -lrt: Needed for clock_gettime libc support on this version.
# -pthread: Needed for pthread_atfork (cffi).
# This used to be spelled with LDFLAGS, but that is deprecated and
# produces a warning on the 2014 image (?). Still needed on the
# 2010 image.
export LIBS="-lrt -pthread"
export LDFLAGS="$LIBS"
# Be sure that we get the loop we expect by default, and not
# a fallback loop.
export GEVENT_LOOP="libev-cext"
if [ -d /gevent -a -d /opt/python ]; then
# Running inside docker
......@@ -117,7 +125,10 @@ if [ -d /gevent -a -d /opt/python ]; then
python -c 'from __future__ import print_function; import gevent; print(gevent, gevent.__version__)'
python -c 'from __future__ import print_function; from gevent._compat import get_clock_info; print("clock info", get_clock_info("perf_counter"))'
python -c 'from __future__ import print_function; import greenlet; print(greenlet, greenlet.__version__)'
python -c 'from __future__ import print_function; import gevent.core; print("loop", gevent.core.loop)'
python -c 'from __future__ import print_function; import gevent.core; print("default loop", gevent.core.loop)'
# Other loops we should have
GEVENT_LOOP=libuv python -c 'from __future__ import print_function; import gevent.core; print("libuv loop", gevent.core.loop)'
GEVENT_LOOP=libev-cffi python -c 'from __future__ import print_function; import gevent.core; print("libev-cffi loop", gevent.core.loop)'
if [ -z "$GEVENTSETUP_DISABLE_ARES" ]; then
python -c 'from __future__ import print_function; import gevent.ares; print("ares", gevent.ares)'
fi
......
This diff is collapsed.
......@@ -763,6 +763,7 @@ class AbstractLoop(object):
msg += ' fileno=' + repr(fileno)
#if sigfd is not None and sigfd != -1:
# msg += ' sigfd=' + repr(sigfd)
msg += ' callbacks=' + str(len(self._callbacks))
return msg
def fileno(self):
......
......@@ -5,7 +5,9 @@ from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
from gevent._gevent_c_hub_local cimport get_hub_if_exists
cdef InvalidSwitchError
cdef InvalidThreadUseError
cdef Timeout
cdef _get_thread_ident
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
......@@ -30,6 +32,9 @@ cdef inline void greenlet_init():
cdef void _init()
cdef class _FakeNotifier(object):
cdef bint pending
cdef class AbstractLinkable(object):
# We declare the __weakref__ here in the base (even though
# that's not really what we want) as a workaround for a Cython
......@@ -49,12 +54,14 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback)
cdef _check_and_notify(self)
cdef void _capture_hub(self, bint create)
cdef SwitchOutGreenletWithLoop _capture_hub(self, bint create)
cdef __wait_to_be_notified(self, bint rawlink)
cdef void __unlink_all(self, obj) # suppress exceptions
cdef void _quiet_unlink_all(self, obj) # suppress exceptions
cdef int _switch_to_hub(self, the_hub) except -1
@cython.nonecheck(False)
cdef _notify_link_list(self, list links)
cdef list _notify_link_list(self, list links)
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)
......@@ -63,5 +70,10 @@ cdef class AbstractLinkable(object):
cpdef _acquire_lock_for_switch_in(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait_return_value(self, bint waited, bint wait_success)
cdef _wait(self, timeout=*)
# Unreleated utilities
cdef _allocate_lock(self)
cdef greenlet _getcurrent(self)
cdef _get_thread_ident(self)
cimport cython
from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent._gevent_c_abstract_linkable cimport AbstractLinkable
from gevent._gevent_c_hub_local cimport get_hub_if_exists
from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
cdef InvalidThreadUseError
cdef LoopExit
cdef Timeout
cdef _native_sleep
cdef monotonic
cdef spawn_raw
cdef class _LockReleaseLink(object):
cdef object lock
cdef class Semaphore(AbstractLinkable):
cdef public int counter
cdef long _multithreaded
cpdef bint locked(self)
cpdef int release(self) except -1000
# We don't really want this to be public, but
# threadpool uses it
cpdef _start_notify(self)
cpdef int wait(self, object timeout=*) except -1000
@cython.locals(
success=bint,
e=Exception,
ex=Exception,
args=tuple,
)
cpdef bint acquire(self, bint blocking=*, object timeout=*) except -1000
cpdef __enter__(self)
cpdef __exit__(self, object t, object v, object tb)
@cython.locals(
hub_for_this_thread=SwitchOutGreenletWithLoop,
owning_hub=SwitchOutGreenletWithLoop,
)
cdef __acquire_from_other_thread(self, tuple args, bint blocking, timeout)
cpdef __acquire_from_other_thread_cb(self, list results, bint blocking, timeout, thread_lock)
cdef __add_link(self, link)
cdef __acquire_using_two_hubs(self,
SwitchOutGreenletWithLoop hub_for_this_thread,
current_greenlet,
timeout)
cdef __acquire_using_other_hub(self, SwitchOutGreenletWithLoop owning_hub, timeout)
cdef bint __acquire_without_hubs(self, timeout)
cdef bint __spin_on_native_lock(self, thread_lock, timeout)
cdef class BoundedSemaphore(Semaphore):
cdef readonly int _initial_value
......
This diff is collapsed.
......@@ -36,12 +36,28 @@ class LoopExit(Exception):
"""
@property
def hub(self):
"""
The (optional) hub that raised the error.
.. versionadded:: NEXT
"""
# XXX: Note that semaphore.py does this manually.
if len(self.args) == 3: # From the hub
return self.args[1]
def __repr__(self):
# pylint:disable=unsubscriptable-object
if len(self.args) == 3: # From the hub
import pprint
return "%s\n\tHub: %s\n\tHandles:\n%s" % (
self.args[0], self.args[1],
return (
"%s\n"
"\tHub: %s\n"
"\tHandles:\n%s"
) % (
self.args[0],
self.args[1],
pprint.pformat(self.args[2])
)
return Exception.__repr__(self)
......
......@@ -294,6 +294,10 @@ class loop(AbstractLoop):
libev.ev_timer_stop(self._timer0)
def _setup_for_run_callback(self):
# XXX: libuv needs to start the callback timer to be sure
# that the loop wakes up and calls this. Our C version doesn't
# do this.
# self._start_callback_timer()
self.ref() # we should go through the loop now
def destroy(self):
......
......@@ -8,6 +8,7 @@ infinite bounds (:class:`DummySemaphore`), along with a reentrant lock
(:class:`RLock`) with the same API as :class:`threading.RLock`.
"""
from __future__ import absolute_import
from __future__ import print_function
from gevent.hub import getcurrent
from gevent._compat import PURE_PYTHON
......@@ -43,82 +44,77 @@ _allocate_lock, _get_ident = monkey.get_original(
('allocate_lock', 'get_ident')
)
def atomic(meth):
def m(self, *args):
with self._atomic:
return meth(self, *args)
return m
class _OwnedLock(object):
class _GILLock(object):
__slots__ = (
'_owner',
'_block',
'_locking',
'_count',
'_owned_thread_id',
'_gil',
'_atomic',
'_recursion_depth',
)
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used.
# Don't allow re-entry to these functions in a single thread, as
# can happen if a sys.settrace is used. (XXX: What does that even
# mean? Our original implementation that did that has been
# replaced by something more robust)
#
# This is essentially a variant of the (pure-Python) RLock from the
# standard library.
def __init__(self):
self._owner = None
self._block = _allocate_lock()
self._locking = {}
self._count = 0
def __begin(self):
# Return (me, count) if we should proceed, otherwise return
# None. The function should exit in that case.
# In either case, it must call __end.
me = _get_ident()
try:
count = self._locking[me]
except KeyError:
count = self._locking[me] = 1
else:
count = self._locking[me] = count + 1
return (me, count) if not count else (None, None)
def __end(self, me, count):
if me is None:
return
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
self._owned_thread_id = None
self._gil = _allocate_lock()
self._atomic = _allocate_lock()
self._recursion_depth = 0
@atomic
def acquire(self):
current_tid = _get_ident()
if self._owned_thread_id == current_tid:
self._recursion_depth += 1
return True
def __enter__(self):
me, lock_count = self.__begin()
# Not owned by this thread. Only one thread will make it through this point.
while 1:
self._atomic.release()
try:
if me is None:
return
self._gil.acquire()
finally:
self._atomic.acquire()
if self._owned_thread_id is None:
break
if self._owner == me:
self._count += 1
return
self._owned_thread_id = current_tid
self._recursion_depth = 1
return True
self._block.acquire()
self._owner = me
self._count = 1
finally:
self.__end(me, lock_count)
@atomic
def release(self):
current_tid = _get_ident()
if current_tid != self._owned_thread_id:
raise RuntimeError("%s: Releasing lock not owned by you. You: 0x%x; Owner: 0x%x" % (
self,
current_tid, self._owned_thread_id or 0,
))
def __exit__(self, t, v, tb):
self.release()
self._recursion_depth -= 1
acquire = __enter__
if not self._recursion_depth:
self._owned_thread_id = None
self._gil.release()
def release(self):
me, lock_count = self.__begin()
try:
if me is None:
return
def __enter__(self):
self.acquire()
self._count = count = self._count - 1
if not count:
self._owner = None
self._block.release()
finally:
self.__end(me, lock_count)
def __exit__(self, t, v, tb):
self.release()
def locked(self):
return self._gil.locked()
class _AtomicSemaphoreMixin(object):
# Behaves as though the GIL was held for the duration of acquire, wait,
......@@ -131,7 +127,7 @@ class _AtomicSemaphoreMixin(object):
# Note that this does *NOT*, in-and-of itself, make semaphores safe to use from multiple threads
__slots__ = ()
def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock() # pylint:disable=assigning-non-slot
self._lock_lock = _GILLock() # pylint:disable=assigning-non-slot
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
def _acquire_lock_for_switch_in(self):
......@@ -347,7 +343,9 @@ class RLock(object):
release it.
"""
if self._owner is not getcurrent():
raise RuntimeError("cannot release un-acquired lock")
raise RuntimeError("cannot release un-acquired lock. Owner: %r Current: %r" % (
self._owner, getcurrent()
))
self._count = count = self._count - 1
if not count:
self._owner = None
......
......@@ -424,6 +424,8 @@ def _check_availability(name):
:raise ImportError: If the source or target cannot be imported.
:return: The tuple ``(gevent_module, target_module, target_module_name)``
"""
# Always import the gevent module first. This helps us be sure we can
# use regular imports in gevent files (when we can't use gevent.monkey.get_original())
gevent_module = getattr(__import__('gevent.' + name), name)
target_module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(target_module_name)
......@@ -1216,10 +1218,13 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
# order is important
if os:
patch_os()
if time:
patch_time()
if thread:
patch_thread(Event=Event, _warnings=_warnings)
if time:
# time must be patched after thread, some modules used by thread
# need access to the real time.sleep function.
patch_time()
# sys must be patched after thread. in other cases threading._shutdown will be
# initiated to _MainThread with real thread ident
if sys:
......
......@@ -93,6 +93,7 @@ from .skipping import skipOnCI
from .skipping import skipOnPyPy3OnCI
from .skipping import skipOnPyPy
from .skipping import skipOnPyPyOnCI
from .skipping import skipOnPyPyOnWindows
from .skipping import skipOnPyPy3
from .skipping import skipIf
from .skipping import skipUnless
......
......@@ -22,31 +22,36 @@ from functools import wraps
def wrap_error_fatal(method):
import gevent
system_error = gevent.get_hub().SYSTEM_ERROR
from gevent._hub_local import get_hub_class
system_error = get_hub_class().SYSTEM_ERROR
@wraps(method)
def wrapper(self, *args, **kwargs):
# XXX should also be able to do gevent.SYSTEM_ERROR = object
# which is a global default to all hubs
gevent.get_hub().SYSTEM_ERROR = object
get_hub_class().SYSTEM_ERROR = object
try:
return method(self, *args, **kwargs)
finally:
gevent.get_hub().SYSTEM_ERROR = system_error
get_hub_class().SYSTEM_ERROR = system_error
return wrapper
def wrap_restore_handle_error(method):
import gevent
old = gevent.get_hub().handle_error
from gevent._hub_local import get_hub_if_exists
from gevent import getcurrent
@wraps(method)
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
finally:
gevent.get_hub().handle_error = old
# Remove any customized handle_error, if set on the
# instance.
try:
del get_hub_if_exists().handle_error
except AttributeError:
pass
if self.peek_error()[0] is not None:
gevent.getcurrent().throw(*self.peek_error()[1:])
getcurrent().throw(*self.peek_error()[1:])
return wrapper
......@@ -24,5 +24,10 @@ class TestRLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
# So we deliberately don't set the hub to help test that condition.
return lock.RLock()
def assertOneHasNoHub(self, sem):
self.assertIsNone(sem._block.hub)
if __name__ == '__main__':
greentest.main()
......@@ -12,6 +12,7 @@ import weakref
import gevent
import gevent.exceptions
from gevent.lock import Semaphore
from gevent.lock import BoundedSemaphore
import gevent.testing as greentest
from gevent.testing import timing
......@@ -74,12 +75,15 @@ class TestSemaphoreMultiThread(greentest.TestCase):
# See https://github.com/gevent/gevent/issues/1437
def _getTargetClass(self):
return Semaphore
def _makeOne(self):
# Create an object that is associated with the current hub. If
# we don't do this now, it gets initialized lazily the first
# time it would have to block, which, in the event of threads,
# would be from an arbitrary thread.
return Semaphore(1, gevent.get_hub())
return self._getTargetClass()(1)
def _makeThreadMain(self, thread_running, thread_acquired, sem,
acquired, exc_info,
......@@ -104,7 +108,12 @@ class TestSemaphoreMultiThread(greentest.TestCase):
thread_acquired.set()
return thread_main
def _do_test_acquire_in_one_then_another(self, release=True, **thread_acquire_kwargs):
IDLE_ITERATIONS = 5
def _do_test_acquire_in_one_then_another(self,
release=True,
require_thread_acquired_to_finish=False,
**thread_acquire_kwargs):
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
......@@ -124,6 +133,7 @@ class TestSemaphoreMultiThread(greentest.TestCase):
acquired, exc_info,
**thread_acquire_kwargs
))
t.daemon = True
t.start()
thread_running.wait(10) # implausibly large time
if release:
......@@ -136,14 +146,25 @@ class TestSemaphoreMultiThread(greentest.TestCase):
# that get run (including time-based) the notifier may or
# may not be immediately ready to run, so this can take up
# to two iterations.)
for _ in range(3):
for _ in range(self.IDLE_ITERATIONS):
gevent.idle()
if thread_acquired.wait(timing.LARGE_TICK):
break
self.assertEqual(acquired, [True])
if not release and thread_acquire_kwargs.get("timeout"):
# Spin the loop to be sure that the timeout has a chance to
# process. Interleave this with something that drops the GIL
# so the background thread has a chance to notice that.
for _ in range(self.IDLE_ITERATIONS):
gevent.idle()
if thread_acquired.wait(timing.LARGE_TICK):
break
thread_acquired.wait(timing.LARGE_TICK * 5)
if require_thread_acquired_to_finish:
self.assertTrue(thread_acquired.is_set())
try:
self.assertEqual(exc_info, [])
finally:
......@@ -157,6 +178,7 @@ class TestSemaphoreMultiThread(greentest.TestCase):
def test_acquire_in_one_then_another_timed(self):
sem, acquired_in_thread = self._do_test_acquire_in_one_then_another(
release=False,
require_thread_acquired_to_finish=True,
timeout=timing.SMALLEST_RELIABLE_DELAY)
self.assertEqual([False], acquired_in_thread)
# This doesn't, of course, notify anything, because
......@@ -177,7 +199,6 @@ class TestSemaphoreMultiThread(greentest.TestCase):
import threading
sem = self._makeOne()
# Make future acquires block
sem.acquire()
......@@ -193,8 +214,6 @@ class TestSemaphoreMultiThread(greentest.TestCase):
exc_info = []
acquired = []
glet = gevent.spawn(greenlet_one)
thread = threading.Thread(target=self._makeThreadMain(
threading.Event(), threading.Event(),
......@@ -202,19 +221,112 @@ class TestSemaphoreMultiThread(greentest.TestCase):
acquired, exc_info,
timeout=timing.LARGE_TICK
))
thread.daemon = True
gevent.idle()
sem.release()
glet.join()
for _ in range(3):
gevent.idle()
thread.join(timing.LARGE_TICK)
self.assertEqual(glet.value, True)
self.assertEqual([], exc_info)
self.assertEqual([False], acquired)
self.assertTrue(glet.dead, glet)
glet = None
def assertOneHasNoHub(self, sem):
self.assertIsNone(sem.hub, sem)
@greentest.skipOnPyPyOnWindows("Flaky there; can't reproduce elsewhere")
def test_dueling_threads(self, acquire_args=(), create_hub=None):
# pylint:disable=too-many-locals,too-many-statements
# Threads doing nothing but acquiring and releasing locks, without
# having any other greenlets to switch to.
# https://github.com/gevent/gevent/issues/1698
from gevent import monkey
from gevent._hub_local import get_hub_if_exists
self.assertFalse(monkey.is_module_patched('threading'))
import threading
from time import sleep as native_sleep
sem = self._makeOne()
self.assertOneHasNoHub(sem)
count = 10000
results = [-1, -1]
run = True
def do_it(ix):
if create_hub:
gevent.get_hub()
try:
for i in range(count):
if not run:
break
sem.acquire(*acquire_args)
sem.release()
results[ix] = i
if not create_hub:
# We don't artificially create the hub.
self.assertIsNone(
get_hub_if_exists(),
(get_hub_if_exists(), ix, i)
)
if create_hub and i % 10 == 0:
gevent.sleep(timing.SMALLEST_RELIABLE_DELAY)
elif i % 100 == 0:
native_sleep(timing.SMALLEST_RELIABLE_DELAY)
except Exception as ex: # pylint:disable=broad-except
import traceback; traceback.print_exc()
results[ix] = str(ex)
ex = None
finally:
hub = get_hub_if_exists()
if hub is not None:
hub.join()
hub.destroy(destroy_loop=True)
t1 = threading.Thread(target=do_it, args=(0,))
t1.daemon = True
t2 = threading.Thread(target=do_it, args=(1,))
t2.daemon = True
t1.start()
t2.start()
t1.join(1)
t2.join(1)
while t1.is_alive() or t2.is_alive():
cur = list(results)
t1.join(7)
t2.join(7)
if cur == results:
# Hmm, after two seconds, no progress
run = False
break
self.assertEqual(results, [count - 1, count - 1])
def test_dueling_threads_timeout(self):
self.test_dueling_threads((True, 4))
def test_dueling_threads_with_hub(self):
self.test_dueling_threads(create_hub=True)
# XXX: Need a test with multiple greenlets in a non-primary
# thread. Things should work, just very slowly; instead of moving through
# greenlet.switch(), they'll be moving with async watchers.
class TestBoundedSemaphoreMultiThread(TestSemaphoreMultiThread):
def _getTargetClass(self):
return BoundedSemaphore
@greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase):
......
......@@ -60,6 +60,7 @@ from gevent._hub_local import get_hub_if_exists
from gevent.greenlet import Greenlet
from gevent.lock import BoundedSemaphore
from gevent.local import local as _local
from gevent.exceptions import LoopExit
if hasattr(__thread__, 'RLock'):
assert PY3 or PYPY
......@@ -115,7 +116,18 @@ class LockType(BoundedSemaphore):
if timeout > self._TIMEOUT_MAX:
raise OverflowError('timeout value is too large')
try:
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
except LoopExit:
# Raised when the semaphore was not trivially ours, and we needed
# to block. Some other thread presumably owns the semaphore, and there are no greenlets
# running in this thread to switch to. So the best we can do is
# release the GIL and try again later.
if blocking: # pragma: no cover
raise
acquired = False
if not acquired and not blocking and getcurrent() is not get_hub_if_exists():
# Run other callbacks. This makes spin locks works.
# We can't do this if we're in the hub, which we could easily be:
......
......@@ -162,7 +162,6 @@ def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
import threading
threads = {th.ident: th for th in threading.enumerate()}
lines.append('*' * 80)
lines.append('* Threads')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment