Commit 4152f909 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1617 from gevent/issue1437

Fix issue 1437 by detecting cross-thread lock usage and using an async watcher
parents 0321105e 738ca29f
#!/bin/sh #!/bin/sh
set -e -x set -e -x
PYTHON=${PYTHON:=python} PYTHON=${PYTHON:=python}
$PYTHON -mperf timeit -s'from gevent.lock import Semaphore; s = Semaphore()' 's.release()' $PYTHON -mpyperf timeit -s'from gevent.lock import Semaphore; s = Semaphore()' 's.release()'
$PYTHON -mperf timeit -s'from gevent.lock import Semaphore; from gevent import spawn_raw; s = Semaphore(0)' 'spawn_raw(s.release); s.acquire()' $PYTHON -mpyperf timeit -s'from gevent.lock import Semaphore; from gevent import spawn_raw; s = Semaphore(0)' 'spawn_raw(s.release); s.acquire()'
$PYTHON -mperf timeit -s'from gevent.lock import Semaphore; from gevent import spawn_raw; s = Semaphore(0)' 'spawn_raw(s.release); spawn_raw(s.release); spawn_raw(s.release); spawn_raw(s.release); s.acquire(); s.acquire(); s.acquire(); s.acquire()' $PYTHON -mpyperf timeit -s'from gevent.lock import Semaphore; from gevent import spawn_raw; s = Semaphore(0)' 'spawn_raw(s.release); spawn_raw(s.release); spawn_raw(s.release); spawn_raw(s.release); s.acquire(); s.acquire(); s.acquire(); s.acquire()'
Make gevent locks that are monkey-patched usually work across native
threads as well as across greenlets within a single thread. Locks that
are only used in a single thread do not take a performance hit. While
cross-thread locking is relatively expensive, and not a recommended
programming pattern, it can happen unwittingly, for example when
using the threadpool and ``logging``.
Before, cross-thread lock uses might succeed, or, if the lock was
contended, raise ``greenlet.error``. Now, in the contended case, if
the lock has been acquired by the main thread at least once, it should
correctly block in any thread, cooperating with the event loop of both
threads. In certain (hopefully rare) cases, it might be possible for
contended case to raise ``LoopExit`` when previously it would have
raised ``greenlet.error``; if these cases are a practical concern,
please open an issue.
Also, the underlying Semaphore always behaves in an atomic fashion (as
if the GIL was not released) when PURE_PYTHON is set. Previously, it
only correctly did so on PyPy.
This diff is collapsed.
...@@ -554,6 +554,15 @@ class AsyncMixin(object): ...@@ -554,6 +554,15 @@ class AsyncMixin(object):
def send(self): def send(self):
raise NotImplementedError() raise NotImplementedError()
def send_ignoring_arg(self, _ignored):
"""
Calling compatibility with ``greenlet.switch(arg)``
as used by waiters that have ``rawlink``.
This is an advanced method, not usually needed.
"""
return self.send()
@property @property
def pending(self): def pending(self):
raise NotImplementedError() raise NotImplementedError()
......
...@@ -2,6 +2,7 @@ cimport cython ...@@ -2,6 +2,7 @@ cimport cython
from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop from gevent._gevent_c_greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
from gevent._gevent_c_hub_local cimport get_hub_if_exists
cdef InvalidSwitchError cdef InvalidSwitchError
cdef Timeout cdef Timeout
...@@ -48,6 +49,9 @@ cdef class AbstractLinkable(object): ...@@ -48,6 +49,9 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback) cpdef unlink(self, callback)
cdef _check_and_notify(self) cdef _check_and_notify(self)
cdef void _capture_hub(self, bint create)
cdef __wait_to_be_notified(self, bint rawlink)
cdef void __unlink_all(self, obj) # suppress exceptions
@cython.nonecheck(False) @cython.nonecheck(False)
cdef _notify_link_list(self, list links) cdef _notify_link_list(self, list links)
...@@ -55,6 +59,9 @@ cdef class AbstractLinkable(object): ...@@ -55,6 +59,9 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False) @cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting) cpdef _notify_links(self, list arrived_while_waiting)
cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self)
cdef _wait_core(self, timeout, catch=*) cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success) cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*) cdef _wait(self, timeout=*)
...@@ -11,7 +11,7 @@ from __future__ import division ...@@ -11,7 +11,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
from gevent import _semaphore from gevent import lock
from gevent import queue from gevent import queue
...@@ -21,7 +21,7 @@ __all__ = [ ...@@ -21,7 +21,7 @@ __all__ = [
] ]
locals()['Greenlet'] = __import__('gevent').Greenlet locals()['Greenlet'] = __import__('gevent').Greenlet
locals()['Semaphore'] = _semaphore.Semaphore locals()['Semaphore'] = lock.Semaphore
locals()['UnboundQueue'] = queue.UnboundQueue locals()['UnboundQueue'] = queue.UnboundQueue
......
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False # cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
###
# This file is ``gevent._semaphore`` so that it can be compiled by Cython
# individually. However, this is not the place to import from. Everyone,
# gevent internal code included, must import from ``gevent.lock``.
# The only exception are .pxd files which need access to the
# C code; the PURE_PYTHON things that have to happen and which are
# handled in ``gevent.lock``, do not apply to them.
###
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
__all__ = [ __all__ = [
...@@ -45,11 +53,15 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -45,11 +53,15 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
unlinks waiters before calling them. unlinks waiters before calling them.
""" """
__slots__ = (
'counter',
)
def __init__(self, value=1, hub=None): def __init__(self, value=1, hub=None):
if value < 0: self.counter = value
if self.counter < 0: # Do the check after Cython native int conversion
raise ValueError("semaphore initial value must be >= 0") raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__(hub) super(Semaphore, self).__init__(hub)
self.counter = value
self._notify_all = False self._notify_all = False
def __str__(self): def __str__(self):
...@@ -151,6 +163,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -151,6 +163,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
raise a ``Timeout`` exception, if some other caller had already started a timer.) raise a ``Timeout`` exception, if some other caller had already started a timer.)
""" """
if self.counter > 0: if self.counter > 0:
# We conceptually now belong to the hub of
# the thread that called this, even though we didn't
# have to block. Note that we cannot force it to be created
# yet, because Semaphore is used by importlib.ModuleLock
# which is used when importing the hub itself!
self._capture_hub(False)
self.counter -= 1 self.counter -= 1
return True return True
......
...@@ -326,6 +326,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -326,6 +326,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
# Not ready and not blocking, so immediately timeout # Not ready and not blocking, so immediately timeout
raise Timeout() raise Timeout()
self._capture_hub(True)
# Wait, raising a timeout that elapses # Wait, raising a timeout that elapses
self._wait_core(timeout, ()) self._wait_core(timeout, ())
......
...@@ -1199,6 +1199,9 @@ cdef public class async_(watcher) [object PyGeventAsyncObject, type PyGeventAsyn ...@@ -1199,6 +1199,9 @@ cdef public class async_(watcher) [object PyGeventAsyncObject, type PyGeventAsyn
_check_loop(self.loop) _check_loop(self.loop)
libev.ev_async_send(self.loop._ptr, &self._watcher) libev.ev_async_send(self.loop._ptr, &self._watcher)
def send_ignoring_arg(self, _ignored):
return self.send()
async = async_ async = async_
cdef start_and_stop child_ss = make_ss(<void*>libev.ev_child_start, <void*>libev.ev_child_stop) cdef start_and_stop child_ss = make_ss(<void*>libev.ev_child_start, <void*>libev.ev_child_stop)
......
...@@ -10,8 +10,12 @@ infinite bounds (:class:`DummySemaphore`), along with a reentrant lock ...@@ -10,8 +10,12 @@ infinite bounds (:class:`DummySemaphore`), along with a reentrant lock
from __future__ import absolute_import from __future__ import absolute_import
from gevent.hub import getcurrent from gevent.hub import getcurrent
from gevent._compat import PYPY from gevent._compat import PURE_PYTHON
from gevent._semaphore import Semaphore, BoundedSemaphore # pylint:disable=no-name-in-module,import-error # This is the one exception to the rule of where to
# import Semaphore, obviously
from gevent import monkey
from gevent._semaphore import Semaphore
from gevent._semaphore import BoundedSemaphore
__all__ = [ __all__ = [
...@@ -24,112 +28,148 @@ __all__ = [ ...@@ -24,112 +28,148 @@ __all__ = [
# On PyPy, we don't compile the Semaphore class with Cython. Under # On PyPy, we don't compile the Semaphore class with Cython. Under
# Cython, each individual method holds the GIL for its entire # Cython, each individual method holds the GIL for its entire
# duration, ensuring that no other thread can interrupt us in an # duration, ensuring that no other thread can interrupt us in an
# unsafe state (only when we _do_wait do we call back into Python and # unsafe state (only when we _wait do we call back into Python and
# allow switching threads). Simulate that here through the use of a manual # allow switching threads; this is broken down into the
# lock. (We use a separate lock for each semaphore to allow sys.settrace functions # _drop_lock_for_switch_out and _acquire_lock_for_switch_in methods).
# to use locks *other* than the one being traced.) # Simulate that here through the use of a manual lock. (We use a
if PYPY: # separate lock for each semaphore to allow sys.settrace functions to
# TODO: Need to use monkey.get_original? # use locks *other* than the one being traced.) This, of course, must
try: # also hold for PURE_PYTHON mode when no optional C extensions are
from _thread import allocate_lock as _allocate_lock # pylint:disable=import-error,useless-suppression # used.
from _thread import get_ident as _get_ident # pylint:disable=import-error,useless-suppression
except ImportError: _allocate_lock, _get_ident = monkey.get_original(
# Python 2 ('_thread', 'thread'),
from thread import allocate_lock as _allocate_lock # pylint:disable=import-error,useless-suppression ('allocate_lock', 'get_ident')
from thread import get_ident as _get_ident # pylint:disable=import-error,useless-suppression )
_sem_lock = _allocate_lock()
def untraceable(f): class _OwnedLock(object):
# Don't allow re-entry to these functions in a single thread, as can __slots__ = (
# happen if a sys.settrace is used '_owner',
def wrapper(self): '_block',
me = _get_ident() '_locking',
try: '_count',
count = self._locking[me] )
except KeyError: # Don't allow re-entry to these functions in a single thread, as can
count = self._locking[me] = 1 # happen if a sys.settrace is used.
else: #
count = self._locking[me] = count + 1 # This is essentially a variant of the (pure-Python) RLock from the
if count: # standard library.
return def __init__(self):
self._owner = None
self._block = _allocate_lock()
self._locking = {}
self._count = 0
try:
return f(self)
finally:
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
return wrapper
class _OwnedLock(object): def __begin(self):
# Return (me, count) if we should proceed, otherwise return
# None. The function should exit in that case.
# In either case, it must call __end.
me = _get_ident()
try:
count = self._locking[me]
except KeyError:
count = self._locking[me] = 1
else:
count = self._locking[me] = count + 1
return (me, count) if not count else (None, None)
def __end(self, me, count):
if me is None:
return
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
def __init__(self): def __enter__(self):
self._owner = None me, lock_count = self.__begin()
self._block = _allocate_lock() try:
self._locking = {} if me is None:
self._count = 0 return
@untraceable
def acquire(self):
me = _get_ident()
if self._owner == me: if self._owner == me:
self._count += 1 self._count += 1
return return
self._owner = me
self._block.acquire() self._block.acquire()
self._owner = me
self._count = 1 self._count = 1
finally:
self.__end(me, lock_count)
def __exit__(self, t, v, tb):
self.release()
acquire = __enter__
def release(self):
me, lock_count = self.__begin()
try:
if me is None:
return
@untraceable
def release(self):
self._count = count = self._count - 1 self._count = count = self._count - 1
if not count: if not count:
self._block.release()
self._owner = None self._owner = None
self._block.release()
finally:
self.__end(me, lock_count)
class _AtomicSemaphoreMixin(object):
# Behaves as though the GIL was held for the duration of acquire, wait,
# and release, just as if we were in Cython.
#
# acquire, wait, and release all acquire the lock on entry and release it # acquire, wait, and release all acquire the lock on entry and release it
# on exit. acquire and wait can call _do_wait, which must release it on entry # on exit. acquire and wait can call _wait, which must release it on entry
# and re-acquire it for them on exit. # and re-acquire it for them on exit.
class _around(object): #
__slots__ = ('before', 'after') # Note that this does *NOT*, in-and-of itself, make semaphores safe to use from multiple threads
def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock()
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
def __init__(self, before, after): def _acquire_lock_for_switch_in(self):
self.before = before self._lock_lock.acquire()
self.after = after
def __enter__(self): def _drop_lock_for_switch_out(self):
self.before() self._lock_lock.release()
def __exit__(self, t, v, tb): def _notify_links(self, arrived_while_waiting):
self.after() with self._lock_lock:
return super(_AtomicSemaphoreMixin, self)._notify_links(arrived_while_waiting)
def release(self):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self).release()
def _decorate(func, cmname): def acquire(self, blocking=True, timeout=None):
# functools.wrap? with self._lock_lock:
def wrapped(self, *args, **kwargs): return super(_AtomicSemaphoreMixin, self).acquire(blocking, timeout)
with getattr(self, cmname):
return func(self, *args, **kwargs)
return wrapped
Semaphore._py3k_acquire = Semaphore.acquire = _decorate(Semaphore.acquire, '_lock_locked') _py3k_acquire = acquire
Semaphore.release = _decorate(Semaphore.release, '_lock_locked')
Semaphore.wait = _decorate(Semaphore.wait, '_lock_locked')
Semaphore._wait = _decorate(Semaphore._wait, '_lock_unlocked')
_Sem_init = Semaphore.__init__ def wait(self, timeout=None):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self).wait(timeout)
def __init__(self, *args, **kwargs): class _AtomicSemaphore(_AtomicSemaphoreMixin, Semaphore):
l = self._lock_lock = _OwnedLock() __slots__ = (
self._lock_locked = _around(l.acquire, l.release) '_lock_lock',
self._lock_unlocked = _around(l.release, l.acquire) )
_Sem_init(self, *args, **kwargs) class _AtomicBoundedSemaphore(_AtomicSemaphoreMixin, BoundedSemaphore):
__slots__ = (
'_lock_lock',
)
Semaphore.__init__ = __init__
del _decorate if PURE_PYTHON:
del untraceable Semaphore = _AtomicSemaphore
BoundedSemaphore = _AtomicBoundedSemaphore
class DummySemaphore(object): class DummySemaphore(object):
...@@ -231,8 +271,12 @@ class RLock(object): ...@@ -231,8 +271,12 @@ class RLock(object):
'__weakref__', '__weakref__',
) )
def __init__(self): def __init__(self, hub=None):
self._block = Semaphore(1) """
.. versionchanged:: NEXT
Add the ``hub`` argument.
"""
self._block = Semaphore(1, hub)
self._owner = None self._owner = None
self._count = 0 self._count = 0
......
...@@ -250,7 +250,9 @@ def get_original(mod_name, item_name): ...@@ -250,7 +250,9 @@ def get_original(mod_name, item_name):
retrieved. retrieved.
:param str mod_name: The name of the standard library module, :param str mod_name: The name of the standard library module,
e.g., ``'socket'``. e.g., ``'socket'``. Can also be a sequence of standard library
modules giving alternate names to try, e.g., ``('thread', '_thread')``;
the first importable module will supply all *item_name* items.
:param item_name: A string or sequence of strings naming the :param item_name: A string or sequence of strings naming the
attribute(s) on the module ``mod_name`` to return. attribute(s) on the module ``mod_name`` to return.
...@@ -258,10 +260,22 @@ def get_original(mod_name, item_name): ...@@ -258,10 +260,22 @@ def get_original(mod_name, item_name):
``item_name`` or a sequence of original values if a ``item_name`` or a sequence of original values if a
sequence was passed. sequence was passed.
""" """
mod_names = [mod_name] if isinstance(mod_name, string_types) else mod_name
if isinstance(item_name, string_types): if isinstance(item_name, string_types):
return _get_original(mod_name, [item_name])[0] item_names = [item_name]
return _get_original(mod_name, item_name) unpack = True
else:
item_names = item_name
unpack = False
for mod in mod_names:
try:
result = _get_original(mod, item_names)
except ImportError:
if mod is mod_names[-1]:
raise
else:
return result[0] if unpack else result
_NONE = object() _NONE = object()
......
...@@ -89,7 +89,6 @@ class _RefCountChecker(object): ...@@ -89,7 +89,6 @@ class _RefCountChecker(object):
return False return False
return True return True
def _growth(self): def _growth(self):
return objgraph.growth(limit=None, peak_stats=self.peak_stats, filter=self._ignore_object_p) return objgraph.growth(limit=None, peak_stats=self.peak_stats, filter=self._ignore_object_p)
...@@ -198,13 +197,15 @@ class _RefCountChecker(object): ...@@ -198,13 +197,15 @@ class _RefCountChecker(object):
def wrap_refcount(method): def wrap_refcount(method):
if objgraph is None:
import warnings
warnings.warn("objgraph not available, leakchecks disabled")
return method
if getattr(method, 'ignore_leakcheck', False): if objgraph is None or getattr(method, 'ignore_leakcheck', False):
return method if objgraph is None:
import warnings
warnings.warn("objgraph not available, leakchecks disabled")
@wraps(method)
def _method_skipped_during_leakcheck(self, *_args, **_kwargs):
self.skipTest("This method ignored during leakchecks")
return _method_skipped_during_leakcheck
@wraps(method) @wraps(method)
......
...@@ -435,7 +435,7 @@ class BaseSemaphoreTests(BaseTestCase): ...@@ -435,7 +435,7 @@ class BaseSemaphoreTests(BaseTestCase):
def test_constructor(self): def test_constructor(self):
self.assertRaises(ValueError, self.semtype, value=-1) self.assertRaises(ValueError, self.semtype, value=-1)
# Py3 doesn't have sys.maxint # Py3 doesn't have sys.maxint
self.assertRaises(ValueError, self.semtype, self.assertRaises((ValueError, OverflowError), self.semtype,
value=-getattr(sys, 'maxint', getattr(sys, 'maxsize', None))) value=-getattr(sys, 'maxint', getattr(sys, 'maxsize', None)))
def test_acquire(self): def test_acquire(self):
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gevent import lock
import gevent.testing as greentest
from gevent.tests import test__semaphore
class TestRLockMultiThread(test__semaphore.TestSemaphoreMultiThread):
def _makeOne(self):
# If we don't set the hub before returning,
# there's a potential race condition, if the implementation
# isn't careful. If it's the background hub that winds up capturing
# the hub, it will ask the hub to switch back to itself and
# then switch to the hub, which will raise LoopExit (nothing
# for the background thread to do). What is supposed to happen
# is that the background thread realizes it's the background thread,
# starts an async watcher and then switches to the hub.
#
# So we deliberately don't set the hub to help test that condition.
return lock.RLock()
if __name__ == '__main__':
greentest.main()
###
# This file is test__semaphore.py only for organization purposes.
# The public API,
# and the *only* correct place to import Semaphore --- even in tests ---
# is ``gevent.lock``, never ``gevent._semaphore``.
##
from __future__ import print_function from __future__ import print_function
from __future__ import absolute_import from __future__ import absolute_import
...@@ -6,16 +12,9 @@ import weakref ...@@ -6,16 +12,9 @@ import weakref
import gevent import gevent
import gevent.exceptions import gevent.exceptions
from gevent.lock import Semaphore from gevent.lock import Semaphore
from gevent.thread import allocate_lock
import gevent.testing as greentest import gevent.testing as greentest
from gevent.testing import timing
try:
from _thread import allocate_lock as std_allocate_lock
except ImportError: # Py2
from thread import allocate_lock as std_allocate_lock
# pylint:disable=broad-except
class TestSemaphore(greentest.TestCase): class TestSemaphore(greentest.TestCase):
...@@ -67,24 +66,154 @@ class TestSemaphore(greentest.TestCase): ...@@ -67,24 +66,154 @@ class TestSemaphore(greentest.TestCase):
s = Semaphore() s = Semaphore()
gevent.wait([s]) gevent.wait([s])
class TestLock(greentest.TestCase):
def test_release_unheld_lock(self):
std_lock = std_allocate_lock()
g_lock = allocate_lock()
try:
std_lock.release()
self.fail("Should have thrown an exception")
except Exception as e:
std_exc = e
class TestSemaphoreMultiThread(greentest.TestCase):
# Tests that the object can be acquired correctly across
# multiple threads.
# Used as a base class.
# See https://github.com/gevent/gevent/issues/1437
def _makeOne(self):
# Create an object that is associated with the current hub. If
# we don't do this now, it gets initialized lazily the first
# time it would have to block, which, in the event of threads,
# would be from an arbitrary thread.
return Semaphore(1, gevent.get_hub())
def _makeThreadMain(self, thread_running, thread_acquired, sem,
acquired, exc_info,
**thread_acquire_kwargs):
from gevent._hub_local import get_hub_if_exists
import sys
def thread_main():
thread_running.set()
try:
acquired.append(
sem.acquire(**thread_acquire_kwargs)
)
except:
exc_info[:] = sys.exc_info()
raise # Print
finally:
hub = get_hub_if_exists()
if hub is not None:
hub.join()
hub.destroy(destroy_loop=True)
thread_acquired.set()
return thread_main
def _do_test_acquire_in_one_then_another(self, release=True, **thread_acquire_kwargs):
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
import threading
thread_running = threading.Event()
thread_acquired = threading.Event()
sem = self._makeOne()
# Make future acquires block
sem.acquire()
exc_info = []
acquired = []
t = threading.Thread(target=self._makeThreadMain(
thread_running, thread_acquired, sem,
acquired, exc_info,
**thread_acquire_kwargs
))
t.start()
thread_running.wait(10) # implausibly large time
if release:
sem.release()
# Spin the loop to be sure the release gets through.
# (Release schedules the notifier to run, and when the
# notifier run it sends the async notification to the
# other thread. Depending on exactly where we are in the
# event loop, and the limit to the number of callbacks
# that get run (including time-based) the notifier may or
# may not be immediately ready to run, so this can take up
# to two iterations.)
for _ in range(3):
gevent.idle()
if thread_acquired.wait(timing.LARGE_TICK):
break
self.assertEqual(acquired, [True])
thread_acquired.wait(timing.LARGE_TICK * 5)
try: try:
g_lock.release() self.assertEqual(exc_info, [])
self.fail("Should have thrown an exception") finally:
except Exception as e: exc_info = None
g_exc = e
self.assertIsInstance(g_exc, type(std_exc)) return sem, acquired
def test_acquire_in_one_then_another(self):
self._do_test_acquire_in_one_then_another(release=True)
def test_acquire_in_one_then_another_timed(self):
sem, acquired_in_thread = self._do_test_acquire_in_one_then_another(
release=False,
timeout=timing.SMALLEST_RELIABLE_DELAY)
self.assertEqual([False], acquired_in_thread)
# This doesn't, of course, notify anything, because
# the waiter has given up.
sem.release()
notifier = getattr(sem, '_notifier', None)
self.assertIsNone(notifier)
def test_acquire_in_one_wait_greenlet_wait_thread_gives_up(self):
# The waiter in the thread both arrives and gives up while
# the notifier is already running...or at least, that's what
# we'd like to arrange, but the _notify_links function doesn't
# drop the GIL/object lock, so the other thread is stuck and doesn't
# actually get to call into the acquire method.
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
import threading
sem = self._makeOne()
# Make future acquires block
sem.acquire()
def greenlet_one():
ack = sem.acquire()
# We're running in the notifier function right now. It switched to
# us.
thread.start()
gevent.sleep(timing.LARGE_TICK)
return ack
exc_info = []
acquired = []
glet = gevent.spawn(greenlet_one)
thread = threading.Thread(target=self._makeThreadMain(
threading.Event(), threading.Event(),
sem,
acquired, exc_info,
timeout=timing.LARGE_TICK
))
gevent.idle()
sem.release()
glet.join()
thread.join(timing.LARGE_TICK)
self.assertEqual(glet.value, True)
self.assertEqual([], exc_info)
self.assertEqual([False], acquired)
# XXX: Need a test with multiple greenlets in a non-primary
# thread. Things should work, just very slowly; instead of moving through
# greenlet.switch(), they'll be moving with async watchers.
@greentest.skipOnPurePython("Needs C extension") @greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase): class TestCExt(greentest.TestCase):
...@@ -153,7 +282,6 @@ def release_then_spawn(sem, should_quit): ...@@ -153,7 +282,6 @@ def release_then_spawn(sem, should_quit):
class TestSemaphoreFair(greentest.TestCase): class TestSemaphoreFair(greentest.TestCase):
@greentest.ignores_leakcheck
def test_fair_or_hangs(self): def test_fair_or_hangs(self):
# If the lock isn't fair, this hangs, spinning between # If the lock isn't fair, this hangs, spinning between
# the last two greenlets. # the last two greenlets.
...@@ -172,6 +300,12 @@ class TestSemaphoreFair(greentest.TestCase): ...@@ -172,6 +300,12 @@ class TestSemaphoreFair(greentest.TestCase):
self.assertTrue(keep_going2.dead, keep_going2) self.assertTrue(keep_going2.dead, keep_going2)
self.assertFalse(keep_going1.dead, keep_going1) self.assertFalse(keep_going1.dead, keep_going1)
sem.release()
keep_going1.kill()
keep_going2.kill()
exiting.kill()
gevent.idle()
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
from __future__ import print_function
from __future__ import absolute_import
from gevent.thread import allocate_lock
import gevent.testing as greentest
try:
from _thread import allocate_lock as std_allocate_lock
except ImportError: # Py2
from thread import allocate_lock as std_allocate_lock
class TestLock(greentest.TestCase):
def test_release_unheld_lock(self):
std_lock = std_allocate_lock()
g_lock = allocate_lock()
with self.assertRaises(Exception) as exc:
std_lock.release()
std_exc = exc.exception
with self.assertRaises(Exception) as exc:
g_lock.release()
g_exc = exc.exception
self.assertIsInstance(g_exc, type(std_exc))
if __name__ == '__main__':
greentest.main()
...@@ -23,10 +23,17 @@ native_thread = None ...@@ -23,10 +23,17 @@ native_thread = None
class Test(greentest.TestCase): class Test(greentest.TestCase):
@classmethod
def tearDownClass(cls):
global native_thread
if native_thread is not None:
native_thread.stop(1)
native_thread = None
def test_main_thread(self): def test_main_thread(self):
current = threading.current_thread() current = threading.current_thread()
self.assertFalse(isinstance(current, threading._DummyThread)) self.assertNotIsInstance(current, threading._DummyThread)
self.assertTrue(isinstance(current, monkey.get_original('threading', 'Thread'))) self.assertIsInstance(current, monkey.get_original('threading', 'Thread'))
# in 3.4, if the patch is incorrectly done, getting the repr # in 3.4, if the patch is incorrectly done, getting the repr
# of the thread fails # of the thread fails
repr(current) repr(current)
...@@ -36,6 +43,9 @@ class Test(greentest.TestCase): ...@@ -36,6 +43,9 @@ class Test(greentest.TestCase):
@greentest.ignores_leakcheck # because it can't be run multiple times @greentest.ignores_leakcheck # because it can't be run multiple times
def test_join_native_thread(self): def test_join_native_thread(self):
if native_thread is None or not native_thread.do_run: # pragma: no cover
self.skipTest("native_thread already closed")
self.assertTrue(native_thread.is_alive()) self.assertTrue(native_thread.is_alive())
native_thread.stop(timeout=1) native_thread.stop(timeout=1)
...@@ -48,6 +58,7 @@ class Test(greentest.TestCase): ...@@ -48,6 +58,7 @@ class Test(greentest.TestCase):
if __name__ == '__main__': if __name__ == '__main__':
native_thread = NativeThread() native_thread = NativeThread()
native_thread.daemon = True
native_thread.start() native_thread.start()
# Only patch after we're running # Only patch after we're running
......
...@@ -203,7 +203,8 @@ if hasattr(__threading__, '_CRLock'): ...@@ -203,7 +203,8 @@ if hasattr(__threading__, '_CRLock'):
# which in turn used _allocate_lock. Now, it wants to use # which in turn used _allocate_lock. Now, it wants to use
# threading._CRLock, which is imported from _thread.RLock and as such # threading._CRLock, which is imported from _thread.RLock and as such
# is implemented in C. So it bypasses our _allocate_lock function. # is implemented in C. So it bypasses our _allocate_lock function.
# Fortunately they left the Python fallback in place. # Fortunately they left the Python fallback in place and use it
# if the imported _CRLock is None; this arranges for that to be the case.
# This was also backported to PyPy 2.7-7.0 # This was also backported to PyPy 2.7-7.0
assert PY3 or PYPY, "Unsupported Python version" assert PY3 or PYPY, "Unsupported Python version"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment