Commit 846394bf authored by Denis Bilenko's avatar Denis Bilenko

gevent.queue: rst docstrings

parent 8072efdb
# Copyright (c) 2009 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009 Denis Bilenko. See LICENSE for details.
"""Synchronized queues.
The :mod:`gevent.queue` module implements multi-producer, multi-consumer queues
that work across greenlets, with the API similar to the classes found in the
standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
A major difference is that queues in this module operate as channels when
initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until a call
to :meth:`Queue.get` retrieves the item.
Another interesting difference is that :meth:`Queue.qsize`, :meth:`Queue.empty`, and
:meth:`Queue.full` *can* be used as indicators of whether the subsequent :meth:`Queue.get`
or :meth:`Queue.put` will not block.
"""
import sys import sys
import heapq import heapq
...@@ -16,10 +31,10 @@ __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'] ...@@ -16,10 +31,10 @@ __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']
class Queue(object): class Queue(object):
"""Create a queue object with a given maximum size. """Create a queue object with a given maximum size.
If maxsize is less than zero or None, the queue size is infinite. If *maxsize* is less than zero or ``None``, the queue size is infinite.
Queue(0) is a channel, that is, its put() method always blocks until the ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks until the
item is delivered. (This is unlike the standard Queue, where 0 means item is delivered. (This is unlike the standard :class:`Queue`, where 0 means
infinite size). infinite size).
""" """
...@@ -71,21 +86,21 @@ class Queue(object): ...@@ -71,21 +86,21 @@ class Queue(object):
return not self.qsize() return not self.qsize()
def full(self): def full(self):
"""Return True if the queue is full, False otherwise. """Return ``True`` if the queue is full, ``False`` otherwise.
Queue(None) is never full. ``Queue(None)`` is never full.
""" """
return self.qsize() >= self.maxsize return self.qsize() >= self.maxsize
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
"""Put an item into the queue. """Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default), If optional arg *block* is true and *timeout* is ``None`` (the default),
block if necessary until a free slot is available. If 'timeout' is block if necessary until a free slot is available. If *timeout* is
a positive number, it blocks at most 'timeout' seconds and raises a positive number, it blocks at most *timeout* seconds and raises
the Full exception if no free slot was available within that time. the :class:`Full` exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot Otherwise (*block* is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout' is immediately available, else raise the :class:`Full` exception (*timeout*
is ignored in that case). is ignored in that case).
""" """
if self.maxsize is None or self.qsize() < self.maxsize: if self.maxsize is None or self.qsize() < self.maxsize:
...@@ -125,20 +140,19 @@ class Queue(object): ...@@ -125,20 +140,19 @@ class Queue(object):
"""Put an item into the queue without blocking. """Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available. Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception. Otherwise raise the :class:`Full` exception.
""" """
self.put(item, False) self.put(item, False)
def get(self, block=True, timeout=None): def get(self, block=True, timeout=None):
"""Remove and return an item from the queue. """Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default), If optional args *block* is true and *timeout* is ``None`` (the default),
block if necessary until an item is available. If 'timeout' is block if necessary until an item is available. If *timeout* is a positive number,
a positive number, it blocks at most 'timeout' seconds and raises it blocks at most *timeout* seconds and raises the :class:`Empty` exception
the Empty exception if no item was available within that time. if no item was available within that time. Otherwise (*block* is false), return
Otherwise ('block' is false), return an item if one is immediately an item if one is immediately available, else raise the :class:`Empty` exception
available, else raise the Empty exception ('timeout' is ignored (*timeout* is ignored in that case).
in that case).
""" """
if self.qsize(): if self.qsize():
if self.putters: if self.putters:
...@@ -174,7 +188,7 @@ class Queue(object): ...@@ -174,7 +188,7 @@ class Queue(object):
"""Remove and return an item from the queue without blocking. """Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise Only get an item if one is immediately available. Otherwise
raise the Empty exception. raise the :class:`Empty` exception.
""" """
return self.get(False) return self.get(False)
...@@ -228,9 +242,9 @@ class ItemWaiter(Waiter): ...@@ -228,9 +242,9 @@ class ItemWaiter(Waiter):
class PriorityQueue(Queue): class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first). '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data). Entries are typically tuples of the form: ``(priority number, data)``.
''' '''
def _init(self, maxsize): def _init(self, maxsize):
...@@ -244,7 +258,7 @@ class PriorityQueue(Queue): ...@@ -244,7 +258,7 @@ class PriorityQueue(Queue):
class LifoQueue(Queue): class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.''' '''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize): def _init(self, maxsize):
self.queue = [] self.queue = []
...@@ -257,6 +271,7 @@ class LifoQueue(Queue): ...@@ -257,6 +271,7 @@ class LifoQueue(Queue):
class JoinableQueue(Queue): class JoinableQueue(Queue):
'''A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods.'''
def __init__(self, maxsize=None): def __init__(self, maxsize=None):
from gevent.event import Event from gevent.event import Event
...@@ -276,6 +291,16 @@ class JoinableQueue(Queue): ...@@ -276,6 +291,16 @@ class JoinableQueue(Queue):
self._cond.clear() self._cond.clear()
def task_done(self): def task_done(self):
'''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
that the processing on the task is complete.
If a :meth:`join` is currently blocking, it will resume when all items have been processed
(meaning that a :meth:`task_done` call was received for every item that had been
:meth:`put <Queue.put>` into the queue).
Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
'''
if self.unfinished_tasks <= 0: if self.unfinished_tasks <= 0:
raise ValueError('task_done() called too many times') raise ValueError('task_done() called too many times')
self.unfinished_tasks -= 1 self.unfinished_tasks -= 1
...@@ -283,5 +308,12 @@ class JoinableQueue(Queue): ...@@ -283,5 +308,12 @@ class JoinableQueue(Queue):
self._cond.set() self._cond.set()
def join(self): def join(self):
'''Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue.
The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
that the item was retrieved and all work on it is complete. When the count of
unfinished tasks drops to zero, :meth:`join` unblocks.
'''
self._cond.wait() self._cond.wait()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment