Commit e7f64dc2 authored by Jason Madden's avatar Jason Madden

ThreadPool applies immediately when called from a worker thread to avoid LoopExit. Fixes #131.

parent 2d9e5a62
...@@ -60,6 +60,11 @@ Unreleased ...@@ -60,6 +60,11 @@ Unreleased
if asked to run something that cannot be run. Previously, the if asked to run something that cannot be run. Previously, the
spawned greenlet would die with an uncaught ``TypeError`` the first spawned greenlet would die with an uncaught ``TypeError`` the first
time it was switched to. Reported in :issue:`119` by stephan. time it was switched to. Reported in :issue:`119` by stephan.
- Recursive use of ``gevent.threadpool.ThreadPool.apply`` no longer
raises a ``LoopExit`` error (using ``ThreadPool.spawn`` and then
``get`` on the result still could; you must be careful to use the
correct hub). Reported in :issue:`131` by 8mayday.
1.1a2 (Jul 8, 2015) 1.1a2 (Jul 8, 2015)
=================== ===================
......
...@@ -10,10 +10,17 @@ ...@@ -10,10 +10,17 @@
.. method:: apply(func, args=None, kwds=None) .. method:: apply(func, args=None, kwds=None)
Rough equivalent of the :func:`apply` builtin, blocking until Rough equivalent of the :func:`apply()` builtin function,
the result is ready and returning it. blocking until the result is ready and returning it.
.. warning:: As implemented, attempting to use The ``func`` will *usually*, but not *always*, be run in a way
that allows the current greenlet to switch out (for example,
in a new greenlet or thread, depending on implementation). But
if the current greenlet or thread is already one that was
spawned by this pool, the pool may choose to immediately run
the `func` synchronously.
.. note:: As implemented, attempting to use
:meth:`Threadpool.appy` from inside another function that :meth:`Threadpool.appy` from inside another function that
was itself spawned in a threadpool (any threadpool) will was itself spawned in a threadpool (any threadpool) will
lead to the hub throwing LoopExit. cause the function to be run immediatesly.
...@@ -475,7 +475,7 @@ class Hub(greenlet): ...@@ -475,7 +475,7 @@ class Hub(greenlet):
loop.run() loop.run()
finally: finally:
loop.error_handler = None # break the refcount cycle loop.error_handler = None # break the refcount cycle
self.parent.throw(LoopExit('This operation would block forever')) self.parent.throw(LoopExit('This operation would block forever', self))
# this function must never return, as it will cause switch() in the parent greenlet # this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value # to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case # It is still possible to kill this greenlet with throw. However, in that case
......
...@@ -197,7 +197,17 @@ class GroupMappingMixin(object): ...@@ -197,7 +197,17 @@ class GroupMappingMixin(object):
return greenlet return greenlet
def apply(self, func, args=None, kwds=None): def apply(self, func, args=None, kwds=None):
"""Equivalent of the apply() builtin function. It blocks till the result is ready.""" """
Rough quivalent of the :func:`apply()` builtin function blocking until
the result is ready and returning it.
The ``func`` will *usually*, but not *always*, be run in a way
that allows the current greenlet to switch out (for example,
in a new greenlet or thread, depending on implementation). But
if the current greenlet or thread is already one that was
spawned by this pool, the pool may choose to immediately run
the `func` synchronously.
"""
if args is None: if args is None:
args = () args = ()
if kwds is None: if kwds is None:
......
...@@ -235,8 +235,12 @@ class ThreadPool(GroupMappingMixin): ...@@ -235,8 +235,12 @@ class ThreadPool(GroupMappingMixin):
return self.apply(function, args, kwargs) return self.apply(function, args, kwargs)
def _apply_immediately(self): def _apply_immediately(self):
# we always pass apply() off to the threadpool # If we're being called from a different thread than the one that
return False # created us, e.g., because a worker task is trying to use apply()
# recursively, we have no choice but to run the task immediately;
# if we try to AsyncResult.get() in the worker thread, it's likely to have
# nothing to switch to and lead to a LoopExit.
return get_hub() is not self.hub
def _apply_async_cb_spawn(self, callback, result): def _apply_async_cb_spawn(self, callback, result):
callback(result) callback(result)
......
...@@ -212,6 +212,23 @@ class TestPool(TestCase): ...@@ -212,6 +212,23 @@ class TestPool(TestCase):
class TestPool2(TestPool): class TestPool2(TestPool):
size = 2 size = 2
def test_recursive_apply(self):
p = self.pool
def a():
return p.apply(b)
def b():
# make sure we can do both types of callbacks
# (loop iteration and end-of-loop) in the recursive
# call
gevent.sleep()
gevent.sleep(0.001)
return "B"
result = p.apply(a)
self.assertEqual(result, "B")
class TestPool3(TestPool): class TestPool3(TestPool):
size = 3 size = 3
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment