Commit d243caa3 authored by Jason Madden's avatar Jason Madden

Add a 'maxsize' parameter for imap_unordered to limit the number of buffered results. Fixes #638.

parent 770bb4a3
......@@ -33,6 +33,15 @@ Unreleased
printed.) Reported in :issue:`617` by Jay Oster and Carlos Sanchez.
- PyPy: Fix a ``TypeError`` from ``gevent.idle()``. Reported in
:issue:`639` by chilun2008.
- The ``imap_unordered`` methods of a pool support a ``maxsize``
parameter to limit the number of results buffered waiting for the
consumer. Reported in :issue:`638` by Sylvain Zimmer.
- The class ``gevent.queue.Queue`` now consistently orders multiple
blocked waiting ``put`` and ``get`` callers in the order they
arrived. Previously, due to an implementation quirk this was often
roughly the case under CPython, but not under PyPy. Now they both
behave the same.
- The class ``gevent.queue.Queue`` now supports the ``len()`` function.
.. _future: http://python-future.org
.. _bench_sendall.py: https://raw.githubusercontent.com/gevent/gevent/master/greentest/bench_sendall.py
......
......@@ -31,10 +31,27 @@ __all__ = ['Group', 'Pool']
class IMapUnordered(Greenlet):
"""
At iterator of map results.
"""
_zipped = False
_queue_max_size = None
def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
"""
An iterator that.
def __init__(self, func, iterable, spawn=None, _zipped=False):
:keyword int maxsize: If given and not-None, specifies the maximum number of
finished results that will be allowed to accumulated awaiting the reader;
more than that number of results will cause map function greenlets to begin
to block. This is most useful is there is a great disparity in the speed of
the mapping code and the consumer and the results consume a great deal of resources.
Using a bound is more computationally expensive than not using a bound.
.. versionchanged:: 1.1b3
Added the *maxsize* parameter.
"""
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
......@@ -43,10 +60,20 @@ class IMapUnordered(Greenlet):
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
self.queue = Queue(maxsize)
if maxsize:
self._queue_max_size = maxsize
self.count = 0
self.finished = False
self.rawlink(self._on_finish)
# If the queue size is unbounded, then we want to call all
# the links (_on_finish and _on_result) directly in the hub greenlet
# for efficiency. However, if the queue is bounded, we can't do that if
# the queue might block (because if there's no waiter the hub can switch to,
# the queue simply raises Full). Therefore, in that case, we use
# the safer, somewhat-slower (because it spawns a greenlet) link() methods.
# This means that _on_finish and _on_result can be called and interleaved in any order
# if the call to self.queue.put() blocks.
self.rawlink(self._on_finish) if not maxsize else self.link(self._on_finish)
def __iter__(self):
return self
......@@ -64,7 +91,7 @@ class IMapUnordered(Greenlet):
def _ispawn(self, func, item):
self.count += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g.rawlink(self._on_result)
g.rawlink(self._on_result) if not self._queue_max_size else g.link(self._on_result)
return g
def _run(self):
......@@ -78,28 +105,42 @@ class IMapUnordered(Greenlet):
self.__dict__.pop('iterable', None)
def _on_result(self, greenlet):
# This method can either be called in the hub greenlet (if the
# queue is unbounded) or its own greenlet. If it's called in
# its own greenlet, the calls to put() may block and switch
# greenlets, which in turn could mutate our state. So any
# state on this object that we need to look at, notably
# self.count, we need to capture or mutate *before* we put
self.count -= 1
count = self.count
finished = self.finished
ready = self.ready()
put_finished = False
if ready and count <= 0 and not finished:
finished = self.finished = True
put_finished = True
if greenlet.successful():
self.queue.put(self._iqueue_value_for_success(greenlet))
else:
self.queue.put(self._iqueue_value_for_failure(greenlet))
if self.ready() and self.count <= 0 and not self.finished:
if put_finished:
self.queue.put(self._iqueue_value_for_finished())
self.finished = True
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.queue.put(self._iqueue_value_for_self_failure())
self.finished = True
self.queue.put(self._iqueue_value_for_self_failure())
return
if self.count <= 0:
self.queue.put(self._iqueue_value_for_finished())
self.finished = True
self.queue.put(self._iqueue_value_for_finished())
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
......@@ -121,11 +162,11 @@ class IMap(IMapUnordered):
# We do this by storing tuples (order, value) in the queue
# not just value.
def __init__(self, func, iterable, spawn=None, _zipped=False):
def __init__(self, *args, **kwargs):
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
IMapUnordered.__init__(self, func, iterable, spawn, _zipped)
IMapUnordered.__init__(self, *args, **kwargs)
def _inext(self):
while True:
......@@ -246,16 +287,64 @@ class GroupMappingMixin(object):
"""
return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, *iterables):
"""An equivalent of itertools.imap()"""
return IMap.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def __imap(self, cls, func, *iterables, **kwargs):
# Python 2 doesn't support the syntax that lets us mix varargs and
# a named kwarg, so we have to unpack manually
maxsize = kwargs.pop('maxsize', None)
if kwargs:
raise TypeError("Unsupported keyword arguments")
return cls.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True, maxsize=maxsize)
def imap(self, func, *iterables, **kwargs):
"""
imap(func, *iterables, maxsize=None) -> iterable
def imap_unordered(self, func, *iterables):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
An equivalent of :func:`itertools.imap`, operating in parallel.
The *func* is applied to each element yielded from each
iterable in *iterables* in turn, collecting the result.
If this object has a bound on the number of active greenlets it can
contain (such as :class:`Pool`), then at most that number of tasks will operate
in parallel.
:keyword int maxsize: If given and not-None, specifies the maximum number of
finished results that will be allowed to accumulate awaiting the reader;
more than that number of results will cause map function greenlets to begin
to block. This is most useful is there is a great disparity in the speed of
the mapping code and the consumer and the results consume a great deal of resources.
.. note:: This is separate from any bound on the number of active parallel
tasks.
.. note:: Using a bound is slightly more computationally expensive than not using a bound.
.. tip:: The :meth:`imap_unordered` method makes much better
use of this parameter. Some additional, unspecified,
number of objects may be required to be kept in memory
to maintain order by this function.
:return: An iterable object.
.. versionchanged:: 1.1b3
Added the *maxsize* keyword parameter.
"""
return self.__imap(IMap, func, *iterables, **kwargs)
def imap_unordered(self, func, *iterables, **kwargs):
"""
imap_unordered(func, *iterables, maxsize=None) -> iterable
The same as :meth:`imap` except that the ordering of the results
from the returned iterator should be considered in arbitrary
order.
This is lighter weight than :meth:`imap` and should be preferred if order
doesn't matter.
.. seealso:: :meth:`imap` for more details.
"""
return self.__imap(IMapUnordered, func, *iterables, **kwargs)
class Group(GroupMappingMixin):
......
......@@ -5,8 +5,6 @@ The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues
that work across greenlets, with the API similar to the classes found in the
standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
Changed in version 1.0: Queue(0) now means queue of infinite size, not a channel.
The classes in this module implement iterator protocol. Iterating over queue
means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` returns ``StopIteration``.
......@@ -18,6 +16,10 @@ means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` r
... print(item)
1
2
.. versionchanged:: 1.0
``Queue(0)`` now means queue of infinite size, not a channel. A :exc:`DeprecationWarning`
will be issued with this argument.
"""
from __future__ import absolute_import
......@@ -40,9 +42,21 @@ __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
class Queue(object):
"""Create a queue object with a given maximum size.
If *maxsize* is less than or equal to zero or ``None``, the queue size is infinite.
"""
Create a queue object with a given maximum size.
If *maxsize* is less than or equal to zero or ``None``, the queue
size is infinite.
.. versionchanged:: 1.1b3
Queue's now support :func:`len`; it behaves the same as :meth:`qsize`.
.. versionchanged:: 1.1b3
Multiple greenlets that block on a call to :meth:`put` for a full queue
will now be woken up to put their items into the queue in the order in which
they arrived. Likewise, multiple greenlets that block on a call to :meth:`get` for
an empty queue will now receive items in the order in which they blocked. An
implementation quirk under CPython *usually* ensured this was roughly the case
previously anyway, but that wasn't the case for PyPy.
"""
def __init__(self, maxsize=None, items=None):
......@@ -54,8 +68,20 @@ class Queue(object):
DeprecationWarning, stacklevel=2)
else:
self.maxsize = maxsize
self.getters = set()
self.putters = set()
# Explicitly maintain order for getters and putters that block
# so that callers can consistently rely on getting things out
# in the apparent order they went in. This is required by
# imap_unordered. Previously these were set() objects, and the
# items put in the set have default hash() and eq() methods;
# under CPython, since new objects tend to have increasing
# hash values, this tended to roughly maintain order anyway,
# but that's not true under PyPy. An alternative to a deque
# (to avoid the linear scan of remove()) might be an
# OrderedDict, but it's 2.7 only; we don't expect to have so
# many waiters that removing an arbitrary element is a
# bottleneck, though.
self.getters = collections.deque()
self.putters = collections.deque()
self.hub = get_hub()
self._event_unlock = None
if items:
......@@ -108,6 +134,29 @@ class Queue(object):
"""Return the size of the queue."""
return len(self.queue)
def __len__(self):
"""
Return the size of the queue. This is the same as :meth:`qsize`.
.. versionadded: 1.1b3
Previously, getting len() of a queue would raise a TypeError.
"""
return self.qsize()
def __bool__(self):
"""
A queue object is always True.
.. versionadded: 1.1b3
Now that queues support len(), they need to implement ``__bool__``
to return True for backwards compatibility.
"""
return True
__nonzero__ = __bool__
def empty(self):
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
return not self.qsize()
......@@ -139,7 +188,7 @@ class Queue(object):
# We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
# Check if possible to get a free slot in the queue.
while self.getters and self.qsize() and self.qsize() >= self.maxsize:
getter = self.getters.pop()
getter = self.getters.popleft()
getter.switch(getter)
if self.qsize() < self.maxsize:
self._put(item)
......@@ -147,16 +196,20 @@ class Queue(object):
raise Full
elif block:
waiter = ItemWaiter(item, self)
self.putters.add(waiter)
timeout = Timeout.start_new(timeout, Full)
self.putters.append(waiter)
timeout = Timeout.start_new(timeout, Full) if timeout is not None else None
try:
if self.getters:
self._schedule_unlock()
result = waiter.get()
assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
finally:
timeout.cancel()
self.putters.discard(waiter)
if timeout is not None:
timeout.cancel()
try:
self.putters.remove(waiter)
except ValueError:
pass # removed by unlock
else:
raise Full
......@@ -186,23 +239,27 @@ class Queue(object):
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.pop().put_and_switch()
self.putters.popleft().put_and_switch()
if self.qsize():
return self._get()
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty)
timeout = Timeout.start_new(timeout, Empty) if timeout is not None else None
try:
self.getters.add(waiter)
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
return self._get()
finally:
self.getters.discard(waiter)
timeout.cancel()
if timeout is not None:
timeout.cancel()
try:
self.getters.remove(waiter)
except ValueError:
pass # Removed by _unlock
else:
raise Empty
......@@ -259,7 +316,7 @@ class Queue(object):
if self.putters and (self.maxsize is None or self.qsize() < self.maxsize):
repeat = True
try:
putter = self.putters.pop()
putter = self.putters.popleft()
self._put(putter.item)
except:
putter.throw(*sys.exc_info())
......@@ -267,7 +324,7 @@ class Queue(object):
putter.switch(putter)
if self.getters and self.qsize():
repeat = True
getter = self.getters.pop()
getter = self.getters.popleft()
getter.switch(getter)
if not repeat:
return
......
# Test idle
import gevent
gevent.sleep()
gevent.idle()
......@@ -380,6 +380,32 @@ class TestPool(greentest.TestCase):
result = list(self.pool.imap_unordered(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
# Issue 638
def test_imap_unordered_bounded_queue(self):
iterable = list(range(100))
def short_running_func(i, j):
return i
# Send two iterables to make sure varargs and kwargs are handled
# correctly
mapping = self.pool.imap_unordered(short_running_func, iterable, iterable,
maxsize=1)
mapping.start()
# Simulate a long running reader. No matter how many workers
# we have, we will never have a queue more than size 1
def reader():
result = []
for x in mapping:
result.append(x)
gevent.sleep(0.01)
self.assertEqual(len(mapping.queue), 1)
return result
l = reader()
self.assertEqual(sorted(l), iterable)
class TestPool2(TestPool):
size = 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment