Commit 85ae2a11 authored by Jason Madden's avatar Jason Madden

Progress on cross-thread locks.

Tests for semaphore are passing in both pure and compiled code. gevent.tests.test__lock.TestLockMultiThread still exposes a race condition though.
parent aa16985a
Make gevent locks that are monkey-patched work across native
threads as well as across greenlets within a single thread. This
is expensive, and not a recommended programming pattern, but it can
happen when using the threadpool. Locks that are only used in a single
thread do not take a performance hit.
The underlying Semaphore always behaves in an atomic fashion (as if The underlying Semaphore always behaves in an atomic fashion (as if
the GIL was not released) when PURE_PYTHON is set. Previously, it only the GIL was not released) when PURE_PYTHON is set. Previously, it only
correctly did so on PyPy. correctly did so on PyPy.
...@@ -27,6 +27,18 @@ class AbstractLinkable(object): ...@@ -27,6 +27,18 @@ class AbstractLinkable(object):
# protocol common to both repeatable events (Event, Semaphore) and # protocol common to both repeatable events (Event, Semaphore) and
# one-time events (AsyncResult). # one-time events (AsyncResult).
# #
# With a few careful exceptions, instances of this object can only
# be used from a single thread. The exception is that certain methods
# may be used from multiple threads IFF:
#
# 1. They are documented as safe for that purpose; AND
# 2a. This object is compiled with Cython and thus is holding the GIL
# for the entire duration of the method; OR
# 2b. A subclass ensures that a Python-level native thread lock is held
# for the duration of the method; this is necessary in pure-Python mode.
# The only known implementation of such
# a subclass is for Semaphore.
#
# TODO: As of gevent 1.5, we use the same datastructures and almost # TODO: As of gevent 1.5, we use the same datastructures and almost
# the same algorithm as Greenlet. See about unifying them more. # the same algorithm as Greenlet. See about unifying them more.
...@@ -70,7 +82,9 @@ class AbstractLinkable(object): ...@@ -70,7 +82,9 @@ class AbstractLinkable(object):
# If its false, we only call callbacks as long as ready() returns true. # If its false, we only call callbacks as long as ready() returns true.
self._notify_all = True self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects # we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub # without initializing the hub. However, for multiple-thread safety, as soon
# as a waiting method is entered, even if it won't have to wait, we
# grab the hub.
self.hub = hub self.hub = hub
def linkcount(self): def linkcount(self):
...@@ -148,6 +162,10 @@ class AbstractLinkable(object): ...@@ -148,6 +162,10 @@ class AbstractLinkable(object):
break break
def _notify_links(self, arrived_while_waiting): def _notify_links(self, arrived_while_waiting):
# This method must hold the GIL, or be guarded with the lock that guards
# this object. Thus, while we are notifying objects, an object from another
# thread simply cannot arrive and mutate ``_links`` or ``arrived_while_waiting``
# ``arrived_while_waiting`` is a list of greenlet.switch methods # ``arrived_while_waiting`` is a list of greenlet.switch methods
# to call. These were objects that called wait() while we were processing, # to call. These were objects that called wait() while we were processing,
# and which would have run *before* those that had actually waited # and which would have run *before* those that had actually waited
...@@ -170,7 +188,6 @@ class AbstractLinkable(object): ...@@ -170,7 +188,6 @@ class AbstractLinkable(object):
# became true that it was once true, even though it may not be # became true that it was once true, even though it may not be
# any more. In that case, we must not keep notifying anyone that's # any more. In that case, we must not keep notifying anyone that's
# newly added after that, even if we go ready again. # newly added after that, even if we go ready again.
try: try:
self._notify_link_list(self._links) self._notify_link_list(self._links)
# Now, those that arrived after we had begun the notification # Now, those that arrived after we had begun the notification
...@@ -187,6 +204,10 @@ class AbstractLinkable(object): ...@@ -187,6 +204,10 @@ class AbstractLinkable(object):
# same callback while self._notifier is still true. # same callback while self._notifier is still true.
assert self._notifier is notifier assert self._notifier is notifier
self._notifier = None self._notifier = None
# TODO: Maybe we should intelligently reset self.hub to
# free up thread affinity? In case of a pathological situation where
# one object was used from one thread once & first, but usually is
# used by another thread.
# Now we may be ready or not ready. If we're ready, which # Now we may be ready or not ready. If we're ready, which
# could have happened during the last link we called, then we # could have happened during the last link we called, then we
...@@ -194,32 +215,123 @@ class AbstractLinkable(object): ...@@ -194,32 +215,123 @@ class AbstractLinkable(object):
# wakeup. # wakeup.
self._check_and_notify() self._check_and_notify()
def _wait_core(self, timeout, catch=Timeout): def __unlink_all(self, obj):
# The core of the wait implementation, handling if obj is None:
# switching and linking. If *catch* is set to (), return
# a timeout that elapses will be allowed to be raised.
# Returns a true value if the wait succeeded without timing out. self.unlink(obj)
switch = getcurrent().switch # pylint:disable=undefined-variable if self._notifier is not None and self._notifier.args:
self.rawlink(switch)
with Timeout._start_new_or_dummy(timeout) as timer:
try: try:
if self.hub is None: self._notifier.args[0].remove(obj)
self.hub = get_hub() except ValueError:
result = self.hub.switch() pass
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, )) def __wait_to_be_notified(self, rawlink): # pylint:disable=too-many-branches
# We've got to watch where we could potentially release the GIL.
# Decisions we make based an the state of this object must be in blocks
# that cannot release the GIL.
resume_this_greenlet = None
watcher = None
current_hub = get_hub()
send = None
while 1:
my_hub = self.hub
if my_hub is current_hub:
break
# We're owned by another hub.
if my_hub.dead: # dead is a property, this could have released the GIL.
# We have the GIL back. Did anything change?
if my_hub is not self.hub:
continue # start over.
# The other hub is dead, so we can take ownership.
self.hub = current_hub
break
# Some other hub owns this object. We must ask it to wake us
# up. We can't use a Python-level ``Lock`` because
# (1) it doesn't support a timeout on all platforms; and
# (2) we don't want to block this hub from running. So we need to
# do so in a way that cooperates with *two* hubs. That's what an
# async watcher is built for.
#
# Allocating and starting the watcher *could* release the GIL.
# with the libev corcext, allocating won't, but starting briefly will.
# With other backends, allocating might, and starting might also.
# So...XXX: Race condition here, tiny though it may be.
watcher = current_hub.loop.async_()
send = watcher.send_ignoring_arg
if rawlink:
# Make direct calls to self.rawlink, the most common case,
# so cython can more easily optimize.
self.rawlink(send)
else:
self._notifier.args[0].append(send)
watcher.start(getcurrent().switch, self) # pylint:disable=undefined-variable
break
if self.hub is current_hub:
resume_this_greenlet = getcurrent().switch # pylint:disable=undefined-variable
if rawlink:
self.rawlink(resume_this_greenlet)
else:
self._notifier.args[0].append(resume_this_greenlet)
try:
self._drop_lock_for_switch_out()
result = current_hub.switch() # Probably releases
# If we got here, we were automatically unlinked already. # If we got here, we were automatically unlinked already.
resume_this_greenlet = None
if result is not self: # pragma: no cover
raise InvalidSwitchError(
'Invalid switch into %s.wait(): %r' % (
self.__class__.__name__,
result,
)
)
finally:
self._acquire_lock_for_switch_in()
self.__unlink_all(resume_this_greenlet)
self.__unlink_all(send)
if watcher is not None:
watcher.stop()
watcher.close()
def _acquire_lock_for_switch_in(self):
return
def _drop_lock_for_switch_out(self):
return
def _wait_core(self, timeout, catch=Timeout):
"""
The core of the wait implementation, handling switching and
linking.
This method is safe to call from multiple threads; it must be holding
the GIL for the entire duration, or be protected by a Python-level
lock for that to be true.
``self.hub`` must be initialized before entering this method.
The hub that is set is considered the owner and cannot be changed.
If *catch* is set to ``()``, a timeout that elapses will be
allowed to be raised.
:return: A true value if the wait succeeded without timing out.
That is, a true return value means we were notified and control
resumed in this greenlet.
"""
with Timeout._start_new_or_dummy(timeout) as timer: # Might release
try:
self.__wait_to_be_notified(True) # Use rawlink()
return True return True
except catch as ex: except catch as ex:
self.unlink(switch)
if ex is not timer: if ex is not timer:
raise raise
# test_set_and_clear and test_timeout in test_threading # test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness # rely on the exact return values, not just truthish-ness
return False return False
except:
self.unlink(switch)
raise
def _wait_return_value(self, waited, wait_success): def _wait_return_value(self, waited, wait_success):
# pylint:disable=unused-argument # pylint:disable=unused-argument
...@@ -228,16 +340,21 @@ class AbstractLinkable(object): ...@@ -228,16 +340,21 @@ class AbstractLinkable(object):
return None # pragma: no cover all extent subclasses override return None # pragma: no cover all extent subclasses override
def _wait(self, timeout=None): def _wait(self, timeout=None):
if self.ready(): """
This method is safe to call from multiple threads, providing
the conditions laid out in the class documentation are met.
"""
# Watch where we could potentially release the GIL.
if self.hub is None: # no release
self.hub = get_hub() # might releases.
if self.ready(): # *might* release, if overridden in Python.
result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none
if self._notifier: if self._notifier:
# We're already notifying waiters; one of them must have run # We're already notifying waiters; one of them must have run
# and switched to us. # and switched to this greenlet, which arrived here. Alternately,
switch = getcurrent().switch # pylint:disable=undefined-variable # we could be in a separate thread (but we're holding the GIL/object lock)
self._notifier.args[0].append(switch) self.__wait_to_be_notified(False) # Use self._notifier.args[0] instead of self.rawlink
switch_result = self.hub.switch()
if switch_result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
return result return result
......
...@@ -554,6 +554,15 @@ class AsyncMixin(object): ...@@ -554,6 +554,15 @@ class AsyncMixin(object):
def send(self): def send(self):
raise NotImplementedError() raise NotImplementedError()
def send_ignoring_arg(self, _ignored):
"""
Calling compatibility with ``greenlet.switch(arg)``
as used by waiters that have ``rawlink``.
This is an advanced method, not usually needed.
"""
return self.send()
@property @property
def pending(self): def pending(self):
raise NotImplementedError() raise NotImplementedError()
......
...@@ -48,6 +48,8 @@ cdef class AbstractLinkable(object): ...@@ -48,6 +48,8 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback) cpdef unlink(self, callback)
cdef _check_and_notify(self) cdef _check_and_notify(self)
cdef __wait_to_be_notified(self, bint rawlink)
cdef __unlink_all(self, obj)
@cython.nonecheck(False) @cython.nonecheck(False)
cdef _notify_link_list(self, list links) cdef _notify_link_list(self, list links)
...@@ -55,6 +57,9 @@ cdef class AbstractLinkable(object): ...@@ -55,6 +57,9 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False) @cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting) cpdef _notify_links(self, list arrived_while_waiting)
cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self)
cdef _wait_core(self, timeout, catch=*) cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success) cdef _wait_return_value(self, waited, wait_success)
cpdef _wait(self, timeout=*) cdef _wait(self, timeout=*)
...@@ -326,6 +326,9 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -326,6 +326,9 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
# Not ready and not blocking, so immediately timeout # Not ready and not blocking, so immediately timeout
raise Timeout() raise Timeout()
if self.hub is None: # pylint:disable=access-member-before-definition
self.hub = get_hub() # pylint:disable=attribute-defined-outside-init
# Wait, raising a timeout that elapses # Wait, raising a timeout that elapses
self._wait_core(timeout, ()) self._wait_core(timeout, ())
......
...@@ -1199,6 +1199,9 @@ cdef public class async_(watcher) [object PyGeventAsyncObject, type PyGeventAsyn ...@@ -1199,6 +1199,9 @@ cdef public class async_(watcher) [object PyGeventAsyncObject, type PyGeventAsyn
_check_loop(self.loop) _check_loop(self.loop)
libev.ev_async_send(self.loop._ptr, &self._watcher) libev.ev_async_send(self.loop._ptr, &self._watcher)
def send_ignoring_arg(self, _ignored):
return self.send()
async = async_ async = async_
cdef start_and_stop child_ss = make_ss(<void*>libev.ev_child_start, <void*>libev.ev_child_stop) cdef start_and_stop child_ss = make_ss(<void*>libev.ev_child_start, <void*>libev.ev_child_stop)
......
...@@ -28,11 +28,14 @@ __all__ = [ ...@@ -28,11 +28,14 @@ __all__ = [
# On PyPy, we don't compile the Semaphore class with Cython. Under # On PyPy, we don't compile the Semaphore class with Cython. Under
# Cython, each individual method holds the GIL for its entire # Cython, each individual method holds the GIL for its entire
# duration, ensuring that no other thread can interrupt us in an # duration, ensuring that no other thread can interrupt us in an
# unsafe state (only when we _do_wait do we call back into Python and # unsafe state (only when we _wait do we call back into Python and
# allow switching threads). Simulate that here through the use of a manual # allow switching threads; this is broken down into the
# lock. (We use a separate lock for each semaphore to allow sys.settrace functions # _drop_lock_for_switch_out and _acquire_lock_for_switch_in methods).
# to use locks *other* than the one being traced.) This, of course, must also # Simulate that here through the use of a manual lock. (We use a
# hold for PURE_PYTHON mode when no optional C extensions are used. # separate lock for each semaphore to allow sys.settrace functions to
# use locks *other* than the one being traced.) This, of course, must
# also hold for PURE_PYTHON mode when no optional C extensions are
# used.
_allocate_lock, _get_ident = monkey.get_original( _allocate_lock, _get_ident = monkey.get_original(
('_thread', 'thread'), ('_thread', 'thread'),
...@@ -47,15 +50,17 @@ class _OwnedLock(object): ...@@ -47,15 +50,17 @@ class _OwnedLock(object):
'_locking', '_locking',
'_count', '_count',
) )
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used.
#
# This is essentially a variant of the (pure-Python) RLock from the
# standard library.
def __init__(self): def __init__(self):
self._owner = None self._owner = None
self._block = _allocate_lock() self._block = _allocate_lock()
self._locking = {} self._locking = {}
self._count = 0 self._count = 0
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used.
def __begin(self): def __begin(self):
# Return (me, count) if we should proceed, otherwise return # Return (me, count) if we should proceed, otherwise return
...@@ -89,8 +94,8 @@ class _OwnedLock(object): ...@@ -89,8 +94,8 @@ class _OwnedLock(object):
self._count += 1 self._count += 1
return return
self._owner = me
self._block.acquire() self._block.acquire()
self._owner = me
self._count = 1 self._count = 1
finally: finally:
self.__end(me, lock_count) self.__end(me, lock_count)
...@@ -108,13 +113,13 @@ class _OwnedLock(object): ...@@ -108,13 +113,13 @@ class _OwnedLock(object):
self._count = count = self._count - 1 self._count = count = self._count - 1
if not count: if not count:
self._block.release()
self._owner = None self._owner = None
self._block.release()
finally: finally:
self.__end(me, lock_count) self.__end(me, lock_count)
class _AtomicSemaphore(Semaphore): class _AtomicSemaphoreMixin(object):
# Behaves as though the GIL was held for the duration of acquire, wait, # Behaves as though the GIL was held for the duration of acquire, wait,
# and release, just as if we were in Cython. # and release, just as if we were in Cython.
# #
...@@ -123,39 +128,48 @@ class _AtomicSemaphore(Semaphore): ...@@ -123,39 +128,48 @@ class _AtomicSemaphore(Semaphore):
# and re-acquire it for them on exit. # and re-acquire it for them on exit.
# #
# Note that this does *NOT* make semaphores safe to use from multiple threads # Note that this does *NOT* make semaphores safe to use from multiple threads
__slots__ = (
'_lock_lock',
)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock() self._lock_lock = _OwnedLock()
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
super(_AtomicSemaphore, self).__init__(*args, **kwargs) def _acquire_lock_for_switch_in(self):
self._lock_lock.acquire()
def _wait(self, *args, **kwargs): def _drop_lock_for_switch_out(self):
self._lock_lock.release() self._lock_lock.release()
try:
return super(_AtomicSemaphore, self)._wait(*args, **kwargs) def _notify_links(self, arrived_while_waiting):
finally: with self._lock_lock:
self._lock_lock.acquire() return super(_AtomicSemaphoreMixin, self)._notify_links(arrived_while_waiting)
def release(self): def release(self):
with self._lock_lock: with self._lock_lock:
return super(_AtomicSemaphore, self).release() return super(_AtomicSemaphoreMixin, self).release()
def acquire(self, blocking=True, timeout=None): def acquire(self, blocking=True, timeout=None):
with self._lock_lock: with self._lock_lock:
return super(_AtomicSemaphore, self).acquire(blocking, timeout) return super(_AtomicSemaphoreMixin, self).acquire(blocking, timeout)
_py3k_acquire = acquire _py3k_acquire = acquire
def wait(self, timeout=None): def wait(self, timeout=None):
with self._lock_lock: with self._lock_lock:
return super(_AtomicSemaphore, self).wait(timeout) return super(_AtomicSemaphoreMixin, self).wait(timeout)
class _AtomicSemaphore(_AtomicSemaphoreMixin, Semaphore):
__slots__ = (
'_lock_lock',
)
class _AtomicBoundedSemaphore(_AtomicSemaphoreMixin, BoundedSemaphore):
__slots__ = (
'_lock_lock',
)
if PURE_PYTHON: if PURE_PYTHON:
Semaphore = _AtomicSemaphore Semaphore = _AtomicSemaphore
BoundedSemaphore = _AtomicBoundedSemaphore
class DummySemaphore(object): class DummySemaphore(object):
......
...@@ -5,7 +5,13 @@ from __future__ import print_function ...@@ -5,7 +5,13 @@ from __future__ import print_function
from gevent import lock from gevent import lock
import gevent.testing as greentest import gevent.testing as greentest
from gevent.tests import test__semaphore
class TestLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
def _makeOne(self):
return lock.RLock()
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
...@@ -14,7 +14,7 @@ import gevent.exceptions ...@@ -14,7 +14,7 @@ import gevent.exceptions
from gevent.lock import Semaphore from gevent.lock import Semaphore
import gevent.testing as greentest import gevent.testing as greentest
from gevent.testing import timing
class TestSemaphore(greentest.TestCase): class TestSemaphore(greentest.TestCase):
...@@ -67,7 +67,7 @@ class TestSemaphore(greentest.TestCase): ...@@ -67,7 +67,7 @@ class TestSemaphore(greentest.TestCase):
gevent.wait([s]) gevent.wait([s])
class TestAcquireContended(greentest.TestCase): class TestSemaphoreMultiThread(greentest.TestCase):
# Tests that the object can be acquired correctly across # Tests that the object can be acquired correctly across
# multiple threads. # multiple threads.
# Used as a base class. # Used as a base class.
...@@ -81,39 +81,113 @@ class TestAcquireContended(greentest.TestCase): ...@@ -81,39 +81,113 @@ class TestAcquireContended(greentest.TestCase):
# would be from an arbitrary thread. # would be from an arbitrary thread.
return Semaphore(1, gevent.get_hub()) return Semaphore(1, gevent.get_hub())
def test_acquire_in_one_then_another(self, release=True, **thread_acquire_kwargs):
def test_acquire_in_one_then_another(self):
from gevent import monkey from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading')) self.assertFalse(monkey.is_module_patched('threading'))
import sys import sys
import threading import threading
thread_running = threading.Event()
thread_acquired = threading.Event()
sem = self._makeOne() sem = self._makeOne()
# Make future acquires block # Make future acquires block
print("acquiring", sem)
sem.acquire() sem.acquire()
exc_info = [] exc_info = []
acquired = []
def thread_main(): def thread_main():
# XXX: When this is fixed, this will have to be modified thread_running.set()
# to avoid deadlock, but being careful to still test
# the initial conditions (e.g., that this doesn't throw;
# we can't pass block=False because that bypasses the part
# that would throw.)
try: try:
sem.acquire() acquired.append(
sem.acquire(**thread_acquire_kwargs)
)
except: except:
exc_info[:] = sys.exc_info() exc_info[:] = sys.exc_info()
raise # Print
finally:
thread_acquired.set()
t = threading.Thread(target=thread_main) t = threading.Thread(target=thread_main)
t.start() t.start()
t.join() while not thread_running.is_set():
thread_running.wait(timing.LARGE_TICK * 5)
if release:
sem.release()
# Spin the loop to be sure the release gets through.
gevent.idle()
thread_acquired.wait(timing.LARGE_TICK * 5)
self.assertEqual(acquired, [True])
thread_acquired.wait(timing.LARGE_TICK * 5)
try: try:
self.assertEqual(exc_info, []) self.assertEqual(exc_info, [])
finally: finally:
exc_info = None exc_info = None
return sem, acquired
def test_acquire_in_one_then_another_timed(self):
sem, acquired_in_thread = self.test_acquire_in_one_then_another(
release=False,
timeout=timing.SMALLEST_RELIABLE_DELAY)
self.assertEqual([False], acquired_in_thread)
# This doesn't, of course, notify anything, because
# the waiter has given up.
sem.release()
notifier = getattr(sem, '_notifier', None)
self.assertIsNone(notifier)
def test_acquire_in_one_wait_greenlet_wait_thread_gives_up(self):
# The waiter in the thread both arrives and gives up while
# the notifier is already running...or at least, that's what
# we'd like to arrange, but the _notify_links function doesn't
# drop the GIL/object lock, so the other thread is stuck and doesn't
# actually get to call into the acquire method.
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
import sys
import threading
sem = self._makeOne()
# Make future acquires block
sem.acquire()
def greenlet_one():
ack = sem.acquire()
# We're running in the notifier function right now. It switched to
# us.
thread.start()
gevent.sleep(timing.LARGE_TICK)
return ack
exc_info = []
acquired = []
def thread_main():
try:
acquired.append(
sem.acquire(timeout=timing.LARGE_TICK)
)
except:
exc_info[:] = sys.exc_info()
raise # Print
glet = gevent.spawn(greenlet_one)
thread = threading.Thread(target=thread_main)
gevent.idle()
sem.release()
glet.join()
thread.join(timing.LARGE_TICK)
self.assertEqual(glet.value, True)
self.assertEqual([], exc_info)
self.assertEqual([False], acquired)
# XXX: Need a test with multiple greenlets in a non-primary
# thread. Things should work, just very slowly; instead of moving through
# greenlet.switch(), they'll be moving with async watchers.
@greentest.skipOnPurePython("Needs C extension") @greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase): class TestCExt(greentest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment