Commit 8fe8a9b6 authored by Denis Bilenko's avatar Denis Bilenko

better implementation for Pool's map() and imap()

- it now avoids unnecessary blocking and returns the results asap
- changed argument order of internal class IMapUnordered.
- new internal class IMap added to pool.py
parent bca5cbba
......@@ -10,6 +10,7 @@ greenlets in the pool has already reached the limit, until there is a free slot.
"""
import sys
from bisect import insort_right
from gevent.hub import GreenletExit, getcurrent
from gevent.greenlet import joinall, Greenlet
......@@ -166,8 +167,7 @@ class Group(object):
return greenlet
def map(self, func, iterable):
greenlets = [self.spawn(func, item) for item in iterable]
return [greenlet.get() for greenlet in greenlets]
return list(self.imap(func, iterable))
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
......@@ -185,16 +185,13 @@ class Group(object):
return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, iterable):
"""An equivalent of itertools.imap()
**TODO**: Fix this.
"""
return iter(self.map(func, iterable))
"""An equivalent of itertools.imap()"""
return IMap.spawn(func, iterable, spawn=self.spawn)
def imap_unordered(self, func, iterable):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(self.spawn, func, iterable)
return IMapUnordered.spawn(func, iterable, spawn=self.spawn)
def full(self):
return False
......@@ -205,9 +202,10 @@ class Group(object):
class IMapUnordered(Greenlet):
def __init__(self, spawn, func, iterable):
def __init__(self, func, iterable, spawn=None):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self.func = func
self.iterable = iterable
......@@ -236,10 +234,63 @@ class IMapUnordered(Greenlet):
self.queue.put(StopIteration)
def GreenletSet(*args, **kwargs):
import warnings
warnings.warn("gevent.pool.GreenletSet was renamed to gevent.pool.Group since version 0.13.0", DeprecationWarning, stacklevel=2)
return Group(*args, **kwargs)
class IMap(Greenlet):
def __init__(self, func, iterable, spawn=None):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
def __iter__(self):
return self
def next(self):
while True:
if self.waiting and self.waiting[0][0] <= self.index:
index, value = self.waiting.pop(0)
else:
index, value = self.queue.get()
if index > self.index:
insort_right(self.waiting, (index, value))
continue
self.index += 1
if value is StopIteration and self.dead:
assert self.index - 1 == self.maxindex, self.__dict__
raise StopIteration
if value is not _SKIP:
return value
def _run(self):
try:
func = self.func
for item in self.iterable:
self.count += 1
g = self.spawn(func, item)
g.rawlink(self._on_result)
self.maxindex += 1
g.index = self.maxindex
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
self.__dict__.pop('iterable', None)
def _on_result(self, greenlet):
self.count -= 1
if greenlet.successful():
self.queue.put((greenlet.index, greenlet.value))
else:
self.queue.put((greenlet.index, _SKIP))
if self.ready() and self.count <= 0:
self.maxindex += 1
self.queue.put((self.maxindex, StopIteration))
class Pool(Group):
......@@ -351,3 +402,6 @@ class pass_value(object):
def __getattr__(self, item):
assert item != 'callback'
return getattr(self.callback, item)
_SKIP = object()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment