Commit c2665e8f authored by Jason Madden's avatar Jason Madden

Documentation updates for threadpool.py

parent 039af1ef
...@@ -60,6 +60,7 @@ ...@@ -60,6 +60,7 @@
of tasks, but still, calling it from a different thread was likely of tasks, but still, calling it from a different thread was likely
to corrupt libev or libuv internals. to corrupt libev or libuv internals.
- Remove some undocumented, deprecated functions from the threadpool module.
1.5a2 (2019-10-21) 1.5a2 (2019-10-21)
================== ==================
......
...@@ -63,10 +63,45 @@ class _WorkerGreenlet(RawGreenlet): ...@@ -63,10 +63,45 @@ class _WorkerGreenlet(RawGreenlet):
class ThreadPool(GroupMappingMixin): class ThreadPool(GroupMappingMixin):
# TODO: Document thread safety restrictions. # TODO: Document thread safety restrictions.
""" """
A pool of native worker threads.
This can be useful for CPU intensive functions, or those that
otherwise will not cooperate with gevent. The best functions to execute
in a thread pool are small functions with a single purpose; ideally they release
the CPython GIL. Such functions are extension functions implemented in C.
It implements the same operations as a :class:`gevent.pool.Pool`,
but using threads instead of greenlets.
.. note:: The method :meth:`apply_async` will always return a new .. note:: The method :meth:`apply_async` will always return a new
greenlet, bypassing the threadpool entirely. greenlet, bypassing the threadpool entirely.
Most users will not need to create instances of this class. Instead,
use the threadpool already associated with gevent's hub::
pool = gevent.get_hub().threadpool
result = pool.spawn(lambda: "Some func").get()
.. important:: It is only possible to use instances of this class from
the thread running their hub. Typically that means from the thread that
created them. Using the pattern shown above takes care of this.
There is no gevent-provided way to have a single process-wide limit on the
number of threads in various pools when doing that, however. The suggested
way to use gevent and threadpools is to have a single gevent hub
and its one threadpool (which is the default without doing any extra work).
Only dispatch minimal blocking functions to the threadpool, functions that
do not use the gevent hub.
The `len` of instances of this class is the number of enqueued
(unfinished) tasks.
.. caution:: Instances of this class are only true if they have .. caution:: Instances of this class are only true if they have
unfinished tasks. unfinished tasks.
.. versionchanged:: 1.5a3
The undocumented ``apply_e`` function, deprecated since 1.1,
was removed.
""" """
__slots__ = ( __slots__ = (
...@@ -188,12 +223,6 @@ class ThreadPool(GroupMappingMixin): ...@@ -188,12 +223,6 @@ class ThreadPool(GroupMappingMixin):
requests up to `maxsize` threads. requests up to `maxsize` threads.
""") """)
# def _init(self, maxsize):
# self._num_worker_threads = 0
# self._available_worker_threads_greenlet_sem = Semaphore(1)
# self._native_thread_internal_lock = Lock()
# self.task_queue = Queue()
# self._set_maxsize(maxsize)
def _on_fork(self): def _on_fork(self):
# fork() only leaves one thread; also screws up locks; # fork() only leaves one thread; also screws up locks;
...@@ -201,10 +230,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -201,10 +230,8 @@ class ThreadPool(GroupMappingMixin):
# NOTE: See comment in gevent.hub.reinit. # NOTE: See comment in gevent.hub.reinit.
pid = os.getpid() pid = os.getpid()
if pid != self.pid: if pid != self.pid:
self.pid = pid # The OS threads have been destroyed, but the Python objects
# Do not mix fork() and threads; since fork() only copies one thread # may live on, creating refcount "leaks"
# all objects referenced by other threads has refcount that will never
# go down to 0.
self.__init__(self._maxsize) self.__init__(self._maxsize)
def join(self): def join(self):
...@@ -260,14 +287,24 @@ class ThreadPool(GroupMappingMixin): ...@@ -260,14 +287,24 @@ class ThreadPool(GroupMappingMixin):
def spawn(self, func, *args, **kwargs): def spawn(self, func, *args, **kwargs):
""" """
Add a new task to the threadpool that will run ``func(*args, **kwargs)``. Add a new task to the threadpool that will run ``func(*args,
**kwargs)``.
Waits until a slot is available. Creates a new native thread if necessary. Waits until a slot is available. Creates a new native thread
if necessary.
This must only be called from the native thread that owns this object's This must only be called from the native thread that owns this
hub. object's hub. This is because creating the necessary data
structures to communicate back to this thread isn't thread
safe, so the hub must not be running something else. Also,
ensuring the pool size stays correct only works within a
single thread.
:return: A :class:`gevent.event.AsyncResult`. :return: A :class:`gevent.event.AsyncResult`.
:raises InvalidThreadUseError: If called from a different thread.
.. versionchanged:: 1.5
Document the thread-safety requirements.
""" """
if self.hub != get_hub(): if self.hub != get_hub():
raise InvalidThreadUseError raise InvalidThreadUseError
...@@ -276,6 +313,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -276,6 +313,8 @@ class ThreadPool(GroupMappingMixin):
semaphore = self._available_worker_threads_greenlet_sem semaphore = self._available_worker_threads_greenlet_sem
semaphore.acquire() semaphore.acquire()
if semaphore is self._available_worker_threads_greenlet_sem: if semaphore is self._available_worker_threads_greenlet_sem:
# If we were asked to change size or re-init we could have changed
# semaphore objects.
break break
# Returned; lets a greenlet in this thread wait # Returned; lets a greenlet in this thread wait
...@@ -316,7 +355,7 @@ class ThreadPool(GroupMappingMixin): ...@@ -316,7 +355,7 @@ class ThreadPool(GroupMappingMixin):
need_decrease = True need_decrease = True
try: try:
while 1: # tiny bit faster than True on Py2 while 1: # tiny bit faster than True on Py2
h = _get_hub() h = _get_hub() # Don't create one; only set if a worker function did it
if h is not None: if h is not None:
h.name = 'ThreadPool Worker Hub' h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue task_queue = self.task_queue
...@@ -362,20 +401,6 @@ class ThreadPool(GroupMappingMixin): ...@@ -362,20 +401,6 @@ class ThreadPool(GroupMappingMixin):
hub.destroy(True) hub.destroy(True)
del hub del hub
def apply_e(self, expected_errors, function, args=None, kwargs=None):
"""
.. deprecated:: 1.1a2
Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
"""
# pylint:disable=unused-argument
# Deprecated but never documented. In the past, before
# self.apply() allowed all errors to be raised to the caller,
# expected_errors allowed a caller to specify a set of errors
# they wanted to be raised, through the wrap_errors function.
# In practice, it always took the value Exception or
# BaseException.
return self.apply(function, args, kwargs)
def _apply_immediately(self): def _apply_immediately(self):
# If we're being called from a different thread than the one that # If we're being called from a different thread than the one that
# created us, e.g., because a worker task is trying to use apply() # created us, e.g., because a worker task is trying to use apply()
...@@ -408,6 +433,13 @@ class _FakeAsync(object): ...@@ -408,6 +433,13 @@ class _FakeAsync(object):
_FakeAsync = _FakeAsync() _FakeAsync = _FakeAsync()
class ThreadResult(object): class ThreadResult(object):
"""
A one-time event for cross-thread communication.
Uses a hub's "async" watcher capability; it must be constructed and
destroyed in the thread running the hub (because creating, starting, and
destroying async watchers isn't guaranteed to be thread safe).
"""
# Using slots here helps to debug reference cycles/leaks # Using slots here helps to debug reference cycles/leaks
__slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value', __slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
...@@ -484,16 +516,6 @@ class ThreadResult(object): ...@@ -484,16 +516,6 @@ class ThreadResult(object):
return self.exception is None return self.exception is None
def wrap_errors(errors, function, args, kwargs):
"""
.. deprecated:: 1.1a2
Previously used by ThreadPool.apply_e.
"""
try:
return True, function(*args, **kwargs)
except errors as ex:
return False, ex
try: try:
import concurrent.futures import concurrent.futures
except ImportError: except ImportError:
...@@ -614,9 +636,16 @@ else: ...@@ -614,9 +636,16 @@ else:
This is a provisional API. This is a provisional API.
""" """
def __init__(self, max_workers): def __init__(self, *args, **kwargs):
super(ThreadPoolExecutor, self).__init__(max_workers) """
self._threadpool = ThreadPool(max_workers) Takes the same arguments as ``concurrent.futures.ThreadPoolExecuter``, which
vary between Python versions.
The first argument is always *max_workers*, the maximum number of
threads to use. Most other arguments, while accepted, are ignored.
"""
super(ThreadPoolExecutor, self).__init__(*args, **kwargs)
self._threadpool = ThreadPool(self._max_workers)
def submit(self, fn, *args, **kwargs): # pylint:disable=arguments-differ def submit(self, fn, *args, **kwargs): # pylint:disable=arguments-differ
with self._shutdown_lock: # pylint:disable=not-context-manager with self._shutdown_lock: # pylint:disable=not-context-manager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment