Commit 99541fd6 authored by Jason Madden's avatar Jason Madden

Compile gevent.queue and gevent.hub.waiter with Cython

This gives massive performance benefits to queues:

| Benchmark                              | 27_queue_master | 27_queue_cython2             |
|----------------------------------------|-----------------|------------------------------|
| bench_unbounded_queue_noblock          | 2.09 us         | 622 ns: 3.37x faster (-70%)  |
| bench_bounded_queue_noblock            | 2.55 us         | 634 ns: 4.02x faster (-75%)  |
| bench_bounded_queue_block              | 36.1 us         | 7.29 us: 4.95x faster (-80%) |
| bench_channel                          | 15.4 us         | 6.40 us: 2.40x faster (-58%) |
| bench_bounded_queue_block_hub          | 13.6 us         | 3.89 us: 3.48x faster (-71%) |
| bench_channel_hub                      | 7.55 us         | 3.38 us: 2.24x faster (-55%) |
| bench_unbounded_priority_queue_noblock | 5.02 us         | 3.18 us: 1.58x faster (-37%) |
| bench_bounded_priority_queue_noblock   | 5.48 us         | 3.22 us: 1.70x faster (-41%) |

In a "real" use caes (pool.imap) it shows up as a 10-20% improvement:

| Benchmark          | 36_pool_event5 | 36_pool_ubq_cython          |
|--------------------|----------------|-----------------------------|
| imap_unordered_seq | 553 us         | 461 us: 1.20x faster (-17%) |
| imap_unordered_par | 301 us         | 265 us: 1.14x faster (-12%) |
| imap_seq           | 587 us         | 497 us: 1.18x faster (-15%) |
| imap_par           | 326 us         | 275 us: 1.19x faster (-16%) |
| spawn              | 310 us         | 284 us: 1.09x faster (-8%)  |

Not significant (3): map_seq; map_par; apply
parent b61f9e91
...@@ -13,6 +13,9 @@ src/gevent/greenlet.c ...@@ -13,6 +13,9 @@ src/gevent/greenlet.c
src/gevent/_ident.c src/gevent/_ident.c
src/gevent/_imap.c src/gevent/_imap.c
src/gevent/event.c src/gevent/event.c
src/gevent/_hub_local.c
src/gevent/_waiter.c
src/gevent/queue.c
src/gevent/libev/corecext.c src/gevent/libev/corecext.c
src/gevent/libev/corecext.h src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c src/gevent/libev/_corecffi.c
......
...@@ -50,7 +50,8 @@ Enhancements ...@@ -50,7 +50,8 @@ Enhancements
reducing the overhead of ``[Thread]Pool.imap``. reducing the overhead of ``[Thread]Pool.imap``.
- The classes `gevent.event.Event` and `gevent.event.AsyncResult` - The classes `gevent.event.Event` and `gevent.event.AsyncResult`
are compiled with Cython for improved performance. Please report any are compiled with Cython for improved performance, as is the
``gevent.queue`` module and ``gevent.hub.Waiter``. Please report any
compatibility issues. compatibility issues.
Monitoring and Debugging Monitoring and Debugging
......
...@@ -20,9 +20,9 @@ def _b_no_block(q): ...@@ -20,9 +20,9 @@ def _b_no_block(q):
for i in range(N): for i in range(N):
j = q.get() j = q.get()
assert i == j assert i == j, (i, j)
def bench_unbounded_queue_noblock(kind=queue.Queue): def bench_unbounded_queue_noblock(kind=queue.UnboundQueue):
_b_no_block(kind()) _b_no_block(kind())
def bench_bounded_queue_noblock(kind=queue.Queue): def bench_bounded_queue_noblock(kind=queue.Queue):
......
...@@ -104,6 +104,21 @@ EVENT = Extension(name="gevent._event", ...@@ -104,6 +104,21 @@ EVENT = Extension(name="gevent._event",
depends=['src/gevent/_event.pxd'], depends=['src/gevent/_event.pxd'],
include_dirs=include_dirs) include_dirs=include_dirs)
QUEUE = Extension(name="gevent._queue",
sources=["src/gevent/queue.py"],
depends=['src/gevent/_queue.pxd'],
include_dirs=include_dirs)
HUB_LOCAL = Extension(name="gevent.__hub_local",
sources=["src/gevent/_hub_local.py"],
depends=['src/gevent/__hub_local.pxd'],
include_dirs=include_dirs)
WAITER = Extension(name="gevent.__waiter",
sources=["src/gevent/_waiter.py"],
depends=['src/gevent/__waiter.pxd'],
include_dirs=include_dirs)
_to_cythonize = [ _to_cythonize = [
SEMAPHORE, SEMAPHORE,
...@@ -112,6 +127,9 @@ _to_cythonize = [ ...@@ -112,6 +127,9 @@ _to_cythonize = [
IDENT, IDENT,
IMAP, IMAP,
EVENT, EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
] ]
EXT_MODULES = [ EXT_MODULES = [
...@@ -123,6 +141,9 @@ EXT_MODULES = [ ...@@ -123,6 +141,9 @@ EXT_MODULES = [
IDENT, IDENT,
IMAP, IMAP,
EVENT, EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
] ]
LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi' LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi'
...@@ -191,6 +212,14 @@ if PYPY: ...@@ -191,6 +212,14 @@ if PYPY:
EXT_MODULES.remove(EVENT) EXT_MODULES.remove(EVENT)
_to_cythonize.remove(EVENT) _to_cythonize.remove(EVENT)
EXT_MODULES.remove(QUEUE)
_to_cythonize.remove(QUEUE)
EXT_MODULES.remove(HUB_LOCAL)
_to_cythonize.remove(HUB_LOCAL)
EXT_MODULES.remove(WAITER)
_to_cythonize.remove(WAITER)
for mod in _to_cythonize: for mod in _to_cythonize:
EXT_MODULES.remove(mod) EXT_MODULES.remove(mod)
......
cdef _threadlocal
cpdef get_hub_class()
cpdef get_hub_if_exists()
cpdef set_hub(hub)
cpdef get_loop()
cpdef set_loop(loop)
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
cpdef get_hub_noargs()
cimport cython cimport cython
from gevent._greenlet cimport Greenlet from gevent._greenlet cimport Greenlet
from gevent.__semaphore cimport Semaphore from gevent.__semaphore cimport Semaphore
from gevent._queue cimport UnboundQueue
@cython.freelist(100) @cython.freelist(100)
@cython.internal
@cython.final
cdef class Failure: cdef class Failure:
cdef readonly exc cdef readonly exc
cdef raise_exception cdef raise_exception
...@@ -17,10 +20,8 @@ cdef class IMapUnordered(Greenlet): ...@@ -17,10 +20,8 @@ cdef class IMapUnordered(Greenlet):
cdef Semaphore _result_semaphore cdef Semaphore _result_semaphore
cdef int _outstanding_tasks cdef int _outstanding_tasks
cdef int _max_index cdef int _max_index
cdef _queue_get
cdef _queue_put
cdef readonly queue cdef readonly UnboundQueue queue
cdef readonly bint finished cdef readonly bint finished
cdef _inext(self) cdef _inext(self)
......
# cython: auto_pickle=False # cython: auto_pickle=False
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef Timeout cdef Timeout
from _greenlet cimport get_hub
cdef bint _greenlet_imported cdef bint _greenlet_imported
......
cimport cython
cdef sys
cdef ConcurrentObjectUseError
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef class Waiter:
cdef readonly hub
cdef readonly greenlet
cdef readonly value
cdef _exception
@cython.final
@cython.internal
cdef class MultipleWaiter(Waiter):
cdef list _values
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
cimport cython cimport cython
from gevent.__ident cimport IdentRegistry from gevent.__ident cimport IdentRegistry
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported cdef bint _greenlet_imported
cdef bint _PYPY cdef bint _PYPY
cdef sys_getframe cdef sys_getframe
...@@ -125,11 +126,6 @@ cdef class Greenlet(greenlet): ...@@ -125,11 +126,6 @@ cdef class Greenlet(greenlet):
# cpdef _raise_exception(self) # cpdef _raise_exception(self)
@cython.final
cdef greenlet get_hub()
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
# Declare a bunch of imports as cdefs so they can # Declare a bunch of imports as cdefs so they can
# be accessed directly as static vars without # be accessed directly as static vars without
......
# -*- coding: utf-8 -*-
# copyright 2018 gevent. See LICENSE
"""
Maintains the thread local hub.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gevent.monkey import get_original
from gevent._compat import thread_mod_name
__all__ = [
'get_hub',
'get_hub_noargs',
'get_hub_if_exists',
]
# These must be the "real" native thread versions,
# not monkey-patched.
class _Threadlocal(get_original(thread_mod_name, '_local')):
def __init__(self):
# Use a class with an initializer so that we can test
# for 'is None' instead of catching AttributeError, making
# the code cleaner and possibly solving some corner cases
# (like #687)
super(_Threadlocal, self).__init__()
self.Hub = None
self.loop = None
self.hub = None
_threadlocal = _Threadlocal()
Hub = None # Set when gevent.hub is imported
def get_hub_class():
"""Return the type of hub to use for the current thread.
If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
"""
hubtype = _threadlocal.Hub
if hubtype is None:
hubtype = _threadlocal.Hub = Hub
return hubtype
def set_default_hub_class(hubtype):
global Hub
Hub = hubtype
def get_hub(*args, **kwargs):
"""
Return the hub for the current thread.
If a hub does not exist in the current thread, a new one is
created of the type returned by :func:`get_hub_class`.
.. deprecated:: 1.3b1
The ``*args`` and ``**kwargs`` arguments are deprecated. They were
only used when the hub was created, and so were non-deterministic---to be
sure they were used, *all* callers had to pass them, or they were order-dependent.
Use ``set_hub`` instead.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
def get_hub_noargs():
# Just like get_hub, but cheaper to call because it
# takes no arguments or kwargs. See also a copy in
# gevent/greenlet.py
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
def get_hub_if_exists():
"""Return the hub for the current thread.
Return ``None`` if no hub has been created yet.
"""
return _threadlocal.hub
def set_hub(hub):
_threadlocal.hub = hub
def get_loop():
return _threadlocal.loop
def set_loop(loop):
_threadlocal.loop = loop
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__hub_local')
...@@ -12,6 +12,7 @@ from __future__ import print_function ...@@ -12,6 +12,7 @@ from __future__ import print_function
from gevent import _semaphore from gevent import _semaphore
from gevent import queue
__all__ = [ __all__ = [
...@@ -21,6 +22,7 @@ __all__ = [ ...@@ -21,6 +22,7 @@ __all__ = [
locals()['Greenlet'] = __import__('gevent').Greenlet locals()['Greenlet'] = __import__('gevent').Greenlet
locals()['Semaphore'] = _semaphore.Semaphore locals()['Semaphore'] = _semaphore.Semaphore
locals()['UnboundQueue'] = queue.UnboundQueue
class Failure(object): class Failure(object):
...@@ -58,15 +60,14 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable ...@@ -58,15 +60,14 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
.. versionchanged:: 1.1b3 .. versionchanged:: 1.1b3
Added the *maxsize* parameter. Added the *maxsize* parameter.
""" """
from gevent.queue import Queue Greenlet.__init__(self) # pylint:disable=undefined-variable
super(IMapUnordered, self).__init__()
self.spawn = spawn self.spawn = spawn
self._zipped = _zipped self._zipped = _zipped
self.func = func self.func = func
self.iterable = iterable self.iterable = iterable
self.queue = Queue() self.queue = UnboundQueue() # pylint:disable=undefined-variable
self._queue_get = self.queue.get
self._queue_put = self.queue.put
if maxsize: if maxsize:
# Bounding the queue is not enough if we want to keep from # Bounding the queue is not enough if we want to keep from
# accumulating objects; the result value will be around as # accumulating objects; the result value will be around as
...@@ -109,7 +110,7 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable ...@@ -109,7 +110,7 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
next = __next__ # Py2 next = __next__ # Py2
def _inext(self): def _inext(self):
return self._queue_get() return self.queue.get()
def _ispawn(self, func, item, item_index): def _ispawn(self, func, item, item_index):
if self._result_semaphore is not None: if self._result_semaphore is not None:
...@@ -149,12 +150,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable ...@@ -149,12 +150,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
put_finished = True put_finished = True
if greenlet.successful(): if greenlet.successful():
self._queue_put(self._iqueue_value_for_success(greenlet)) self.queue.put(self._iqueue_value_for_success(greenlet))
else: else:
self._queue_put(self._iqueue_value_for_failure(greenlet)) self.queue.put(self._iqueue_value_for_failure(greenlet))
if put_finished: if put_finished:
self._queue_put(self._iqueue_value_for_self_finished()) self.queue.put(self._iqueue_value_for_self_finished())
def _on_finish(self, exception): def _on_finish(self, exception):
# Called in this greenlet. # Called in this greenlet.
...@@ -163,12 +164,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable ...@@ -163,12 +164,12 @@ class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
if exception is not None: if exception is not None:
self.finished = True self.finished = True
self._queue_put(self._iqueue_value_for_self_failure(exception)) self.queue.put(self._iqueue_value_for_self_failure(exception))
return return
if self._outstanding_tasks <= 0: if self._outstanding_tasks <= 0:
self.finished = True self.finished = True
self._queue_put(self._iqueue_value_for_self_finished()) self.queue.put(self._iqueue_value_for_self_finished())
def _iqueue_value_for_success(self, greenlet): def _iqueue_value_for_success(self, greenlet):
return greenlet.value return greenlet.value
...@@ -202,7 +203,7 @@ class IMap(IMapUnordered): ...@@ -202,7 +203,7 @@ class IMap(IMapUnordered):
except KeyError: except KeyError:
# Wait for our index to finish. # Wait for our index to finish.
while 1: while 1:
index, value = self._queue_get() index, value = self.queue.get()
if index == self.index: if index == self.index:
break break
else: else:
......
cimport cython
from gevent.__waiter cimport Waiter
from gevent._event cimport Event
cdef _heappush
cdef _heappop
cdef _heapify
@cython.final
@cython.internal
cdef _safe_remove(deq, item)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly queue
cdef class Queue:
cdef readonly hub
cdef readonly queue
cdef Py_ssize_t _maxsize
cdef getters
cdef putters
cdef _event_unlock
cpdef _get(self)
cpdef _put(self, item)
cpdef _peek(self)
cpdef Py_ssize_t qsize(self)
cpdef bint empty(self)
cpdef bint full(self)
cpdef put(self, item, block=*, timeout=*)
cpdef put_nowait(self, item)
cdef __get_or_peek(self, method, block, timeout)
cpdef get(self, block=*, timeout=*)
cpdef get_nowait(self)
cpdef peek(self, block=*, timeout=*)
cpdef peek_nowait(self)
cdef _schedule_unlock(self)
@cython.final
cdef class UnboundQueue(Queue):
pass
cdef class PriorityQueue(Queue):
pass
cdef class LifoQueue(Queue):
pass
cdef class JoinableQueue(Queue):
cdef Event _cond
cdef readonly int unfinished_tasks
cdef class Channel:
cdef readonly getters
cdef readonly putters
cdef readonly hub
cdef _event_unlock
cpdef get(self, block=*, timeout=*)
cpdef get_nowait(self)
cdef _schedule_unlock(self)
...@@ -70,8 +70,8 @@ __imports__.extend(__py3_imports__) ...@@ -70,8 +70,8 @@ __imports__.extend(__py3_imports__)
import time import time
import sys import sys
from gevent.hub import _get_hub_noargs as get_hub from gevent._hub_local import get_hub_noargs as get_hub
from gevent.hub import ConcurrentObjectUseError from gevent.exceptions import ConcurrentObjectUseError
from gevent.timeout import Timeout from gevent.timeout import Timeout
from gevent._compat import string_types, integer_types, PY3 from gevent._compat import string_types, integer_types, PY3
from gevent._util import copy_globals from gevent._util import copy_globals
......
# -*- coding: utf-8 -*-
# copyright 2018 gevent
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
Low-level waiting primitives.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.exceptions import ConcurrentObjectUseError
__all__ = [
'Waiter',
]
_NONE = object()
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
class Waiter(object):
"""
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception.
The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hello from Waiter')
>>> result.get() # blocks for 0.1 seconds
'hello from Waiter'
>>> timer.close()
If switch is called before the greenlet gets a chance to call :meth:`get` then
:class:`Waiter` stores the value.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hi from Waiter')
>>> sleep(0.2)
>>> result.get() # returns immediately without blocking
'hi from Waiter'
>>> timer.close()
.. warning::
This a limited and dangerous way to communicate between
greenlets. It can easily leave a greenlet unscheduled forever
if used incorrectly. Consider using safer classes such as
:class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
or :class:`gevent.queue.Queue`.
"""
__slots__ = ['hub', 'greenlet', 'value', '_exception']
def __init__(self, hub=None):
self.hub = get_hub() if hub is None else hub
self.greenlet = None
self.value = None
self._exception = _NONE
def clear(self):
self.greenlet = None
self.value = None
self._exception = _NONE
def __str__(self):
if self._exception is _NONE:
return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
if self._exception is None:
return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
def ready(self):
"""Return true if and only if it holds a value or an exception"""
return self._exception is not _NONE
def successful(self):
"""Return true if and only if it is ready and holds a value"""
return self._exception is None
@property
def exc_info(self):
"Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
if getcurrent() is not self.hub: # pylint:disable=undefined-variable
raise AssertionError("Can only use Waiter.switch method from the Hub greenlet")
switch = greenlet.switch
try:
switch(value)
except: # pylint:disable=bare-except
self.hub.handle_error(switch, *sys.exc_info())
def switch_args(self, *args):
return self.switch(args)
def throw(self, *throw_args):
"""Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
greenlet = self.greenlet
if greenlet is None:
self._exception = throw_args
else:
if getcurrent() is not self.hub: # pylint:disable=undefined-variable
raise AssertionError("Can only use Waiter.switch method from the Hub greenlet")
throw = greenlet.throw
try:
throw(*throw_args)
except: # pylint:disable=bare-except
self.hub.handle_error(throw, *sys.exc_info())
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception) # pylint:disable=undefined-variable
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent() # pylint:disable=undefined-variable
try:
return self.hub.switch()
finally:
self.greenlet = None
def __call__(self, source):
if source.exception is None:
self.switch(source.value)
else:
self.throw(source.exception)
# can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
# and unwraps it in wait() thus checking that switch() was indeed called
class MultipleWaiter(Waiter):
"""
An internal extension of Waiter that can be used if multiple objects
must be waited on, and there is a chance that in between waits greenlets
might be switched out. All greenlets that switch to this waiter
will have their value returned.
This does not handle exceptions or throw methods.
"""
__slots__ = ['_values']
def __init__(self, hub=None):
Waiter.__init__(self, hub)
# we typically expect a relatively small number of these to be outstanding.
# since we pop from the left, a deque might be slightly
# more efficient, but since we're in the hub we avoid imports if
# we can help it to better support monkey-patching, and delaying the import
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
self._values.append(value)
Waiter.switch(self, True)
def get(self):
if not self._values:
Waiter.get(self)
Waiter.clear(self)
return self._values.pop(0)
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__waiter')
# -*- coding: utf-8 -*-
# copyright 2018 gevent
"""
Exceptions.
.. versionadded:: 1.3b1
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__all__ = [
'LoopExit',
]
class LoopExit(Exception):
"""
Exception thrown when the hub finishes running.
In a normal application, this is never thrown or caught
explicitly. The internal implementation of functions like
:func:`join` and :func:`joinall` may catch it, but user code
generally should not.
.. caution::
Errors in application programming can also lead to this exception being
raised. Some examples include (but are not limited too):
- greenlets deadlocking on a lock;
- using a socket or other gevent object with native thread
affinity from a different thread
"""
class BlockingSwitchOutError(AssertionError):
pass
class InvalidSwitchError(AssertionError):
pass
class ConcurrentObjectUseError(AssertionError):
# raised when an object is used (waited on) by two greenlets
# independently, meaning the object was entered into a blocking
# state by one greenlet and then another while still blocking in the
# first one
pass
...@@ -18,8 +18,6 @@ from gevent._tblib import load_traceback ...@@ -18,8 +18,6 @@ from gevent._tblib import load_traceback
from gevent.hub import GreenletExit from gevent.hub import GreenletExit
from gevent.hub import InvalidSwitchError from gevent.hub import InvalidSwitchError
from gevent.hub import Waiter from gevent.hub import Waiter
from gevent.hub import _threadlocal
from gevent.hub import get_hub_class
from gevent.hub import iwait from gevent.hub import iwait
from gevent.hub import wait from gevent.hub import wait
...@@ -28,7 +26,7 @@ from gevent.timeout import Timeout ...@@ -28,7 +26,7 @@ from gevent.timeout import Timeout
from gevent._config import config as GEVENT_CONFIG from gevent._config import config as GEVENT_CONFIG
from gevent._util import Lazy from gevent._util import Lazy
from gevent._util import readproperty from gevent._util import readproperty
from gevent._hub_local import get_hub_noargs as get_hub
__all__ = [ __all__ = [
'Greenlet', 'Greenlet',
...@@ -45,15 +43,6 @@ __all__ = [ ...@@ -45,15 +43,6 @@ __all__ = [
locals()['getcurrent'] = __import__('greenlet').getcurrent locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None locals()['greenlet_init'] = lambda: None
def get_hub():
# This is identical to gevent.hub._get_hub_noargs so that it
# can be inlined for greenlet spawning by cython.
# It is also cimported in semaphore.pxd
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
if _PYPY: if _PYPY:
......
This diff is collapsed.
This diff is collapsed.
...@@ -101,7 +101,8 @@ main = unittest.main ...@@ -101,7 +101,8 @@ main = unittest.main
from greentest.hub import QuietHub from greentest.hub import QuietHub
import gevent.hub import gevent.hub
gevent.hub.Hub = QuietHub gevent.hub.set_default_hub_class(QuietHub)
from greentest.sockets import bind_and_listen from greentest.sockets import bind_and_listen
......
...@@ -42,6 +42,10 @@ NO_ALL = [ ...@@ -42,6 +42,10 @@ NO_ALL = [
'gevent._patcher', 'gevent._patcher',
] ]
ALLOW_IMPLEMENTS = [
'gevent._queue',
]
# A list of modules that may contain things that aren't actually, technically, # A list of modules that may contain things that aren't actually, technically,
# extensions, but that need to be in __extensions__ anyway due to the way, # extensions, but that need to be in __extensions__ anyway due to the way,
# for example, monkey patching, needs to work. # for example, monkey patching, needs to work.
...@@ -76,6 +80,8 @@ class Test(unittest.TestCase): ...@@ -76,6 +80,8 @@ class Test(unittest.TestCase):
def check_implements_presence_justified(self): def check_implements_presence_justified(self):
"Check that __implements__ is present only if the module is modeled after a module from stdlib (like gevent.socket)." "Check that __implements__ is present only if the module is modeled after a module from stdlib (like gevent.socket)."
if self.modname in ALLOW_IMPLEMENTS:
return
if self.__implements__ is not None and self.stdlib_module is None: if self.__implements__ is not None and self.stdlib_module is None:
raise AssertionError('%r has __implements__ but no stdlib counterpart (%s)' raise AssertionError('%r has __implements__ but no stdlib counterpart (%s)'
% (self.modname, self.stdlib_name)) % (self.modname, self.stdlib_name))
......
...@@ -31,7 +31,9 @@ class TestDestroyDefaultLoop(unittest.TestCase): ...@@ -31,7 +31,9 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# crash only happened when that greenlet object # crash only happened when that greenlet object
# was collected at exit time, which was most common # was collected at exit time, which was most common
# in CPython 3.5) # in CPython 3.5)
del gevent.hub._threadlocal.hub from gevent._hub_local import set_hub
set_hub(None)
def test_destroy_two(self): def test_destroy_two(self):
......
...@@ -13,6 +13,7 @@ import greentest ...@@ -13,6 +13,7 @@ import greentest
import gevent import gevent
from gevent import util from gevent import util
from gevent import local from gevent import local
from greenlet import getcurrent
from gevent._compat import NativeStrIO from gevent._compat import NativeStrIO
...@@ -84,10 +85,12 @@ class TestTree(greentest.TestCase): ...@@ -84,10 +85,12 @@ class TestTree(greentest.TestCase):
assert l assert l
def s(f): def s(f):
str(getcurrent())
g = gevent.spawn(f) g = gevent.spawn(f)
# Access this in spawning order for consistent sorting # Access this in spawning order for consistent sorting
# at print time in the test case. # at print time in the test case.
getattr(g, 'minimal_ident') getattr(g, 'minimal_ident')
str(g)
return g return g
def t1(): def t1():
...@@ -101,6 +104,7 @@ class TestTree(greentest.TestCase): ...@@ -101,6 +104,7 @@ class TestTree(greentest.TestCase):
return g return g
s1 = s(t2) s1 = s(t2)
#self.assertEqual(0, s1.minimal_ident)
s1.join() s1.join()
glets.append(s(t2)) glets.append(s(t2))
...@@ -130,6 +134,7 @@ class TestTree(greentest.TestCase): ...@@ -130,6 +134,7 @@ class TestTree(greentest.TestCase):
value = value.replace('test__util', '__main__') value = value.replace('test__util', '__main__')
value = re.compile(' fileno=.').sub('', value) value = re.compile(' fileno=.').sub('', value)
value = value.replace('ref=-1', 'ref=0') value = value.replace('ref=-1', 'ref=0')
value = value.replace("type.current_tree", 'GreenletTree.current_tree')
return value return value
@greentest.ignores_leakcheck @greentest.ignores_leakcheck
...@@ -149,23 +154,23 @@ class TestTree(greentest.TestCase): ...@@ -149,23 +154,23 @@ class TestTree(greentest.TestCase):
: {'foo': 42} : {'foo': 42}
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> +--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <greenlet.greenlet object at X> : Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: _run>; finished with value <Greenlet "CustomName-0" at 0x +--- <Greenlet "Greenlet-1" at X: t2>; finished with value <Greenlet "CustomName-0" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-0" at X: _run>; finished with exception ExpectedException() | +--- <Greenlet "CustomName-0" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-2" at X: _run>; finished with value <Greenlet "CustomName-4" at 0x +--- <Greenlet "Greenlet-2" at X: t2>; finished with value <Greenlet "CustomName-4" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-4" at X: _run>; finished with exception ExpectedException() | +--- <Greenlet "CustomName-4" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-3" at X: _run>; finished with value <Greenlet "Greenlet-5" at X +--- <Greenlet "Greenlet-3" at X: t3>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Spawn Tree Locals : Spawn Tree Locals
: {'stl': 'STL'} : {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: _run>; finished with value <Greenlet "CustomName-6" at 0x | +--- <Greenlet "Greenlet-5" at X: t2>; finished with value <Greenlet "CustomName-6" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "CustomName-6" at X: _run>; finished with exception ExpectedException() | +--- <Greenlet "CustomName-6" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> : Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-7" at X: _run>; finished with value <gevent.util.GreenletTree obje +--- <Greenlet "Greenlet-7" at X: <bound method GreenletTree.current_tree of <class 'gevent.util.GreenletTree'>>>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X> Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
""".strip() """.strip()
self.assertEqual(expected, value) self.assertEqual(expected, value)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment