Commit c07537ae authored by Denis Bilenko's avatar Denis Bilenko

Greenlet: a few bugfixes

 - make sure link() is always async
 - add rawlink() that does not spawn a new greenlet per link
 - linking to greenlet does not spawn auxilary greenlet anymore
 - throw() method overriden to handle killing of not yet started greenlet properly
 - _schedule_run and _schedule_switch renamed to schedule_switch and schedule_switch_later
 - make spawn_later return tuple (greenlet, timer)
 - joinall - change order of arguments to joinall(greenlets, timeout, raise_error)
parent 93a16bf1
......@@ -15,7 +15,130 @@ __all__ = ['Greenlet',
'killall']
class SpawnedLink(object):
"""A wrapper around link that calls it in another greenlet.
Can be called only from main loop.
"""
__slots__ = ['callback']
def __init__(self, callback):
self.callback = callback
def __call__(self, source):
g = greenlet(self.callback, get_hub())
g.switch(source)
def __hash__(self):
return hash(self.callback)
def __eq__(self, other):
return self.callback == getattr(other, 'callback', other)
def __str__(self):
return str(self.callback)
def __repr__(self):
return repr(self.callback)
def __getattr__(self, item):
assert item != 'callback'
return getattr(self.callback, item)
class SuccessSpawnedLink(SpawnedLink):
"""A wrapper around link that calls it in another greenlet only if source succeed.
Can be called only from main loop.
"""
__slots__ = []
def __call__(self, source):
if source.successful():
return SpawnedLink.__call__(self, source)
class FailureSpawnedLink(SpawnedLink):
"""A wrapper around link that calls it in another greenlet only if source failed.
Can be called only from main loop.
"""
__slots__ = []
def __call__(self, source):
if not source.successful():
return SpawnedLink.__call__(self, source)
class GreenletLink(object):
"""A wrapper around greenlet that raises a LinkedExited exception when called.
Can be called only from main loop.
"""
__slots__ = ['greenlet']
def __init__(self, greenlet):
self.greenlet = greenlet
def __call__(self, source):
if source.successful():
error = getLinkedCompleted(source)
else:
error = LinkedFailed(source)
current = getcurrent()
greenlet = self.greenlet
if current is greenlet:
greenlet.throw(error)
elif current is get_hub():
try:
greenlet.throw(error)
except:
traceback.print_exc()
else:
kill(self.greenlet, error)
def __hash__(self):
return hash(self.greenlet)
def __eq__(self, other):
return self.greenlet == getattr(other, 'greenlet', other)
def __str__(self):
return str(self.greenlet)
def __repr__(self):
return repr(self.greenlet)
class SuccessGreenletLink(GreenletLink):
"""A wrapper around greenlet that raises a LinkedExited exception when called
if source has succeed.
Can be called only from main loop.
"""
__slots__ = []
def __call__(self, source):
if source.successful():
return GreenletLink.__call__(self, source)
class FailureGreenletLink(GreenletLink):
"""A wrapper around greenlet that raises a LinkedExited exception when called
if source has failed.
Can be called only from main loop.
"""
__slots__ = []
def __call__(self, source):
if not source.successful():
return GreenletLink.__call__(self, source)
class Greenlet(greenlet):
# QQQ rename to Microthread
"""A greenlet subclass that adds a few features.
"""
def __init__(self, run=None):
if run is not None:
......@@ -24,6 +147,7 @@ class Greenlet(greenlet):
self._links = set()
self.value = None
self._exception = _NONE
self._notifier = None
def ready(self):
return self.dead or self._exception is not _NONE
......@@ -46,36 +170,57 @@ class Greenlet(greenlet):
@property
def exception(self):
"""If greenlet has failed, 'exception' property holds the exception instance."""
if self._exception is not _NONE:
return self._exception
def _schedule_run(self, *args):
def throw(self, *args):
if not self.dead:
if self:
return greenlet.throw(self, *args)
else:
# special case for when greenlet is not yet started
if len(args)==1:
self._exception = args[0]
elif not args:
self._exception = GreenletExit()
else:
self._exception = args[1]
try:
return greenlet.throw(self, *args)
finally:
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
def schedule_switch(self, *args):
"""Must be called _exactly_ once for a greenlet to become active"""
return core.active_event(self.switch, *args)
def _schedule_run_later(self, seconds, *args):
def schedule_switch_later(self, seconds, *args):
"""Must be called _exactly_ once for a greenlet to become active"""
return core.timer(seconds, self.switch, *args)
@classmethod
def spawn(cls, function, *args, **kwargs):
if kwargs:
g = cls(_switch_helper)
g._schedule_run(function, args, kwargs)
g.schedule_switch(function, args, kwargs)
return g
else:
g = cls(function)
g._schedule_run(*args)
g.schedule_switch(*args)
return g
@classmethod
def spawn_later(cls, seconds, function, *args, **kwargs):
if kwargs:
g = cls(_switch_helper)
g._schedule_run_later(seconds, function, args, kwargs)
return g
timer = g.schedule_switch_later(seconds, function, args, kwargs)
return g, timer
else:
g = cls(function)
g._schedule_run_later(seconds, *args)
return g
timer = g.schedule_switch_later(seconds, *args)
return g, timer
@classmethod
def spawn_link(cls, function, *args, **kwargs):
......@@ -150,8 +295,8 @@ class Greenlet(greenlet):
def _report_result(self, result, args):
self._exception = None
self.value = result
if self._links:
core.active_event(self._notify_links)
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
def _report_error(self, exc_info, args):
try:
......@@ -159,8 +304,8 @@ class Greenlet(greenlet):
info = str(self)
finally:
self._exception = exc_info[1]
if self._links:
core.active_event(self._notify_links)
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
# put the printed traceback in context
if args:
......@@ -182,66 +327,61 @@ class Greenlet(greenlet):
return
self._report_result(result, args)
def link(self, callback=None):
if callback is None:
callback = GreenletLink(getcurrent())
def rawlink(self, callback):
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
if self.ready() and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
def link(self, callback=None, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink):
"""Link greenlet's completion to callable or another greenlet.
callback is None means link to the current greenlet.
Always asynchronous, unless callback is a current greenlet and the result is ready.
"""
current = getcurrent()
if callback is None or callback is current:
callback = GreenletLink(current)
if self.ready():
# special case : linking to current greenlet when link is ready
# raise LinkedExited immediatelly
callback(self)
return
elif not callable(callback):
if isinstance(callback, greenlet):
callback = GreenletLink(callback)
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
if not self.ready():
self._links.add(callback)
else:
callback(self)
callback = SpawnedLink(callback)
self.rawlink(callback)
def unlink(self, callback=None):
if callback is None:
callback = getcurrent()
self._links.discard(callback)
def link_value(self, callback=None):
if callback is None:
callback = SuccessLink(GreenletLink(getcurrent()))
elif not callable(callback):
if isinstance(callback, greenlet):
callback = SuccessLink(GreenletLink(callback))
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
else:
callback = SuccessLink(callback)
if not self.ready():
self._links.add(callback)
else:
callback(self)
def link_value(self, callback=None, GreenletLink=SuccessGreenletLink, SpawnedLink=SuccessSpawnedLink):
self.link(callback=callback, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink)
def link_exception(self, callback=None):
if callback is None:
callback = FailureLink(GreenletLink(getcurrent()))
elif not callable(callback):
if isinstance(callback, greenlet):
callback = FailureLink(GreenletLink(callback))
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
else:
callback = FailureLink(callback)
if not self.ready():
self._links.add(callback)
else:
callback(self)
def link_exception(self, callback=None, GreenletLink=FailureGreenletLink, SpawnedLink=FailureSpawnedLink):
self.link(callback=callback, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink)
def _notify_links(self):
try:
while self._links:
link = self._links.pop()
g = greenlet(link, get_hub())
try:
g.switch(self)
link(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %s of %r\n\n' % (getfuncname(link), self))
except:
pass
finally:
self._notifier = None
spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later
......@@ -260,89 +400,7 @@ def _switch_helper(function, args, kwargs):
return function(*args, **kwargs)
class GreenletLink(object):
__slots__ = ['greenlet']
def __init__(self, greenlet):
self.greenlet = greenlet
def __call__(self, source):
if source.successful():
error = getLinkedCompleted(source)
else:
error = LinkedFailed(source)
current = getcurrent()
greenlet = self.greenlet
if current is greenlet:
greenlet.throw(error)
elif current is get_hub():
try:
greenlet.throw(error)
except:
traceback.print_exc()
else:
kill(self.greenlet, error)
def __hash__(self):
return hash(self.greenlet)
def __eq__(self, other):
return self.greenlet == getattr(other, 'greenlet', other)
def __str__(self):
return str(self.greenlet)
def __repr__(self):
return repr(self.greenlet)
class SuccessLink(object):
__slots__ = ['callback']
def __init__(self, callback):
self.callback = callback
def __call__(self, source):
if source.successful():
self.callback(source)
def __hash__(self):
return hash(self.callback)
def __eq__(self, other):
return self.callback == getattr(other, 'callback', other)
def __str__(self):
return str(self.callback)
def __repr__(self):
return repr(self.callback)
class FailureLink(object):
__slots__ = ['callback']
def __init__(self, callback):
self.callback = callback
def __call__(self, source):
if not source.successful():
self.callback(source)
def __hash__(self):
return hash(self.callback)
def __eq__(self, other):
return self.callback == getattr(other, 'callback', other)
def __str__(self):
return str(self.callback)
def __repr__(self):
return repr(self.callback)
def joinall(greenlets, raise_error=False, timeout=None):
def joinall(greenlets, timeout=None, raise_error=False):
from gevent.queue import Queue
queue = Queue()
put = queue.put
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment