Commit 0c38a064 authored by Denis Bilenko's avatar Denis Bilenko

add features of Proc to the greenlet subclass returned by gevent.spawn()

- add a new Greenlet class that has features of Proc but a bit simpler and derives from greenlet
- its interface is more in line with Thread and Process from std lib: it has join() and get() instead of wait()
- gevent.spawn returns a Greenlet
- spawn_link, spawn_link_value and spawn_link_exception are available from gevent top level package
- so are joinall() and killall() methods
- greenlet.py split into hub.py, timeout.py, greenlet.py, rawgreenlet.py
- Hub derives from greenlet now
parent 2a520886
version_info = (0, 10, 0)
__version__ = '0.10.0'
__all__ = ['getcurrent',
'sleep',
__all__ = ['Greenlet',
'spawn',
'spawn_later',
'kill',
'spawn_link',
'spawn_link_value',
'spawn_link_exception',
'joinall',
'killall',
'Timeout',
'with_timeout',
'getcurrent',
'sleep',
'kill',
'signal',
'fork',
'shutdown',
'core',
'reinit']
from gevent.greenlet import *
from gevent import core
reinit = core.reinit
from gevent.greenlet import Greenlet, joinall, killall
spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later
spawn_link = Greenlet.spawn_link
spawn_link_value = Greenlet.spawn_link_value
spawn_link_exception = Greenlet.spawn_link_exception
from gevent.timeout import Timeout, with_timeout
from gevent.hub import getcurrent, sleep, kill, signal, fork, shutdown
from gevent.core import reinit
......@@ -41,6 +41,7 @@ except AttributeError:
class SocketConsole(Greenlet):
def __init__(self, desc, hostport, locals):
Greenlet.__init__(self)
self.hostport = hostport
self.locals = locals
# mangle the socket
......@@ -58,9 +59,7 @@ class SocketConsole(Greenlet):
self.old[key] = getattr(desc, key)
setattr(desc, key, value)
Greenlet.__init__(self)
def run(self):
def _run(self):
try:
console = InteractiveConsole(self.locals)
console.interact()
......
......@@ -26,7 +26,8 @@ import time
import traceback
from gevent.core import active_event
from gevent.greenlet import get_hub, spawn, getcurrent, sleep
from gevent.hub import get_hub, getcurrent, sleep
from gevent.rawgreenlet import spawn
class Cancelled(RuntimeError):
......@@ -439,7 +440,7 @@ class Channel(object):
def send(self, result=None, exc=None):
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
if getcurrent() is get_hub().greenlet:
if getcurrent() is get_hub():
self.items.append((result, exc))
if self._waiters and self._timer is None:
self._timer = active_event(self._do_switch)
......
import sys
import os
import traceback
from gevent import core
from gevent.hub import greenlet, getcurrent, get_hub, GreenletExit, Waiter, kill
from gevent.timeout import Timeout
__all__ = ['getcurrent',
'sleep',
__all__ = ['Greenlet',
'spawn',
'spawn_later',
'kill',
'killall',
'join',
'spawn_link',
'spawn_link_value',
'spawn_link_exception',
'joinall',
'Timeout',
'with_timeout',
'signal',
'fork',
'shutdown']
try:
from py.magic import greenlet
Greenlet = greenlet
except ImportError:
greenlet = __import__('greenlet')
Greenlet = greenlet.greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
MAIN = greenlet.getcurrent()
thread = __import__('thread')
threadlocal = thread._local
_threadlocal = threadlocal()
_threadlocal.Hub = None
_original_fork = os.fork
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. Calling sleep with *seconds* of 0 is the canonical way of
expressing a cooperative yield. For example, if one is looping over a
large list performing an expensive calculation without calling any socket
methods, it's a good idea to call ``sleep(0)`` occasionally; otherwise
nothing else will run.
"""
unique_mark = object()
t = core.timer(seconds, getcurrent().switch, unique_mark)
try:
switch_result = get_hub().switch()
assert switch_result is unique_mark, 'Invalid switch into sleep(): %r' % (switch_result, )
finally:
t.cancel()
'killall']
def _switch_helper(function, args, kwargs):
# work around the fact that greenlet.switch does not support keyword args
return function(*args, **kwargs)
class Greenlet(greenlet):
def __init__(self, run=None):
if run is not None:
self._run = run # subclasses should override _run() (not run())
greenlet.__init__(self, parent=get_hub())
self._links = set()
self.value = None
self._exception = _NONE
def spawn(function, *args, **kwargs):
if kwargs:
g = Greenlet(_switch_helper, get_hub().greenlet)
core.active_event(g.switch, function, args, kwargs)
return g
else:
g = Greenlet(function, get_hub().greenlet)
core.active_event(g.switch, *args)
return g
def ready(self):
return self.dead or self._exception is not _NONE
def successful(self):
return self._exception is None
def spawn_later(seconds, function, *args, **kwargs):
if kwargs:
g = Greenlet(_switch_helper, get_hub().greenlet)
core.timer(seconds, g.switch, function, args, kwargs)
return g
else:
g = Greenlet(function, get_hub().greenlet)
core.timer(seconds, g.switch, *args)
return g
def __repr__(self):
classname = self.__class__.__name__
try:
funcname = getfuncname(self.__dict__['_run'])
except Exception:
funcname = None
result = '<%s at %s' % (classname, hex(id(self)))
if funcname is not None:
result += ': %s' % funcname
def _kill(greenlet, exception, waiter):
try:
greenlet.throw(exception)
except:
traceback.print_exc()
waiter.switch()
return result + '>'
@property
def exception(self):
if self._exception is not _NONE:
return self._exception
def _schedule_run(self, *args):
return core.active_event(self.switch, *args)
def _schedule_run_later(self, seconds, *args):
return core.timer(seconds, self.switch, *args)
@classmethod
def spawn(cls, function, *args, **kwargs):
if kwargs:
g = cls(_switch_helper)
g._schedule_run(function, args, kwargs)
return g
else:
g = cls(function)
g._schedule_run(*args)
return g
@classmethod
def spawn_later(cls, seconds, function, *args, **kwargs):
if kwargs:
g = cls(_switch_helper)
g._schedule_run_later(seconds, function, args, kwargs)
return g
else:
g = cls(function)
g._schedule_run_later(seconds, *args)
return g
@classmethod
def spawn_link(cls, function, *args, **kwargs):
g = cls.spawn(function, *args, **kwargs)
g.link()
return g
def kill(greenlet, exception=GreenletExit, block=False, polling_period=0.2):
"""Kill greenlet with exception (GreenletExit by default).
Wait for it to die if block is true.
"""
if not greenlet.dead:
waiter = Waiter()
core.active_event(_kill, greenlet, exception, waiter)
if block:
waiter.wait()
join(greenlet, polling_period=polling_period)
@classmethod
def spawn_link_value(cls, function, *args, **kwargs):
g = cls.spawn(function, *args, **kwargs)
g.link_value()
return g
@classmethod
def spawn_link_exception(cls, function, *args, **kwargs):
g = cls.spawn(function, *args, **kwargs)
g.link_exception()
return g
def _killall(greenlets, exception, waiter):
diehards = []
for g in greenlets:
if not g.dead:
def kill(self, exception=GreenletExit, block=False, timeout=None):
if not self.dead:
waiter = Waiter()
core.active_event(_kill, self, exception, waiter)
if block:
waiter.wait()
self.join(timeout)
def get(self, block=True, timeout=None):
if self.ready():
if self.successful():
return self.value
else:
raise self._exception
if block:
switch = getcurrent().switch
self.link(switch)
try:
g.throw(exception)
t = Timeout(timeout)
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Greenlet.get(): %r' % (result, )
finally:
t.cancel()
except:
traceback.print_exc()
if not g.dead:
diehards.append(g)
waiter.switch(diehards)
self.unlink(switch)
raise
if self.ready():
if self.successful():
return self.value
else:
raise self._exception
else:
raise Timeout
def join(self, timeout=None):
if self.ready():
return
else:
switch = getcurrent().switch
self.link(switch)
try:
t = Timeout(timeout)
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Greenlet.join(): %r' % (result, )
finally:
t.cancel()
except:
self.unlink(switch)
raise
def killall(greenlets, exception=GreenletExit, block=False, polling_period=0.2):
"""Kill all the greenlets with exception (GreenletExit by default).
Wait for them to die if block is true.
"""
waiter = Waiter()
core.active_event(_killall, greenlets, exception, waiter)
if block:
alive = waiter.wait()
if alive:
joinall(alive, polling_period=polling_period)
def _report_result(self, result, args):
self._exception = None
self.value = result
if self._links:
core.active_event(self._notify_links)
def _report_error(self, exc_info, args):
try:
traceback.print_exception(*exc_info)
info = str(self)
finally:
self._exception = exc_info[1]
if self._links:
core.active_event(self._notify_links)
# put the printed traceback in context
if args:
info += ' (' + ', '.join(repr(x) for x in args) + ')'
info += ' failed with '
try:
info += self._exception.__class__.__name__
except:
info += str(self._exception) or repr(self._exception)
sys.stderr.write(info + '\n\n')
def join(greenlet, polling_period=0.2):
"""Wait for a greenlet to finish by polling its status"""
delay = 0.002
while not greenlet.dead:
delay = min(polling_period, delay*2)
sleep(delay)
def run(self, *args):
try:
result = self._run(*args)
except GreenletExit, ex:
result = ex
except:
self._report_error(sys.exc_info(), args)
return
self._report_result(result, args)
def link(self, callback=None):
if callback is None:
callback = GreenletLink(getcurrent())
elif not callable(callback):
if isinstance(callback, greenlet):
callback = GreenletLink(callback)
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
if not self.ready():
self._links.add(callback)
else:
callback(self)
def unlink(self, callback=None):
if callback is None:
callback = getcurrent()
self._links.discard(callback)
def link_value(self, callback=None):
if callback is None:
callback = SuccessLink(GreenletLink(getcurrent()))
elif not callable(callback):
if isinstance(callback, greenlet):
callback = SuccessLink(GreenletLink(callback))
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
else:
callback = SuccessLink(callback)
if not self.ready():
self._links.add(callback)
else:
callback(self)
def link_exception(self, callback=None):
if callback is None:
callback = FailureLink(GreenletLink(getcurrent()))
elif not callable(callback):
if isinstance(callback, greenlet):
callback = FailureLink(GreenletLink(callback))
else:
raise TypeError('Expected callable or greenlet: %r' % (callback, ))
else:
callback = FailureLink(callback)
if not self.ready():
self._links.add(callback)
else:
callback(self)
def _notify_links(self):
while self._links:
link = self._links.pop()
g = greenlet(link, get_hub())
try:
g.switch(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %s of %r\n\n' % (getfuncname(link), self))
except:
pass
def joinall(greenlets, polling_period=0.2):
"""Wait for the greenlets to finish by polling their status"""
current = 0
while current < len(greenlets) and greenlets[current].dead:
current += 1
delay = 0.002
while current < len(greenlets):
delay = min(polling_period, delay*2)
sleep(delay)
while current < len(greenlets) and greenlets[current].dead:
current += 1
spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later
spawn_link = Greenlet.spawn_link
spawn_link_value = Greenlet.spawn_link_value
spawn_link_exception = Greenlet.spawn_link_exception
try:
BaseException
except NameError: # Python < 2.5
class BaseException:
# not subclassing from object() intentionally, because in
# that case "raise Timeout" fails with TypeError.
pass
def _kill(greenlet, exception, waiter):
greenlet.throw(exception)
waiter.switch()
class Timeout(BaseException):
"""Raise an exception in the current greenlet after timeout.
def _switch_helper(function, args, kwargs):
# work around the fact that greenlet.switch does not support keyword args
return function(*args, **kwargs)
timeout = Timeout(seconds[, exception])
try:
... code block ...
finally:
timeout.cancel()
Assuming code block is yielding (i.e. gives up control to the hub),
an exception will be raised if code block has been running for more
than `seconds` seconds. By default (or when exception is None), the
Timeout instance itself is raised. If exception is provided, then it
is raised instead.
class GreenletLink(object):
__slots__ = ['greenlet']
For Python starting with 2.5 'with' statement can be used:
def __init__(self, greenlet):
self.greenlet = greenlet
with Timeout(seconds[, exception]) as timeout:
... code block ...
def __call__(self, source):
if source.successful():
error = getLinkedCompleted(source)
else:
error = LinkedFailed(source)
current = getcurrent()
greenlet = self.greenlet
if current is greenlet:
greenlet.throw(error)
elif current is get_hub():
try:
greenlet.throw(error)
except:
traceback.print_exc()
else:
kill(self.greenlet, error)
This is equivalent to try/finally block above with one additional feature:
if exception is False, the timeout is still raised, but context manager
suppresses it, so surrounding code won't see it.
def __hash__(self):
return hash(self.greenlet)
This is handy for adding a timeout feature to the functions that don't
implement it themselves:
def __eq__(self, other):
return self.greenlet == getattr(other, 'greenlet', other)
data = None
with Timeout(5, False):
data = mysock.makefile().readline()
if data is None:
# 5 seconds passed without reading a line
else:
# a line was read within 5 seconds
def __str__(self):
return str(self.greenlet)
Note that, if readline() catches BaseException (or everything with 'except:'),
then your timeout is screwed.
def __repr__(self):
return repr(self.greenlet)
When catching timeouts, keep in mind that the one you catch maybe not the
one you have set; if you going to silent a timeout, always check that it's
the one you need:
timeout = Timeout(1)
try:
...
except Timeout, t:
if t is not timeout:
raise # not my timeout
"""
def __init__(self, seconds=None, exception=None):
if seconds is None: # "fake" timeout (never expires)
self.exception = None
self.timer = None
elif exception is None or exception is False: # timeout that raises self
self.exception = exception
self.timer = core.timer(seconds, getcurrent().throw, self)
else: # regular timeout with user-provided exception
self.exception = exception
self.timer = core.timer(seconds, getcurrent().throw, exception)
class SuccessLink(object):
__slots__ = ['callback']
@property
def pending(self):
if self.timer is not None:
return self.timer.pending
else:
return False
def __init__(self, callback):
self.callback = callback
def cancel(self):
if self.timer is not None:
self.timer.cancel()
def __call__(self, source):
if source.successful():
self.callback(source)
def __repr__(self):
try:
classname = self.__class__.__name__
except AttributeError: # Python < 2.5
classname = 'Timeout'
if self.exception is None:
return '<%s at %s timer=%s>' % (classname, hex(id(self)), self.timer)
else:
return '<%s at %s timer=%s exception=%s>' % (classname, hex(id(self)), self.timer, self.exception)
def __hash__(self):
return hash(self.callback)
def __eq__(self, other):
return self.callback == getattr(other, 'callback', other)
def __str__(self):
"""
>>> raise Timeout
Traceback (most recent call last):
...
Timeout
"""
if self.exception is None:
return ''
elif self.exception is False:
return '(silent)'
else:
return str(self.exception)
return str(self.callback)
def __enter__(self):
return self
def __repr__(self):
return repr(self.callback)
def __exit__(self, typ, value, tb):
self.cancel()
if value is self and self.exception is False:
return True
class FailureLink(object):
__slots__ = ['callback']
# use this? less prone to errors (what if func has timeout_value argument or func is with_timeout itself?)
# def with_timeout(seconds, func[, args[, kwds[, timeout_value]]]):
# see what other similar standard library functions accept as params (start_new_thread, start new process)
def __init__(self, callback):
self.callback = callback
class _NONE(object):
__slots__ = []
def __call__(self, source):
if not source.successful():
self.callback(source)
_NONE = _NONE()
def __hash__(self):
return hash(self.callback)
def __eq__(self, other):
return self.callback == getattr(other, 'callback', other)
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
value.
def __str__(self):
return str(self.callback)
seconds
(int or float) seconds before timeout occurs
func
the callable to execute with a timeout; must be one of the functions
that implicitly or explicitly yields
\*args, \*\*kwds
(positional, keyword) arguments to pass to *func*
timeout_value=
value to return if timeout occurs (default raise ``Timeout``)
def __repr__(self):
return repr(self.callback)
**Returns**:
Value returned by *func* if *func* returns before *seconds*, else
*timeout_value* if provided, else raise ``Timeout``
def joinall(greenlets, raise_error=False, timeout=None):
from gevent.queue import Queue
queue = Queue()
put = queue.put
try:
for greenlet in greenlets:
greenlet.link(put)
for _ in xrange(len(greenlets)):
greenlet = queue.get()
if raise_error and not greenlet.successful():
getcurrent().throw(greenlet.exception)
except:
for greenlet in greenlets:
greenlet.unlink(put)
raise
**Raises**:
Any exception raised by *func*, and ``Timeout`` if *func* times out
and no ``timeout_value`` has been provided.
def _killall3(greenlets, exception, waiter):
diehards = []
for g in greenlets:
if not g.dead:
try:
g.throw(exception)
except:
traceback.print_exc()
if not g.dead:
diehards.append(g)
waiter.switch(diehards)
**Example**::
data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="")
def _killall(greenlets, exception):
for g in greenlets:
if not g.dead:
try:
g.throw(exception)
except:
traceback.print_exc()
Here *data* is either the result of the ``get()`` call, or the empty string if
it took too long to return. Any exception raised by the ``get()`` call is
passed through to the caller.
"""
# Recognize a specific keyword argument, while also allowing pass-through
# of any other keyword arguments accepted by func. Use pop() so we don't
# pass timeout_value through to func().
timeout_value = kwds.pop("timeout_value", _NONE)
timeout = Timeout(seconds)
try:
try:
return func(*args, **kwds)
except Timeout, t:
if t is timeout and timeout_value is not _NONE:
return timeout_value
raise
finally:
timeout.cancel()
def killall(greenlets, exception=GreenletExit, block=False, timeout=None):
if block:
waiter = Waiter()
core.active_event(_killall3, greenlets, exception, waiter)
if block:
t = Timeout(timeout)
# t.start()
try:
alive = waiter.wait()
if alive:
joinall(alive, raise_error=False)
finally:
t.cancel()
else:
core.active_event(_killall, greenlets, exception)
def signal(signalnum, handler, *args, **kwargs):
def deliver_exception_to_MAIN():
try:
handler(*args, **kwargs)
except:
MAIN.throw(*sys.exc_info())
return core.signal(signalnum, deliver_exception_to_MAIN)
class LinkedExited(Exception):
pass
def fork():
result = _original_fork()
core.reinit()
return result
class LinkedCompleted(LinkedExited):
"""Raised when a linked greenlet finishes the execution cleanly"""
def shutdown():
"""Cancel our CTRL-C handler and wait for core.dispatch() to return."""
global _threadlocal
hub = _threadlocal.__dict__.get('hub')
if hub is not None and not hub.greenlet.dead:
hub.shutdown()
msg = "%r completed successfully"
def __init__(self, source):
assert source.ready(), source
assert source.successful(), source
LinkedExited.__init__(self, self.msg % source)
class Waiter(object):
"""A low level synchronization class.
Wrapper around switch() and throw() calls that makes them safe:
a) switching will occur only if the waiting greenlet is executing wait()
method currently. Otherwise, switch() and throw() are no-ops.
b) any error raised in the greenlet is handled inside switch() and throw()
class LinkedKilled(LinkedCompleted):
"""Raised when a linked greenlet returns GreenletExit instance"""
switch and throw methods must only be called from the mainloop greenlet.
wait must be called from a greenlet other than mainloop.
"""
__slots__ = ['greenlet']
msg = "%r returned %s"
def __init__(self):
self.greenlet = None
def __init__(self, source):
try:
result = source.value.__class__.__name__
except:
result = str(source) or repr(source)
LinkedExited.__init__(self, self.msg % (source, result))
def __repr__(self):
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s at %s%s greenlet=%r>' % (type(self).__name__, hex(id(self)), waiting, self.greenlet)
def __str__(self):
"""
>>> print Waiter()
<Waiter greenlet=None>
"""
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
def getLinkedCompleted(source):
if isinstance(source.value, GreenletExit):
return LinkedKilled(source)
else:
return LinkedCompleted(source)
def __nonzero__(self):
return self.greenlet is not None
@property
def waiting(self):
return self.greenlet is not None
def switch(self, value=None):
"""Wake up the greenlet that is calling wait() currently (if there is one).
Can only be called from Hub's greenlet.
"""
assert greenlet.getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.switch(value)
except:
traceback.print_exc()
class LinkedFailed(LinkedExited):
"""Raised when a linked greenlet dies because of unhandled exception"""
def throw(self, *throw_args):
"""Make greenlet calling wait() wake up (if there is a wait()).
Can only be called from Hub's greenlet.
"""
assert greenlet.getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.throw(*throw_args)
except:
traceback.print_exc()
msg = "%r failed with %s"
def wait(self):
"""Wait until switch() or throw() is called.
"""
assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
self.greenlet = greenlet.getcurrent()
def __init__(self, source):
try:
return get_hub().switch()
finally:
self.greenlet = None
excname = source.exception.__name__
except:
excname = str(source) or repr(source)
LinkedExited.__init__(self, self.msg % (source, excname))
def get_hub():
global _threadlocal
try:
return _threadlocal.hub
except AttributeError:
def getfuncname(func):
if not hasattr(func, 'im_self'):
try:
hubtype = _threadlocal.Hub
funcname = func.__name__
except AttributeError:
# do not pretend to support multiple threads because it's not implemented properly by core.pyx
# this may change in the future, although currently I don't have a strong need for this
raise NotImplementedError('gevent is only usable from a single thread')
if hubtype is None:
hubtype = Hub
hub = _threadlocal.hub = hubtype()
return hub
class Hub(object):
def __init__(self):
self.greenlet = Greenlet(self.run)
self.keyboard_interrupt_signal = None
@property
def dead(self):
return self.greenlet.dead
def switch(self):
cur = getcurrent()
assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
switch_out = getattr(cur, 'switch_out', None)
if switch_out is not None:
try:
switch_out()
except:
traceback.print_exc()
return self.greenlet.switch()
def run(self):
global _threadlocal
assert self.greenlet is getcurrent(), 'Do not call run() directly'
self.keyboard_interrupt_signal = signal(2, MAIN.throw, KeyboardInterrupt)
try:
loop_count = 0
while True:
try:
result = core.dispatch()
except IOError, ex:
loop_count += 1
if loop_count > 15:
raise
sys.stderr.write('Restarting gevent.core.dispatch() after an error [%s]: %s\n' % (loop_count, ex))
continue
raise DispatchExit(result)
finally:
if self.keyboard_interrupt_signal is not None:
self.keyboard_interrupt_signal.cancel()
if _threadlocal.__dict__.get('hub') is self:
_threadlocal.__dict__.pop('hub')
def shutdown(self):
assert getcurrent() is MAIN, "Shutting down is only possible from MAIN greenlet"
if self.keyboard_interrupt_signal is not None:
self.keyboard_interrupt_signal.cancel()
self.keyboard_interrupt_signal = None
try:
get_hub().switch()
except DispatchExit, ex:
if ex.code == 1:
return
raise
pass
else:
if funcname != '<lambda>':
return funcname
return repr(func)
class DispatchExit(Exception):
def __init__(self, code):
self.code = code
Exception.__init__(self, code)
_NONE = Exception("Greenlet didn't even start")
import sys
import os
import traceback
from gevent import core
__all__ = ['getcurrent',
'sleep',
'kill',
'signal',
'fork',
'shutdown']
try:
from py.magic import greenlet
except ImportError:
greenlet = __import__('greenlet').greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
MAIN = greenlet.getcurrent()
thread = __import__('thread')
threadlocal = thread._local
_threadlocal = threadlocal()
_threadlocal.Hub = None
_original_fork = os.fork
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. Calling sleep with *seconds* of 0 is the canonical way of
expressing a cooperative yield. For example, if one is looping over a
large list performing an expensive calculation without calling any socket
methods, it's a good idea to call ``sleep(0)`` occasionally; otherwise
nothing else will run.
"""
unique_mark = object()
t = core.timer(seconds, getcurrent().switch, unique_mark)
try:
switch_result = get_hub().switch()
assert switch_result is unique_mark, 'Invalid switch into sleep(): %r' % (switch_result, )
finally:
t.cancel()
def kill(greenlet, exception=GreenletExit):
"""Kill greenlet asynchronously. The current greenlet is not unscheduled.
Note, that Greenlet's kill() method does the same and more. However, MAIN
greenlet does not have kill() method so you have to use this function.
"""
if not greenlet.dead:
core.active_event(greenlet.throw, exception)
def signal(signalnum, handler, *args, **kwargs):
def deliver_exception_to_MAIN():
try:
handler(*args, **kwargs)
except:
MAIN.throw(*sys.exc_info())
return core.signal(signalnum, deliver_exception_to_MAIN)
def fork():
result = _original_fork()
core.reinit()
return result
def shutdown():
"""Cancel our CTRL-C handler and wait for core.dispatch() to return."""
global _threadlocal
hub = _threadlocal.__dict__.get('hub')
if hub is not None and not hub.dead:
hub.shutdown()
def get_hub():
global _threadlocal
try:
return _threadlocal.hub
except AttributeError:
try:
hubtype = _threadlocal.Hub
except AttributeError:
# do not pretend to support multiple threads because it's not implemented properly by core.pyx
# this may change in the future, although currently I don't have a strong need for this
raise NotImplementedError('gevent is only usable from a single thread')
if hubtype is None:
hubtype = Hub
hub = _threadlocal.hub = hubtype()
return hub
class Hub(greenlet):
def __init__(self):
self.keyboard_interrupt_signal = None
def switch(self):
cur = getcurrent()
assert cur is not self, 'Cannot switch to MAINLOOP from MAINLOOP'
switch_out = getattr(cur, 'switch_out', None)
if switch_out is not None:
try:
switch_out()
except:
traceback.print_exc()
return greenlet.switch(self)
def run(self):
global _threadlocal
assert self is getcurrent(), 'Do not call run() directly'
self.keyboard_interrupt_signal = signal(2, MAIN.throw, KeyboardInterrupt)
try:
loop_count = 0
while True:
try:
result = core.dispatch()
except IOError, ex:
loop_count += 1
if loop_count > 15:
raise
sys.stderr.write('Restarting gevent.core.dispatch() after an error [%s]: %s\n' % (loop_count, ex))
continue
raise DispatchExit(result)
finally:
if self.keyboard_interrupt_signal is not None:
self.keyboard_interrupt_signal.cancel()
if _threadlocal.__dict__.get('hub') is self:
_threadlocal.__dict__.pop('hub')
def shutdown(self):
assert getcurrent() is MAIN, "Shutting down is only possible from MAIN greenlet"
if self.keyboard_interrupt_signal is not None:
self.keyboard_interrupt_signal.cancel()
self.keyboard_interrupt_signal = None
try:
self.switch()
except DispatchExit, ex:
if ex.code == 1:
return
raise
class DispatchExit(Exception):
def __init__(self, code):
self.code = code
Exception.__init__(self, code)
class Waiter(object):
"""A low level synchronization class.
Wrapper around switch() and throw() calls that makes them safe:
a) switching will occur only if the waiting greenlet is executing wait()
method currently. Otherwise, switch() and throw() are no-ops.
b) any error raised in the greenlet is handled inside switch() and throw()
switch and throw methods must only be called from the mainloop greenlet.
wait must be called from a greenlet other than mainloop.
"""
__slots__ = ['greenlet']
def __init__(self):
self.greenlet = None
def __repr__(self):
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s at %s%s greenlet=%r>' % (type(self).__name__, hex(id(self)), waiting, self.greenlet)
def __str__(self):
"""
>>> print Waiter()
<Waiter greenlet=None>
"""
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
def __nonzero__(self):
return self.greenlet is not None
@property
def waiting(self):
return self.greenlet is not None
def switch(self, value=None):
"""Wake up the greenlet that is calling wait() currently (if there is one).
Can only be called from Hub's greenlet.
"""
assert greenlet.getcurrent() is get_hub(), "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.switch(value)
except:
traceback.print_exc()
def throw(self, *throw_args):
"""Make greenlet calling wait() wake up (if there is a wait()).
Can only be called from Hub's greenlet.
"""
assert greenlet.getcurrent() is get_hub(), "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.throw(*throw_args)
except:
traceback.print_exc()
# QQQ should be renamed to get() ? and the whole class is called Receiver?
def wait(self):
"""Wait until switch() or throw() is called.
"""
assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
self.greenlet = greenlet.getcurrent()
try:
return get_hub().switch()
finally:
self.greenlet = None
import sys
def patch_os():
from gevent.greenlet import fork
from gevent.hub import fork
import os
os.fork = fork
def patch_time():
from gevent.greenlet import sleep
from gevent.hub import sleep
_time = __import__('time')
_time.sleep = sleep
......
......@@ -67,7 +67,7 @@ fails then there's no way to complete the task so the parent must fail as well;
>>> p = spawn(demofunc, 1, 0)
>>> _ = p.link_exception()
>>> try:
... greenlet.sleep(1)
... hub.sleep(1)
... except LinkedFailed:
... print 'LinkedFailed'
LinkedFailed
......@@ -76,7 +76,8 @@ One application of linking is `waitall' function: link to a bunch of coroutines
and wait for all them to complete. Such function is provided by this module.
"""
import sys
from gevent import coros, greenlet, core
from gevent import coros, hub, core, rawgreenlet
from gevent.timeout import Timeout
from gevent.queue import Queue
import traceback
......@@ -124,12 +125,12 @@ class LinkedKilled(LinkedFailed):
msg = """%r was killed with %s"""
def getLinkedFailed(name, typ, value=None, tb=None):
if issubclass(typ, greenlet.GreenletExit):
if issubclass(typ, hub.GreenletExit):
return LinkedKilled(name, typ, value, tb)
return LinkedFailed(name, typ, value, tb)
class ProcExit(greenlet.GreenletExit):
class ProcExit(hub.GreenletExit):
"""Raised when this proc is killed."""
......@@ -183,7 +184,7 @@ def joinall(sources, trap_errors=True, queue=None):
for _ in xrange(len(sources)):
completed = queue.get()
if not trap_errors and completed.has_exception():
greenlet.getcurrent().throw(*completed.exc_info())
hub.getcurrent().throw(*completed.exc_info())
finally:
for link in links:
link.cancel()
......@@ -195,15 +196,15 @@ def waitall(sources, trap_errors=False, queue=None):
def killall(sources, exception=ProcExit, block=False, polling_period=0.2):
waiter = greenlet.Waiter()
core.active_event(greenlet._killall, sources, exception, waiter)
waiter = hub.Waiter()
core.active_event(rawgreenlet._killall, sources, exception, waiter)
if block:
alive = waiter.wait()
if alive:
try:
joinall(alive, trap_errors=True)
except TypeError:
greenlet._joinall(alive, polling_period=polling_period)
rawgreenlet._joinall(alive, polling_period=polling_period)
# QQQ a) use links for all the greenlets we can and poll the others
# QQQ b) have only one unversal version of killall, waitall, joinall etc
# QQQ the current dichotomy of greenlets and procs is confusing
......@@ -233,8 +234,8 @@ def spawn_greenlet(function, *args):
"""
import warnings
warnings.warn("gevent.proc.spawn_greenlet is deprecated; use gevent.spawn", DeprecationWarning, stacklevel=2)
g = greenlet.Greenlet(function)
g.parent = greenlet.get_hub().greenlet
g = hub.greenlet(function)
g.parent = hub.get_hub()
core.active_event(g.switch, *args)
return g
......@@ -331,10 +332,10 @@ class Source(object):
if self.ready() and self._exc is not None:
return
if listener is None:
listener = greenlet.getcurrent()
listener = hub.getcurrent()
if link is None:
link = self.getLink(listener)
if self.ready() and listener is greenlet.getcurrent():
if self.ready() and listener is hub.getcurrent():
link(self)
else:
self._value_links[listener] = link
......@@ -346,10 +347,10 @@ class Source(object):
if self.value is not _NOT_USED and self._exc is None:
return
if listener is None:
listener = greenlet.getcurrent()
listener = hub.getcurrent()
if link is None:
link = self.getLink(listener)
if self.ready() and listener is greenlet.getcurrent():
if self.ready() and listener is hub.getcurrent():
link(self)
else:
self._exception_links[listener] = link
......@@ -359,10 +360,10 @@ class Source(object):
def link(self, listener=None, link=None):
if listener is None:
listener = greenlet.getcurrent()
listener = hub.getcurrent()
if link is None:
link = self.getLink(listener)
if self.ready() and listener is greenlet.getcurrent():
if self.ready() and listener is hub.getcurrent():
if self._exc is None:
link(self)
else:
......@@ -379,7 +380,7 @@ class Source(object):
def unlink(self, listener=None):
if listener is None:
listener = greenlet.getcurrent()
listener = hub.getcurrent()
self._value_links.pop(listener, None)
self._exception_links.pop(listener, None)
......@@ -441,7 +442,7 @@ class Source(object):
if self._exc is None:
return self.value
else:
greenlet.getcurrent().throw(*self._exc)
hub.getcurrent().throw(*self._exc)
elif timeout is None:
waiter = Waiter()
self.link(waiter)
......@@ -453,11 +454,11 @@ class Source(object):
if exception is False:
return
if exception is None:
exception = greenlet.Timeout()
exception = Timeout()
raise exception
else:
# what follows is:
# with greenlet.Timeout(timeout, *throw_args):
# with Timeout(timeout, *throw_args):
# waiter = Waiter()
# self.link(waiter)
# try:
......@@ -465,7 +466,7 @@ class Source(object):
# finally:
# self.unlink(waiter)
# however, with statement is hand decompiled to make it 2.4 compatible
timer = greenlet.Timeout(timeout, exception)
timer = Timeout(timeout, exception)
EXC = True
try:
try:
......@@ -495,15 +496,15 @@ class Waiter(object):
"""Wake up the greenlet that is calling wait() currently (if there is one).
Can only be called from get_hub().greenlet.
"""
assert greenlet.getcurrent() is greenlet.get_hub().greenlet
assert hub.getcurrent() is hub.get_hub()
if self.greenlet is not None:
self.greenlet.switch(value)
def send_exception(self, *throw_args):
"""Make greenlet calling wait() wake up (if there is a wait()).
Can only be called from greenlet.get_hub().greenlet.
Can only be called from greenlet.get_hub().
"""
assert greenlet.getcurrent() is greenlet.get_hub().greenlet
assert hub.getcurrent() is hub.get_hub()
if self.greenlet is not None:
self.greenlet.throw(*throw_args)
......@@ -512,11 +513,11 @@ class Waiter(object):
into send() or raise exception passed into send_exception().
"""
assert self.greenlet is None
current = greenlet.getcurrent()
assert current is not greenlet.get_hub().greenlet
current = hub.getcurrent()
assert current is not hub.get_hub()
self.greenlet = current
try:
return greenlet.get_hub().switch()
return hub.get_hub().switch()
finally:
self.greenlet = None
......@@ -570,7 +571,7 @@ class Proc(Source):
assert self.greenlet is None, "'run' can only be called once per instance"
if self.name is None:
self.name = str(function)
self.greenlet = greenlet.spawn(self._run, function, args, kwargs)
self.greenlet = rawgreenlet.spawn(self._run, function, args, kwargs)
def _run(self, function, args, kwargs):
"""Internal top level function.
......@@ -699,7 +700,7 @@ class ProcSet(object):
return len(self.procs) + len(self.dying)
def __contains__(self, item):
if isinstance(item, greenlet.Greenlet):
if isinstance(item, hub.greenlet):
# special case for "getcurrent() in running_proc_set" to work
for x in self.procs:
if x.greenlet == item:
......@@ -798,7 +799,7 @@ class Pool(object):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.sem.locked() and greenlet.getcurrent() in self.procs:
if self.sem.locked() and hub.getcurrent() in self.procs:
p = spawn(func, *args, **kwargs)
try:
p.wait()
......@@ -813,7 +814,7 @@ class Pool(object):
def execute_async(self, func, *args, **kwargs):
if self.sem.locked():
return greenlet.spawn(self.execute, func, *args, **kwargs)
return rawgreenlet.spawn(self.execute, func, *args, **kwargs)
else:
return self.execute(func, *args, **kwargs)
......
......@@ -3,7 +3,8 @@ import heapq
import collections
from Queue import Full, Empty
from gevent.greenlet import Timeout, Waiter, get_hub, getcurrent, _NONE
from gevent.timeout import Timeout, _NONE
from gevent.hub import get_hub, Waiter, getcurrent
from gevent import core
......@@ -90,7 +91,7 @@ class Queue(object):
self._put(item)
if self.getters:
self._schedule_unlock()
elif not block and get_hub().greenlet is getcurrent():
elif not block and get_hub() is getcurrent():
# we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
# find a getter and deliver an item to it
while self.getters:
......@@ -141,7 +142,7 @@ class Queue(object):
if self.putters:
self._schedule_unlock()
return self._get()
elif not block and get_hub().greenlet is getcurrent():
elif not block and get_hub() is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
......
"""A few utilities for raw greenlets"""
import traceback
from gevent import core
from gevent.hub import greenlet, get_hub, GreenletExit, Waiter, sleep
__all__ = ['spawn',
'spawn_later',
'kill',
'killall',
'join',
'joinall']
def _switch_helper(function, args, kwargs):
# work around the fact that greenlet.switch does not support keyword args
return function(*args, **kwargs)
def spawn(function, *args, **kwargs):
if kwargs:
g = greenlet(_switch_helper, get_hub())
core.active_event(g.switch, function, args, kwargs)
return g
else:
g = greenlet(function, get_hub())
core.active_event(g.switch, *args)
return g
def spawn_later(seconds, function, *args, **kwargs):
if kwargs:
g = greenlet(_switch_helper, get_hub())
core.timer(seconds, g.switch, function, args, kwargs)
return g
else:
g = greenlet(function, get_hub())
core.timer(seconds, g.switch, *args)
return g
def _kill(greenlet, exception, waiter):
try:
greenlet.throw(exception)
except:
traceback.print_exc()
waiter.switch()
def kill(greenlet, exception=GreenletExit, block=False, polling_period=0.2):
"""Kill greenlet with exception (GreenletExit by default).
Wait for it to die if block is true.
"""
if not greenlet.dead:
waiter = Waiter()
core.active_event(_kill, greenlet, exception, waiter)
if block:
waiter.wait()
join(greenlet, polling_period=polling_period)
def _killall(greenlets, exception, waiter):
diehards = []
for g in greenlets:
if not g.dead:
try:
g.throw(exception)
except:
traceback.print_exc()
if not g.dead:
diehards.append(g)
waiter.switch(diehards)
def killall(greenlets, exception=GreenletExit, block=False, polling_period=0.2):
"""Kill all the greenlets with exception (GreenletExit by default).
Wait for them to die if block is true.
"""
waiter = Waiter()
core.active_event(_killall, greenlets, exception, waiter)
if block:
alive = waiter.wait()
if alive:
joinall(alive, polling_period=polling_period)
def join(greenlet, polling_period=0.2):
"""Wait for a greenlet to finish by polling its status"""
delay = 0.002
while not greenlet.dead:
delay = min(polling_period, delay*2)
sleep(delay)
def joinall(greenlets, polling_period=0.2):
"""Wait for the greenlets to finish by polling their status"""
current = 0
while current < len(greenlets) and greenlets[current].dead:
current += 1
delay = 0.002
while current < len(greenlets):
delay = min(polling_period, delay*2)
sleep(delay)
while current < len(greenlets) and greenlets[current].dead:
current += 1
from gevent import core, greenlet
from gevent import core
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
def get_fileno(obj):
try:
......@@ -12,10 +14,10 @@ def get_fileno(obj):
def select(read_list, write_list, error_list, timeout=None):
hub = greenlet.get_hub()
hub = get_hub()
timeout = None
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
current = getcurrent()
assert hub is not current, 'do not call blocking functions from the mainloop'
allevents = []
def callback(ev, evtype):
......@@ -31,11 +33,11 @@ def select(read_list, write_list, error_list, timeout=None):
for w in write_list:
allevents.append(core.write(get_fileno(r), callback, arg=w))
timeout = greenlet.Timeout(timeout)
timeout = Timeout(timeout)
try:
try:
result = hub.switch()
except greenlet.Timeout, t:
except Timeout, t:
if t is not timeout:
raise
return [], [], []
......
......@@ -35,7 +35,8 @@ import sys
import errno
import time
from gevent.greenlet import spawn, getcurrent, get_hub
from gevent.hub import getcurrent, get_hub
from gevent.rawgreenlet import spawn
from gevent import core
BUFFER_SIZE = 4096
......
"""implements standard module 'thread' with greenlets"""
__thread = __import__('thread')
from gevent.greenlet import spawn, getcurrent, GreenletExit
from gevent.hub import getcurrent, GreenletExit
from gevent.rawgreenlet import spawn
from gevent.coros import Semaphore as LockType
def get_ident(gr=None):
......
from gevent import core
from gevent.hub import getcurrent
__all__ = ['Timeout',
'with_timeout']
try:
BaseException
except NameError: # Python < 2.5
class BaseException:
# not subclassing from object() intentionally, because in
# that case "raise Timeout" fails with TypeError.
pass
class Timeout(BaseException):
"""Raise an exception in the current greenlet after timeout.
timeout = Timeout(seconds[, exception])
try:
... code block ...
finally:
timeout.cancel()
Assuming code block is yielding (i.e. gives up control to the hub),
an exception will be raised if code block has been running for more
than `seconds` seconds. By default (or when exception is None), the
Timeout instance itself is raised. If exception is provided, then it
is raised instead.
For Python starting with 2.5 'with' statement can be used:
with Timeout(seconds[, exception]) as timeout:
... code block ...
This is equivalent to try/finally block above with one additional feature:
if exception is False, the timeout is still raised, but context manager
suppresses it, so surrounding code won't see it.
This is handy for adding a timeout feature to the functions that don't
implement it themselves:
data = None
with Timeout(5, False):
data = mysock.makefile().readline()
if data is None:
# 5 seconds passed without reading a line
else:
# a line was read within 5 seconds
Note that, if readline() catches BaseException (or everything with 'except:'),
then your timeout is screwed.
When catching timeouts, keep in mind that the one you catch maybe not the
one you have set; if you going to silent a timeout, always check that it's
the one you need:
timeout = Timeout(1)
try:
...
except Timeout, t:
if t is not timeout:
raise # not my timeout
"""
def __init__(self, seconds=None, exception=None):
if seconds is None: # "fake" timeout (never expires)
self.exception = None
self.timer = None
elif exception is None or exception is False: # timeout that raises self
self.exception = exception
self.timer = core.timer(seconds, getcurrent().throw, self)
else: # regular timeout with user-provided exception
self.exception = exception
self.timer = core.timer(seconds, getcurrent().throw, exception)
@property
def pending(self):
if self.timer is not None:
return self.timer.pending
else:
return False
def cancel(self):
if self.timer is not None:
self.timer.cancel()
def __repr__(self):
try:
classname = self.__class__.__name__
except AttributeError: # Python < 2.5
classname = 'Timeout'
if self.exception is None:
return '<%s at %s timer=%s>' % (classname, hex(id(self)), self.timer)
else:
return '<%s at %s timer=%s exception=%s>' % (classname, hex(id(self)), self.timer, self.exception)
def __str__(self):
"""
>>> raise Timeout
Traceback (most recent call last):
...
Timeout
"""
if self.exception is None:
return ''
elif self.exception is False:
return '(silent)'
else:
return str(self.exception)
def __enter__(self):
return self
def __exit__(self, typ, value, tb):
self.cancel()
if value is self and self.exception is False:
return True
# how about returning Timeout instance ?
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
value.
seconds
(int or float) seconds before timeout occurs
func
the callable to execute with a timeout; must be one of the functions
that implicitly or explicitly yields
\*args, \*\*kwds
(positional, keyword) arguments to pass to *func*
timeout_value=
value to return if timeout occurs (default raise ``Timeout``)
**Returns**:
Value returned by *func* if *func* returns before *seconds*, else
*timeout_value* if provided, else raise ``Timeout``
**Raises**:
Any exception raised by *func*, and ``Timeout`` if *func* times out
and no ``timeout_value`` has been provided.
**Example**::
data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="")
Here *data* is either the result of the ``get()`` call, or the empty string if
it took too long to return. Any exception raised by the ``get()`` call is
passed through to the caller.
"""
# Recognize a specific keyword argument, while also allowing pass-through
# of any other keyword arguments accepted by func. Use pop() so we don't
# pass timeout_value through to func().
timeout_value = kwds.pop("timeout_value", _NONE)
timeout = Timeout(seconds)
try:
try:
return func(*args, **kwds)
except Timeout, t:
if t is timeout and timeout_value is not _NONE:
return timeout_value
raise
finally:
timeout.cancel()
_NONE = object()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment