Commit b5672607 authored by Jason Madden's avatar Jason Madden

Let ThreadPool.apply raise exceptions, like Group/Pool.apply and the builtin apply.

It preserves the tracebacks and obsoletes the ``apply_e`` function.

Add test cases for this behaviour.

Fixes #556.
parent 87f40093
...@@ -12,6 +12,11 @@ Unreleased ...@@ -12,6 +12,11 @@ Unreleased
- (Experimental) Exceptions raised from iterating using the - (Experimental) Exceptions raised from iterating using the
``ThreadPool`` or ``Group`` mapping/application functions should now ``ThreadPool`` or ``Group`` mapping/application functions should now
have the original traceback. have the original traceback.
- ``gevent.threadpool.ThreadPool.apply`` now raises any exception
raised by the called function, the same as
``gevent.pool.Group``/``Pool`` and the builtin ``apply`` function.
This obsoletes the undocumented ``apply_e`` function. Original PR
#556 by Robert Estelle.
1.1a1 (Jun 29, 2015) 1.1a1 (Jun 29, 2015)
......
...@@ -2,8 +2,9 @@ ...@@ -2,8 +2,9 @@
"""Basic synchronization primitives: Event and AsyncResult""" """Basic synchronization primitives: Event and AsyncResult"""
import sys import sys
from gevent.hub import get_hub, getcurrent, _NONE, PY3 from gevent.hub import get_hub, getcurrent, _NONE, PY3, reraise
from gevent.timeout import Timeout from gevent.timeout import Timeout
from gevent._tblib import dump_traceback, load_traceback
from collections import deque from collections import deque
if PY3: if PY3:
xrange = range xrange = range
...@@ -160,6 +161,7 @@ class AsyncResult(object): ...@@ -160,6 +161,7 @@ class AsyncResult(object):
self._links = deque() self._links = deque()
self.value = None self.value = None
self._exception = _NONE self._exception = _NONE
self._exc_info = ()
self.hub = get_hub() self.hub = get_hub()
self._notifier = None self._notifier = None
...@@ -185,8 +187,8 @@ class AsyncResult(object): ...@@ -185,8 +187,8 @@ class AsyncResult(object):
def exception(self): def exception(self):
"""Holds the exception instance passed to :meth:`set_exception` if :meth:`set_exception` was called. """Holds the exception instance passed to :meth:`set_exception` if :meth:`set_exception` was called.
Otherwise ``None``.""" Otherwise ``None``."""
if self._exception is not _NONE: if self._exc_info:
return self._exception return self._exc_info[1]
def set(self, value=None): def set(self, value=None):
"""Store the value. Wake up the waiters. """Store the value. Wake up the waiters.
...@@ -199,16 +201,24 @@ class AsyncResult(object): ...@@ -199,16 +201,24 @@ class AsyncResult(object):
if self._links and not self._notifier: if self._links and not self._notifier:
self._notifier = self.hub.loop.run_callback(self._notify_links) self._notifier = self.hub.loop.run_callback(self._notify_links)
def set_exception(self, exception): def set_exception(self, exception, exc_info=None):
"""Store the exception. Wake up the waiters. """Store the exception. Wake up the waiters.
All greenlets blocking on :meth:`get` or :meth:`wait` are woken up. All greenlets blocking on :meth:`get` or :meth:`wait` are woken up.
Sequential calls to :meth:`wait` and :meth:`get` will not block at all. Sequential calls to :meth:`wait` and :meth:`get` will not block at all.
""" """
self._exception = exception self._exception = exception
if exc_info:
self._exc_info = (exc_info[0], exc_info[1], dump_traceback(exc_info[2]))
if self._links and not self._notifier: if self._links and not self._notifier:
self._notifier = self.hub.loop.run_callback(self._notify_links) self._notifier = self.hub.loop.run_callback(self._notify_links)
def _raise_exception(self):
if self._exc_info:
reraise(self._exc_info[0], self._exc_info[1], load_traceback(self._exc_info[2]))
raise self._exception
def get(self, block=True, timeout=None): def get(self, block=True, timeout=None):
"""Return the stored value or raise the exception. """Return the stored value or raise the exception.
...@@ -223,7 +233,7 @@ class AsyncResult(object): ...@@ -223,7 +233,7 @@ class AsyncResult(object):
if self._exception is not _NONE: if self._exception is not _NONE:
if self._exception is None: if self._exception is None:
return self.value return self.value
raise self._exception self._raise_exception()
elif block: elif block:
switch = getcurrent().switch switch = getcurrent().switch
self.rawlink(switch) self.rawlink(switch)
...@@ -239,7 +249,7 @@ class AsyncResult(object): ...@@ -239,7 +249,7 @@ class AsyncResult(object):
raise raise
if self._exception is None: if self._exception is None:
return self.value return self.value
raise self._exception self._raise_exception()
else: else:
raise Timeout raise Timeout
...@@ -318,4 +328,4 @@ class AsyncResult(object): ...@@ -318,4 +328,4 @@ class AsyncResult(object):
if source.successful(): if source.successful():
self.set(source.value) self.set(source.value)
else: else:
self.set_exception(source.exception) self.set_exception(source.exception, getattr(source, 'exc_info', None))
...@@ -65,7 +65,7 @@ class FileObjectThread(object): ...@@ -65,7 +65,7 @@ class FileObjectThread(object):
def _apply(self, func, args=None, kwargs=None): def _apply(self, func, args=None, kwargs=None):
with self.lock: with self.lock:
return self.threadpool.apply_e(BaseException, func, args, kwargs) return self.threadpool.apply(func, args, kwargs)
def close(self): def close(self):
fobj = self._fobj fobj = self._fobj
......
...@@ -122,7 +122,7 @@ class Greenlet(greenlet): ...@@ -122,7 +122,7 @@ class Greenlet(greenlet):
return deque() return deque()
def _raise_exception(self): def _raise_exception(self):
reraise(self._exc_info[0], self._exc_info[1], load_traceback(self._exc_info[2])) reraise(*self.exc_info)
@property @property
def loop(self): def loop(self):
...@@ -190,6 +190,14 @@ class Greenlet(greenlet): ...@@ -190,6 +190,14 @@ class Greenlet(greenlet):
""" """
return self._exc_info[1] if self._exc_info else None return self._exc_info[1] if self._exc_info else None
@property
def exc_info(self):
"""Holds the exc_info three-tuple raised by the function if the greenlet finished with an error.
Otherwise a false value."""
e = self._exc_info
if e:
return (e[0], e[1], load_traceback(e[2]))
def throw(self, *args): def throw(self, *args):
"""Immediatelly switch into the greenlet and raise an exception in it. """Immediatelly switch into the greenlet and raise an exception in it.
......
...@@ -84,13 +84,13 @@ def tp_read(fd, n): ...@@ -84,13 +84,13 @@ def tp_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string """Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string containing the bytes read. If end-of-file is reached, an empty string
is returned.""" is returned."""
return get_hub().threadpool.apply_e(BaseException, _read, (fd, n)) return get_hub().threadpool.apply(_read, (fd, n))
def tp_write(fd, buf): def tp_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the """Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written.""" number of bytes written."""
return get_hub().threadpool.apply_e(BaseException, _write, (fd, buf)) return get_hub().threadpool.apply(_write, (fd, buf))
if hasattr(os, 'fork'): if hasattr(os, 'fork'):
......
...@@ -29,16 +29,16 @@ class Resolver(object): ...@@ -29,16 +29,16 @@ class Resolver(object):
# below are thread-safe in Python, even if they are not thread-safe in C. # below are thread-safe in Python, even if they are not thread-safe in C.
def gethostbyname(self, *args): def gethostbyname(self, *args):
return self.pool.apply_e(self.expected_errors, _socket.gethostbyname, args) return self.pool.apply(_socket.gethostbyname, args)
def gethostbyname_ex(self, *args): def gethostbyname_ex(self, *args):
return self.pool.apply_e(self.expected_errors, _socket.gethostbyname_ex, args) return self.pool.apply( _socket.gethostbyname_ex, args)
def getaddrinfo(self, *args, **kwargs): def getaddrinfo(self, *args, **kwargs):
return self.pool.apply_e(self.expected_errors, _socket.getaddrinfo, args, kwargs) return self.pool.apply(_socket.getaddrinfo, args, kwargs)
def gethostbyaddr(self, *args, **kwargs): def gethostbyaddr(self, *args, **kwargs):
return self.pool.apply_e(self.expected_errors, _socket.gethostbyaddr, args, kwargs) return self.pool.apply(_socket.gethostbyaddr, args, kwargs)
def getnameinfo(self, *args, **kwargs): def getnameinfo(self, *args, **kwargs):
return self.pool.apply_e(self.expected_errors, _socket.getnameinfo, args, kwargs) return self.pool.apply(_socket.getnameinfo, args, kwargs)
...@@ -9,9 +9,6 @@ from gevent.pool import GroupMappingMixin ...@@ -9,9 +9,6 @@ from gevent.pool import GroupMappingMixin
from gevent.lock import Semaphore from gevent.lock import Semaphore
from gevent._threading import Lock, Queue, start_new_thread from gevent._threading import Lock, Queue, start_new_thread
# XXX apply_e is ugly and must not be needed
# XXX apply() should re-raise everything
__all__ = ['ThreadPool', __all__ = ['ThreadPool',
'ThreadResult'] 'ThreadResult']
...@@ -211,18 +208,14 @@ class ThreadPool(GroupMappingMixin): ...@@ -211,18 +208,14 @@ class ThreadPool(GroupMappingMixin):
if need_decrease: if need_decrease:
self._decrease_size() self._decrease_size()
# XXX apply() should re-raise error by default
# XXX because that's what builtin apply does
# XXX check gevent.pool.Pool.apply and multiprocessing.Pool.apply
def apply_e(self, expected_errors, function, args=None, kwargs=None): def apply_e(self, expected_errors, function, args=None, kwargs=None):
if args is None: # Deprecated but never documented. In the past, before
args = () # self.apply() allowed all errors to be raised to the caller,
if kwargs is None: # expected_errors allowed a caller to specify a set of errors
kwargs = {} # they wanted to be raised, through the wrap_errors function.
success, result = self.spawn(wrap_errors, expected_errors, function, args, kwargs).get() # In practice, it always took the value Exception or
if success: # BaseException.
return result return self.apply(function, args, kwargs)
raise result
def _apply_immediately(self): def _apply_immediately(self):
# we always pass apply() off to the threadpool # we always pass apply() off to the threadpool
...@@ -238,6 +231,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -238,6 +231,8 @@ class ThreadPool(GroupMappingMixin):
class ThreadResult(object): class ThreadResult(object):
exc_info = ()
def __init__(self, receiver, hub=None): def __init__(self, receiver, hub=None):
if hub is None: if hub is None:
hub = get_hub() hub = get_hub()
...@@ -245,27 +240,28 @@ class ThreadResult(object): ...@@ -245,27 +240,28 @@ class ThreadResult(object):
self.hub = hub self.hub = hub
self.value = None self.value = None
self.context = None self.context = None
self.exc_info = None
self.async = hub.loop.async() self.async = hub.loop.async()
self.async.start(self._on_async) self.async.start(self._on_async)
@property
def exception(self):
return self.exc_info[1] if self.exc_info else None
def _on_async(self): def _on_async(self):
self.async.stop() self.async.stop()
try: try:
if self.exc_info is not None: if self.exc_info:
try:
self.hub.handle_error(self.context, *self.exc_info) self.hub.handle_error(self.context, *self.exc_info)
finally:
self.exc_info = None
self.context = None self.context = None
self.async = None self.async = None
self.hub = None self.hub = None
if self.receiver is not None: if self.receiver is not None:
# XXX exception!!!?
self.receiver(self) self.receiver(self)
finally: finally:
self.receiver = None self.receiver = None
self.value = None self.value = None
if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def set(self, value): def set(self, value):
self.value = value self.value = value
...@@ -278,10 +274,11 @@ class ThreadResult(object): ...@@ -278,10 +274,11 @@ class ThreadResult(object):
# link protocol: # link protocol:
def successful(self): def successful(self):
return True return self.exception is None
def wrap_errors(errors, function, args, kwargs): def wrap_errors(errors, function, args, kwargs):
# Deprecated but never documented.
try: try:
return True, function(*args, **kwargs) return True, function(*args, **kwargs)
except errors as ex: except errors as ex:
......
...@@ -34,6 +34,21 @@ class TestCoroutinePool(unittest.TestCase): ...@@ -34,6 +34,21 @@ class TestCoroutinePool(unittest.TestCase):
result = pool.apply(some_work) result = pool.apply(some_work)
self.assertEqual(value, result) self.assertEqual(value, result)
def test_apply_raises(self):
pool = self.klass(1)
def raiser():
raise ExpectedException()
try:
pool.apply(raiser)
except ExpectedException:
pass
else:
self.fail("Should have raised ExpectedException")
# Don't let the metaclass automatically force any error
# that reaches the hub from a spawned greenlet to become
# fatal; that defeats the point of the test.
test_apply_raises.error_fatal = False
def test_multiple_coros(self): def test_multiple_coros(self):
evt = Event() evt = Event()
results = [] results = []
......
...@@ -5,6 +5,7 @@ import weakref ...@@ -5,6 +5,7 @@ import weakref
import greentest import greentest
from gevent.threadpool import ThreadPool from gevent.threadpool import ThreadPool
import gevent import gevent
from greentest import ExpectedException
import six import six
import gc import gc
...@@ -47,6 +48,24 @@ class PoolBasicTests(TestCase): ...@@ -47,6 +48,24 @@ class PoolBasicTests(TestCase):
result = pool.apply(lambda a: ('foo', a), (1, )) result = pool.apply(lambda a: ('foo', a), (1, ))
self.assertEqual(result, ('foo', 1)) self.assertEqual(result, ('foo', 1))
def test_apply_raises(self):
self.pool = pool = ThreadPool(1)
def raiser():
raise ExpectedException()
try:
pool.apply(raiser)
except ExpectedException:
import traceback; traceback.print_exc()
pass
else:
self.fail("Should have raised ExpectedException")
# Don't let the metaclass automatically force any error
# that reaches the hub from a spawned greenlet to become
# fatal; that defeats the point of the test.
test_apply_raises.error_fatal = False
def test_init_valueerror(self): def test_init_valueerror(self):
self.switch_expected = False self.switch_expected = False
self.assertRaises(ValueError, ThreadPool, -1) self.assertRaises(ValueError, ThreadPool, -1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment