Commit a637628e authored by Denis Bilenko's avatar Denis Bilenko

pool: rename, add and remove methods

- rename 'killall' and 'joinall' to 'kill' and 'join';
- add a few methods from multiprocessing's Pool: apply, apply_async, map, map_async, imap, imap_unordered
- remove old execute and execute_async
parent f9e67f7d
from gevent import core from collections import deque
from gevent import coros from gevent.hub import GreenletExit, getcurrent
from gevent import rawgreenlet from gevent.greenlet import spawn, joinall, Greenlet, _switch_helper
from gevent.hub import getcurrent, GreenletExit from gevent.timeout import Timeout
from gevent.greenlet import spawn, spawn_link, spawn_link_value, spawn_link_exception, joinall
class GreenletSet(object): class GreenletSet(object):
"""Maintain a set of greenlets that are still running. """Maintain a set of greenlets that are still running.
...@@ -11,147 +11,192 @@ class GreenletSet(object): ...@@ -11,147 +11,192 @@ class GreenletSet(object):
""" """
def __init__(self, *args): def __init__(self, *args):
self.procs = set(*args) assert len(args)<=1, args
self.dying = set() self.greenlets = set(*args)
if args: if args:
for p in args[0]: for p in args[0]:
p.link(self.discard) p.rawlink(self.discard)
# each item we kill we place in dying, to avoid killing the same greenlet twice
self.dying = set()
def __repr__(self): def __repr__(self):
try: try:
classname = self.__class__.__name__ classname = self.__class__.__name__
except AttributeError: except AttributeError:
classname = 'ProcSet' classname = 'GreenletSet' # XXX check if 2.4 really uses this line
return '<%s at %s procs=%s dying=%s>' % (classname, hex(id(self)), self.procs, self.dying) return '<%s at %s %s>' % (classname, hex(id(self)), self.greenlets)
def __len__(self): def __len__(self):
return len(self.procs) + len(self.dying) return len(self.greenlets)
def __contains__(self, item): def __contains__(self, item):
# if isinstance(item, greenlet): return item in self.greenlets
# # XXX should not be necessary
# # special case for "getcurrent() in running_proc_set" to work
# for x in self.procs:
# if x.greenlet == item:
# return True
# for x in self.dying:
# if x.greenlet == item:
# return True
# # hack Proc's __hash__ and __eq__ to avoid special casing this?
# # in this case, instead of a hack. maybe make Proc a subclass of greenlet?
# else:
return item in self.procs or item in self.dying
def __iter__(self): def __iter__(self):
return iter(self.procs+self.dying) return iter(self.greenlets)
def add(self, p): def add(self, p):
self.procs.add(p) p.rawlink(self.discard)
p.link(self.discard) self.greenlets.add(p)
# QQQ check if Proc can be fixed to support p.link(self.procs.discard)
def discard(self, p): def discard(self, p):
self.procs.discard(p) self.greenlets.discard(p)
self.dying.discard(p) self.dying.discard(p)
def spawn(self, func, *args, **kwargs): def spawn(self, func, *args, **kwargs):
add = self.add
p = spawn(func, *args, **kwargs) p = spawn(func, *args, **kwargs)
self.add(p) add(p)
return p return p
def spawn_link(self, func, *args, **kwargs): def spawn_link(self, func, *args, **kwargs):
p = spawn_link(func, *args, **kwargs) p = self.spawn(func, *args, **kwargs)
self.add(p) p.link()
return p return p
def spawn_link_value(self, func, *args, **kwargs): def spawn_link_value(self, func, *args, **kwargs):
p = spawn_link_value(func, *args, **kwargs) p = self.spawn(func, *args, **kwargs)
self.add(p) p.link_value()
return p return p
def spawn_link_exception(self, func, *args, **kwargs): def spawn_link_exception(self, func, *args, **kwargs):
p = spawn_link_exception(func, *args, **kwargs) p = self.spawn(func, *args, **kwargs)
self.add(p) p.link_exception()
return p return p
def joinall(self, raise_error=False): # def close(self):
while self.procs: # """Prevents any more tasks from being submitted to the pool"""
joinall(self.procs, raise_error=raise_error) # self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
def kill(self, p, exception=GreenletExit, block=False): def join(self, timeout=None, raise_error=False):
kill = p.kill t = Timeout(timeout)
try: try:
self.procs.remove(p) while self.greenlets:
except KeyError: joinall(self.greenlets, raise_error=raise_error)
return finally:
t.cancel()
def kill(self, exception=GreenletExit, block=False, timeout=None):
t = Timeout(timeout)
try:
while self.greenlets:
for p in self.greenlets:
if p not in self.dying:
p.kill(exception)
self.dying.add(p) self.dying.add(p)
return kill(exception=exception, block=block)
def killall(self, exception=GreenletExit, block=False):
while self.procs or self.dying:
for p in self.procs:
core.active_event(p.throw, exception)
self.dying.update(self.procs)
self.procs.clear()
if not block: if not block:
break break
if self.dying: joinall(self.greenlets)
joinall(self.dying) joinall(self.dying)
finally:
t.cancel()
def killonce(self, p, exception=GreenletExit, block=False, timeout=None):
if p not in self.dying and p in self.greenlets:
p.kill(exception)
self.dying.add(p)
if block:
p.join(timeout)
def full(self):
return False
def apply(self, func, args=None, kwds=None):
"""Equivalent of the apply() builtin function. It blocks till the result is ready."""
if args is None:
args = ()
if kwds is None:
kwds = {}
if self.full() and getcurrent() in self:
return func(*args, **kwds)
else:
return self.spawn(func, *args, **kwds).get()
def apply_async(self, func, args=None, kwds=None, callback=None):
"""A variant of the apply() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready
callback is applied to it (unless the call failed)."""
if args is None:
args = ()
if kwds is None:
kwds = {}
p = self.spawn(func, *args, **kwds)
if callback is not None:
p.link(pass_value(callback))
return p
# make interface similar to standard library pools in multiprocessing def map(self, func, iterable):
greenlets = [self.spawn(func, item) for item in iterable]
class Pool(object): return [g.get() for g in greenlets]
def __init__(self, size=100): def map_async(self, func, iterable, callback=None):
self.sem = coros.Semaphore(size) """
self.procs = GreenletSet() A variant of the map() method which returns a result object.
@property If callback is specified then it should be a callable which accepts a
def current_size(self): single argument.
return len(self.procs) """
greenlets = [self.spawn(func, item) for item in iterable]
result = spawn(get_values, greenlets)
if callback is not None:
result.link(pass_value(callback))
return result
def imap(self, func, iterable):
"""An equivalent of itertools.imap()"""
greenlets = [self.spawn(func, item) for item in iterable]
for g in greenlets:
yield g.get()
def imap_unordered(self, func, iterable):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered arbitrary."""
from gevent.queue import Queue
q = Queue()
greenlets = [self.spawn(func, item) for item in iterable]
for g in greenlets:
g.rawlink(q.put)
for _ in xrange(len(greenlets)):
yield q.get().get()
class Pool(GreenletSet):
def __init__(self, size=None):
GreenletSet.__init__(self)
self.size = size
self.waiting = deque()
def full(self):
return self.free_count() <= 0
def free_count(self): def free_count(self):
return self.sem.counter if self.size is None:
return 1
def execute(self, func, *args, **kwargs): return max(0, self.size - len(self) - len(self.waiting))
"""Execute func in one of the coroutines maintained
by the pool, when one is free. def spawn(self, function, *args, **kwargs):
if kwargs:
Immediately returns a Proc object which can be queried g = Greenlet(_switch_helper)
for the func's result. args = (function, args, kwargs)
>>> pool = Pool()
>>> task = pool.execute(lambda a: ('foo', a), 1)
>>> task.get()
('foo', 1)
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.sem.locked() and getcurrent() in self.procs:
p = spawn(func, *args, **kwargs)
p.join()
else: else:
self.sem.acquire() g = Greenlet(function)
p = self.procs.spawn(func, *args, **kwargs) if self.size is not None and len(self) >= self.size:
# assuming the above line cannot raise self.waiting.append((g, args))
p.link(lambda p: self.sem.release())
return p
def execute_async(self, func, *args, **kwargs):
if self.sem.locked():
return rawgreenlet.spawn(self.execute, func, *args, **kwargs)
else: else:
return self.execute(func, *args, **kwargs) g.schedule_switch(*args)
self.add(g)
return g
def _execute(self, evt, func, args, kw): def discard(self, p):
p = self.execute(func, *args, **kw) GreenletSet.discard(self, p)
p.link(evt) while self.waiting and len(self) < self.size:
return p g, args = self.waiting.popleft()
g.schedule_switch(*args)
def joinall(self): # XXX use just join? self.add(g)
return self.procs.joinall()
def killall(self):
return self.procs.killall()
def get_values(greenlets):
joinall(greenlets)
return [x.value for x in greenlets]
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment