Commit 07e3d866 authored by Denis Bilenko's avatar Denis Bilenko

pool: make Pool.spawn block if there are no slots

parent b8bd5a17
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
from collections import deque
from gevent.hub import GreenletExit, getcurrent from gevent.hub import GreenletExit, getcurrent
from gevent.greenlet import joinall, Greenlet from gevent.greenlet import joinall, Greenlet
from gevent.timeout import Timeout from gevent.timeout import Timeout
from gevent.event import Event
__all__ = ['GreenletSet', 'Pool'] __all__ = ['GreenletSet', 'Pool']
...@@ -48,6 +48,10 @@ class GreenletSet(object): ...@@ -48,6 +48,10 @@ class GreenletSet(object):
self.greenlets.discard(greenlet) self.greenlets.discard(greenlet)
self.dying.discard(greenlet) self.dying.discard(greenlet)
def start(self, greenlet):
self.add(greenlet)
greenlet.start()
def spawn(self, *args, **kwargs): def spawn(self, *args, **kwargs):
add = self.add add = self.add
greenlet = self.greenlet_class.spawn(*args, **kwargs) greenlet = self.greenlet_class.spawn(*args, **kwargs)
...@@ -117,6 +121,12 @@ class GreenletSet(object): ...@@ -117,6 +121,12 @@ class GreenletSet(object):
else: else:
return self.spawn(func, *args, **kwds).get() return self.spawn(func, *args, **kwds).get()
def apply_cb(self, func, args=None, kwds=None, callback=None):
result = self.apply(func, args, kwds)
if callback is not None:
Greenlet.spawn(callback, result)
return result
def apply_async(self, func, args=None, kwds=None, callback=None): def apply_async(self, func, args=None, kwds=None, callback=None):
"""A variant of the apply() method which returns a Greenlet object. """A variant of the apply() method which returns a Greenlet object.
...@@ -126,6 +136,10 @@ class GreenletSet(object): ...@@ -126,6 +136,10 @@ class GreenletSet(object):
args = () args = ()
if kwds is None: if kwds is None:
kwds = {} kwds = {}
if self.full():
# cannot call spawn() directly because it blocks
return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
else:
greenlet = self.spawn(func, *args, **kwds) greenlet = self.spawn(func, *args, **kwds)
if callback is not None: if callback is not None:
greenlet.link(pass_value(callback)) greenlet.link(pass_value(callback))
...@@ -135,6 +149,12 @@ class GreenletSet(object): ...@@ -135,6 +149,12 @@ class GreenletSet(object):
greenlets = [self.spawn(func, item) for item in iterable] greenlets = [self.spawn(func, item) for item in iterable]
return [greenlet.get() for greenlet in greenlets] return [greenlet.get() for greenlet in greenlets]
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
if callback is not None:
callback(result)
return result
def map_async(self, func, iterable, callback=None): def map_async(self, func, iterable, callback=None):
""" """
A variant of the map() method which returns a Greenlet object. A variant of the map() method which returns a Greenlet object.
...@@ -142,28 +162,18 @@ class GreenletSet(object): ...@@ -142,28 +162,18 @@ class GreenletSet(object):
If callback is specified then it should be a callable which accepts a If callback is specified then it should be a callable which accepts a
single argument. single argument.
""" """
greenlets = [self.spawn(func, item) for item in iterable] return Greenlet.spawn(self.map_cb, func, iterable, callback)
result = self.spawn(get_values, greenlets)
if callback is not None:
result.link(pass_value(callback))
return result
def imap(self, func, iterable): def imap(self, func, iterable):
"""An equivalent of itertools.imap()""" """An equivalent of itertools.imap()"""
greenlets = [self.spawn(func, item) for item in iterable] # FIXME
for greenlet in greenlets: return iter(self.map(func, iterable))
yield greenlet.get()
def imap_unordered(self, func, iterable): def imap_unordered(self, func, iterable):
"""The same as imap() except that the ordering of the results from the """The same as imap() except that the ordering of the results from the
returned iterator should be considered arbitrary.""" returned iterator should be considered arbitrary."""
from gevent.queue import Queue # FIXME
q = Queue() return iter(self.map(func, iterable))
greenlets = [self.spawn(func, item) for item in iterable]
for greenlet in greenlets:
greenlet.rawlink(q.put)
for _ in xrange(len(greenlets)):
yield q.get().get()
def full(self): def full(self):
return False return False
...@@ -176,7 +186,8 @@ class Pool(GreenletSet): ...@@ -176,7 +186,8 @@ class Pool(GreenletSet):
raise ValueError('Invalid size for pool (positive integer or None required): %r' % (size, )) raise ValueError('Invalid size for pool (positive integer or None required): %r' % (size, ))
GreenletSet.__init__(self) GreenletSet.__init__(self)
self.size = size self.size = size
self.waiting = deque() self._available_event = Event()
self._available_event.set()
def full(self): def full(self):
return self.free_count() <= 0 return self.free_count() <= 0
...@@ -184,32 +195,32 @@ class Pool(GreenletSet): ...@@ -184,32 +195,32 @@ class Pool(GreenletSet):
def free_count(self): def free_count(self):
if self.size is None: if self.size is None:
return 1 return 1
return max(0, self.size - len(self) - len(self.waiting)) return max(0, self.size - len(self))
def add(self, greenlet):
greenlet.rawlink(self.discard)
self.greenlets.add(greenlet)
def start(self, greenlet): def start(self, greenlet):
if self.size is not None and len(self) >= self.size: self._available_event.wait()
self.waiting.append(greenlet)
else:
greenlet.start()
self.add(greenlet) self.add(greenlet)
greenlet.start()
if self.full():
self._available_event.clear()
def spawn(self, function, *args, **kwargs): def spawn(self, *args, **kwargs):
greenlet = Greenlet(function, *args, **kwargs) self._available_event.wait()
self.start(greenlet) greenlet = self.greenlet_class.spawn(*args, **kwargs)
self.add(greenlet)
if self.full():
self._available_event.clear()
return greenlet return greenlet
def discard(self, greenlet): def discard(self, greenlet):
GreenletSet.discard(self, greenlet) GreenletSet.discard(self, greenlet)
while self.waiting and len(self) < self.size: if not self.full():
greenlet = self.waiting.popleft() self._available_event.set()
greenlet.start()
self.add(greenlet)
def kill(self, exception=GreenletExit, block=False, timeout=None):
for greenlet in self.waiting:
greenlet.kill(exception)
self.waiting.clear()
return GreenletSet.kill(self, exception=exception, block=block, timeout=timeout)
def get_values(greenlets): def get_values(greenlets):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment