Commit 681ad54b authored by Jason Madden's avatar Jason Madden

The imap methods accept multiple iterables

parent 454bced7
...@@ -41,7 +41,11 @@ Unreleased ...@@ -41,7 +41,11 @@ Unreleased
return True, supporting its use-case as an "infinite" or unbounded return True, supporting its use-case as an "infinite" or unbounded
semaphore providing no exclusing, and allowing the idiom ``if semaphore providing no exclusing, and allowing the idiom ``if
sem.acquire(): ...``. PR #544 by Mouad Benchchaoui. sem.acquire(): ...``. PR #544 by Mouad Benchchaoui.
- Patch ``subprocess`` by default in ``gevent.monkey.patch_all``. See #446. - Patch ``subprocess`` by default in ``gevent.monkey.patch_all``. See
#446.
- ``gevent.pool.Group.imap`` and ``imap_unordered`` not accept
multiple iterables like ``itertools.imap``. Issue #565 reported by
Thomas Steinacher.
Release 1.0.2 Release 1.0.2
------------- -------------
......
...@@ -10,8 +10,13 @@ greenlets in the pool has already reached the limit, until there is a free slot. ...@@ -10,8 +10,13 @@ greenlets in the pool has already reached the limit, until there is a free slot.
""" """
from bisect import insort_right from bisect import insort_right
try:
from itertools import izip
except ImportError:
# Python 3
izip = zip
from gevent.hub import GreenletExit, getcurrent, kill as _kill, PY3 from gevent.hub import GreenletExit, getcurrent, kill as _kill
from gevent.greenlet import joinall, Greenlet from gevent.greenlet import joinall, Greenlet
from gevent.timeout import Timeout from gevent.timeout import Timeout
from gevent.event import Event from gevent.event import Event
...@@ -181,14 +186,16 @@ class Group(object): ...@@ -181,14 +186,16 @@ class Group(object):
""" """
return Greenlet.spawn(self.map_cb, func, iterable, callback) return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, iterable): def imap(self, func, *iterables):
"""An equivalent of itertools.imap()""" """An equivalent of itertools.imap()"""
return IMap.spawn(func, iterable, spawn=self.spawn) return IMap.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def imap_unordered(self, func, iterable): def imap_unordered(self, func, *iterables):
"""The same as imap() except that the ordering of the results from the """The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order.""" returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(func, iterable, spawn=self.spawn) return IMapUnordered.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def full(self): def full(self):
return False return False
...@@ -196,14 +203,17 @@ class Group(object): ...@@ -196,14 +203,17 @@ class Group(object):
def wait_available(self): def wait_available(self):
pass pass
class _IMapBase(Greenlet):
class IMapUnordered(Greenlet): _zipped = False
def __init__(self, func, iterable, spawn=None): def __init__(self, func, iterable, spawn=None, _zipped=False):
from gevent.queue import Queue from gevent.queue import Queue
Greenlet.__init__(self) Greenlet.__init__(self)
if spawn is not None: if spawn is not None:
self.spawn = spawn self.spawn = spawn
if _zipped:
self._zipped = _zipped
self.func = func self.func = func
self.iterable = iterable self.iterable = iterable
self.queue = Queue() self.queue = Queue()
...@@ -214,22 +224,27 @@ class IMapUnordered(Greenlet): ...@@ -214,22 +224,27 @@ class IMapUnordered(Greenlet):
def __iter__(self): def __iter__(self):
return self return self
def _ispawn(self, func, item):
self.count += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g.rawlink(self._on_result)
return g
class IMapUnordered(_IMapBase):
def next(self): def next(self):
value = self.queue.get() value = self.queue.get()
if isinstance(value, Failure): if isinstance(value, Failure):
raise value.exc raise value.exc
return value return value
__next__ = next
if PY3:
__next__ = next
del next
def _run(self): def _run(self):
try: try:
func = self.func func = self.func
for item in self.iterable: for item in self.iterable:
self.count += 1 self._ispawn(func, item)
self.spawn(func, item).rawlink(self._on_result)
finally: finally:
self.__dict__.pop('spawn', None) self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None) self.__dict__.pop('func', None)
...@@ -259,25 +274,14 @@ class IMapUnordered(Greenlet): ...@@ -259,25 +274,14 @@ class IMapUnordered(Greenlet):
self.finished = True self.finished = True
class IMap(Greenlet): class IMap(_IMapBase):
def __init__(self, func, iterable, spawn=None): def __init__(self, func, iterable, spawn=None, _zipped=False):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.waiting = [] # QQQ maybe deque will work faster there? self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0 self.index = 0
self.maxindex = -1 self.maxindex = -1
self.finished = False _IMapBase.__init__(self, func, iterable, spawn, _zipped)
self.rawlink(self._on_finish)
def __iter__(self):
return self
def next(self): def next(self):
while True: while True:
...@@ -292,18 +296,13 @@ class IMap(Greenlet): ...@@ -292,18 +296,13 @@ class IMap(Greenlet):
if isinstance(value, Failure): if isinstance(value, Failure):
raise value.exc raise value.exc
return value return value
__next__ = next
if PY3:
__next__ = next
del next
def _run(self): def _run(self):
try: try:
func = self.func func = self.func
for item in self.iterable: for item in self.iterable:
self.count += 1 g = self._ispawn(func, item)
g = self.spawn(func, item)
g.rawlink(self._on_result)
self.maxindex += 1 self.maxindex += 1
g.index = self.maxindex g.index = self.maxindex
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment