Commit e4ef80dc authored by Jason Madden's avatar Jason Madden

Make JoinableQueue.join consistent when items are passed to the constructor,...

Make JoinableQueue.join consistent when items are passed to the constructor, extending #554; add a test for this.
parent e4bef9df
...@@ -19,6 +19,11 @@ Unreleased ...@@ -19,6 +19,11 @@ Unreleased
- Upgraded c-ares to 1.10.0. PR #579 by Omer Katz. - Upgraded c-ares to 1.10.0. PR #579 by Omer Katz.
- Add a ``count`` argument to ``gevent.greenlet.wait``. PR #482 by - Add a ``count`` argument to ``gevent.greenlet.wait``. PR #482 by
wiggin15. wiggin15.
- Add a ``timeout`` argument to ``gevent.queue.JoinableQueue.wait``
which now returns whether all items were waited for or not.
- ``gevent.queue.JoinableQueue`` treats ``items`` passed to
``__init__`` as unfinished tasks, the same as if they were ``put``.
Initial PR #554 by DuLLSoN.
Release 1.0.2 Release 1.0.2
------------- -------------
......
...@@ -349,14 +349,18 @@ class JoinableQueue(Queue): ...@@ -349,14 +349,18 @@ class JoinableQueue(Queue):
def __init__(self, maxsize=None, items=None, unfinished_tasks=None): def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
from gevent.event import Event from gevent.event import Event
Queue.__init__(self, maxsize, items) Queue.__init__(self, maxsize, items)
self._cond = Event()
self._cond.set()
if unfinished_tasks: if unfinished_tasks:
self.unfinished_tasks = unfinished_tasks self.unfinished_tasks = unfinished_tasks
elif items: elif items:
self.unfinished_tasks = len(items) self.unfinished_tasks = len(items)
else: else:
self.unfinished_tasks = 0 self.unfinished_tasks = 0
self._cond = Event()
self._cond.set() if self.unfinished_tasks:
self._cond.clear()
def copy(self): def copy(self):
return type(self)(self.maxsize, self.queue, self.unfinished_tasks) return type(self)(self.maxsize, self.queue, self.unfinished_tasks)
...@@ -389,15 +393,20 @@ class JoinableQueue(Queue): ...@@ -389,15 +393,20 @@ class JoinableQueue(Queue):
if self.unfinished_tasks == 0: if self.unfinished_tasks == 0:
self._cond.set() self._cond.set()
def join(self): def join(self, timeout=None):
'''Block until all items in the queue have been gotten and processed. '''Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count of unfinished tasks goes up whenever an item is added to the queue.
The count goes down whenever a consumer thread calls :meth:`task_done` to indicate The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
that the item was retrieved and all work on it is complete. When the count of that the item was retrieved and all work on it is complete. When the count of
unfinished tasks drops to zero, :meth:`join` unblocks. unfinished tasks drops to zero, :meth:`join` unblocks.
:param float timeout: If not ``None``, then wait no more than this time in seconds
for all tasks to finish.
:return: ``True`` if all tasks have finished; if ``timeout`` was given and expired before
all tasks finished, ``False``.
''' '''
self._cond.wait() return self._cond.wait(timeout=timeout)
class Channel(object): class Channel(object):
......
...@@ -200,6 +200,26 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin): ...@@ -200,6 +200,26 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
else: else:
self.fail("Did not detect task count going negative") self.fail("Did not detect task count going negative")
def test_queue_task_done_with_items(self):
# Passing items to the constructor allows for as
# many task_done calls. Joining before all the task done
# are called returns false
# XXX the same test in subclass
l = [1, 2, 3]
q = Queue.JoinableQueue(items=l)
for i in l:
self.assertFalse(q.join(timeout=0.001))
self.assertEqual(i, q.get())
q.task_done()
try:
q.task_done()
except ValueError:
pass
else:
self.fail("Did not detect task count going negative")
self.assertTrue(q.join(timeout=0.001))
def test_simple_queue(self): def test_simple_queue(self):
# Do it a couple of times on the same queue. # Do it a couple of times on the same queue.
# Done twice to make sure works with same instance reused. # Done twice to make sure works with same instance reused.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment