Commit 9b4085b5 authored by Jason Madden's avatar Jason Madden

Fix TestPatchedTPE.test__future.

parent 894fb643
...@@ -142,9 +142,11 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes ...@@ -142,9 +142,11 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
if universal: if universal:
if can_write: if can_write:
raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'") raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'")
import warnings # Just because the stdlib deprecates this, no need for us to do so as well.
warnings.warn("'U' mode is deprecated", # Especially not while we still support Python 2.
DeprecationWarning, 4) # import warnings
# warnings.warn("'U' mode is deprecated",
# DeprecationWarning, 4)
reading = True reading = True
if text and binary: if text and binary:
raise ValueError("can't have text and binary mode at once") raise ValueError("can't have text and binary mode at once")
......
...@@ -51,7 +51,7 @@ def set_default_hub_class(hubtype): ...@@ -51,7 +51,7 @@ def set_default_hub_class(hubtype):
global Hub global Hub
Hub = hubtype Hub = hubtype
def get_hub(*args, **kwargs): def get_hub(*args, **kwargs): # pylint:disable=unused-argument
""" """
Return the hub for the current thread. Return the hub for the current thread.
...@@ -63,12 +63,12 @@ def get_hub(*args, **kwargs): ...@@ -63,12 +63,12 @@ def get_hub(*args, **kwargs):
only used when the hub was created, and so were non-deterministic---to be only used when the hub was created, and so were non-deterministic---to be
sure they were used, *all* callers had to pass them, or they were order-dependent. sure they were used, *all* callers had to pass them, or they were order-dependent.
Use ``set_hub`` instead. Use ``set_hub`` instead.
.. versionchanged:: 1.5a3
The *args* and *kwargs* arguments are now completely ignored.
""" """
hub = _threadlocal.hub
if hub is None: return get_hub_noargs()
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
def get_hub_noargs(): def get_hub_noargs():
# Just like get_hub, but cheaper to call because it # Just like get_hub, but cheaper to call because it
......
...@@ -460,6 +460,19 @@ class Hub(WaitOperationsGreenlet): ...@@ -460,6 +460,19 @@ class Hub(WaitOperationsGreenlet):
result += ' thread_ident=%s' % (hex(self.thread_ident), ) result += ' thread_ident=%s' % (hex(self.thread_ident), )
return result + '>' return result + '>'
def _normalize_exception(self, t, v, tb):
# Allow passing in all None if the caller doesn't have
# easy access to sys.exc_info()
if (t, v, tb) == (None, None, None):
t, v, tb = sys.exc_info()
if isinstance(v, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
v = t(v)
return t, v, tb
def handle_error(self, context, type, value, tb): def handle_error(self, context, type, value, tb):
""" """
Called by the event loop when an error occurs. The default Called by the event loop when an error occurs. The default
...@@ -486,13 +499,7 @@ class Hub(WaitOperationsGreenlet): ...@@ -486,13 +499,7 @@ class Hub(WaitOperationsGreenlet):
that should generally result in exiting the loop and being that should generally result in exiting the loop and being
thrown to the parent greenlet. thrown to the parent greenlet.
""" """
if (type, value, tb) == (None, None, None): type, value, tb = self._normalize_exception(type, value, tb)
type, value, tb = sys.exc_info()
if isinstance(value, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
value = type(value)
if not issubclass(type, self.NOT_ERROR): if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb) self.print_exception(context, type, value, tb)
...@@ -543,7 +550,7 @@ class Hub(WaitOperationsGreenlet): ...@@ -543,7 +550,7 @@ class Hub(WaitOperationsGreenlet):
stderr = stderr.io # pylint:disable=no-member stderr = stderr.io # pylint:disable=no-member
return stderr return stderr
def print_exception(self, context, type, value, tb): def print_exception(self, context, t, v, tb):
# Python 3 does not gracefully handle None value or tb in # Python 3 does not gracefully handle None value or tb in
# traceback.print_exception() as previous versions did. # traceback.print_exception() as previous versions did.
# pylint:disable=no-member # pylint:disable=no-member
...@@ -555,10 +562,12 @@ class Hub(WaitOperationsGreenlet): ...@@ -555,10 +562,12 @@ class Hub(WaitOperationsGreenlet):
# See https://github.com/gevent/gevent/issues/1295 # See https://github.com/gevent/gevent/issues/1295
return return
if value is None: t, v, tb = self._normalize_exception(t, v, tb)
errstream.write('%s\n' % type.__name__)
if v is None:
errstream.write('%s\n' % t.__name__)
else: else:
traceback.print_exception(type, value, tb, file=errstream) traceback.print_exception(t, v, tb, file=errstream)
del tb del tb
try: try:
...@@ -576,7 +585,7 @@ class Hub(WaitOperationsGreenlet): ...@@ -576,7 +585,7 @@ class Hub(WaitOperationsGreenlet):
except: # pylint:disable=bare-except except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream) traceback.print_exc(file=self.exception_stream)
context = repr(context) context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), )) errstream.write('%s failed with %s\n\n' % (context, getattr(t, '__name__', 'exception'), ))
def run(self): def run(self):
......
...@@ -25,6 +25,13 @@ import unittest ...@@ -25,6 +25,13 @@ import unittest
# pylint:disable=unused-import # pylint:disable=unused-import
# It's important to do this ASAP, because if we're monkey patched,
# then importing things like the standard library test.support can
# need to construct the hub (to check for IPv6 support using a socket).
from .hub import QuietHub
import gevent.hub
gevent.hub.set_default_hub_class(QuietHub)
from .sysinfo import VERBOSE from .sysinfo import VERBOSE
from .sysinfo import WIN from .sysinfo import WIN
from .sysinfo import LINUX from .sysinfo import LINUX
...@@ -87,9 +94,7 @@ from .exception import ExpectedException ...@@ -87,9 +94,7 @@ from .exception import ExpectedException
from .leakcheck import ignores_leakcheck from .leakcheck import ignores_leakcheck
from .params import LARGE_TIMEOUT from .params import LARGE_TIMEOUT
from .params import DEFAULT_LOCAL_HOST_ADDR from .params import DEFAULT_LOCAL_HOST_ADDR
from .params import DEFAULT_LOCAL_HOST_ADDR6 from .params import DEFAULT_LOCAL_HOST_ADDR6
from .params import DEFAULT_BIND_ADDR from .params import DEFAULT_BIND_ADDR
...@@ -103,10 +108,6 @@ from .params import DEFAULT_XPC_SOCKET_TIMEOUT ...@@ -103,10 +108,6 @@ from .params import DEFAULT_XPC_SOCKET_TIMEOUT
main = unittest.main main = unittest.main
SkipTest = unittest.SkipTest SkipTest = unittest.SkipTest
from .hub import QuietHub
import gevent.hub
gevent.hub.set_default_hub_class(QuietHub)
......
...@@ -29,10 +29,16 @@ class QuietHub(Hub): ...@@ -29,10 +29,16 @@ class QuietHub(Hub):
EXPECTED_TEST_ERROR = (ExpectedException,) EXPECTED_TEST_ERROR = (ExpectedException,)
def handle_error(self, context, type, value, tb): def handle_error(self, context, type, value, tb):
if (type, value, tb) == (None, None, None): type, value, tb = self._normalize_exception(type, value, tb)
import sys
type, value, tb = sys.exc_info()
if issubclass(type, self.EXPECTED_TEST_ERROR): if issubclass(type, self.EXPECTED_TEST_ERROR):
# Don't print these to cut down on the noise in the test logs # Don't print these to cut down on the noise in the test logs
return return
return Hub.handle_error(self, context, type, value, tb) return Hub.handle_error(self, context, type, value, tb)
def print_exception(self, context, t, v, tb):
t, v, tb = self._normalize_exception(t, v, tb)
if issubclass(t, self.EXPECTED_TEST_ERROR):
# see handle_error
return
return Hub.print_exception(self, context, t, v, tb)
...@@ -26,3 +26,5 @@ def thread_is_greenlet(): ...@@ -26,3 +26,5 @@ def thread_is_greenlet():
is_greenlet = get_hub().threadpool.apply(thread_is_greenlet) is_greenlet = get_hub().threadpool.apply(thread_is_greenlet)
print(is_greenlet) print(is_greenlet)
print(len(sys._current_frames())) print(len(sys._current_frames()))
sys.stdout.flush()
sys.stderr.flush()
...@@ -4,7 +4,7 @@ import unittest ...@@ -4,7 +4,7 @@ import unittest
import gevent import gevent
from gevent import core from gevent import core
from gevent.hub import Hub
@unittest.skipUnless( @unittest.skipUnless(
getattr(core, 'LIBEV_EMBED', False), getattr(core, 'LIBEV_EMBED', False),
...@@ -23,7 +23,7 @@ class Test(unittest.TestCase): ...@@ -23,7 +23,7 @@ class Test(unittest.TestCase):
getattr(unittest.TestCase, 'assertRaisesRegexp')) getattr(unittest.TestCase, 'assertRaisesRegexp'))
def _check_backend(self, backend): def _check_backend(self, backend):
hub = gevent.get_hub(backend, default=False) hub = Hub(backend, default=False)
try: try:
self.assertEqual(hub.loop.backend, backend) self.assertEqual(hub.loop.backend, backend)
...@@ -33,8 +33,9 @@ class Test(unittest.TestCase): ...@@ -33,8 +33,9 @@ class Test(unittest.TestCase):
raise unittest.SkipTest("backend %s lacks fileno" % (backend,)) raise unittest.SkipTest("backend %s lacks fileno" % (backend,))
os.close(fileno) os.close(fileno)
with self.assertRaisesRegex(SystemError, "(libev)"): if backend != 'kqueue':
gevent.sleep(0.001) with self.assertRaisesRegex(SystemError, "(libev)"):
gevent.sleep(0.001)
hub.destroy() hub.destroy()
self.assertIn('destroyed', repr(hub)) self.assertIn('destroyed', repr(hub))
......
...@@ -37,7 +37,7 @@ class TestRun(unittest.TestCase): ...@@ -37,7 +37,7 @@ class TestRun(unittest.TestCase):
args += [script, 'patched'] args += [script, 'patched']
p = Popen(args, stdout=PIPE, stderr=PIPE, env=env) p = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
monkey_out, monkey_err = p.communicate() monkey_out, monkey_err = p.communicate()
self.assertEqual(0, p.returncode, (monkey_out, monkey_err)) self.assertEqual(0, p.returncode, (p.returncode, monkey_out, monkey_err))
if module: if module:
args = [sys.executable, "-m", script, 'stdlib'] args = [sys.executable, "-m", script, 'stdlib']
......
...@@ -525,146 +525,162 @@ class TestRefCount(TestCase): ...@@ -525,146 +525,162 @@ class TestRefCount(TestCase):
gevent.sleep(0) gevent.sleep(0)
pool.kill() pool.kill()
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from concurrent.futures import TimeoutError as FutureTimeoutError
from concurrent.futures import wait as cf_wait
from concurrent.futures import as_completed as cf_as_completed
from gevent import monkey from gevent import monkey
class TestTPE(_AbstractPoolTest): @greentest.skipUnless(
size = 1 hasattr(gevent.threadpool, 'ThreadPoolExecutor'),
"Requires ThreadPoolExecutor")
class TestTPE(_AbstractPoolTest):
size = 1
MAP_IS_GEN = True MAP_IS_GEN = True
ClassUnderTest = gevent.threadpool.ThreadPoolExecutor @property
def ClassUnderTest(self):
return gevent.threadpool.ThreadPoolExecutor
MONKEY_PATCHED = False MONKEY_PATCHED = False
@greentest.ignores_leakcheck @property
def test_future(self): def FutureTimeoutError(self):
self.assertEqual(monkey.is_module_patched('threading'), from concurrent.futures import TimeoutError as FutureTimeoutError
self.MONKEY_PATCHED) return FutureTimeoutError
pool = self.pool
calledback = [] @property
def cf_wait(self):
from concurrent.futures import wait as cf_wait
return cf_wait
def fn(): @property
gevent.sleep(0.5) def cf_as_completed(self):
return 42 from concurrent.futures import as_completed as cf_as_completed
return cf_as_completed
def callback(future): @greentest.ignores_leakcheck
future.calledback += 1 def test_future(self):
raise greentest.ExpectedException("Expected, ignored") self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
future = pool.submit(fn) # pylint:disable=no-member calledback = []
future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)
def spawned(): def fn():
return 2016 gevent.sleep(0.5)
return 42
spawned_greenlet = gevent.spawn(spawned) def callback(future):
future.calledback += 1
raise greentest.ExpectedException("Expected, ignored")
# Whether or not we are monkey patched, the background future = pool.submit(fn) # pylint:disable=no-member
# greenlet we spawned got to run while we waited. future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(self.FutureTimeoutError, future.result, timeout=0.001)
self.assertEqual(future.result(), 42) def spawned():
self.assertTrue(future.done()) return 2016
self.assertFalse(future.cancelled())
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(future.calledback, 1)
self.assertTrue(spawned_greenlet.ready()) spawned_greenlet = gevent.spawn(spawned)
self.assertEqual(spawned_greenlet.value, 2016)
# Adding the callback again runs immediately # Whether or not we are monkey patched, the background
future.add_done_callback(lambda f: calledback.append(True)) # greenlet we spawned got to run while we waited.
self.assertEqual(calledback, [True])
# We can wait on the finished future self.assertEqual(future.result(), 42)
done, _not_done = cf_wait((future,)) self.assertTrue(future.done())
self.assertEqual(list(done), [future]) self.assertFalse(future.cancelled())
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(future.calledback, 1)
self.assertEqual(list(cf_as_completed((future,))), [future]) self.assertTrue(spawned_greenlet.ready())
# Doing so does not call the callback again self.assertEqual(spawned_greenlet.value, 2016)
self.assertEqual(future.calledback, 1)
# even after a trip around the event loop # Adding the callback again runs immediately
gevent.sleep() future.add_done_callback(lambda f: calledback.append(True))
self.assertEqual(future.calledback, 1) self.assertEqual(calledback, [True])
pool.kill() # We can wait on the finished future
del future done, _not_done = self.cf_wait((future,))
del pool self.assertEqual(list(done), [future])
del self.pool
self.assertEqual(list(self.cf_as_completed((future,))), [future])
@greentest.ignores_leakcheck # Doing so does not call the callback again
def test_future_wait_module_function(self): self.assertEqual(future.calledback, 1)
# Instead of waiting on the result, we can wait # even after a trip around the event loop
# on the future using the module functions gevent.sleep()
self.assertEqual(monkey.is_module_patched('threading'), self.assertEqual(future.calledback, 1)
self.MONKEY_PATCHED)
pool = self.pool pool.kill()
del future
def fn(): del pool
gevent.sleep(0.5) del self.pool
return 42
@greentest.ignores_leakcheck
future = pool.submit(fn) # pylint:disable=no-member def test_future_wait_module_function(self):
if self.MONKEY_PATCHED: # Instead of waiting on the result, we can wait
# Things work as expected when monkey-patched # on the future using the module functions
_done, not_done = cf_wait((future,), timeout=0.001) self.assertEqual(monkey.is_module_patched('threading'),
self.assertEqual(list(not_done), [future]) self.MONKEY_PATCHED)
pool = self.pool
def spawned():
return 2016 def fn():
gevent.sleep(0.5)
spawned_greenlet = gevent.spawn(spawned) return 42
done, _not_done = cf_wait((future,)) future = pool.submit(fn) # pylint:disable=no-member
self.assertEqual(list(done), [future]) if self.MONKEY_PATCHED:
self.assertTrue(spawned_greenlet.ready()) # Things work as expected when monkey-patched
self.assertEqual(spawned_greenlet.value, 2016) _done, not_done = self.cf_wait((future,), timeout=0.001)
else: self.assertEqual(list(not_done), [future])
# When not monkey-patched, raises an AttributeError
self.assertRaises(AttributeError, cf_wait, (future,))
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_gevent_function(self):
# The future object can be waited on with gevent functions.
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn) # pylint:disable=no-member
def spawned(): def spawned():
return 2016 return 2016
spawned_greenlet = gevent.spawn(spawned) spawned_greenlet = gevent.spawn(spawned)
done = gevent.wait((future,)) done, _not_done = self.cf_wait((future,))
self.assertEqual(list(done), [future]) self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready()) self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016) self.assertEqual(spawned_greenlet.value, 2016)
else:
# When not monkey-patched, raises an AttributeError
self.assertRaises(AttributeError, self.cf_wait, (future,))
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_gevent_function(self):
# The future object can be waited on with gevent functions.
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
pool.kill() def fn():
del future gevent.sleep(0.5)
del pool return 42
del self.pool
future = pool.submit(fn) # pylint:disable=no-member
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done = gevent.wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
pool.kill()
del future
del pool
del self.pool
if __name__ == '__main__': if __name__ == '__main__':
......
from __future__ import print_function from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from gevent import monkey; monkey.patch_all() from gevent import monkey; monkey.patch_all()
import gevent.testing as greentest import gevent.testing as greentest
import gevent.threadpool
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'): from . import test__threadpool
from test__threadpool import TestTPE as _Base
class TestPatchedTPE(_Base): class TestPatchedTPE(test__threadpool.TestTPE):
MONKEY_PATCHED = True MONKEY_PATCHED = True
del _Base
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
...@@ -579,22 +579,21 @@ else: ...@@ -579,22 +579,21 @@ else:
from gevent._util import Lazy from gevent._util import Lazy
from concurrent.futures import _base as cfb from concurrent.futures import _base as cfb
def _wrap_error(future, fn): def _ignore_error(future_proxy, fn):
def cbwrap(_): def cbwrap(_):
del _ del _
# we're called with the async result, but # We're called with the async result (from the threadpool), but
# be sure to pass in ourself. Also automatically # be sure to pass in the user-visible _FutureProxy object..
# unlink ourself so that we don't get called multiple
# times.
try: try:
fn(future) fn(future_proxy)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
future.hub.handle_error((fn, future), None, None, None) # Just print, don't raise to the hub's parent.
future_proxy.hub.print_exception((fn, future_proxy), None, None, None)
return cbwrap return cbwrap
def _wrap(future, fn): def _wrap(future_proxy, fn):
def f(_): def f(_):
fn(future) fn(future_proxy)
return f return f
class _FutureProxy(object): class _FutureProxy(object):
...@@ -656,10 +655,11 @@ else: ...@@ -656,10 +655,11 @@ else:
raise concurrent.futures.TimeoutError() raise concurrent.futures.TimeoutError()
def add_done_callback(self, fn): def add_done_callback(self, fn):
"""Exceptions raised by *fn* are ignored."""
if self.done(): if self.done():
fn(self) fn(self)
else: else:
self.asyncresult.rawlink(_wrap_error(self, fn)) self.asyncresult.rawlink(_ignore_error(self, fn))
def rawlink(self, fn): def rawlink(self, fn):
self.asyncresult.rawlink(_wrap(self, fn)) self.asyncresult.rawlink(_wrap(self, fn))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment