Commit 04287733 authored by Jason Madden's avatar Jason Madden

Make waiters on Events that arrive while the event is ready and notifying...

Make waiters on Events that arrive while the event is ready and notifying greenlets that had to block run after they do.

Fixes #1520
parent e5b896ea
Waiters on Event and Semaphore objects that call ``wait()`` or
``acquire()``, respectively, that find the Event already set, or the
Semaphore available, no longer "cut in line" and run before any
previously scheduled greenlets. They now run in the order in which
they arrived, just as waiters that had to block in those methods do.
...@@ -42,13 +42,19 @@ cdef class AbstractLinkable(object): ...@@ -42,13 +42,19 @@ cdef class AbstractLinkable(object):
cdef list _links cdef list _links
cdef bint _notify_all cdef bint _notify_all
cpdef linkcount(self)
cpdef rawlink(self, callback) cpdef rawlink(self, callback)
cpdef bint ready(self) cpdef bint ready(self)
cpdef unlink(self, callback) cpdef unlink(self, callback)
cdef _check_and_notify(self) cdef _check_and_notify(self)
@cython.nonecheck(False)
cdef _notify_link_list(self, list links)
@cython.nonecheck(False) @cython.nonecheck(False)
cpdef _notify_links(self) cpdef _notify_links(self, list arrived_while_waiting)
cdef _wait_core(self, timeout, catch=*) cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success) cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*) cdef _wait(self, timeout=*)
...@@ -81,14 +81,6 @@ class AbstractLinkable(object): ...@@ -81,14 +81,6 @@ class AbstractLinkable(object):
# Instances must define this # Instances must define this
raise NotImplementedError raise NotImplementedError
def _check_and_notify(self):
# If this object is ready to be notified, begin the process.
if self.ready() and self._links and not self._notifier:
if self.hub is None:
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links)
def rawlink(self, callback): def rawlink(self, callback):
""" """
Register a callback to call when this object is ready. Register a callback to call when this object is ready.
...@@ -116,45 +108,77 @@ class AbstractLinkable(object): ...@@ -116,45 +108,77 @@ class AbstractLinkable(object):
# But we can't set it to None in case it was actually running. # But we can't set it to None in case it was actually running.
self._notifier.stop() self._notifier.stop()
def _notify_links(self): def _check_and_notify(self):
# We release self._notifier here. We are called by it # If this object is ready to be notified, begin the process.
# at the end of the loop, and it is now false in a boolean way (as soon if self.ready() and self._links and not self._notifier:
# as this method returns). if self.hub is None:
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links, [])
def _notify_link_list(self, links):
# The core of the _notify_links method to notify
# links in order. Lets the ``links`` list be mutated,
# and only notifies up to the last item in the list, in case
# objects are added to it.
only_while_ready = not self._notify_all
final_link = links[-1]
done = set() # of ids
while links: # remember this can be mutated
if only_while_ready and not self.ready():
break
link = links.pop(0) # Cython optimizes using list internals
id_link = id(link)
if id_link not in done:
# XXX: JAM: What was I thinking? This doesn't make much sense,
# there's a good chance `link` will be deallocated, and its id() will
# be free to be reused.
done.add(id_link)
try:
link(self)
except: # pylint:disable=bare-except
# We're running in the hub, errors must not escape.
self.hub.handle_error((link, self), *sys.exc_info())
if link is final_link:
break
def _notify_links(self, arrived_while_waiting):
# ``arrived_while_waiting`` is a list of greenlet.switch methods
# to call. These were objects that called wait() while we were processing,
# and which would have run *before* those that had actually waited
# and blocked. Instead of returning True immediately, we add them to this
# list so they wait their turn.
# We release self._notifier here when done invoking links.
# The object itself becomes false in a boolean way as soon
# as this method returns.
notifier = self._notifier notifier = self._notifier
# Early links are allowed to remove later links, and links # Early links are allowed to remove later links, and links
# are allowed to add more links. # are allowed to add more links, thus we must not
# make a copy of our the ``_links`` list, we must traverse it and
# mutate in place.
# #
# We were ready() at the time this callback was scheduled; we # We were ready() at the time this callback was scheduled; we
# may not be anymore, and that status may change during # may not be anymore, and that status may change during
# callback processing. Some of our subclasses (Event) will # callback processing. Some of our subclasses (Event) will
# want to notify everyone who was registered when the status # want to notify everyone who was registered when the status
# became true that it was once true, even though it may not be # became true that it was once true, even though it may not be
# anymore. In that case, we must not keep notifying anyone that's # any more. In that case, we must not keep notifying anyone that's
# newly added after that, even if we go ready again. # newly added after that, even if we go ready again.
final_link = self._links[-1]
only_while_ready = not self._notify_all
done = set() # of ids
try: try:
while self._links: # remember this can be mutated self._notify_link_list(self._links)
if only_while_ready and not self.ready():
break # Now, those that arrived after we had begun the notification
# process. Follow the same rules, stop with those that are
link = self._links.pop(0) # Cython optimizes using list internals # added so far to prevent starvation.
if arrived_while_waiting:
id_link = id(link) self._notify_link_list(arrived_while_waiting)
if id_link not in done:
# XXX: JAM: What was I thinking? This doesn't make much sense, # Anything left needs to go back on the main list.
# there's a good chance `link` will be deallocated, and its id() will self._links.extend(arrived_while_waiting)
# be free to be reused.
done.add(id_link)
try:
link(self)
except: # pylint:disable=bare-except
# We're running in the hub, errors must not escape.
self.hub.handle_error((link, self), *sys.exc_info())
if link is final_link:
break
finally: finally:
# We should not have created a new notifier even if callbacks # We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the # released us because we loop through *all* of our links on the
...@@ -203,7 +227,17 @@ class AbstractLinkable(object): ...@@ -203,7 +227,17 @@ class AbstractLinkable(object):
def _wait(self, timeout=None): def _wait(self, timeout=None):
if self.ready(): if self.ready():
return self._wait_return_value(False, False) result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none
if self._notifier:
# We're already notifying waiters; one of them must have run
# and switched to us.
switch = getcurrent().switch # pylint:disable=undefined-variable
self._notifier.args[0].append(switch)
switch_result = self.hub.switch()
if switch_result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
return result
gotit = self._wait_core(timeout) gotit = self._wait_core(timeout)
return self._wait_return_value(True, gotit) return self._wait_return_value(True, gotit)
......
...@@ -22,31 +22,54 @@ def _get_linkable(): ...@@ -22,31 +22,54 @@ def _get_linkable():
locals()['AbstractLinkable'] = _get_linkable() locals()['AbstractLinkable'] = _get_linkable()
del _get_linkable del _get_linkable
# Sadly, something about the way we have to "import" AbstractLinkable
# breaks pylint's inference of slots, even though they're declared
# right here.
# pylint:disable=assigning-non-slot
class Event(AbstractLinkable): # pylint:disable=undefined-variable class Event(AbstractLinkable): # pylint:disable=undefined-variable
"""A synchronization primitive that allows one greenlet to wake up one or more others. """
It has the same interface as :class:`threading.Event` but works across greenlets. A synchronization primitive that allows one greenlet to wake up
one or more others. It has the same interface as
An event object manages an internal flag that can be set to true with the :class:`threading.Event` but works across greenlets.
:meth:`set` method and reset to false with the :meth:`clear` method. The :meth:`wait` method
blocks until the flag is true. An event object manages an internal flag that can be set to true
with the :meth:`set` method and reset to false with the
:meth:`clear` method. The :meth:`wait` method blocks until the
flag is true; as soon as the flag is set to true, all greenlets
that are currently blocked in a call to :meth:`wait` will be scheduled
to awaken.
Note that the flag may be cleared and set many times before
any individual greenlet runs; all the greenlet can know for sure is that the
flag was set *at least once* while it was waiting.
If the greenlet cares whether the flag is still
set, it must check with :meth:`ready` and possibly call back into
:meth:`wait` again.
.. note:: .. note::
The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a The exact order and timing in which waiting greenlets are awakened is not determined.
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
(those not waiting to be awakened) may run between the current greenlet yielding and Once the event is set, other greenlets may run before any waiting greenlets
the waiting greenlets being awakened. These details may change in the future. are awakened.
While the code here will awaken greenlets in the order in which they
waited, each such greenlet that runs may in turn cause other greenlets
to run.
These details may change in the future.
.. versionchanged:: 1.5a3 .. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
Waiting greenlets are now awakened in
the order in which they waited.
.. versionchanged:: 1.5a3 .. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them. The low-level ``rawlink`` method (most users won't use this) now
automatically unlinks waiters before calling them.
.. versionchanged:: NEXT
Callers to ``wait`` that find the event already set will now run
after any other waiters that had to block. See :issue:`1520`.
""" """
__slots__ = ('_flag',) __slots__ = ('_flag',)
...@@ -107,11 +130,11 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -107,11 +130,11 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
def wait(self, timeout=None): def wait(self, timeout=None):
""" """
Block until the internal flag is true. Block until this object is :meth:`ready`.
If the internal flag is true on entry, return immediately. Otherwise, If the internal flag is true on entry, return immediately. Otherwise,
block until another thread (greenlet) calls :meth:`set` to set the flag to true, block until another thread (greenlet) calls :meth:`set` to set the flag to true,
or until the optional timeout occurs. or until the optional *timeout* expires.
When the *timeout* argument is present and not ``None``, it should be a When the *timeout* argument is present and not ``None``, it should be a
floating point number specifying a timeout for the operation in seconds floating point number specifying a timeout for the operation in seconds
......
...@@ -25,7 +25,7 @@ class TestEventWait(AbstractGenericWaitTestCase): ...@@ -25,7 +25,7 @@ class TestEventWait(AbstractGenericWaitTestCase):
str(Event()) str(Event())
class TestWaitEvent(AbstractGenericWaitTestCase): class TestGeventWaitOnEvent(AbstractGenericWaitTestCase):
def wait(self, timeout): def wait(self, timeout):
gevent.wait([Event()], timeout=timeout) gevent.wait([Event()], timeout=timeout)
...@@ -246,6 +246,32 @@ class TestEventBasics(greentest.TestCase): ...@@ -246,6 +246,32 @@ class TestEventBasics(greentest.TestCase):
del e del e
del r del r
def test_wait_while_notifying(self):
# If someone calls wait() on an Event that is
# ready, and notifying other waiters, that new
# waiter still runs at the end, but this does not
# require a trip around the event loop.
# See https://github.com/gevent/gevent/issues/1520
event = Event()
results = []
def wait_then_append(arg):
event.wait()
results.append(arg)
gevent.spawn(wait_then_append, 1)
gevent.spawn(wait_then_append, 2)
gevent.idle()
self.assertEqual(2, event.linkcount())
check = gevent.get_hub().loop.check()
check.start(results.append, 4)
event.set()
wait_then_append(3)
self.assertEqual(results, [1, 2, 3])
# Note that the check event DID NOT run.
check.stop()
check.close()
del AbstractGenericGetTestCase del AbstractGenericGetTestCase
del AbstractGenericWaitTestCase del AbstractGenericWaitTestCase
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment