Commit 6115e431 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1488 from gevent/issue1487

Make Semaphores fair.
parents d80b3687 9d7a12c0
...@@ -36,6 +36,13 @@ ...@@ -36,6 +36,13 @@
Using spin locks is not recommended, but may have been done in code Using spin locks is not recommended, but may have been done in code
written for threads, especially on Python 3. See :issue:`1464`. written for threads, especially on Python 3. See :issue:`1464`.
- Fix Semaphore (and monkey-patched threading locks) to be fair. This
eliminates the rare potential for starvation of greenlets. As part
of this change, the low-level method ``rawlink`` of Semaphore,
Event, and AsyncResult now always remove the link object when
calling it, so ``unlink`` can sometimes be optimized out. See
:issue:`1487`.
1.5a2 (2019-10-21) 1.5a2 (2019-10-21)
================== ==================
......
...@@ -39,7 +39,7 @@ cdef class AbstractLinkable(object): ...@@ -39,7 +39,7 @@ cdef class AbstractLinkable(object):
cdef readonly SwitchOutGreenletWithLoop hub cdef readonly SwitchOutGreenletWithLoop hub
cdef _notifier cdef _notifier
cdef set _links cdef list _links
cdef bint _notify_all cdef bint _notify_all
cpdef rawlink(self, callback) cpdef rawlink(self, callback)
...@@ -47,6 +47,7 @@ cdef class AbstractLinkable(object): ...@@ -47,6 +47,7 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback) cpdef unlink(self, callback)
cdef _check_and_notify(self) cdef _check_and_notify(self)
@cython.nonecheck(False)
cpdef _notify_links(self) cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*) cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success) cdef _wait_return_value(self, waited, wait_success)
......
This diff is collapsed.
...@@ -5,19 +5,13 @@ __all__ = [ ...@@ -5,19 +5,13 @@ __all__ = [
] ]
# For times when *args is captured but often not passed (empty),
# we can avoid keeping the new tuple that was created for *args
# around by using a constant.
_NOARGS = ()
class callback(object): class callback(object):
__slots__ = ('callback', 'args') __slots__ = ('callback', 'args')
def __init__(self, cb, args): def __init__(self, cb, args):
self.callback = cb self.callback = cb
self.args = args or _NOARGS self.args = args
def stop(self): def stop(self):
self.callback = None self.callback = None
......
...@@ -36,6 +36,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -36,6 +36,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
The order in which waiters are awakened is not specified. It was not The order in which waiters are awakened is not specified. It was not
specified previously, but usually went in FIFO order. specified previously, but usually went in FIFO order.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
""" """
def __init__(self, value=1): def __init__(self, value=1):
......
...@@ -166,6 +166,7 @@ class BaseServer(object): ...@@ -166,6 +166,7 @@ class BaseServer(object):
raise TypeError("'handle' must be provided") raise TypeError("'handle' must be provided")
def _start_accepting_if_started(self, _event=None): def _start_accepting_if_started(self, _event=None):
print("Begin accepting. Already started?", self.started)
if self.started: if self.started:
self.start_accepting() self.start_accepting()
...@@ -209,6 +210,8 @@ class BaseServer(object): ...@@ -209,6 +210,8 @@ class BaseServer(object):
for _ in xrange(self.max_accept): for _ in xrange(self.max_accept):
if self.full(): if self.full():
self.stop_accepting() self.stop_accepting()
if self.pool is not None:
self.pool._semaphore.rawlink(self._start_accepting_if_started)
return return
try: try:
args = self.do_read() args = self.do_read()
......
...@@ -38,9 +38,15 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -38,9 +38,15 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
.. note:: .. note::
The order and timing in which waiting greenlets are awakened is not determined. The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
(those not waiting to be awakened) may run between the current greenlet yielding and (those not waiting to be awakened) may run between the current greenlet yielding and
the waiting greenlets being awakened. These details may change in the future. the waiting greenlets being awakened. These details may change in the future.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
""" """
__slots__ = ('_flag',) __slots__ = ('_flag',)
...@@ -181,6 +187,11 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -181,6 +187,11 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
.. versionchanged:: 1.1 .. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged. merged.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
""" """
__slots__ = ('_value', '_exc_info', '_imap_task_index') __slots__ = ('_value', '_exc_info', '_imap_task_index')
......
import gevent.testing as greentest from __future__ import print_function
from __future__ import absolute_import
import weakref
import gevent import gevent
import gevent.exceptions
from gevent.lock import Semaphore from gevent.lock import Semaphore
from gevent.thread import allocate_lock from gevent.thread import allocate_lock
import weakref
import gevent.testing as greentest
try: try:
from _thread import allocate_lock as std_allocate_lock from _thread import allocate_lock as std_allocate_lock
except ImportError: # Py2 except ImportError: # Py2
...@@ -34,6 +41,7 @@ class TestSemaphore(greentest.TestCase): ...@@ -34,6 +41,7 @@ class TestSemaphore(greentest.TestCase):
r = weakref.ref(s) r = weakref.ref(s)
self.assertEqual(s, r()) self.assertEqual(s, r())
@greentest.ignores_leakcheck
def test_semaphore_in_class_with_del(self): def test_semaphore_in_class_with_del(self):
# Issue #704. This used to crash the process # Issue #704. This used to crash the process
# under PyPy through at least 4.0.1 if the Semaphore # under PyPy through at least 4.0.1 if the Semaphore
...@@ -50,7 +58,6 @@ class TestSemaphore(greentest.TestCase): ...@@ -50,7 +58,6 @@ class TestSemaphore(greentest.TestCase):
gc.collect() gc.collect()
gc.collect() gc.collect()
test_semaphore_in_class_with_del.ignore_leakcheck = True
def test_rawlink_on_unacquired_runs_notifiers(self): def test_rawlink_on_unacquired_runs_notifiers(self):
# https://github.com/gevent/gevent/issues/1287 # https://github.com/gevent/gevent/issues/1287
...@@ -87,5 +94,81 @@ class TestCExt(greentest.TestCase): ...@@ -87,5 +94,81 @@ class TestCExt(greentest.TestCase):
'gevent.__semaphore') 'gevent.__semaphore')
class SwitchWithFixedHash(object):
# Replaces greenlet.switch with a callable object
# with a hash code we control.
def __init__(self, greenlet, hashcode):
self.switch = greenlet.switch
self.hashcode = hashcode
def __hash__(self):
return self.hashcode
def __eq__(self, other):
return self is other
def __call__(self, *args, **kwargs):
return self.switch(*args, **kwargs)
def __repr__(self):
return repr(self.switch)
class FirstG(gevent.Greenlet):
# A greenlet whose switch method will have a low hashcode.
hashcode = 10
def __init__(self, *args, **kwargs):
gevent.Greenlet.__init__(self, *args, **kwargs)
self.switch = SwitchWithFixedHash(self, self.hashcode)
class LastG(FirstG):
# A greenlet whose switch method will have a high hashcode.
hashcode = 12
def acquire_then_exit(sem, should_quit):
sem.acquire()
should_quit.append(True)
def acquire_then_spawn(sem, should_quit):
if should_quit:
return
sem.acquire()
g = FirstG.spawn(release_then_spawn, sem, should_quit)
g.join()
def release_then_spawn(sem, should_quit):
sem.release()
if should_quit:
return
g = FirstG.spawn(acquire_then_spawn, sem, should_quit)
g.join()
class TestSemaphoreFair(greentest.TestCase):
@greentest.ignores_leakcheck
def test_fair_or_hangs(self):
# If the lock isn't fair, this hangs, spinning between
# the last two greenlets.
# See https://github.com/gevent/gevent/issues/1487
sem = Semaphore()
should_quit = []
keep_going1 = FirstG.spawn(acquire_then_spawn, sem, should_quit)
keep_going2 = FirstG.spawn(acquire_then_spawn, sem, should_quit)
exiting = LastG.spawn(acquire_then_exit, sem, should_quit)
with self.assertRaises(gevent.exceptions.LoopExit):
gevent.joinall([keep_going1, keep_going2, exiting])
self.assertTrue(exiting.dead, exiting)
self.assertTrue(keep_going2.dead, keep_going2)
self.assertFalse(keep_going1.dead, keep_going1)
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
...@@ -449,13 +449,11 @@ else: ...@@ -449,13 +449,11 @@ else:
fn(future) fn(future)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
future.hub.print_exception((fn, future), *sys.exc_info()) future.hub.print_exception((fn, future), *sys.exc_info())
cbwrap.auto_unlink = True
return cbwrap return cbwrap
def _wrap(future, fn): def _wrap(future, fn):
def f(_): def f(_):
fn(future) fn(future)
f.auto_unlink = True
return f return f
class _FutureProxy(object): class _FutureProxy(object):
...@@ -490,8 +488,6 @@ else: ...@@ -490,8 +488,6 @@ else:
else: else:
w.add_exception(self) w.add_exception(self)
__when_done.auto_unlink = True
@property @property
def _state(self): def _state(self):
if self.done(): if self.done():
......
...@@ -427,7 +427,12 @@ class GreenletTree(object): ...@@ -427,7 +427,12 @@ class GreenletTree(object):
tree.child_multidata(pprint.pformat(tree_locals)) tree.child_multidata(pprint.pformat(tree_locals))
self.__render_locals(tree) self.__render_locals(tree)
self.__render_children(tree) try:
self.__render_children(tree)
except RuntimeError:
# If the tree is exceptionally deep, we can hit the recursion error.
# Usually it's several levels down so we can make a print call.
print("When rendering children", *sys.exc_info())
return tree.lines return tree.lines
def __render_children(self, tree): def __render_children(self, tree):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment