Commit ab630e1c authored by Jason Madden's avatar Jason Madden

Improve test for #638 to make sure it actually bounds what it's supposed to bound.

parent d243caa3
...@@ -36,7 +36,6 @@ class IMapUnordered(Greenlet): ...@@ -36,7 +36,6 @@ class IMapUnordered(Greenlet):
""" """
_zipped = False _zipped = False
_queue_max_size = None
def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False): def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
""" """
...@@ -60,9 +59,26 @@ class IMapUnordered(Greenlet): ...@@ -60,9 +59,26 @@ class IMapUnordered(Greenlet):
self._zipped = _zipped self._zipped = _zipped
self.func = func self.func = func
self.iterable = iterable self.iterable = iterable
self.queue = Queue(maxsize) self.queue = Queue()
if maxsize: if maxsize:
self._queue_max_size = maxsize # Bounding the queue is not enough if we want to keep from
# accumulating objects; the result value will be around as
# the greenlet's result, blocked on self.queue.put(), and
# we'll go on to spawn another greenlet, which in turn can
# create the result. So we need a semaphore to prevent a
# greenlet from exiting while the queue is full so that we
# don't spawn the next greenlet (assuming that self.spawn
# is of course bounded). (Alternatively we could have the
# greenlet itself do the insert into the pool, but that
# takes some rework).
#
# Given the use of a semaphore at this level, sizing the queue becomes
# redundant, and that lets us avoid having to use self.link() instead
# of self.rawlink() to avoid having blocking methods called in the
# hub greenlet.
self._result_semaphore = Semaphore(maxsize)
else:
self._result_semaphore = DummySemaphore()
self.count = 0 self.count = 0
self.finished = False self.finished = False
# If the queue size is unbounded, then we want to call all # If the queue size is unbounded, then we want to call all
...@@ -72,13 +88,15 @@ class IMapUnordered(Greenlet): ...@@ -72,13 +88,15 @@ class IMapUnordered(Greenlet):
# the queue simply raises Full). Therefore, in that case, we use # the queue simply raises Full). Therefore, in that case, we use
# the safer, somewhat-slower (because it spawns a greenlet) link() methods. # the safer, somewhat-slower (because it spawns a greenlet) link() methods.
# This means that _on_finish and _on_result can be called and interleaved in any order # This means that _on_finish and _on_result can be called and interleaved in any order
# if the call to self.queue.put() blocks. # if the call to self.queue.put() blocks..
self.rawlink(self._on_finish) if not maxsize else self.link(self._on_finish) # Note that right now we're not bounding the queue, instead using a semaphore.
self.rawlink(self._on_finish)
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
self._result_semaphore.release()
value = self._inext() value = self._inext()
if isinstance(value, Failure): if isinstance(value, Failure):
raise value.exc raise value.exc
...@@ -89,9 +107,10 @@ class IMapUnordered(Greenlet): ...@@ -89,9 +107,10 @@ class IMapUnordered(Greenlet):
return self.queue.get() return self.queue.get()
def _ispawn(self, func, item): def _ispawn(self, func, item):
self._result_semaphore.acquire()
self.count += 1 self.count += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item) g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g.rawlink(self._on_result) if not self._queue_max_size else g.link(self._on_result) g.rawlink(self._on_result)
return g return g
def _run(self): def _run(self):
...@@ -110,7 +129,9 @@ class IMapUnordered(Greenlet): ...@@ -110,7 +129,9 @@ class IMapUnordered(Greenlet):
# its own greenlet, the calls to put() may block and switch # its own greenlet, the calls to put() may block and switch
# greenlets, which in turn could mutate our state. So any # greenlets, which in turn could mutate our state. So any
# state on this object that we need to look at, notably # state on this object that we need to look at, notably
# self.count, we need to capture or mutate *before* we put # self.count, we need to capture or mutate *before* we put.
# (Note that right now we're not bounding the queue, but we may
# choose to do so in the future so this implementation will be left in case.)
self.count -= 1 self.count -= 1
count = self.count count = self.count
finished = self.finished finished = self.finished
...@@ -315,7 +336,8 @@ class GroupMappingMixin(object): ...@@ -315,7 +336,8 @@ class GroupMappingMixin(object):
the mapping code and the consumer and the results consume a great deal of resources. the mapping code and the consumer and the results consume a great deal of resources.
.. note:: This is separate from any bound on the number of active parallel .. note:: This is separate from any bound on the number of active parallel
tasks. tasks, though they may have some interaction (for example, limiting the
number of parallel tasks to the smallest bound).
.. note:: Using a bound is slightly more computationally expensive than not using a bound. .. note:: Using a bound is slightly more computationally expensive than not using a bound.
......
...@@ -70,7 +70,7 @@ class Queue(object): ...@@ -70,7 +70,7 @@ class Queue(object):
self.maxsize = maxsize self.maxsize = maxsize
# Explicitly maintain order for getters and putters that block # Explicitly maintain order for getters and putters that block
# so that callers can consistently rely on getting things out # so that callers can consistently rely on getting things out
# in the apparent order they went in. This is required by # in the apparent order they went in. This was once required by
# imap_unordered. Previously these were set() objects, and the # imap_unordered. Previously these were set() objects, and the
# items put in the set have default hash() and eq() methods; # items put in the set have default hash() and eq() methods;
# under CPython, since new objects tend to have increasing # under CPython, since new objects tend to have increasing
......
...@@ -384,27 +384,32 @@ class TestPool(greentest.TestCase): ...@@ -384,27 +384,32 @@ class TestPool(greentest.TestCase):
def test_imap_unordered_bounded_queue(self): def test_imap_unordered_bounded_queue(self):
iterable = list(range(100)) iterable = list(range(100))
running = [0]
def short_running_func(i, j): def short_running_func(i, j):
running[0] += 1
return i return i
# Send two iterables to make sure varargs and kwargs are handled # Send two iterables to make sure varargs and kwargs are handled
# correctly # correctly
mapping = self.pool.imap_unordered(short_running_func, iterable, iterable, for meth in self.pool.imap_unordered, self.pool.imap:
maxsize=1) running[0] = 0
mapping.start() mapping = meth(short_running_func, iterable, iterable,
maxsize=1)
# Simulate a long running reader. No matter how many workers
# we have, we will never have a queue more than size 1 # Simulate a long running reader. No matter how many workers
def reader(): # we have, we will never have a queue more than size 1
result = [] def reader():
for x in mapping: result = []
result.append(x) for i, x in enumerate(mapping):
gevent.sleep(0.01) self.assertTrue(running[0] <= i + 2, running[0])
self.assertEqual(len(mapping.queue), 1) result.append(x)
return result gevent.sleep(0.01)
self.assertTrue(len(mapping.queue) <= 2, len(mapping.queue))
l = reader() return result
self.assertEqual(sorted(l), iterable)
l = reader()
self.assertEqual(sorted(l), iterable)
class TestPool2(TestPool): class TestPool2(TestPool):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment