Commit 77d7b6f0 authored by Jason Madden's avatar Jason Madden

Add a ThreadPoolExecutor that uses native threads.

This can be helpful when threading is patched.

Ref #786.
parent f108a0f4
...@@ -59,6 +59,12 @@ ...@@ -59,6 +59,12 @@
in :pr:`779` by sean-peters-au and changed in :pr:`781`. in :pr:`779` by sean-peters-au and changed in :pr:`781`.
- Unhandled exception reports that kill a greenlet print now include a - Unhandled exception reports that kill a greenlet print now include a
timestamp. See :issue:`137`. timestamp. See :issue:`137`.
- Add :class:`gevent.threadpool.ThreadPoolExecutor` (a
:class:`concurrent.futures.ThreadPoolExecutor` variant that always
uses native threads) on platforms that have ``concurrent.futures``
available (Python 3 and Python 2 with the ``futures`` backport
installed). This is helpful for, e.g., grpc. Reported in
:issue:`786` by Markus Padourek.
1.1.0 (Mar 5, 2016) 1.1.0 (Mar 5, 2016)
=================== ===================
......
...@@ -6,6 +6,7 @@ prospector[with_pyroma] ...@@ -6,6 +6,7 @@ prospector[with_pyroma]
coverage>=4.0 coverage>=4.0
coveralls>=1.0 coveralls>=1.0
cffi cffi
futures
# For viewing README.rst (restview --long-description), # For viewing README.rst (restview --long-description),
# CONTRIBUTING.rst, etc. # CONTRIBUTING.rst, etc.
# https://github.com/mgedmin/restview # https://github.com/mgedmin/restview
......
...@@ -30,3 +30,5 @@ ...@@ -30,3 +30,5 @@
.. versionchanged:: 1.1a2 .. versionchanged:: 1.1a2
Now raises any exception raised by *func* instead of Now raises any exception raised by *func* instead of
dropping it. dropping it.
.. autoclass:: ThreadPoolExecutor
...@@ -336,3 +336,42 @@ def wrap_errors(errors, function, args, kwargs): ...@@ -336,3 +336,42 @@ def wrap_errors(errors, function, args, kwargs):
return True, function(*args, **kwargs) return True, function(*args, **kwargs)
except errors as ex: except errors as ex:
return False, ex return False, ex
try:
import concurrent.futures
except ImportError:
pass
else:
__all__.append("ThreadPoolExecutor")
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""
A version of :class:`concurrent.futures.ThreadPoolExecutor` that
always uses native threads, even when threading is monkey-patched.
.. versionadded:: 1.2a1
"""
def __init__(self, max_workers):
super(ThreadPoolExecutor, self).__init__(max_workers)
self._threadpool = ThreadPool(max_workers)
def submit(self, fn, *args, **kwargs):
future = super(ThreadPoolExecutor, self).submit(fn, *args, **kwargs)
with self._shutdown_lock:
work_item = self._work_queue.get()
assert work_item.fn is fn
self._threadpool.spawn(work_item.run)
return future
def shutdown(self, wait=True):
super(ThreadPoolExecutor, self).shutdown(wait)
self._threadpool.kill()
kill = shutdown # greentest compat
def _adjust_thread_count(self):
# Does nothing. We don't want to spawn any "threads",
# let the threadpool handle that.
pass
...@@ -3,6 +3,7 @@ from time import time, sleep ...@@ -3,6 +3,7 @@ from time import time, sleep
import random import random
import weakref import weakref
import greentest import greentest
import gevent.threadpool
from gevent.threadpool import ThreadPool from gevent.threadpool import ThreadPool
import gevent import gevent
from greentest import ExpectedException from greentest import ExpectedException
...@@ -100,25 +101,32 @@ def sqr_random_sleep(x): ...@@ -100,25 +101,32 @@ def sqr_random_sleep(x):
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
class _AbstractPoolTest(TestCase):
class TestPool(TestCase):
__timeout__ = 5 __timeout__ = 5
size = 1 size = 1
ClassUnderTest = ThreadPool
MAP_IS_GEN = False
def setUp(self): def setUp(self):
greentest.TestCase.setUp(self) greentest.TestCase.setUp(self)
self.pool = ThreadPool(self.size) self.pool = self.ClassUnderTest(self.size)
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
self.assertEqual(papply(sqr, (), {'x': 3}), sqr(x=3))
def test_map(self): def test_map(self):
pmap = self.pool.map pmap = self.pool.map
if self.MAP_IS_GEN:
pmap = lambda *args: list(self.pool.map(*args))
self.assertEqual(pmap(sqr, range(10)), list(map(sqr, range(10)))) self.assertEqual(pmap(sqr, range(10)), list(map(sqr, range(10))))
self.assertEqual(pmap(sqr, range(100)), list(map(sqr, range(100)))) self.assertEqual(pmap(sqr, range(100)), list(map(sqr, range(100))))
class TestPool(_AbstractPoolTest):
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), sqr(5))
self.assertEqual(papply(sqr, (), {'x': 3}), sqr(x=3))
def test_async(self): def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get) get = TimingWrapper(res.get)
...@@ -417,6 +425,13 @@ class TestRefCount(TestCase): ...@@ -417,6 +425,13 @@ class TestRefCount(TestCase):
gevent.sleep(0) gevent.sleep(0)
pool.kill() pool.kill()
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
class TestTPE(_AbstractPoolTest):
MAP_IS_GEN = True
ClassUnderTest = gevent.threadpool.ThreadPoolExecutor
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment