Commit e3667c71 authored by Jason Madden's avatar Jason Madden

Rework the way Semaphore handles cross-thread locking.

This should ultimately be far more robust, and it handles LoopExit cases much better.

Fixes #1698

Not yet complete; On Python 2, I sometimes get ValueError: over-released the semaphore for reasons I don't understand.

Also, Python 2 in general, and PyPy in particular, sometimes fails to make progress on the semaphore tests (one of the background threads doesn't complete for some reason). The RLock tests are much worse on Python 2 also. I don't understand why. Then again, I don't understand how the normal RLock could possibly actually be thread safe either: I think it must lean heavily on the GIL.
parent 00337094
Improve the ability to use monkey-patched locks, and
`gevent.lock.BoundedSemaphore`, across threads, especially when the
various threads might not have a gevent hub or any other active
greenlets. In particular, this handles some cases that previously
raised ``LoopExit``.
This diff is collapsed.
......@@ -763,6 +763,7 @@ class AbstractLoop(object):
msg += ' fileno=' + repr(fileno)
#if sigfd is not None and sigfd != -1:
# msg += ' sigfd=' + repr(sigfd)
msg += ' callbacks=' + str(len(self._callbacks))
return msg
def fileno(self):
......
......@@ -5,7 +5,9 @@ from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
from gevent._gevent_c_hub_local cimport get_hub_if_exists
cdef InvalidSwitchError
cdef InvalidThreadUseError
cdef Timeout
cdef _get_thread_ident
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
......@@ -49,9 +51,14 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback)
cdef _check_and_notify(self)
cdef void _capture_hub(self, bint create)
cdef int _capture_hub(self, bint create) except -1
cdef __wait_to_be_notified(self, bint rawlink)
cdef void __unlink_all(self, obj) # suppress exceptions
cdef void _quiet_unlink_all(self, obj) # suppress exceptions
cdef _allocate_lock(self)
cdef greenlet _getcurrent(self)
cdef int _switch_to_hub(self, the_hub) except -1
@cython.nonecheck(False)
cdef _notify_link_list(self, list links)
......@@ -63,5 +70,5 @@ cdef class AbstractLinkable(object):
cpdef _acquire_lock_for_switch_in(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait_return_value(self, bint waited, bint wait_success)
cdef _wait(self, timeout=*)
cimport cython
from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent._gevent_c_abstract_linkable cimport AbstractLinkable
from gevent._gevent_c_hub_local cimport get_hub_if_exists
from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
cdef Timeout
cdef InvalidThreadUseError
cdef LoopExit
cdef spawn_raw
cdef _native_sleep
cdef class Semaphore(AbstractLinkable):
......@@ -13,10 +21,23 @@ cdef class Semaphore(AbstractLinkable):
# threadpool uses it
cpdef _start_notify(self)
cpdef int wait(self, object timeout=*) except -1000
@cython.locals(
success=bint,
e=Exception,
ex=Exception,
args=tuple,
)
cpdef bint acquire(self, bint blocking=*, object timeout=*) except -1000
cpdef __enter__(self)
cpdef __exit__(self, object t, object v, object tb)
@cython.locals(
hub_for_this_thread=SwitchOutGreenletWithLoop,
owning_hub=SwitchOutGreenletWithLoop,
)
cdef __acquire_from_other_thread(self, tuple args, bint blocking, timeout)
cpdef __acquire_from_other_thread_cb(self, list results, bint blocking, timeout, thread_lock)
cdef class BoundedSemaphore(Semaphore):
cdef readonly int _initial_value
......
This diff is collapsed.
......@@ -36,12 +36,28 @@ class LoopExit(Exception):
"""
@property
def hub(self):
"""
The (optional) hub that raised the error.
.. versionadded:: NEXT
"""
# XXX: Note that semaphore.py does this manually.
if len(self.args) == 3: # From the hub
return self.args[1]
def __repr__(self):
# pylint:disable=unsubscriptable-object
if len(self.args) == 3: # From the hub
import pprint
return "%s\n\tHub: %s\n\tHandles:\n%s" % (
self.args[0], self.args[1],
return (
"%s\n"
"\tHub: %s\n"
"\tHandles:\n%s"
) % (
self.args[0],
self.args[1],
pprint.pformat(self.args[2])
)
return Exception.__repr__(self)
......
......@@ -8,6 +8,7 @@ infinite bounds (:class:`DummySemaphore`), along with a reentrant lock
(:class:`RLock`) with the same API as :class:`threading.RLock`.
"""
from __future__ import absolute_import
from __future__ import print_function
from gevent.hub import getcurrent
from gevent._compat import PURE_PYTHON
......@@ -43,82 +44,77 @@ _allocate_lock, _get_ident = monkey.get_original(
('allocate_lock', 'get_ident')
)
def atomic(meth):
def m(self, *args):
with self._atomic:
return meth(self, *args)
return m
class _OwnedLock(object):
class _GILLock(object):
__slots__ = (
'_owner',
'_block',
'_locking',
'_count',
'_owned_thread_id',
'_gil',
'_atomic',
'_recursion_depth',
)
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used.
# Don't allow re-entry to these functions in a single thread, as
# can happen if a sys.settrace is used. (XXX: What does that even
# mean? Our original implementation that did that has been
# replaced by something more robust)
#
# This is essentially a variant of the (pure-Python) RLock from the
# standard library.
def __init__(self):
self._owner = None
self._block = _allocate_lock()
self._locking = {}
self._count = 0
self._owned_thread_id = None
self._gil = _allocate_lock()
self._atomic = _allocate_lock()
self._recursion_depth = 0
@atomic
def acquire(self):
current_tid = _get_ident()
if self._owned_thread_id == current_tid:
self._recursion_depth += 1
return True
# Not owned by this thread. Only one thread will make it through this point.
while 1:
self._atomic.release()
try:
self._gil.acquire()
finally:
self._atomic.acquire()
if self._owned_thread_id is None:
break
self._owned_thread_id = current_tid
self._recursion_depth = 1
return True
@atomic
def release(self):
current_tid = _get_ident()
if current_tid != self._owned_thread_id:
raise RuntimeError("%s: Releasing lock not owned by you. You: 0x%x; Owner: 0x%x" % (
self,
current_tid, self._owned_thread_id or 0,
))
def __begin(self):
# Return (me, count) if we should proceed, otherwise return
# None. The function should exit in that case.
# In either case, it must call __end.
me = _get_ident()
try:
count = self._locking[me]
except KeyError:
count = self._locking[me] = 1
else:
count = self._locking[me] = count + 1
return (me, count) if not count else (None, None)
def __end(self, me, count):
if me is None:
return
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
self._recursion_depth -= 1
def __enter__(self):
me, lock_count = self.__begin()
try:
if me is None:
return
if not self._recursion_depth:
self._owned_thread_id = None
self._gil.release()
if self._owner == me:
self._count += 1
return
self._block.acquire()
self._owner = me
self._count = 1
finally:
self.__end(me, lock_count)
def __enter__(self):
self.acquire()
def __exit__(self, t, v, tb):
self.release()
acquire = __enter__
def release(self):
me, lock_count = self.__begin()
try:
if me is None:
return
self._count = count = self._count - 1
if not count:
self._owner = None
self._block.release()
finally:
self.__end(me, lock_count)
def locked(self):
return self._gil.locked()
class _AtomicSemaphoreMixin(object):
# Behaves as though the GIL was held for the duration of acquire, wait,
......@@ -131,7 +127,7 @@ class _AtomicSemaphoreMixin(object):
# Note that this does *NOT*, in-and-of itself, make semaphores safe to use from multiple threads
__slots__ = ()
def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock() # pylint:disable=assigning-non-slot
self._lock_lock = _GILLock() # pylint:disable=assigning-non-slot
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
def _acquire_lock_for_switch_in(self):
......@@ -347,7 +343,9 @@ class RLock(object):
release it.
"""
if self._owner is not getcurrent():
raise RuntimeError("cannot release un-acquired lock")
raise RuntimeError("cannot release un-acquired lock. Owner: %r Current: %r" % (
self._owner, getcurrent()
))
self._count = count = self._count - 1
if not count:
self._owner = None
......
......@@ -424,6 +424,8 @@ def _check_availability(name):
:raise ImportError: If the source or target cannot be imported.
:return: The tuple ``(gevent_module, target_module, target_module_name)``
"""
# Always import the gevent module first. This helps us be sure we can
# use regular imports in gevent files (when we can't use gevent.monkey.get_original())
gevent_module = getattr(__import__('gevent.' + name), name)
target_module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(target_module_name)
......@@ -1216,10 +1218,13 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
# order is important
if os:
patch_os()
if time:
patch_time()
if thread:
patch_thread(Event=Event, _warnings=_warnings)
if time:
# time must be patched after thread, some modules used by thread
# need access to the real time.sleep function.
patch_time()
# sys must be patched after thread. in other cases threading._shutdown will be
# initiated to _MainThread with real thread ident
if sys:
......
......@@ -11,6 +11,9 @@ from gevent.tests import test__semaphore
class TestRLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
# For some reason, the tests are extremely flaky on Python 2.
dueling_thread_tests_are_too_flaky = greentest.PY2
def _makeOne(self):
# If we don't set the hub before returning,
# there's a potential race condition, if the implementation
......@@ -24,5 +27,10 @@ class TestRLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
# So we deliberately don't set the hub to help test that condition.
return lock.RLock()
def assertOneHasNoHub(self, sem):
self.assertIsNone(sem._block.hub)
if __name__ == '__main__':
greentest.main()
......@@ -12,9 +12,11 @@ import weakref
import gevent
import gevent.exceptions
from gevent.lock import Semaphore
from gevent.lock import BoundedSemaphore
import gevent.testing as greentest
from gevent.testing import timing
from gevent.testing import flaky
class TestSemaphore(greentest.TestCase):
......@@ -74,12 +76,15 @@ class TestSemaphoreMultiThread(greentest.TestCase):
# See https://github.com/gevent/gevent/issues/1437
def _getTargetClass(self):
return Semaphore
def _makeOne(self):
# Create an object that is associated with the current hub. If
# we don't do this now, it gets initialized lazily the first
# time it would have to block, which, in the event of threads,
# would be from an arbitrary thread.
return Semaphore(1, gevent.get_hub())
return self._getTargetClass()(1)
def _makeThreadMain(self, thread_running, thread_acquired, sem,
acquired, exc_info,
......@@ -104,7 +109,10 @@ class TestSemaphoreMultiThread(greentest.TestCase):
thread_acquired.set()
return thread_main
def _do_test_acquire_in_one_then_another(self, release=True, **thread_acquire_kwargs):
def _do_test_acquire_in_one_then_another(self,
release=True,
require_thread_acquired_to_finish=False,
**thread_acquire_kwargs):
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
......@@ -144,6 +152,8 @@ class TestSemaphoreMultiThread(greentest.TestCase):
self.assertEqual(acquired, [True])
thread_acquired.wait(timing.LARGE_TICK * 5)
if require_thread_acquired_to_finish:
self.assertTrue(thread_acquired.is_set())
try:
self.assertEqual(exc_info, [])
finally:
......@@ -157,6 +167,7 @@ class TestSemaphoreMultiThread(greentest.TestCase):
def test_acquire_in_one_then_another_timed(self):
sem, acquired_in_thread = self._do_test_acquire_in_one_then_another(
release=False,
require_thread_acquired_to_finish=True,
timeout=timing.SMALLEST_RELIABLE_DELAY)
self.assertEqual([False], acquired_in_thread)
# This doesn't, of course, notify anything, because
......@@ -177,7 +188,6 @@ class TestSemaphoreMultiThread(greentest.TestCase):
import threading
sem = self._makeOne()
# Make future acquires block
sem.acquire()
......@@ -193,8 +203,6 @@ class TestSemaphoreMultiThread(greentest.TestCase):
exc_info = []
acquired = []
glet = gevent.spawn(greenlet_one)
thread = threading.Thread(target=self._makeThreadMain(
threading.Event(), threading.Event(),
......@@ -210,11 +218,107 @@ class TestSemaphoreMultiThread(greentest.TestCase):
self.assertEqual(glet.value, True)
self.assertEqual([], exc_info)
self.assertEqual([False], acquired)
self.assertTrue(glet.dead, glet)
glet = None
def assertOneHasNoHub(self, sem):
self.assertIsNone(sem.hub, sem)
dueling_thread_tests_are_too_flaky = False
def test_dueling_threads(self, acquire_args=(), create_hub=None):
# pylint:disable=too-many-locals,too-many-statements
if self.dueling_thread_tests_are_too_flaky:
self.skipTest("Described as too flaky")
# Threads doing nothing but acquiring and releasing locks, without
# having any other greenlets to switch to.
# https://github.com/gevent/gevent/issues/1698
from gevent import monkey
from gevent._hub_local import get_hub_if_exists
self.assertFalse(monkey.is_module_patched('threading'))
import threading
from time import sleep as native_sleep
sem = self._makeOne()
self.assertOneHasNoHub(sem)
count = 10000
results = [-1, -1]
run = True
def do_it(ix):
if create_hub:
gevent.get_hub()
try:
for i in range(count):
if not run:
break
sem.acquire(*acquire_args)
sem.release()
results[ix] = i
if create_hub and i % 10 == 0:
gevent.sleep(timing.SMALLEST_RELIABLE_DELAY)
elif i % 100 == 0:
native_sleep(timing.SMALLEST_RELIABLE_DELAY)
except Exception as ex: # pylint:disable=broad-except
import traceback; traceback.print_exc()
results[ix] = ex
finally:
hub = get_hub_if_exists()
if hub is not None:
hub.join()
hub.destroy(destroy_loop=True)
t1 = threading.Thread(target=do_it, args=(0,))
t1.daemon = True
t2 = threading.Thread(target=do_it, args=(1,))
t2.daemon = True
t1.start()
t2.start()
t1.join(1)
t2.join(1)
while t1.is_alive() or t2.is_alive():
cur = list(results)
t1.join(2)
t2.join(2)
if cur == results:
# Hmm, after two seconds, no progress
print("No progress!", cur, results, t1, t2)
from gevent.util import print_run_info
print_run_info()
run = False
break
try:
self.assertEqual(results, [count - 1, count - 1])
except AssertionError:
if greentest.PY2:
flaky.reraiseFlakyTestRaceCondition()
else:
raise
def test_dueling_threads_timeout(self):
self.test_dueling_threads((True, 4))
def test_dueling_threads_with_hub(self):
self.test_dueling_threads(create_hub=True)
# XXX: Need a test with multiple greenlets in a non-primary
# thread. Things should work, just very slowly; instead of moving through
# greenlet.switch(), they'll be moving with async watchers.
class TestBoundedSemaphoreMultiThread(TestSemaphoreMultiThread):
def _getTargetClass(self):
return BoundedSemaphore
@greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase):
......
......@@ -60,6 +60,7 @@ from gevent._hub_local import get_hub_if_exists
from gevent.greenlet import Greenlet
from gevent.lock import BoundedSemaphore
from gevent.local import local as _local
from gevent.exceptions import LoopExit
if hasattr(__thread__, 'RLock'):
assert PY3 or PYPY
......@@ -115,7 +116,25 @@ class LockType(BoundedSemaphore):
if timeout > self._TIMEOUT_MAX:
raise OverflowError('timeout value is too large')
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
acquired = BoundedSemaphore.acquire(self, 0)
if not acquired and getcurrent() is not get_hub_if_exists() and blocking and not timeout:
# If we would block forever, and we're not in the hub, and a trivial non-blocking
# check didn't get us the lock, then try to run pending callbacks that might
# release the lock.
sleep()
if not acquired:
try:
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
except LoopExit:
# Raised when the semaphore was not trivially ours, and we needed
# to block. Some other thread presumably owns the semaphore, and there are no greenlets
# running in this thread to switch to. So the best we can do is
# release the GIL and try again later.
if blocking: # pragma: no cover
raise
acquired = False
if not acquired and not blocking and getcurrent() is not get_hub_if_exists():
# Run other callbacks. This makes spin locks works.
# We can't do this if we're in the hub, which we could easily be:
......
......@@ -162,7 +162,6 @@ def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
import threading
threads = {th.ident: th for th in threading.enumerate()}
lines.append('*' * 80)
lines.append('* Threads')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment