Commit 5a05e824 authored by Jason Madden's avatar Jason Madden

Install thread profiling/tracing hooks in ThreadPool worker threads while the task runs.

Fixes #1670
parent abcd3a95
Make worker threads created by :class:`gevent.threadpool.ThreadPool` install
the :func:`threading.setprofile` and :func:`threading.settrace` hooks
while tasks are running. This provides visibility to profiling and
tracing tools like yappi.
Reported by Suhail Muhammed.
......@@ -721,5 +721,92 @@ class TestThreadResult(greentest.TestCase):
self.assertIsNotNone(tr.receiver)
class TestWorkerProfileAndTrace(TestCase):
# Worker threads should execute the test and trace functions.
# (When running the user code.)
# https://github.com/gevent/gevent/issues/1670
old_profile = None
old_trace = None
def setUp(self):
super(TestWorkerProfileAndTrace, self).setUp()
self.old_profile = gevent.threadpool._get_thread_profile()
self.old_trace = gevent.threadpool._get_thread_trace()
def tearDown(self):
import threading
threading.setprofile(self.old_profile)
threading.settrace(self.old_trace)
def test_get_profile(self):
import threading
threading.setprofile(self)
self.assertIs(gevent.threadpool._get_thread_profile(), self)
def test_get_trace(self):
import threading
threading.settrace(self)
self.assertIs(gevent.threadpool._get_thread_trace(), self)
def _test_func_called_in_task(self, func):
import threading
import sys
setter = getattr(threading, 'set' + func)
getter = getattr(sys, 'get' + func)
called = [0]
def callback(*_args):
called[0] += 1
def task():
test.assertIsNotNone(getter)
return 1701
before_task = []
after_task = []
test = self
class Pool(ThreadPool):
class _WorkerGreenlet(ThreadPool._WorkerGreenlet):
# pylint:disable=signature-differs
def _before_run_task(self, func, *args):
before_task.append(func)
before_task.append(getter())
ThreadPool._WorkerGreenlet._before_run_task(self, func, *args)
before_task.append(getter())
def _after_run_task(self, func, *args):
after_task.append(func)
after_task.append(getter())
ThreadPool._WorkerGreenlet._after_run_task(self, func, *args)
after_task.append(getter())
self.ClassUnderTest = Pool
pool = self._makeOne(1)
assert isinstance(pool, Pool)
setter(callback)
res = pool.apply(task)
self.assertEqual(res, 1701)
self.assertGreaterEqual(called[0], 1)
# The function is active only for the scope of the function
self.assertEqual(before_task, [task, None, callback])
self.assertEqual(after_task, [task, callback, None])
def test_profile_called_in_task(self):
self._test_func_called_in_task('profile')
def test_trace_called_in_task(self):
self._test_func_called_in_task('trace')
if __name__ == '__main__':
greentest.main()
......@@ -40,6 +40,17 @@ def _format_hub(hub):
hub.__class__.__name__, id(hub), hub.thread_ident
)
def _get_thread_profile(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._profile_hook
def _get_thread_trace(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._trace_hook
class _WorkerGreenlet(RawGreenlet):
# Exists to produce a more useful repr for worker pool
# threads/greenlets, and manage the communication of the worker
......@@ -137,12 +148,27 @@ class _WorkerGreenlet(RawGreenlet):
file=stderr)
tb = tb.tb_next
def _before_run_task(self, func, args, kwargs, thread_result,
_sys=sys,
_get_thread_profile=_get_thread_profile,
_get_thread_trace=_get_thread_trace):
# pylint:disable=unused-argument
_sys.setprofile(_get_thread_profile())
_sys.settrace(_get_thread_trace())
def _after_run_task(self, func, args, kwargs, thread_result, _sys=sys):
# pylint:disable=unused-argument
_sys.setprofile(None)
_sys.settrace(None)
def __run_task(self, func, args, kwargs, thread_result):
self._before_run_task(func, args, kwargs, thread_result)
try:
thread_result.set(func(*args, **kwargs))
except: # pylint:disable=bare-except
thread_result.handle_error((self, func), self._exc_info())
finally:
self._after_run_task(func, args, kwargs, thread_result)
del func, args, kwargs, thread_result
def run(self):
......@@ -236,12 +262,23 @@ class ThreadPool(GroupMappingMixin):
The `len` of instances of this class is the number of enqueued
(unfinished) tasks.
Just before a task starts running in a worker thread,
the values of :func:`threading.setprofile` and :func:`threading.settrace`
are consulted. Any values there are installed in that thread for the duration
of the task (using :func:`sys.setprofile` and :func:`sys.settrace`, respectively).
(Because worker threads are long-lived and outlast any given task, this arrangement
lets the hook functions change between tasks, but does not let them see the
bookkeeping done by the worker thread itself.)
.. caution:: Instances of this class are only true if they have
unfinished tasks.
.. versionchanged:: 1.5a3
The undocumented ``apply_e`` function, deprecated since 1.1,
was removed.
.. versionchanged:: NEXT
Install the profile and trace functions in the worker thread while
the worker thread is running the supplied task.
"""
__slots__ = (
......@@ -268,6 +305,8 @@ class ThreadPool(GroupMappingMixin):
'task_queue',
)
_WorkerGreenlet = _WorkerGreenlet
def __init__(self, maxsize, hub=None):
if hub is None:
hub = get_hub()
......@@ -437,7 +476,7 @@ class ThreadPool(GroupMappingMixin):
self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self):
_WorkerGreenlet(self)
self._WorkerGreenlet(self)
def spawn(self, func, *args, **kwargs):
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment