Commit 0c1d7806 authored by Jason Madden's avatar Jason Madden

Adjust when and how Semaphore captures the hub.

parent a191cb49
Make gevent locks that are monkey-patched work across native Make gevent locks that are monkey-patched usually work across native
threads as well as across greenlets within a single thread. This threads as well as across greenlets within a single thread. Locks that
is expensive, and not a recommended programming pattern, but it can are only used in a single thread do not take a performance hit. While
happen when using the threadpool. Locks that are only used in a single cross-thread locking is relatively expensive, and not a recommended
thread do not take a performance hit. programming pattern, it can happen unwittingly, for example when
using the threadpool and ``logging``.
The underlying Semaphore always behaves in an atomic fashion (as if Before, cross-thread lock uses might succeed, or, if the lock was
the GIL was not released) when PURE_PYTHON is set. Previously, it only contended, raise ``greenlet.error``. Now, in the contended case, if
correctly did so on PyPy. the lock has been acquired by the main thread at least once, it should
correctly block in any thread, cooperating with the event loop of both
threads. In certain (hopefully rare) cases, it might be possible for
contended case to raise ``LoopExit`` when previously it would have
raised ``greenlet.error``; if these cases are a practical concern,
please open an issue.
Also, the underlying Semaphore always behaves in an atomic fashion (as
if the GIL was not released) when PURE_PYTHON is set. Previously, it
only correctly did so on PyPy.
...@@ -11,6 +11,7 @@ from __future__ import print_function ...@@ -11,6 +11,7 @@ from __future__ import print_function
import sys import sys
from gevent._hub_local import get_hub_noargs as get_hub from gevent._hub_local import get_hub_noargs as get_hub
from gevent._hub_local import get_hub_if_exists
from gevent.exceptions import InvalidSwitchError from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout from gevent.timeout import Timeout
...@@ -84,8 +85,9 @@ class AbstractLinkable(object): ...@@ -84,8 +85,9 @@ class AbstractLinkable(object):
# we don't want to do get_hub() here to allow defining module-level objects # we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub. However, for multiple-thread safety, as soon # without initializing the hub. However, for multiple-thread safety, as soon
# as a waiting method is entered, even if it won't have to wait, we # as a waiting method is entered, even if it won't have to wait, we
# grab the hub. # need to grab the hub and assign ownership. For that reason, if the hub
self.hub = hub # is present, we'll go ahead and take it.
self.hub = hub if hub is not None else get_hub_if_exists()
def linkcount(self): def linkcount(self):
# For testing: how many objects are linked to this one? # For testing: how many objects are linked to this one?
...@@ -125,12 +127,24 @@ class AbstractLinkable(object): ...@@ -125,12 +127,24 @@ class AbstractLinkable(object):
# _notify_links method. # _notify_links method.
self._notifier.stop() self._notifier.stop()
def _capture_hub(self, create):
# Subclasses should call this as the first action from any
# public method that could, in theory, block and switch
# to the hub. This may release the GIL.
if self.hub is None:
# This next line might release the GIL.
current_hub = get_hub() if create else get_hub_if_exists()
if current_hub is None:
return
# We have the GIL again. Did anything change? If so,
# we lost the race.
if self.hub is None:
self.hub = current_hub
def _check_and_notify(self): def _check_and_notify(self):
# If this object is ready to be notified, begin the process. # If this object is ready to be notified, begin the process.
if self.ready() and self._links and not self._notifier: if self.ready() and self._links and not self._notifier:
if self.hub is None: self._capture_hub(True) # Must create, we need it.
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links, []) self._notifier = self.hub.loop.run_callback(self._notify_links, [])
def _notify_link_list(self, links): def _notify_link_list(self, links):
...@@ -153,7 +167,11 @@ class AbstractLinkable(object): ...@@ -153,7 +167,11 @@ class AbstractLinkable(object):
# be free to be reused. # be free to be reused.
done.add(id_link) done.add(id_link)
try: try:
link(self) self._drop_lock_for_switch_out()
try:
link(self)
finally:
self._acquire_lock_for_switch_in()
except: # pylint:disable=bare-except except: # pylint:disable=bare-except
# We're running in the hub, errors must not escape. # We're running in the hub, errors must not escape.
self.hub.handle_error((link, self), *sys.exc_info()) self.hub.handle_error((link, self), *sys.exc_info())
...@@ -208,7 +226,6 @@ class AbstractLinkable(object): ...@@ -208,7 +226,6 @@ class AbstractLinkable(object):
# free up thread affinity? In case of a pathological situation where # free up thread affinity? In case of a pathological situation where
# one object was used from one thread once & first, but usually is # one object was used from one thread once & first, but usually is
# used by another thread. # used by another thread.
# Now we may be ready or not ready. If we're ready, which # Now we may be ready or not ready. If we're ready, which
# could have happened during the last link we called, then we # could have happened during the last link we called, then we
# must have more links than we started with. We need to schedule the # must have more links than we started with. We need to schedule the
...@@ -267,6 +284,7 @@ class AbstractLinkable(object): ...@@ -267,6 +284,7 @@ class AbstractLinkable(object):
self.rawlink(send) self.rawlink(send)
else: else:
self._notifier.args[0].append(send) self._notifier.args[0].append(send)
watcher.start(getcurrent().switch, self) # pylint:disable=undefined-variable watcher.start(getcurrent().switch, self) # pylint:disable=undefined-variable
break break
...@@ -276,7 +294,6 @@ class AbstractLinkable(object): ...@@ -276,7 +294,6 @@ class AbstractLinkable(object):
self.rawlink(resume_this_greenlet) self.rawlink(resume_this_greenlet)
else: else:
self._notifier.args[0].append(resume_this_greenlet) self._notifier.args[0].append(resume_this_greenlet)
try: try:
self._drop_lock_for_switch_out() self._drop_lock_for_switch_out()
result = current_hub.switch() # Probably releases result = current_hub.switch() # Probably releases
...@@ -345,8 +362,7 @@ class AbstractLinkable(object): ...@@ -345,8 +362,7 @@ class AbstractLinkable(object):
the conditions laid out in the class documentation are met. the conditions laid out in the class documentation are met.
""" """
# Watch where we could potentially release the GIL. # Watch where we could potentially release the GIL.
if self.hub is None: # no release self._capture_hub(True) # Must create, we must have an owner. Might release
self.hub = get_hub() # might releases.
if self.ready(): # *might* release, if overridden in Python. if self.ready(): # *might* release, if overridden in Python.
result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none
......
...@@ -2,6 +2,7 @@ cimport cython ...@@ -2,6 +2,7 @@ cimport cython
from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
from gevent._gevent_c_hub_local cimport get_hub_if_exists
cdef InvalidSwitchError cdef InvalidSwitchError
cdef Timeout cdef Timeout
...@@ -48,8 +49,9 @@ cdef class AbstractLinkable(object): ...@@ -48,8 +49,9 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback) cpdef unlink(self, callback)
cdef _check_and_notify(self) cdef _check_and_notify(self)
cdef void _capture_hub(self, bint create)
cdef __wait_to_be_notified(self, bint rawlink) cdef __wait_to_be_notified(self, bint rawlink)
cdef __unlink_all(self, obj) cdef void __unlink_all(self, obj) # suppress exceptions
@cython.nonecheck(False) @cython.nonecheck(False)
cdef _notify_link_list(self, list links) cdef _notify_link_list(self, list links)
......
...@@ -58,10 +58,10 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -58,10 +58,10 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
) )
def __init__(self, value=1, hub=None): def __init__(self, value=1, hub=None):
if value < 0: self.counter = value
if self.counter < 0: # Do the check after Cython native int conversion
raise ValueError("semaphore initial value must be >= 0") raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__(hub) super(Semaphore, self).__init__(hub)
self.counter = value
self._notify_all = False self._notify_all = False
def __str__(self): def __str__(self):
...@@ -163,6 +163,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -163,6 +163,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
raise a ``Timeout`` exception, if some other caller had already started a timer.) raise a ``Timeout`` exception, if some other caller had already started a timer.)
""" """
if self.counter > 0: if self.counter > 0:
# We conceptually now belong to the hub of
# the thread that called this, even though we didn't
# have to block. Note that we cannot force it to be created
# yet, because Semaphore is used by importlib.ModuleLock
# which is used when importing the hub itself!
self._capture_hub(False)
self.counter -= 1 self.counter -= 1
return True return True
......
...@@ -127,7 +127,7 @@ class _AtomicSemaphoreMixin(object): ...@@ -127,7 +127,7 @@ class _AtomicSemaphoreMixin(object):
# on exit. acquire and wait can call _wait, which must release it on entry # on exit. acquire and wait can call _wait, which must release it on entry
# and re-acquire it for them on exit. # and re-acquire it for them on exit.
# #
# Note that this does *NOT* make semaphores safe to use from multiple threads # Note that this does *NOT*, in-and-of itself, make semaphores safe to use from multiple threads
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock() self._lock_lock = _OwnedLock()
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs) super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
...@@ -271,8 +271,12 @@ class RLock(object): ...@@ -271,8 +271,12 @@ class RLock(object):
'__weakref__', '__weakref__',
) )
def __init__(self): def __init__(self, hub=None):
self._block = Semaphore(1) """
.. versionchanged:: NEXT
Add the ``hub`` argument.
"""
self._block = Semaphore(1, hub)
self._owner = None self._owner = None
self._count = 0 self._count = 0
......
...@@ -435,7 +435,7 @@ class BaseSemaphoreTests(BaseTestCase): ...@@ -435,7 +435,7 @@ class BaseSemaphoreTests(BaseTestCase):
def test_constructor(self): def test_constructor(self):
self.assertRaises(ValueError, self.semtype, value=-1) self.assertRaises(ValueError, self.semtype, value=-1)
# Py3 doesn't have sys.maxint # Py3 doesn't have sys.maxint
self.assertRaises(ValueError, self.semtype, self.assertRaises((ValueError, OverflowError), self.semtype,
value=-getattr(sys, 'maxint', getattr(sys, 'maxsize', None))) value=-getattr(sys, 'maxint', getattr(sys, 'maxsize', None)))
def test_acquire(self): def test_acquire(self):
......
...@@ -4,13 +4,24 @@ from __future__ import print_function ...@@ -4,13 +4,24 @@ from __future__ import print_function
from gevent import lock from gevent import lock
import gevent.testing as greentest import gevent.testing as greentest
from gevent.tests import test__semaphore from gevent.tests import test__semaphore
class TestLockMultiThread(test__semaphore.TestSemaphoreMultiThread): class TestRLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
def _makeOne(self): def _makeOne(self):
# If we don't set the hub before returning,
# there's a potential race condition, if the implementation
# isn't careful. If it's the background hub that winds up capturing
# the hub, it will ask the hub to switch back to itself and
# then switch to the hub, which will raise LoopExit (nothing
# for the background thread to do). What is supposed to happen
# is that the background thread realizes it's the background thread,
# starts an async watcher and then switches to the hub.
#
# So we deliberately don't set the hub to help test that condition.
return lock.RLock() return lock.RLock()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -110,15 +110,25 @@ class TestSemaphoreMultiThread(greentest.TestCase): ...@@ -110,15 +110,25 @@ class TestSemaphoreMultiThread(greentest.TestCase):
t = threading.Thread(target=thread_main) t = threading.Thread(target=thread_main)
t.start() t.start()
while not thread_running.is_set(): thread_running.wait(10) # implausibly large time
thread_running.wait(timing.LARGE_TICK * 5)
if release: if release:
sem.release() sem.release()
# Spin the loop to be sure the release gets through. # Spin the loop to be sure the release gets through.
gevent.idle() # (Release schedules the notifier to run, and when the
thread_acquired.wait(timing.LARGE_TICK * 5) # notifier run it sends the async notification to the
# other thread. Depending on exactly where we are in the
# event loop, and the limit to the number of callbacks
# that get run (including time-based) the notifier may or
# may not be immediately ready to run, so this can take up
# to two iterations.)
for _ in range(3):
gevent.idle()
if thread_acquired.wait(timing.LARGE_TICK):
break
self.assertEqual(acquired, [True]) self.assertEqual(acquired, [True])
thread_acquired.wait(timing.LARGE_TICK * 5) thread_acquired.wait(timing.LARGE_TICK * 5)
try: try:
self.assertEqual(exc_info, []) self.assertEqual(exc_info, [])
......
...@@ -203,7 +203,8 @@ if hasattr(__threading__, '_CRLock'): ...@@ -203,7 +203,8 @@ if hasattr(__threading__, '_CRLock'):
# which in turn used _allocate_lock. Now, it wants to use # which in turn used _allocate_lock. Now, it wants to use
# threading._CRLock, which is imported from _thread.RLock and as such # threading._CRLock, which is imported from _thread.RLock and as such
# is implemented in C. So it bypasses our _allocate_lock function. # is implemented in C. So it bypasses our _allocate_lock function.
# Fortunately they left the Python fallback in place. # Fortunately they left the Python fallback in place and use it
# if the imported _CRLock is None; this arranges for that to be the case.
# This was also backported to PyPy 2.7-7.0 # This was also backported to PyPy 2.7-7.0
assert PY3 or PYPY, "Unsupported Python version" assert PY3 or PYPY, "Unsupported Python version"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment