Commit 821e7fc8 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1155 from gevent/cython-waiter

Compile gevent.queue and gevent.hub.waiter with Cython
parents b61f9e91 870e8e13
......@@ -13,6 +13,9 @@ src/gevent/greenlet.c
src/gevent/_ident.c
src/gevent/_imap.c
src/gevent/event.c
src/gevent/_hub_local.c
src/gevent/_waiter.c
src/gevent/queue.c
src/gevent/libev/corecext.c
src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c
......
......@@ -50,7 +50,8 @@ Enhancements
reducing the overhead of ``[Thread]Pool.imap``.
- The classes `gevent.event.Event` and `gevent.event.AsyncResult`
are compiled with Cython for improved performance. Please report any
are compiled with Cython for improved performance, as is the
``gevent.queue`` module and ``gevent.hub.Waiter``. Please report any
compatibility issues.
Monitoring and Debugging
......
......@@ -20,9 +20,9 @@ def _b_no_block(q):
for i in range(N):
j = q.get()
assert i == j
assert i == j, (i, j)
def bench_unbounded_queue_noblock(kind=queue.Queue):
def bench_unbounded_queue_noblock(kind=queue.UnboundQueue):
_b_no_block(kind())
def bench_bounded_queue_noblock(kind=queue.Queue):
......
......@@ -104,6 +104,21 @@ EVENT = Extension(name="gevent._event",
depends=['src/gevent/_event.pxd'],
include_dirs=include_dirs)
QUEUE = Extension(name="gevent._queue",
sources=["src/gevent/queue.py"],
depends=['src/gevent/_queue.pxd'],
include_dirs=include_dirs)
HUB_LOCAL = Extension(name="gevent.__hub_local",
sources=["src/gevent/_hub_local.py"],
depends=['src/gevent/__hub_local.pxd'],
include_dirs=include_dirs)
WAITER = Extension(name="gevent.__waiter",
sources=["src/gevent/_waiter.py"],
depends=['src/gevent/__waiter.pxd'],
include_dirs=include_dirs)
_to_cythonize = [
SEMAPHORE,
......@@ -112,6 +127,9 @@ _to_cythonize = [
IDENT,
IMAP,
EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
]
EXT_MODULES = [
......@@ -123,6 +141,9 @@ EXT_MODULES = [
IDENT,
IMAP,
EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
]
LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi'
......@@ -191,6 +212,14 @@ if PYPY:
EXT_MODULES.remove(EVENT)
_to_cythonize.remove(EVENT)
EXT_MODULES.remove(QUEUE)
_to_cythonize.remove(QUEUE)
EXT_MODULES.remove(HUB_LOCAL)
_to_cythonize.remove(HUB_LOCAL)
EXT_MODULES.remove(WAITER)
_to_cythonize.remove(WAITER)
for mod in _to_cythonize:
EXT_MODULES.remove(mod)
......
cdef _threadlocal
cpdef get_hub_class()
cpdef get_hub_if_exists()
cpdef set_hub(hub)
cpdef get_loop()
cpdef set_loop(loop)
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
cpdef get_hub_noargs()
cimport cython
from gevent._greenlet cimport Greenlet
from gevent.__semaphore cimport Semaphore
from gevent._queue cimport UnboundQueue
@cython.freelist(100)
@cython.internal
@cython.final
cdef class Failure:
cdef readonly exc
cdef raise_exception
......@@ -17,10 +20,8 @@ cdef class IMapUnordered(Greenlet):
cdef Semaphore _result_semaphore
cdef int _outstanding_tasks
cdef int _max_index
cdef _queue_get
cdef _queue_put
cdef readonly queue
cdef readonly UnboundQueue queue
cdef readonly bint finished
cdef _inext(self)
......
# cython: auto_pickle=False
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef Timeout
from _greenlet cimport get_hub
cdef bint _greenlet_imported
......
cimport cython
cdef sys
cdef ConcurrentObjectUseError
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef _NONE
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef class Waiter:
cdef readonly hub
cdef readonly greenlet
cdef readonly value
cdef _exception
@cython.final
@cython.internal
cdef class MultipleWaiter(Waiter):
cdef list _values
......@@ -2,10 +2,13 @@
cimport cython
from gevent.__ident cimport IdentRegistry
cdef bint _greenlet_imported
from gevent.__hub_local cimport get_hub_noargs as get_hub
from gevent.__waiter cimport Waiter
cdef bint _PYPY
cdef sys_getframe
cdef sys_exc_info
cdef Timeout
cdef extern from "greenlet/greenlet.h":
......@@ -18,9 +21,12 @@ cdef extern from "greenlet/greenlet.h":
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
@cython.final
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef bint _greenlet_imported
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
......@@ -125,11 +131,6 @@ cdef class Greenlet(greenlet):
# cpdef _raise_exception(self)
@cython.final
cdef greenlet get_hub()
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
# Declare a bunch of imports as cdefs so they can
# be accessed directly as static vars without
......
# -*- coding: utf-8 -*-
# copyright 2018 gevent. See LICENSE
"""
Maintains the thread local hub.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gevent.monkey import get_original
from gevent._compat import thread_mod_name
__all__ = [
'get_hub',
'get_hub_noargs',
'get_hub_if_exists',
]
# These must be the "real" native thread versions,
# not monkey-patched.
class _Threadlocal(get_original(thread_mod_name, '_local')):
def __init__(self):
# Use a class with an initializer so that we can test
# for 'is None' instead of catching AttributeError, making
# the code cleaner and possibly solving some corner cases
# (like #687)
super(_Threadlocal, self).__init__()
self.Hub = None
self.loop = None
self.hub = None
_threadlocal = _Threadlocal()
Hub = None # Set when gevent.hub is imported
def get_hub_class():
"""Return the type of hub to use for the current thread.
If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
"""
hubtype = _threadlocal.Hub
if hubtype is None:
hubtype = _threadlocal.Hub = Hub
return hubtype
def set_default_hub_class(hubtype):
global Hub
Hub = hubtype
def get_hub(*args, **kwargs):
"""
Return the hub for the current thread.
If a hub does not exist in the current thread, a new one is
created of the type returned by :func:`get_hub_class`.
.. deprecated:: 1.3b1
The ``*args`` and ``**kwargs`` arguments are deprecated. They were
only used when the hub was created, and so were non-deterministic---to be
sure they were used, *all* callers had to pass them, or they were order-dependent.
Use ``set_hub`` instead.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
def get_hub_noargs():
# Just like get_hub, but cheaper to call because it
# takes no arguments or kwargs. See also a copy in
# gevent/greenlet.py
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
def get_hub_if_exists():
"""Return the hub for the current thread.
Return ``None`` if no hub has been created yet.
"""
return _threadlocal.hub
def set_hub(hub):
_threadlocal.hub = hub
def get_loop():
return _threadlocal.loop
def set_loop(loop):
_threadlocal.loop = loop
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__hub_local')
......@@ -12,6 +12,7 @@ from __future__ import print_function
from gevent import _semaphore
from gevent import queue
__all__ = [
......@@ -21,6 +22,7 @@ __all__ = [
locals()['Greenlet'] = __import__('gevent').Greenlet
locals()['Semaphore'] = _semaphore.Semaphore
locals()['UnboundQueue'] = queue.UnboundQueue
class Failure(object):
......@@ -58,15 +60,14 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
.. versionchanged:: 1.1b3
Added the *maxsize* parameter.
"""
from gevent.queue import Queue
super(IMapUnordered, self).__init__()
Greenlet.__init__(self) # pylint:disable=undefined-variable
self.spawn = spawn
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
self._queue_get = self.queue.get
self._queue_put = self.queue.put
self.queue = UnboundQueue() # pylint:disable=undefined-variable
if maxsize:
# Bounding the queue is not enough if we want to keep from
# accumulating objects; the result value will be around as
......@@ -109,7 +110,7 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
next = __next__ # Py2
def _inext(self):
return self._queue_get()
return self.queue.get()
def _ispawn(self, func, item, item_index):
if self._result_semaphore is not None:
......@@ -149,12 +150,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
put_finished = True
if greenlet.successful():
self._queue_put(self._iqueue_value_for_success(greenlet))
self.queue.put(self._iqueue_value_for_success(greenlet))
else:
self._queue_put(self._iqueue_value_for_failure(greenlet))
self.queue.put(self._iqueue_value_for_failure(greenlet))
if put_finished:
self._queue_put(self._iqueue_value_for_self_finished())
self.queue.put(self._iqueue_value_for_self_finished())
def _on_finish(self, exception):
# Called in this greenlet.
......@@ -163,12 +164,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
if exception is not None:
self.finished = True
self._queue_put(self._iqueue_value_for_self_failure(exception))
self.queue.put(self._iqueue_value_for_self_failure(exception))
return
if self._outstanding_tasks <= 0:
self.finished = True
self._queue_put(self._iqueue_value_for_self_finished())
self.queue.put(self._iqueue_value_for_self_finished())
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
......@@ -202,7 +203,7 @@ class IMap(IMapUnordered):
except KeyError:
# Wait for our index to finish.
while 1:
index, value = self._queue_get()
index, value = self.queue.get()
if index == self.index:
break
else:
......
cimport cython
from gevent.__waiter cimport Waiter
from gevent._event cimport Event
cdef _heappush
cdef _heappop
cdef _heapify
@cython.final
cdef _safe_remove(deq, item)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly queue
cdef class Queue:
cdef readonly hub
cdef readonly queue
cdef Py_ssize_t _maxsize
cdef getters
cdef putters
cdef _event_unlock
cpdef _get(self)
cpdef _put(self, item)
cpdef _peek(self)
cpdef Py_ssize_t qsize(self)
cpdef bint empty(self)
cpdef bint full(self)
cpdef put(self, item, block=*, timeout=*)
cpdef put_nowait(self, item)
cdef __get_or_peek(self, method, block, timeout)
cpdef get(self, block=*, timeout=*)
cpdef get_nowait(self)
cpdef peek(self, block=*, timeout=*)
cpdef peek_nowait(self)
cdef _schedule_unlock(self)
@cython.final
cdef class UnboundQueue(Queue):
pass
cdef class PriorityQueue(Queue):
pass
cdef class LifoQueue(Queue):
pass
cdef class JoinableQueue(Queue):
cdef Event _cond
cdef readonly int unfinished_tasks
cdef class Channel:
cdef readonly getters
cdef readonly putters
cdef readonly hub
cdef _event_unlock
cpdef get(self, block=*, timeout=*)
cpdef get_nowait(self)
cdef _schedule_unlock(self)
......@@ -70,8 +70,8 @@ __imports__.extend(__py3_imports__)
import time
import sys
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import ConcurrentObjectUseError
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.exceptions import ConcurrentObjectUseError
from gevent.timeout import Timeout
from gevent._compat import string_types, integer_types, PY3
from gevent._util import copy_globals
......
# -*- coding: utf-8 -*-
# copyright 2018 gevent
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
Low-level waiting primitives.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.exceptions import ConcurrentObjectUseError
__all__ = [
'Waiter',
]
_NONE = object()
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
class Waiter(object):
"""
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception.
The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hello from Waiter')
>>> result.get() # blocks for 0.1 seconds
'hello from Waiter'
>>> timer.close()
If switch is called before the greenlet gets a chance to call :meth:`get` then
:class:`Waiter` stores the value.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hi from Waiter')
>>> sleep(0.2)
>>> result.get() # returns immediately without blocking
'hi from Waiter'
>>> timer.close()
.. warning::
This a limited and dangerous way to communicate between
greenlets. It can easily leave a greenlet unscheduled forever
if used incorrectly. Consider using safer classes such as
:class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
or :class:`gevent.queue.Queue`.
"""
__slots__ = ['hub', 'greenlet', 'value', '_exception']
def __init__(self, hub=None):
self.hub = get_hub() if hub is None else hub
self.greenlet = None
self.value = None
self._exception = _NONE
def clear(self):
self.greenlet = None
self.value = None
self._exception = _NONE
def __str__(self):
if self._exception is _NONE:
return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
if self._exception is None:
return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
def ready(self):
"""Return true if and only if it holds a value or an exception"""
return self._exception is not _NONE
def successful(self):
"""Return true if and only if it is ready and holds a value"""
return self._exception is None
@property
def exc_info(self):
"Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
if getcurrent() is not self.hub: # pylint:disable=undefined-variable
raise AssertionError("Can only use Waiter.switch method from the Hub greenlet")
switch = greenlet.switch
try:
switch(value)
except: # pylint:disable=bare-except
self.hub.handle_error(switch, *sys.exc_info())
def switch_args(self, *args):
return self.switch(args)
def throw(self, *throw_args):
"""Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
greenlet = self.greenlet
if greenlet is None:
self._exception = throw_args
else:
if getcurrent() is not self.hub: # pylint:disable=undefined-variable
raise AssertionError("Can only use Waiter.switch method from the Hub greenlet")
throw = greenlet.throw
try:
throw(*throw_args)
except: # pylint:disable=bare-except
self.hub.handle_error(throw, *sys.exc_info())
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception) # pylint:disable=undefined-variable
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent() # pylint:disable=undefined-variable
try:
return self.hub.switch()
finally:
self.greenlet = None
def __call__(self, source):
if source.exception is None:
self.switch(source.value)
else:
self.throw(source.exception)
# can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
# and unwraps it in wait() thus checking that switch() was indeed called
class MultipleWaiter(Waiter):
"""
An internal extension of Waiter that can be used if multiple objects
must be waited on, and there is a chance that in between waits greenlets
might be switched out. All greenlets that switch to this waiter
will have their value returned.
This does not handle exceptions or throw methods.
"""
__slots__ = ['_values']
def __init__(self, hub=None):
Waiter.__init__(self, hub)
# we typically expect a relatively small number of these to be outstanding.
# since we pop from the left, a deque might be slightly
# more efficient, but since we're in the hub we avoid imports if
# we can help it to better support monkey-patching, and delaying the import
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
self._values.append(value)
Waiter.switch(self, True)
def get(self):
if not self._values:
Waiter.get(self)
Waiter.clear(self)
return self._values.pop(0)
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__waiter')
# -*- coding: utf-8 -*-
# copyright 2018 gevent
"""
Exceptions.
.. versionadded:: 1.3b1
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__all__ = [
'LoopExit',
]
class LoopExit(Exception):
"""
Exception thrown when the hub finishes running.
In a normal application, this is never thrown or caught
explicitly. The internal implementation of functions like
:func:`join` and :func:`joinall` may catch it, but user code
generally should not.
.. caution::
Errors in application programming can also lead to this exception being
raised. Some examples include (but are not limited too):
- greenlets deadlocking on a lock;
- using a socket or other gevent object with native thread
affinity from a different thread
"""
class BlockingSwitchOutError(AssertionError):
pass
class InvalidSwitchError(AssertionError):
pass
class ConcurrentObjectUseError(AssertionError):
# raised when an object is used (waited on) by two greenlets
# independently, meaning the object was entered into a blocking
# state by one greenlet and then another while still blocking in the
# first one
pass
......@@ -17,9 +17,6 @@ from gevent._tblib import load_traceback
from gevent.hub import GreenletExit
from gevent.hub import InvalidSwitchError
from gevent.hub import Waiter
from gevent.hub import _threadlocal
from gevent.hub import get_hub_class
from gevent.hub import iwait
from gevent.hub import wait
......@@ -28,6 +25,8 @@ from gevent.timeout import Timeout
from gevent._config import config as GEVENT_CONFIG
from gevent._util import Lazy
from gevent._util import readproperty
from gevent._hub_local import get_hub_noargs as get_hub
from gevent import _waiter
__all__ = [
......@@ -37,23 +36,13 @@ __all__ = [
]
# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
def get_hub():
# This is identical to gevent.hub._get_hub_noargs so that it
# can be inlined for greenlet spawning by cython.
# It is also cimported in semaphore.pxd
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
locals()['Waiter'] = _waiter.Waiter
if _PYPY:
......@@ -649,7 +638,7 @@ class Greenlet(greenlet):
if self.dead:
self.__handle_death_before_start((exception,))
else:
waiter = Waiter() if block else None
waiter = Waiter() if block else None # pylint:disable=undefined-variable
self.parent.loop.run_callback(_kill, self, exception, waiter)
if block:
waiter.get()
......@@ -966,7 +955,7 @@ def killall(greenlets, exception=GreenletExit, block=True, timeout=None):
return
loop = greenlets[0].loop
if block:
waiter = Waiter()
waiter = Waiter() # pylint:disable=undefined-variable
loop.run_callback(_killall3, greenlets, exception, waiter)
t = Timeout._start_new_or_dummy(timeout)
try:
......
......@@ -6,7 +6,7 @@ from __future__ import absolute_import, print_function
# XXX: FIXME: Refactor to make this smaller
# pylint:disable=too-many-lines
from functools import partial as _functools_partial
import os
import sys
import traceback
from weakref import ref as wref
......@@ -31,7 +31,6 @@ __all__ = [
]
from gevent._config import config as GEVENT_CONFIG
from gevent._compat import string_types
from gevent._compat import xrange
from gevent._compat import thread_mod_name
from gevent._util import _NONE
......@@ -40,65 +39,27 @@ from gevent._util import Lazy
from gevent._util import gmctime
from gevent._ident import IdentRegistry
from gevent.monkey import get_original
from gevent._hub_local import get_hub
from gevent._hub_local import get_loop
from gevent._hub_local import set_hub
from gevent._hub_local import set_loop
from gevent._hub_local import get_hub_if_exists as _get_hub
from gevent._hub_local import get_hub_noargs as _get_hub_noargs
from gevent._hub_local import set_default_hub_class
from gevent.monkey import get_original
# These must be the "real" native thread versions,
# not monkey-patched.
class _Threadlocal(get_original(thread_mod_name, '_local')):
from gevent.exceptions import LoopExit
from gevent.exceptions import BlockingSwitchOutError
from gevent.exceptions import InvalidSwitchError
def __init__(self):
# Use a class with an initializer so that we can test
# for 'is None' instead of catching AttributeError, making
# the code cleaner and possibly solving some corner cases
# (like #687)
super(_Threadlocal, self).__init__()
self.Hub = None
self.loop = None
self.hub = None
_threadlocal = _Threadlocal()
from gevent._waiter import Waiter
from gevent._waiter import MultipleWaiter as _MultipleWaiter
get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
class LoopExit(Exception):
"""
Exception thrown when the hub finishes running.
In a normal application, this is never thrown or caught
explicitly. The internal implementation of functions like
:func:`join` and :func:`joinall` may catch it, but user code
generally should not.
.. caution::
Errors in application programming can also lead to this exception being
raised. Some examples include (but are not limited too):
- greenlets deadlocking on a lock;
- using a socket or other gevent object with native thread
affinity from a different thread
"""
class BlockingSwitchOutError(AssertionError):
pass
class InvalidSwitchError(AssertionError):
pass
class ConcurrentObjectUseError(AssertionError):
# raised when an object is used (waited on) by two greenlets
# independently, meaning the object was entered into a blocking
# state by one greenlet and then another while still blocking in the
# first one
pass
class TrackedRawGreenlet(RawGreenlet):
def __init__(self, function, parent):
......@@ -201,7 +162,7 @@ def sleep(seconds=0, ref=True):
hub = _get_hub_noargs()
loop = hub.loop
if seconds <= 0:
waiter = Waiter()
waiter = Waiter(hub)
loop.run_callback(waiter.switch)
waiter.get()
else:
......@@ -387,56 +348,6 @@ def reinit():
#sleep(0.00001)
def get_hub_class():
"""Return the type of hub to use for the current thread.
If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
"""
hubtype = _threadlocal.Hub
if hubtype is None:
hubtype = _threadlocal.Hub = Hub
return hubtype
def get_hub(*args, **kwargs):
"""
Return the hub for the current thread.
If a hub does not exist in the current thread, a new one is
created of the type returned by :func:`get_hub_class`.
.. deprecated:: 1.3b1
The ``*args`` and ``**kwargs`` arguments are deprecated. They were
only used when the hub was created, and so were non-deterministic---to be
sure they were used, *all* callers had to pass them, or they were order-dependent.
Use ``set_hub`` instead.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
def _get_hub_noargs():
# Just like get_hub, but cheaper to call because it
# takes no arguments or kwargs. See also a copy in
# gevent/greenlet.py
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
def _get_hub():
"""Return the hub for the current thread.
Return ``None`` if no hub has been created yet.
"""
return _threadlocal.hub
def set_hub(hub):
_threadlocal.hub = hub
class _dummy_greenlet(object):
......@@ -446,12 +357,6 @@ class _dummy_greenlet(object):
_dummy_greenlet = _dummy_greenlet()
def _config(default, envvar):
result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
if isinstance(result, string_types):
return result.split(',')
return result
hub_ident_registry = IdentRegistry()
class Hub(TrackedRawGreenlet):
......@@ -501,11 +406,11 @@ class Hub(TrackedRawGreenlet):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
elif _threadlocal.loop is not None:
elif get_loop() is not None:
# Reuse a loop instance previously set by
# destroying a hub without destroying the associated
# loop. See #237 and #238.
self.loop = _threadlocal.loop
self.loop = get_loop()
else:
if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False
......@@ -539,7 +444,6 @@ class Hub(TrackedRawGreenlet):
"""
return self.thread_ident == MAIN_THREAD_IDENT
def __repr__(self):
if self.loop is None:
info = 'destroyed'
......@@ -768,7 +672,7 @@ class Hub(TrackedRawGreenlet):
if self.dead:
return True
waiter = Waiter()
waiter = Waiter(self)
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
......@@ -798,18 +702,20 @@ class Hub(TrackedRawGreenlet):
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
if _threadlocal.loop is self.loop:
if get_loop() is self.loop:
# Don't let anyone try to reuse this
_threadlocal.loop = None
set_loop(None)
self.loop.destroy()
else:
# Store in case another hub is created for this
# thread.
_threadlocal.loop = self.loop
set_loop(self.loop)
self.loop = None
if _threadlocal.hub is self:
_threadlocal.hub = None
if _get_hub() is self:
set_hub(None)
# XXX: We can probably simplify the resolver and threadpool properties.
......@@ -850,167 +756,7 @@ class Hub(TrackedRawGreenlet):
threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
class Waiter(object):
"""
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception.
The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hello from Waiter')
>>> result.get() # blocks for 0.1 seconds
'hello from Waiter'
>>> timer.close()
If switch is called before the greenlet gets a chance to call :meth:`get` then
:class:`Waiter` stores the value.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hi from Waiter')
>>> sleep(0.2)
>>> result.get() # returns immediately without blocking
'hi from Waiter'
>>> timer.close()
.. warning::
This a limited and dangerous way to communicate between
greenlets. It can easily leave a greenlet unscheduled forever
if used incorrectly. Consider using safer classes such as
:class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
or :class:`gevent.queue.Queue`.
"""
__slots__ = ['hub', 'greenlet', 'value', '_exception']
def __init__(self, hub=None):
self.hub = _get_hub_noargs() if hub is None else hub
self.greenlet = None
self.value = None
self._exception = _NONE
def clear(self):
self.greenlet = None
self.value = None
self._exception = _NONE
def __str__(self):
if self._exception is _NONE:
return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
if self._exception is None:
return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
def ready(self):
"""Return true if and only if it holds a value or an exception"""
return self._exception is not _NONE
def successful(self):
"""Return true if and only if it is ready and holds a value"""
return self._exception is None
@property
def exc_info(self):
"Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
switch = greenlet.switch
try:
switch(value)
except: # pylint:disable=bare-except
self.hub.handle_error(switch, *sys.exc_info())
def switch_args(self, *args):
return self.switch(args)
def throw(self, *throw_args):
"""Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
greenlet = self.greenlet
if greenlet is None:
self._exception = throw_args
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
throw = greenlet.throw
try:
throw(*throw_args)
except: # pylint:disable=bare-except
self.hub.handle_error(throw, *sys.exc_info())
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception)
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent()
try:
return self.hub.switch()
finally:
self.greenlet = None
def __call__(self, source):
if source.exception is None:
self.switch(source.value)
else:
self.throw(source.exception)
# can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
# and unwraps it in wait() thus checking that switch() was indeed called
class _MultipleWaiter(Waiter):
"""
An internal extension of Waiter that can be used if multiple objects
must be waited on, and there is a chance that in between waits greenlets
might be switched out. All greenlets that switch to this waiter
will have their value returned.
This does not handle exceptions or throw methods.
"""
__slots__ = ['_values']
def __init__(self, *args, **kwargs):
Waiter.__init__(self, *args, **kwargs)
# we typically expect a relatively small number of these to be outstanding.
# since we pop from the left, a deque might be slightly
# more efficient, but since we're in the hub we avoid imports if
# we can help it to better support monkey-patching, and delaying the import
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
self._values.append(value)
Waiter.switch(self, True)
def get(self):
if not self._values:
Waiter.get(self)
Waiter.clear(self)
return self._values.pop(0)
set_default_hub_class(Hub)
def iwait(objects, timeout=None, count=None):
......@@ -1036,16 +782,17 @@ def iwait(objects, timeout=None, count=None):
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
hub = _get_hub_noargs()
if objects is None:
yield _get_hub_noargs().join(timeout=timeout)
yield hub.join(timeout=timeout)
return
count = len(objects) if count is None else min(count, len(objects))
waiter = _MultipleWaiter()
waiter = _MultipleWaiter(hub)
switch = waiter.switch
if timeout is not None:
timer = _get_hub_noargs().loop.timer(timeout, priority=-1)
timer = hub.loop.timer(timeout, priority=-1)
timer.start(switch, _NONE)
try:
......
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
# copyright (c) 2018 gevent
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
Synchronized queues.
......@@ -27,7 +29,9 @@ class, not an instance or subclass).
from __future__ import absolute_import
import sys
import heapq
from heapq import heappush as _heappush
from heapq import heappop as _heappop
from heapq import heapify as _heapify
import collections
if sys.version_info[0] == 2:
......@@ -38,10 +42,9 @@ Full = __queue__.Full
Empty = __queue__.Empty
from gevent.timeout import Timeout
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import Waiter
from gevent.hub import getcurrent
from gevent.hub import InvalidSwitchError
from gevent._hub_local import get_hub_noargs as get_hub
from greenlet import getcurrent
from gevent.exceptions import InvalidSwitchError
__implements__ = ['Queue', 'PriorityQueue', 'LifoQueue']
......@@ -61,12 +64,18 @@ def _safe_remove(deq, item):
except ValueError:
pass
import gevent._waiter
locals()['Waiter'] = gevent._waiter.Waiter
class ItemWaiter(Waiter):
__slots__ = ['item', 'queue']
class ItemWaiter(Waiter): # pylint:disable=undefined-variable
# pylint:disable=assigning-non-slot
__slots__ = (
'item',
'queue',
)
def __init__(self, item, queue):
Waiter.__init__(self)
Waiter.__init__(self) # pylint:disable=undefined-variable
self.item = item
self.queue = queue
......@@ -97,19 +106,26 @@ class Queue(object):
previously anyway, but that wasn't the case for PyPy.
"""
_warn_depth = 2
__slots__ = (
'_maxsize',
'getters',
'putters',
'hub',
'_event_unlock',
'queue',
)
def __init__(self, maxsize=None, items=None):
def __init__(self, maxsize=None, items=(), _warn_depth=2):
if maxsize is not None and maxsize <= 0:
self.maxsize = None
if maxsize == 0:
import warnings
warnings.warn(
'Queue(0) now equivalent to Queue(None); if you want a channel, use Channel',
DeprecationWarning,
stacklevel=self._warn_depth)
else:
self.maxsize = maxsize
stacklevel=_warn_depth)
maxsize = None
self._maxsize = maxsize if maxsize is not None else -1
# Explicitly maintain order for getters and putters that block
# so that callers can consistently rely on getting things out
# in the apparent order they went in. This was once required by
......@@ -126,23 +142,25 @@ class Queue(object):
self.putters = collections.deque()
self.hub = get_hub()
self._event_unlock = None
if items:
self._init(maxsize, items)
else:
self._init(maxsize)
self.queue = self._create_queue(items)
# QQQ make maxsize into a property with setter that schedules unlock if necessary
@property
def maxsize(self):
return self._maxsize if self._maxsize > 0 else None
@maxsize.setter
def maxsize(self, nv):
# QQQ make maxsize into a property with setter that schedules unlock if necessary
if nv is None or nv <= 0:
self._maxsize = -1
else:
self._maxsize = nv
def copy(self):
return type(self)(self.maxsize, self.queue)
def _init(self, maxsize, items=None):
# FIXME: Why is maxsize unused or even passed?
# pylint:disable=unused-argument
if items:
self.queue = collections.deque(items)
else:
self.queue = collections.deque()
def _create_queue(self, items=()):
return collections.deque(items)
def _get(self):
return self.queue.popleft()
......@@ -198,7 +216,12 @@ class Queue(object):
to return True for backwards compatibility.
"""
return True
__nonzero__ = __bool__
def __nonzero__(self):
# Py2.
# For Cython; __bool__ becomes a special method that we can't
# get by name.
return True
def empty(self):
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
......@@ -209,7 +232,7 @@ class Queue(object):
``Queue(None)`` is never full.
"""
return self.maxsize is not None and self.qsize() >= self.maxsize
return self._maxsize > 0 and self.qsize() >= self._maxsize
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
......@@ -222,7 +245,7 @@ class Queue(object):
is immediately available, else raise the :class:`Full` exception (*timeout*
is ignored in that case).
"""
if self.maxsize is None or self.qsize() < self.maxsize:
if self._maxsize == -1 or self.qsize() < self._maxsize:
# there's a free slot, put an item right away
self._put(item)
if self.getters:
......@@ -230,10 +253,10 @@ class Queue(object):
elif self.hub is getcurrent():
# We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
# Check if possible to get a free slot in the queue.
while self.getters and self.qsize() and self.qsize() >= self.maxsize:
while self.getters and self.qsize() and self.qsize() >= self._maxsize:
getter = self.getters.popleft()
getter.switch(getter)
if self.qsize() < self.maxsize:
if self.qsize() < self._maxsize:
self._put(item)
return
raise Full
......@@ -283,7 +306,7 @@ class Queue(object):
# to return. No choice...
raise Empty()
waiter = Waiter()
waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
try:
self.getters.append(waiter)
......@@ -349,7 +372,7 @@ class Queue(object):
def _unlock(self):
while True:
repeat = False
if self.putters and (self.maxsize is None or self.qsize() < self.maxsize):
if self.putters and (self._maxsize == -1 or self.qsize() < self._maxsize):
repeat = True
try:
putter = self.putters.popleft()
......@@ -372,16 +395,31 @@ class Queue(object):
def __iter__(self):
return self
def next(self):
def __next__(self):
result = self.get()
if result is StopIteration:
raise result
return result
__next__ = next
next = __next__ # Py2
class UnboundQueue(Queue):
# A specialization of Queue that knows it can never
# be bound. Changing its maxsize has no effect.
__slots__ = ()
def __init__(self, maxsize=None, items=()):
if maxsize is not None:
raise ValueError("UnboundQueue has no maxsize")
Queue.__init__(self, maxsize, items)
self.putters = None # Will never be used.
def put(self, item, block=True, timeout=None):
self._put(item)
if self.getters:
self._schedule_unlock()
class PriorityQueue(Queue):
......@@ -395,30 +433,27 @@ class PriorityQueue(Queue):
Previously it was just assumed that they were already a heap.
'''
def _init(self, maxsize, items=None):
if items:
self.queue = list(items)
heapq.heapify(self.queue)
else:
self.queue = []
__slots__ = ()
def _create_queue(self, items=()):
q = list(items)
_heapify(q)
return q
def _put(self, item, heappush=heapq.heappush):
# pylint:disable=arguments-differ
heappush(self.queue, item)
def _put(self, item):
_heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
# pylint:disable=arguments-differ
return heappop(self.queue)
def _get(self):
return _heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize, items=None):
if items:
self.queue = list(items)
else:
self.queue = []
__slots__ = ()
def _create_queue(self, items=()):
return list(items)
def _put(self, item):
self.queue.append(item)
......@@ -436,9 +471,12 @@ class JoinableQueue(Queue):
:meth:`task_done` and :meth:`join` methods.
"""
_warn_depth = 3
__slots__ = (
'_cond',
'unfinished_tasks',
)
def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
def __init__(self, maxsize=None, items=(), unfinished_tasks=None):
"""
.. versionchanged:: 1.1a1
......@@ -446,8 +484,9 @@ class JoinableQueue(Queue):
(if any) will be considered unfinished.
"""
Queue.__init__(self, maxsize, items, _warn_depth=3)
from gevent.event import Event
Queue.__init__(self, maxsize, items)
self._cond = Event()
self._cond.set()
......@@ -514,6 +553,13 @@ class JoinableQueue(Queue):
class Channel(object):
__slots__ = (
'getters',
'putters',
'hub',
'_event_unlock',
)
def __init__(self, maxsize=1):
# We take maxsize to simplify certain kinds of code
if maxsize != 1:
......@@ -561,7 +607,7 @@ class Channel(object):
if not block:
timeout = 0
waiter = Waiter()
waiter = Waiter() # pylint:disable=undefined-variable
item = (item, waiter)
self.putters.append(item)
timeout = Timeout._start_new_or_dummy(timeout, Full)
......@@ -590,7 +636,7 @@ class Channel(object):
if not block:
timeout = 0
waiter = Waiter()
waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
try:
self.getters.append(waiter)
......@@ -620,10 +666,13 @@ class Channel(object):
def __iter__(self):
return self
def next(self):
def __next__(self):
result = self.get()
if result is StopIteration:
raise result
return result
__next__ = next # py3
next = __next__ # Py2
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent._queue')
......@@ -101,7 +101,8 @@ main = unittest.main
from greentest.hub import QuietHub
import gevent.hub
gevent.hub.Hub = QuietHub
gevent.hub.set_default_hub_class(QuietHub)
from greentest.sockets import bind_and_listen
......
......@@ -42,6 +42,10 @@ NO_ALL = [
'gevent._patcher',
]
ALLOW_IMPLEMENTS = [
'gevent._queue',
]
# A list of modules that may contain things that aren't actually, technically,
# extensions, but that need to be in __extensions__ anyway due to the way,
# for example, monkey patching, needs to work.
......@@ -76,6 +80,8 @@ class Test(unittest.TestCase):
def check_implements_presence_justified(self):
"Check that __implements__ is present only if the module is modeled after a module from stdlib (like gevent.socket)."
if self.modname in ALLOW_IMPLEMENTS:
return
if self.__implements__ is not None and self.stdlib_module is None:
raise AssertionError('%r has __implements__ but no stdlib counterpart (%s)'
% (self.modname, self.stdlib_name))
......
......@@ -31,7 +31,9 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# crash only happened when that greenlet object
# was collected at exit time, which was most common
# in CPython 3.5)
del gevent.hub._threadlocal.hub
from gevent._hub_local import set_hub
set_hub(None)
def test_destroy_two(self):
......
......@@ -13,6 +13,7 @@ import greentest
import gevent
from gevent import util
from gevent import local
from greenlet import getcurrent
from gevent._compat import NativeStrIO
......@@ -84,10 +85,12 @@ class TestTree(greentest.TestCase):
assert l
def s(f):
str(getcurrent())
g = gevent.spawn(f)
# Access this in spawning order for consistent sorting
# at print time in the test case.
getattr(g, 'minimal_ident')
str(g)
return g
def t1():
......@@ -101,6 +104,7 @@ class TestTree(greentest.TestCase):
return g
s1 = s(t2)
#self.assertEqual(0, s1.minimal_ident)
s1.join()
glets.append(s(t2))
......@@ -130,6 +134,7 @@ class TestTree(greentest.TestCase):
value = value.replace('test__util', '__main__')
value = re.compile(' fileno=.').sub('', value)
value = value.replace('ref=-1', 'ref=0')
value = value.replace("type.current_tree", 'GreenletTree.current_tree')
return value
@greentest.ignores_leakcheck
......@@ -149,23 +154,23 @@ class TestTree(greentest.TestCase):
: {'foo': 42}
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: _run>; finished with value <Greenlet "CustomName-0" at 0x
+--- <Greenlet "Greenlet-1" at X: t2>; finished with value <Greenlet "CustomName-0" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-0" at X: _run>; finished with exception ExpectedException()
| +--- <Greenlet "CustomName-0" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-2" at X: _run>; finished with value <Greenlet "CustomName-4" at 0x
+--- <Greenlet "Greenlet-2" at X: t2>; finished with value <Greenlet "CustomName-4" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-4" at X: _run>; finished with exception ExpectedException()
| +--- <Greenlet "CustomName-4" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-3" at X: _run>; finished with value <Greenlet "Greenlet-5" at X
+--- <Greenlet "Greenlet-3" at X: t3>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Spawn Tree Locals
: {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: _run>; finished with value <Greenlet "CustomName-6" at 0x
| +--- <Greenlet "Greenlet-5" at X: t2>; finished with value <Greenlet "CustomName-6" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-6" at X: _run>; finished with exception ExpectedException()
| +--- <Greenlet "CustomName-6" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-7" at X: _run>; finished with value <gevent.util.GreenletTree obje
+--- <Greenlet "Greenlet-7" at X: <bound method GreenletTree.current_tree of <class 'gevent.util.GreenletTree'>>>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
""".strip()
self.assertEqual(expected, value)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment