Commit 7029d966 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1496 from gevent/issue1484

Fix threadpool in command line scripts
parents ead60793 588b34f0
......@@ -72,6 +72,10 @@
special circumstances, but real applications are unlikely to be
affected. See :issue:`1493`.
- Fix using the threadpool inside a script or module run with ``python
-m gevent.monkey``. Previously it would use greenlets instead of
native threads. See :issue:`1484`.
1.5a2 (2019-10-21)
==================
......
......@@ -142,9 +142,11 @@ class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
if universal:
if can_write:
raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'")
import warnings
warnings.warn("'U' mode is deprecated",
DeprecationWarning, 4)
# Just because the stdlib deprecates this, no need for us to do so as well.
# Especially not while we still support Python 2.
# import warnings
# warnings.warn("'U' mode is deprecated",
# DeprecationWarning, 4)
reading = True
if text and binary:
raise ValueError("can't have text and binary mode at once")
......
......@@ -51,7 +51,7 @@ def set_default_hub_class(hubtype):
global Hub
Hub = hubtype
def get_hub(*args, **kwargs):
def get_hub(*args, **kwargs): # pylint:disable=unused-argument
"""
Return the hub for the current thread.
......@@ -63,12 +63,12 @@ def get_hub(*args, **kwargs):
only used when the hub was created, and so were non-deterministic---to be
sure they were used, *all* callers had to pass them, or they were order-dependent.
Use ``set_hub`` instead.
.. versionchanged:: 1.5a3
The *args* and *kwargs* arguments are now completely ignored.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
return get_hub_noargs()
def get_hub_noargs():
# Just like get_hub, but cheaper to call because it
......
......@@ -146,3 +146,15 @@ class Queue(object):
self._not_empty.wait()
item = self._queue.popleft()
return item
def kill(self):
"""
Call to destroy this object.
Use this when it's not possible to safely drain the queue, e.g.,
after a fork when the locks are in an uncertain state.
"""
self._queue = None
self._mutex = None
self._not_empty = None
self.unfinished_tasks = None
......@@ -460,6 +460,19 @@ class Hub(WaitOperationsGreenlet):
result += ' thread_ident=%s' % (hex(self.thread_ident), )
return result + '>'
def _normalize_exception(self, t, v, tb):
# Allow passing in all None if the caller doesn't have
# easy access to sys.exc_info()
if (t, v, tb) == (None, None, None):
t, v, tb = sys.exc_info()
if isinstance(v, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
v = t(v)
return t, v, tb
def handle_error(self, context, type, value, tb):
"""
Called by the event loop when an error occurs. The default
......@@ -486,10 +499,8 @@ class Hub(WaitOperationsGreenlet):
that should generally result in exiting the loop and being
thrown to the parent greenlet.
"""
if isinstance(value, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
value = type(value)
type, value, tb = self._normalize_exception(type, value, tb)
if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR):
......@@ -539,7 +550,7 @@ class Hub(WaitOperationsGreenlet):
stderr = stderr.io # pylint:disable=no-member
return stderr
def print_exception(self, context, type, value, tb):
def print_exception(self, context, t, v, tb):
# Python 3 does not gracefully handle None value or tb in
# traceback.print_exception() as previous versions did.
# pylint:disable=no-member
......@@ -551,10 +562,12 @@ class Hub(WaitOperationsGreenlet):
# See https://github.com/gevent/gevent/issues/1295
return
if value is None:
errstream.write('%s\n' % type.__name__)
t, v, tb = self._normalize_exception(t, v, tb)
if v is None:
errstream.write('%s\n' % t.__name__)
else:
traceback.print_exception(type, value, tb, file=errstream)
traceback.print_exception(t, v, tb, file=errstream)
del tb
try:
......@@ -572,7 +585,7 @@ class Hub(WaitOperationsGreenlet):
except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream)
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
errstream.write('%s failed with %s\n\n' % (context, getattr(t, '__name__', 'exception'), ))
def run(self):
......
......@@ -1031,16 +1031,26 @@ def patch_signal():
def _check_repatching(**module_settings):
_warnings = []
key = '_gevent_saved_patch_all'
key = '_gevent_saved_patch_all_module_settings'
del module_settings['kwargs']
if saved.get(key, module_settings) != module_settings:
currently_patched = saved.setdefault(key, {})
first_time = not currently_patched
if not first_time and currently_patched != module_settings:
_queue_warning("Patching more than once will result in the union of all True"
" parameters being patched",
_warnings)
first_time = key not in saved
saved[key] = module_settings
return _warnings, first_time, module_settings
to_patch = {}
for k, v in module_settings.items():
# If we haven't seen the setting at all, record it and echo it.
# If we have seen the setting, but it became true, record it and echo it.
if k not in currently_patched:
to_patch[k] = currently_patched[k] = v
elif v and not currently_patched[k]:
to_patch[k] = currently_patched[k] = True
return _warnings, first_time, to_patch
def _subscribe_signal_os(will_patch_all):
......@@ -1053,7 +1063,6 @@ def _subscribe_signal_os(will_patch_all):
warnings)
def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True,
httplib=False, # Deprecated, to be removed.
subprocess=True, sys=False, aggressive=True, Event=True,
builtins=True, signal=True,
queue=True,
......@@ -1083,16 +1092,26 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
for kwarg values to be interpreted by plugins, for example, `patch_all(mylib_futures=True)`.
.. versionchanged:: 1.3.5
Add *queue*, defaulting to True, for Python 3.7.
.. versionchanged:: 1.5
Remove the ``httplib`` argument. Previously, setting it raised a ``ValueError``.
.. versionchanged:: 1.5
Better handling of patching more than once.
"""
# pylint:disable=too-many-locals,too-many-branches
# Check to see if they're changing the patched list
_warnings, first_time, modules_to_patch = _check_repatching(**locals())
if not _warnings and not first_time:
# Nothing to do, identical args to what we just
# did
if not modules_to_patch:
# Nothing to do. Either the arguments were identical to what
# we previously did, or they specified false values
# for things we had previously patched.
_process_warnings(_warnings)
return
for k, v in modules_to_patch.items():
locals()[k] = v
from gevent import events
try:
_notify_patch(events.GeventWillPatchAllEvent(modules_to_patch, kwargs), _warnings)
......@@ -1116,8 +1135,6 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
patch_select(aggressive=aggressive)
if ssl:
patch_ssl(_warnings=_warnings, _first_time=first_time)
if httplib:
raise ValueError('gevent.httplib is no longer provided, httplib must be False')
if subprocess:
patch_subprocess()
if builtins:
......@@ -1143,7 +1160,7 @@ def main():
while argv and argv[0].startswith('--'):
option = argv[0][2:]
if option == 'verbose':
verbose = True
verbose += 1
elif option == 'module':
run_fn = "run_module"
elif option.startswith('no-') and option.replace('no-', '') in patch_all_args:
......@@ -1166,18 +1183,40 @@ def main():
print('sys.modules=%s' % pprint.pformat(sorted(sys.modules.keys())))
print('cwd=%s' % os.getcwd())
patch_all(**args)
if argv:
import runpy
sys.argv = argv
# Use runpy.run_path to closely (exactly) match what the
# interpreter does given 'python <path>'. This includes allowing
# passing .pyc/.pyo files and packages with a __main__ and
# potentially even zip files. Previously we used exec, which only
# worked if we directly read a python source file.
getattr(runpy, run_fn)(sys.argv[0], run_name='__main__')
else:
if not argv:
print(script_help)
return
sys.argv[:] = argv
# Make sure that we don't get imported again under a different
# name (usually it's ``__main__`` here) because that could lead to
# double-patching, and making monkey.get_original() not work.
try:
mod_name = __spec__.name
except NameError:
# Py2: __spec__ is not defined as standard
mod_name = 'gevent.monkey'
sys.modules[mod_name] = sys.modules[__name__]
# On Python 2, we have to set the gevent.monkey attribute
# manually; putting gevent.monkey into sys.modules stops the
# import machinery from making that connection, and ``from gevent
# import monkey`` is broken. On Python 3 (.8 at least) that's not
# necessary.
if 'gevent' in sys.modules:
sys.modules['gevent'].monkey = sys.modules[mod_name]
# Running ``patch_all()`` will load pkg_resources entry point plugins
# which may attempt to import ``gevent.monkey``, so it is critical that
# we have established the correct saved module name first.
patch_all(**args)
import runpy
# Use runpy.run_path to closely (exactly) match what the
# interpreter does given 'python <path>'. This includes allowing
# passing .pyc/.pyo files and packages with a __main__ and
# potentially even zip files. Previously we used exec, which only
# worked if we directly read a python source file.
run_meth = getattr(runpy, run_fn)
return run_meth(sys.argv[0], run_name='__main__')
def _get_script_help():
......@@ -1193,12 +1232,12 @@ def _get_script_help():
USAGE: ``python -m gevent.monkey [MONKEY OPTIONS] [--module] (script|module) [SCRIPT OPTIONS]``
If no OPTIONS present, monkey patches all the modules it can patch.
If no MONKEY OPTIONS are present, monkey patches all the modules as if by calling ``patch_all()``.
You can exclude a module with --no-<module>, e.g. --no-thread. You can
specify a module to patch with --<module>, e.g. --socket. In the latter
case only the modules specified on the command line will be patched.
The default behavior is to execute the script passed as argument. If you with
The default behavior is to execute the script passed as argument. If you wish
to run a module instead, pass the `--module` argument before the module name.
.. versionchanged:: 1.3b1
......
......@@ -17,7 +17,11 @@ from gevent._compat import string_types
from gevent._compat import integer_types
# Nothing public here.
__all__ = []
__all__ = ()
# trigger import of encodings.idna to avoid https://github.com/gevent/gevent/issues/349
u'foo'.encode('idna')
def _lookup_port(port, socktype):
# pylint:disable=too-many-branches
......
......@@ -10,10 +10,6 @@ from gevent.hub import get_hub
__all__ = ['Resolver']
# trigger import of encodings.idna to avoid https://github.com/gevent/gevent/issues/349
u'foo'.encode('idna')
class Resolver(object):
"""
Implementation of the resolver API using native threads and native resolution
......
......@@ -25,6 +25,13 @@ import unittest
# pylint:disable=unused-import
# It's important to do this ASAP, because if we're monkey patched,
# then importing things like the standard library test.support can
# need to construct the hub (to check for IPv6 support using a socket).
from .hub import QuietHub
import gevent.hub
gevent.hub.set_default_hub_class(QuietHub)
from .sysinfo import VERBOSE
from .sysinfo import WIN
from .sysinfo import LINUX
......@@ -80,6 +87,7 @@ from .skipping import skipOnLibuvOnTravisOnCPython27
from .skipping import skipOnPy37
from .skipping import skipWithoutResource
from .skipping import skipWithoutExternalNetwork
from .skipping import skipOnPy2
from .exception import ExpectedException
......@@ -87,9 +95,7 @@ from .exception import ExpectedException
from .leakcheck import ignores_leakcheck
from .params import LARGE_TIMEOUT
from .params import DEFAULT_LOCAL_HOST_ADDR
from .params import DEFAULT_LOCAL_HOST_ADDR6
from .params import DEFAULT_BIND_ADDR
......@@ -103,10 +109,6 @@ from .params import DEFAULT_XPC_SOCKET_TIMEOUT
main = unittest.main
SkipTest = unittest.SkipTest
from .hub import QuietHub
import gevent.hub
gevent.hub.set_default_hub_class(QuietHub)
......
......@@ -29,7 +29,16 @@ class QuietHub(Hub):
EXPECTED_TEST_ERROR = (ExpectedException,)
def handle_error(self, context, type, value, tb):
type, value, tb = self._normalize_exception(type, value, tb)
if issubclass(type, self.EXPECTED_TEST_ERROR):
# Don't print these to cut down on the noise in the test logs
return
return Hub.handle_error(self, context, type, value, tb)
def print_exception(self, context, t, v, tb):
t, v, tb = self._normalize_exception(t, v, tb)
if issubclass(t, self.EXPECTED_TEST_ERROR):
# see handle_error
return
return Hub.print_exception(self, context, t, v, tb)
......@@ -94,4 +94,7 @@ except SkipTest as e:
print("Ran 0 tests in 0.0s")
print('OK (skipped=0)')
finally:
os.remove(temp_path)
try:
os.remove(temp_path)
except OSError:
pass
......@@ -42,6 +42,7 @@ skipOnPyPy3OnCI = _do_not_skip
skipOnPyPy3 = _do_not_skip
skipOnPyPyOnWindows = _do_not_skip
skipOnPy2 = unittest.skip if sysinfo.PY2 else _do_not_skip
skipOnPy37 = unittest.skip if sysinfo.PY37 else _do_not_skip
skipOnPurePython = unittest.skip if sysinfo.PURE_PYTHON else _do_not_skip
......
......@@ -167,6 +167,10 @@ class SubscriberCleanupMixin(object):
from gevent import events
self.__old_subscribers = events.subscribers[:]
def addSubscriber(self, sub):
from gevent import events
events.subscribers.append(sub)
def tearDown(self):
from gevent import events
events.subscribers[:] = self.__old_subscribers
......
......@@ -33,9 +33,9 @@ except (ImportError, OSError, IOError):
pass
TIMEOUT = 100
NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
if NWORKERS > 10:
NWORKERS = 10
DEFAULT_NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
if DEFAULT_NWORKERS > 10:
DEFAULT_NWORKERS = 10
if RUN_LEAKCHECKS:
# Capturing the stats takes time, and we run each
......@@ -49,9 +49,7 @@ DEFAULT_RUN_OPTIONS = {
if RUNNING_ON_CI:
# Too many and we get spurious timeouts
NWORKERS = 4
DEFAULT_NWORKERS = 4
def _package_relative_filename(filename, package):
......@@ -96,7 +94,8 @@ class Runner(object):
configured_failing_tests=(),
failfast=False,
quiet=False,
configured_run_alone_tests=()):
configured_run_alone_tests=(),
worker_count=DEFAULT_NWORKERS):
"""
:keyword quiet: Set to True or False to explicitly choose. Set to
`None` to use the default, which may come from the environment variable
......@@ -112,7 +111,7 @@ class Runner(object):
self.results.total = len(self._tests)
self._running_jobs = []
self._worker_count = min(len(tests), NWORKERS) or 1
self._worker_count = min(len(tests), worker_count) or 1
def _run_one(self, cmd, **kwargs):
if self._quiet is not None:
......@@ -516,6 +515,11 @@ def main():
parser.add_argument("--verbose", action="store_false", dest='quiet')
parser.add_argument("--debug", action="store_true", default=False)
parser.add_argument("--package", default="gevent.tests")
parser.add_argument(
"--processes", "-j", default=DEFAULT_NWORKERS, type=int,
help="Use up to the given number of parallel processes to execute tests. "
"Defaults to %(default)s."
)
parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
action='store', type=parse_resources,
help='specify which special resource intensive tests '
......@@ -614,6 +618,7 @@ def main():
failfast=options.failfast,
quiet=options.quiet,
configured_run_alone_tests=RUN_ALONE,
worker_count=options.processes,
)
if options.travis_fold:
......
# -*- coding: utf-8 -*-
"""
Make a package.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import print_function
# This file makes this directory into a package.
# This file makes this directory into a runnable package.
# it exists to test 'python -m gevent.monkey monkey_package'
print(__file__)
print(__name__)
# -*- coding: utf-8 -*-
"""
This file runs ``gevent.monkey.patch_all()``.
It is intended to be used by ``python -m gevent.monkey <this file>``
to prove that monkey-patching twice doesn't have unfortunate sife effects (such as
breaking the threadpool).
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
from gevent import monkey
from gevent import get_hub
monkey.patch_all(thread=False, sys=True)
def thread_is_greenlet():
from gevent.thread import get_ident as gr_ident
std_thread_mod = 'thread' if bytes is str else '_thread'
thr_ident = monkey.get_original(std_thread_mod, 'get_ident')
return thr_ident() == gr_ident()
is_greenlet = get_hub().threadpool.apply(thread_is_greenlet)
print(is_greenlet)
print(len(sys._current_frames()))
sys.stdout.flush()
sys.stderr.flush()
# -*- coding: utf-8 -*-
"""
This file *does not* run ``gevent.monkey.patch_all()``.
It is intended to be used by ``python -m gevent.monkey <this file>``
to prove that the threadpool and getting the original value of things
works.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
from gevent import monkey
from gevent import get_hub
from gevent.thread import get_ident as gr_ident
std_thread_mod = 'thread' if bytes is str else '_thread'
thr_ident = monkey.get_original(std_thread_mod, 'get_ident')
print(thr_ident is gr_ident)
def thread_is_greenlet():
return thr_ident() == gr_ident()
is_greenlet = get_hub().threadpool.apply(thread_is_greenlet)
print(is_greenlet)
print(len(sys._current_frames()))
......@@ -4,7 +4,7 @@ import unittest
import gevent
from gevent import core
from gevent.hub import Hub
@unittest.skipUnless(
getattr(core, 'LIBEV_EMBED', False),
......@@ -23,7 +23,7 @@ class Test(unittest.TestCase):
getattr(unittest.TestCase, 'assertRaisesRegexp'))
def _check_backend(self, backend):
hub = gevent.get_hub(backend, default=False)
hub = Hub(backend, default=False)
try:
self.assertEqual(hub.loop.backend, backend)
......@@ -33,8 +33,11 @@ class Test(unittest.TestCase):
raise unittest.SkipTest("backend %s lacks fileno" % (backend,))
os.close(fileno)
with self.assertRaisesRegex(SystemError, "(libev)"):
gevent.sleep(0.001)
if backend not in ('kqueue', 'epoll'):
# That's actually all the libev backends that use a file descriptor,
# right?
with self.assertRaisesRegex(SystemError, "(libev)"):
gevent.sleep(0.001)
hub.destroy()
self.assertIn('destroyed', repr(hub))
......
from __future__ import print_function
import gevent.monkey
gevent.monkey.patch_all()
from gevent import monkey
monkey.patch_all()
import os
import unittest
import multiprocessing
import gevent
......@@ -20,29 +21,42 @@ fork_watcher = hub.loop.fork(ref=False)
fork_watcher.start(on_fork)
def run(q):
def in_child(q):
# libev only calls fork callbacks at the beginning of
# the loop; we use callbacks extensively so it takes *two*
# calls to sleep (with a timer) to actually get wrapped
# around to the beginning of the loop.
gevent.sleep(0.01)
gevent.sleep(0.01)
gevent.sleep(0.001)
gevent.sleep(0.001)
q.put(newpid)
def test():
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
# If the Queue is global, q.get() hangs on Windows; must pass as
# an argument.
q = multiprocessing.Queue()
p = multiprocessing.Process(target=run, args=(q,))
p.start()
p.join()
p_val = q.get()
class Test(unittest.TestCase):
assert p_val is not None, "The fork watcher didn't run"
assert p_val != pid
def test(self):
self.assertEqual(hub.threadpool.size, 0)
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
self.assertEqual(hub.threadpool.size, 1)
# If the Queue is global, q.get() hangs on Windows; must pass as
# an argument.
q = multiprocessing.Queue()
p = multiprocessing.Process(target=in_child, args=(q,))
p.start()
p.join()
p_val = q.get()
self.assertIsNone(
newpid,
"The fork watcher ran in the parent for some reason."
)
self.assertIsNotNone(
p_val,
"The child process returned nothing, meaning the fork watcher didn't run in the child."
)
self.assertNotEqual(p_val, pid)
assert p_val != pid
if __name__ == '__main__':
# Must call for Windows to fork properly; the fork can't be in the top-level
......@@ -57,4 +71,4 @@ if __name__ == '__main__':
# to create a whole new process that has no relation to the current process;
# that process then calls multiprocessing.forking.main() to do its work.
# Since no state is shared, a fork watcher cannot exist in that process.
test()
unittest.main()
# a deadlock is possible if we import a module that runs Gevent's getaddrinfo
# On Python 2, a deadlock is possible if we import a module that runs gevent's getaddrinfo
# with a unicode hostname, which starts Python's getaddrinfo on a thread, which
# attempts to import encodings.idna but blocks on the import lock. verify
# that Gevent avoids this deadlock.
# attempts to import encodings.idna but blocks on the import lock. Verify
# that gevent avoids this deadlock.
import getaddrinfo_module
del getaddrinfo_module # fix pyflakes
......@@ -19,57 +19,68 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Test that modules in gevent.green package are indeed green.
To do that spawn a green server and then access it using a green socket.
If either operation blocked the whole script would block and timeout.
"""
Trivial test that a single process (and single thread) can both read
and write from green sockets (when monkey patched).
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from gevent import monkey
monkey.patch_all()
import gevent.testing as greentest
try:
from urllib import request as urllib2
from http import server as BaseHTTPServer
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
except ImportError:
# Python 2
import urllib2
import BaseHTTPServer
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import gevent
from gevent.testing import params
class QuietHandler(SimpleHTTPRequestHandler, object):
class TestGreenness(greentest.TestCase):
check_totalrefcount = False
def log_message(self, *args): # pylint:disable=arguments-differ
self.server.messages += ((args,),)
def setUp(self):
server_address = params.DEFAULT_BIND_ADDR_TUPLE
BaseHTTPServer.BaseHTTPRequestHandler.protocol_version = "HTTP/1.0"
self.httpd = BaseHTTPServer.HTTPServer(server_address,
SimpleHTTPRequestHandler)
self.httpd.request_count = 0
class Server(HTTPServer, object):
def tearDown(self):
self.httpd.server_close()
self.httpd = None
messages = ()
requests_handled = 0
def serve(self):
self.httpd.handle_request()
self.httpd.request_count += 1
def __init__(self):
HTTPServer.__init__(self,
params.DEFAULT_BIND_ADDR_TUPLE,
QuietHandler)
def handle_request(self):
HTTPServer.handle_request(self)
self.requests_handled += 1
class TestGreenness(greentest.TestCase):
check_totalrefcount = False
def test_urllib2(self):
server = gevent.spawn(self.serve)
httpd = Server()
server_greenlet = gevent.spawn(httpd.handle_request)
port = self.httpd.socket.getsockname()[1]
port = httpd.socket.getsockname()[1]
rsp = urllib2.urlopen('http://127.0.0.1:%s' % port)
rsp.read()
rsp.close()
server.join()
self.assertEqual(self.httpd.request_count, 1)
server_greenlet.join()
self.assertEqual(httpd.requests_handled, 1)
httpd.server_close()
if __name__ == '__main__':
......
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import sys
if not sys.argv[1:]:
......@@ -27,6 +30,7 @@ elif sys.argv[1:] == ['subprocess']: # pragma: no cover
except NameError:
line = input()
print('%s chars.' % len(line))
sys.stdout.flush()
gevent.spawn(printline).join()
......
......@@ -11,6 +11,22 @@ class TestMonkey(SubscriberCleanupMixin, unittest.TestCase):
maxDiff = None
def setUp(self):
super(TestMonkey, self).setUp()
self.all_events = []
self.addSubscriber(self.all_events.append)
self.orig_saved = orig_saved = {}
for k, v in monkey.saved.items():
orig_saved[k] = v.copy()
def tearDown(self):
monkey.saved = self.orig_saved
del self.orig_saved
del self.all_events
super(TestMonkey, self).tearDown()
def test_time(self):
import time
from gevent import time as gtime
......@@ -70,49 +86,51 @@ class TestMonkey(SubscriberCleanupMixin, unittest.TestCase):
def test_patch_twice_warnings_events(self):
import warnings
from gevent.testing import verify
orig_saved = {}
for k, v in monkey.saved.items():
orig_saved[k] = v.copy()
from gevent import events
all_events = []
events.subscribers.append(all_events.append)
def veto(event):
if isinstance(event, events.GeventWillPatchModuleEvent) and event.module_name == 'ssl':
raise events.DoNotPatch
events.subscribers.append(veto)
all_events = self.all_events
with warnings.catch_warnings(record=True) as issued_warnings:
# Patch again, triggering three warnings, one for os=False/signal=True,
# one for repeated monkey-patching, one for patching after ssl (on python >= 2.7.9)
# Patch again, triggering just one warning, for
# a different set of arguments. Because we're going to False instead of
# turning something on, nothing is actually done, no events are issued.
monkey.patch_all(os=False, extra_kwarg=42)
self.assertGreaterEqual(len(issued_warnings), 2)
self.assertIn('SIGCHLD', str(issued_warnings[-1].message))
self.assertEqual(len(issued_warnings), 1)
self.assertIn('more than once', str(issued_warnings[0].message))
self.assertEqual(all_events, [])
# Patching with the exact same argument doesn't issue a second warning.
# in fact, it doesn't do anything
# Same warning again, but still nothing is done.
del issued_warnings[:]
monkey.patch_all(os=False)
orig_saved['_gevent_saved_patch_all'] = monkey.saved['_gevent_saved_patch_all']
self.assertFalse(issued_warnings)
self.assertEqual(len(issued_warnings), 1)
self.assertIn('more than once', str(issued_warnings[0].message))
self.assertEqual(all_events, [])
self.orig_saved['_gevent_saved_patch_all_module_settings'] = monkey.saved[
'_gevent_saved_patch_all_module_settings']
# Make sure that re-patching did not change the monkey.saved
# attribute, overwriting the original functions.
if 'logging' in monkey.saved and 'logging' not in orig_saved:
if 'logging' in monkey.saved and 'logging' not in self.orig_saved:
# some part of the warning or unittest machinery imports logging
orig_saved['logging'] = monkey.saved['logging']
self.assertEqual(orig_saved, monkey.saved)
self.orig_saved['logging'] = monkey.saved['logging']
self.assertEqual(self.orig_saved, monkey.saved)
# Make sure some problematic attributes stayed correct.
# NOTE: This was only a problem if threading was not previously imported.
for k, v in monkey.saved['threading'].items():
self.assertNotIn('gevent', str(v))
self.assertNotIn('gevent', str(v), (k, v))
def test_patch_events(self):
from gevent import events
from gevent.testing import verify
all_events = self.all_events
def veto(event):
if isinstance(event, events.GeventWillPatchModuleEvent) and event.module_name == 'ssl':
raise events.DoNotPatch
self.addSubscriber(veto)
monkey.saved = {} # Reset
monkey.patch_all(thread=False, select=False, extra_kwarg=42) # Go again
self.assertIsInstance(all_events[0], events.GeventWillPatchAllEvent)
self.assertEqual({'extra_kwarg': 42}, all_events[0].patch_all_kwargs)
......
"""
Tests for running ``gevent.monkey`` as a module to launch a
patched script.
Uses files in the ``monkey_package/`` directory.
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import os
import os.path
import sys
import unittest
from subprocess import Popen
from subprocess import PIPE
class TestRun(unittest.TestCase):
from gevent import testing as greentest
class TestRun(greentest.TestCase):
maxDiff = None
def setUp(self):
......@@ -25,8 +36,8 @@ class TestRun(unittest.TestCase):
args.append('--module')
args += [script, 'patched']
p = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
gout, gerr = p.communicate()
self.assertEqual(0, p.returncode, (gout, gerr))
monkey_out, monkey_err = p.communicate()
self.assertEqual(0, p.returncode, (p.returncode, monkey_out, monkey_err))
if module:
args = [sys.executable, "-m", script, 'stdlib']
......@@ -34,32 +45,32 @@ class TestRun(unittest.TestCase):
args = [sys.executable, script, 'stdlib']
p = Popen(args, stdout=PIPE, stderr=PIPE)
pout, perr = p.communicate()
self.assertEqual(0, p.returncode, (pout, perr))
std_out, std_err = p.communicate()
self.assertEqual(0, p.returncode, (p.returncode, std_out, std_err))
glines = gout.decode("utf-8").splitlines()
plines = pout.decode('utf-8').splitlines()
self.assertEqual(glines, plines)
self.assertEqual(gerr, perr)
monkey_out_lines = monkey_out.decode("utf-8").splitlines()
std_out_lines = std_out.decode('utf-8').splitlines()
self.assertEqual(monkey_out_lines, std_out_lines)
self.assertEqual(monkey_err, std_err)
return glines, gerr
return monkey_out_lines, monkey_err
def test_run_simple(self):
self._run(os.path.join('monkey_package', 'script.py'))
def test_run_package(self):
# Run a __main__ inside a package.
lines, _ = self._run('monkey_package')
def _run_package(self, module):
lines, _ = self._run('monkey_package', module=module)
self.assertTrue(lines[0].endswith('__main__.py'), lines[0])
self.assertEqual(lines[1], '__main__')
def test_run_module(self):
# Run a __main__ inside a module
lines, _ = self._run('monkey_package', module=True)
def test_run_package(self):
# Run a __main__ inside a package, even without specifying -m
self._run_package(module=False)
self.assertTrue(lines[0].endswith('__main__.py'), lines[0])
self.assertEqual(lines[1], '__main__')
def test_run_module(self):
# Run a __main__ inside a package, when specifying -m
self._run_package(module=True)
def test_issue_302(self):
lines, _ = self._run(os.path.join('monkey_package', 'issue302monkey.py'))
......@@ -69,6 +80,40 @@ class TestRun(unittest.TestCase):
self.assertEqual(lines[1], 'monkey_package/issue302monkey.py')
self.assertEqual(lines[2], 'True', lines)
# These three tests all sometimes fail on Py2 on CI, writing
# to stderr:
# Unhandled exception in thread started by \n
# sys.excepthook is missing\n
# lost sys.stderr\n
# Fatal Python error: PyImport_GetModuleDict: no module dictionary!\n'
# I haven't been able to produce this locally on macOS or Linux.
# The last line seems new with 2.7.17?
# Also, occasionally, they get '3' instead of '2' for the number of threads.
# That could have something to do with...? Most commonly that's PyPy, but
# sometimes CPython. Again, haven't reproduced.
@greentest.skipOnPy2("lost sys.stderr sometimes")
def test_threadpool_in_patched_after_patch(self):
# Issue 1484
# If we don't have this correct, then we get exceptions
out, err = self._run(os.path.join('monkey_package', 'threadpool_monkey_patches.py'))
self.assertEqual(out, ['False', '2'])
self.assertEqual(err, b'')
@greentest.skipOnPy2("lost sys.stderr sometimes")
def test_threadpool_in_patched_after_patch_module(self):
# Issue 1484
# If we don't have this correct, then we get exceptions
out, err = self._run('monkey_package.threadpool_monkey_patches', module=True)
self.assertEqual(out, ['False', '2'])
self.assertEqual(err, b'')
@greentest.skipOnPy2("lost sys.stderr sometimes")
def test_threadpool_not_patched_after_patch_module(self):
# Issue 1484
# If we don't have this correct, then we get exceptions
out, err = self._run('monkey_package.threadpool_no_monkey', module=True)
self.assertEqual(out, ['False', 'False', '2'])
self.assertEqual(err, b'')
if __name__ == '__main__':
unittest.main()
greentest.main()
......@@ -525,146 +525,162 @@ class TestRefCount(TestCase):
gevent.sleep(0)
pool.kill()
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from concurrent.futures import TimeoutError as FutureTimeoutError
from concurrent.futures import wait as cf_wait
from concurrent.futures import as_completed as cf_as_completed
from gevent import monkey
from gevent import monkey
class TestTPE(_AbstractPoolTest):
size = 1
@greentest.skipUnless(
hasattr(gevent.threadpool, 'ThreadPoolExecutor'),
"Requires ThreadPoolExecutor")
class TestTPE(_AbstractPoolTest):
size = 1
MAP_IS_GEN = True
MAP_IS_GEN = True
ClassUnderTest = gevent.threadpool.ThreadPoolExecutor
@property
def ClassUnderTest(self):
return gevent.threadpool.ThreadPoolExecutor
MONKEY_PATCHED = False
MONKEY_PATCHED = False
@greentest.ignores_leakcheck
def test_future(self):
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
@property
def FutureTimeoutError(self):
from concurrent.futures import TimeoutError as FutureTimeoutError
return FutureTimeoutError
calledback = []
@property
def cf_wait(self):
from concurrent.futures import wait as cf_wait
return cf_wait
def fn():
gevent.sleep(0.5)
return 42
@property
def cf_as_completed(self):
from concurrent.futures import as_completed as cf_as_completed
return cf_as_completed
def callback(future):
future.calledback += 1
raise greentest.ExpectedException("Expected, ignored")
@greentest.ignores_leakcheck
def test_future(self):
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
future = pool.submit(fn) # pylint:disable=no-member
future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)
calledback = []
def spawned():
return 2016
def fn():
gevent.sleep(0.5)
return 42
spawned_greenlet = gevent.spawn(spawned)
def callback(future):
future.calledback += 1
raise greentest.ExpectedException("Expected, ignored")
# Whether or not we are monkey patched, the background
# greenlet we spawned got to run while we waited.
future = pool.submit(fn) # pylint:disable=no-member
future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(self.FutureTimeoutError, future.result, timeout=0.001)
self.assertEqual(future.result(), 42)
self.assertTrue(future.done())
self.assertFalse(future.cancelled())
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(future.calledback, 1)
def spawned():
return 2016
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
spawned_greenlet = gevent.spawn(spawned)
# Adding the callback again runs immediately
future.add_done_callback(lambda f: calledback.append(True))
self.assertEqual(calledback, [True])
# Whether or not we are monkey patched, the background
# greenlet we spawned got to run while we waited.
# We can wait on the finished future
done, _not_done = cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertEqual(future.result(), 42)
self.assertTrue(future.done())
self.assertFalse(future.cancelled())
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(future.calledback, 1)
self.assertEqual(list(cf_as_completed((future,))), [future])
# Doing so does not call the callback again
self.assertEqual(future.calledback, 1)
# even after a trip around the event loop
gevent.sleep()
self.assertEqual(future.calledback, 1)
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_module_function(self):
# Instead of waiting on the result, we can wait
# on the future using the module functions
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn) # pylint:disable=no-member
if self.MONKEY_PATCHED:
# Things work as expected when monkey-patched
_done, not_done = cf_wait((future,), timeout=0.001)
self.assertEqual(list(not_done), [future])
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done, _not_done = cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
else:
# When not monkey-patched, raises an AttributeError
self.assertRaises(AttributeError, cf_wait, (future,))
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_gevent_function(self):
# The future object can be waited on with gevent functions.
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn) # pylint:disable=no-member
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
# Adding the callback again runs immediately
future.add_done_callback(lambda f: calledback.append(True))
self.assertEqual(calledback, [True])
# We can wait on the finished future
done, _not_done = self.cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertEqual(list(self.cf_as_completed((future,))), [future])
# Doing so does not call the callback again
self.assertEqual(future.calledback, 1)
# even after a trip around the event loop
gevent.sleep()
self.assertEqual(future.calledback, 1)
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_module_function(self):
# Instead of waiting on the result, we can wait
# on the future using the module functions
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn) # pylint:disable=no-member
if self.MONKEY_PATCHED:
# Things work as expected when monkey-patched
_done, not_done = self.cf_wait((future,), timeout=0.001)
self.assertEqual(list(not_done), [future])
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done = gevent.wait((future,))
done, _not_done = self.cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
else:
# When not monkey-patched, raises an AttributeError
self.assertRaises(AttributeError, self.cf_wait, (future,))
pool.kill()
del future
del pool
del self.pool
@greentest.ignores_leakcheck
def test_future_wait_gevent_function(self):
# The future object can be waited on with gevent functions.
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool
pool.kill()
del future
del pool
del self.pool
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn) # pylint:disable=no-member
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done = gevent.wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
pool.kill()
del future
del pool
del self.pool
if __name__ == '__main__':
......
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from gevent import monkey; monkey.patch_all()
import gevent.testing as greentest
import gevent.threadpool
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from . import test__threadpool
from test__threadpool import TestTPE as _Base
class TestPatchedTPE(_Base):
MONKEY_PATCHED = True
class TestPatchedTPE(test__threadpool.TestTPE):
MONKEY_PATCHED = True
del _Base
if __name__ == '__main__':
greentest.main()
......@@ -3,23 +3,25 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
import sys
from greenlet import greenlet as RawGreenlet
from gevent import monkey
from gevent._compat import integer_types
from gevent.event import AsyncResult
from gevent.exceptions import InvalidThreadUseError
from gevent.greenlet import Greenlet
from gevent.hub import _get_hub
from gevent._hub_local import get_hub_if_exists
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import getcurrent
from gevent.hub import sleep
from gevent.lock import Semaphore
from gevent.pool import GroupMappingMixin
from gevent.util import clear_stack_frames
from gevent._threading import Lock
from gevent._threading import Queue
from gevent._threading import start_new_thread
from gevent._threading import get_thread_ident
......@@ -30,38 +32,138 @@ __all__ = [
'ThreadResult',
]
def _format_hub(hub):
if hub is None:
return '<missing>'
return '<%s at 0x%x thread_ident=0x%x>' % (
hub.__class__.__name__, id(hub), hub.thread_ident
)
class _WorkerGreenlet(RawGreenlet):
# Exists to produce a more useful repr for worker pool
# threads/greenlets.
# Inform the gevent.util.GreenletTree that this should be
# considered the root (for printing purposes)
greenlet_tree_is_root = True
_thread_ident = 0
_exc_info = sys.exc_info
_get_hub_if_exists = staticmethod(get_hub_if_exists)
# We capture the hub each time through the loop in case its created
# so we can destroy it after a fork.
_hub_of_worker = None
# The hub of the threadpool we're working for. Just for info.
_hub = None
def __init__(self, threadpool):
RawGreenlet.__init__(self, threadpool._worker)
self.thread_ident = get_thread_ident()
self._threadpool = threadpool
# Construct in the main thread (owner of the threadpool)
# The parent greenlet and thread identifier will be set once the
# new thread begins running.
RawGreenlet.__init__(self)
self._hub = threadpool.hub
# Avoid doing any imports in the background thread if it's not
# necessary (monkey.get_original imports if not patched).
# Background imports can hang Python 2 (gevent's thread resolver runs in the BG,
# and resolving may have to import the idna module, which needs an import lock, so
# resolving at module scope)
if monkey.is_module_patched('sys'):
stderr = monkey.get_original('sys', 'stderr')
else:
stderr = sys.stderr
self._stderr = stderr
# We can capture the task_queue; even though it can change if the threadpool
# is re-innitted, we won't be running in that case
self._task_queue = threadpool.task_queue
# Inform the gevent.util.GreenletTree that this should be
# considered the root (for printing purposes) and to
# ignore the parent attribute. (We can't set parent to None.)
self._unregister_worker = threadpool._unregister_worker
threadpool._register_worker(self)
try:
start_new_thread(self._begin, ())
except:
self._unregister_worker(self)
raise
self.greenlet_tree_is_root = True
def _begin(self):
# we're in the new thread (but its root greenlet). Establish invariants and get going
# by making this the current greenlet.
self.parent = getcurrent() # pylint:disable=attribute-defined-outside-init
self._thread_ident = get_thread_ident()
# ignore the parent attribute. (We can't set parent to None.)
self.parent.greenlet_tree_is_ignored = True
try:
self.switch() # goto run()
except: # pylint:disable=bare-except
# run() will attempt to print any exceptions, but that might
# not work during shutdown. sys.excepthook and such may be gone,
# so things might not get printed at all except for a cryptic
# message. This is especially true on Python 2 (doesn't seem to be
# an issue on Python 3).
pass
def __fixup_hub_before_block(self):
hub = self._get_hub_if_exists() # Don't create one; only set if a worker function did it
if hub is not None:
hub.name = 'ThreadPool Worker Hub'
# While we block, don't let the monitoring thread, if any,
# report us as blocked. Indeed, so long as we never
# try to switch greenlets, don't report us as blocked---
# the threadpool is *meant* to run blocking tasks
if hub is not None and hub.periodic_monitoring_thread is not None:
hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
self._hub_of_worker = hub
def __run_task(self, func, args, kwargs, thread_result):
try:
thread_result.set(func(*args, **kwargs))
except: # pylint:disable=bare-except
thread_result.handle_error((self, func), self._exc_info())
finally:
del func, args, kwargs, thread_result
@classmethod
def run_in_worker_thread(cls, threadpool):
# The root function of each new worker thread.
glet = cls(threadpool)
glet.switch()
def run(self):
# pylint:disable=too-many-branches
try:
while 1: # tiny bit faster than True on Py2
self.__fixup_hub_before_block()
def __repr__(self):
return "<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>" % (
task = self._task_queue.get()
try:
if task is None:
return
self.__run_task(*task)
finally:
task = None
self._task_queue.task_done()
except Exception as e: # pylint:disable=broad-except
print("Failed to run worker thread", e, file=self._stderr)
finally:
# Re-check for the hub in case the task created it but then
# failed.
self.cleanup(self._get_hub_if_exists())
def cleanup(self, hub_of_worker):
if self._hub is not None:
self._hub = None
self._unregister_worker(self)
self._unregister_worker = lambda _: None
self._task_queue = None
if hub_of_worker is not None:
hub_of_worker.destroy(True)
def __repr__(self, _format_hub=_format_hub):
return "<ThreadPoolWorker at 0x%x thread_ident=0x%x hub=%s>" % (
id(self),
self.thread_ident,
self._threadpool
self._thread_ident,
_format_hub(self._hub)
)
class ThreadPool(GroupMappingMixin):
# TODO: Document thread safety restrictions.
"""
A pool of native worker threads.
......@@ -120,11 +222,9 @@ class ThreadPool(GroupMappingMixin):
# gevent.lock.Semaphore, this is only safe to use from a single
# native thread.
'_available_worker_threads_greenlet_sem',
# A native threading lock, used to protect internals
# that are safe to call across multiple threads.
'_native_thread_internal_lock',
# The number of running worker threads
'_num_worker_threads',
# A set of running or pending _WorkerGreenlet objects;
# we rely on the GIL for thread safety.
'_worker_greenlets',
# The task queue is itself safe to use from multiple
# native threads.
'task_queue',
......@@ -139,15 +239,20 @@ class ThreadPool(GroupMappingMixin):
self.task_queue = Queue()
self.fork_watcher = None
self._worker_greenlets = set()
self._maxsize = 0
# Note that by starting with 1, we actually allow
# maxsize + 1 tasks in the queue.
self._available_worker_threads_greenlet_sem = Semaphore(1, hub)
self._native_thread_internal_lock = Lock()
self._num_worker_threads = 0
self._set_maxsize(maxsize)
self.fork_watcher = hub.loop.fork(ref=False)
def _register_worker(self, worker):
self._worker_greenlets.add(worker)
def _unregister_worker(self, worker):
self._worker_greenlets.discard(worker)
def _set_maxsize(self, maxsize):
if not isinstance(maxsize, integer_types):
raise TypeError('maxsize must be integer: %r' % (maxsize, ))
......@@ -172,12 +277,13 @@ class ThreadPool(GroupMappingMixin):
will block waiting for a task to finish.
""")
def __repr__(self):
return '<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>' % (
def __repr__(self, _format_hub=_format_hub):
return '<%s at 0x%x tasks=%s size=%s maxsize=%s hub=%s>' % (
self.__class__.__name__,
id(self),
len(self), self.size, self.maxsize,
self.hub.__class__.__name__, id(self.hub), self.hub.thread_ident)
_format_hub(self.hub),
)
def __len__(self):
# XXX just do unfinished_tasks property
......@@ -186,9 +292,7 @@ class ThreadPool(GroupMappingMixin):
return self.task_queue.unfinished_tasks
def _get_size(self):
# TODO: This probably needs to acquire the lock. We
# modify this from self._add_thread under a lock.
return self._num_worker_threads
return len(self._worker_greenlets)
def _set_size(self, size):
if size < 0:
......@@ -197,17 +301,17 @@ class ThreadPool(GroupMappingMixin):
raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
if self.manager:
self.manager.kill()
while self._num_worker_threads < size:
while len(self._worker_greenlets) < size:
self._add_thread()
delay = self.hub.loop.approx_timer_resolution
while self._num_worker_threads > size:
while self._num_worker_threads - size > self.task_queue.unfinished_tasks:
while len(self._worker_greenlets) > size:
while len(self._worker_greenlets) - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if getcurrent() is self.hub:
break
sleep(delay)
delay = min(delay * 2, .05)
if self._num_worker_threads:
if self._worker_greenlets:
self.fork_watcher.start(self._on_fork)
else:
self.fork_watcher.stop()
......@@ -226,14 +330,33 @@ class ThreadPool(GroupMappingMixin):
def _on_fork(self):
# fork() only leaves one thread; also screws up locks;
# let's re-create locks and threads.
# let's re-create locks and threads, and do our best to
# clean up any worker threads left behind.
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid()
if pid != self.pid:
# The OS threads have been destroyed, but the Python objects
# may live on, creating refcount "leaks"
# The OS threads have been destroyed, but the Python
# objects may live on, creating refcount "leaks". Python 2
# leaves dead frames (those that are for dead OS threads)
# around; Python 3.8 does not.
thread_ident_to_frame = dict(sys._current_frames())
for worker in list(self._worker_greenlets):
frame = thread_ident_to_frame.get(worker._thread_ident)
clear_stack_frames(frame)
worker.cleanup(worker._hub_of_worker)
# We can't throw anything to the greenlet, nor can we
# switch to it or set a parent. Those would all be cross-thread
# operations, which aren't allowed.
worker.__dict__.clear()
# We've cleared f_locals and on Python 3.4, possibly the actual
# array locals of the stack frame, but the task queue may still be
# referenced if we didn't actually get all the locals. Shut it down
# and clear it before we throw away our reference.
self.task_queue.kill()
self.__init__(self._maxsize)
def join(self):
"""Waits until all outstanding tasks have been completed."""
delay = max(0.0005, self.hub.loop.approx_timer_resolution)
......@@ -247,14 +370,14 @@ class ThreadPool(GroupMappingMixin):
def _adjust_step(self):
# if there is a possibility & necessity for adding a thread, do it
while (self._num_worker_threads < self._maxsize
and self.task_queue.unfinished_tasks > self._num_worker_threads):
while (len(self._worker_greenlets) < self._maxsize
and self.task_queue.unfinished_tasks > len(self._worker_greenlets)):
self._add_thread()
# while the number of threads is more than maxsize, kill one
# we do not check what's already in task_queue - it could be all Nones
while self._num_worker_threads - self._maxsize > self.task_queue.unfinished_tasks:
while len(self._worker_greenlets) - self._maxsize > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if self._num_worker_threads:
if self._worker_greenlets:
self.fork_watcher.start(self._on_fork)
elif self.fork_watcher is not None:
self.fork_watcher.stop()
......@@ -263,27 +386,20 @@ class ThreadPool(GroupMappingMixin):
delay = 0.0001
while True:
self._adjust_step()
if self._num_worker_threads <= self._maxsize:
if len(self._worker_greenlets) <= self._maxsize:
return
sleep(delay)
delay = min(delay * 2, .05)
def adjust(self):
self._adjust_step()
if not self.manager and self._num_worker_threads > self._maxsize:
if not self.manager and len(self._worker_greenlets) > self._maxsize:
# might need to feed more Nones into the pool to shutdown
# threads.
self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self):
with self._native_thread_internal_lock:
self._num_worker_threads += 1
try:
start_new_thread(_WorkerGreenlet.run_in_worker_thread, (self,))
except:
with self._native_thread_internal_lock:
self._num_worker_threads -= 1
raise
_WorkerGreenlet(self)
def spawn(self, func, *args, **kwargs):
"""
......@@ -338,69 +454,6 @@ class ThreadPool(GroupMappingMixin):
raise
return result
def _decrease_size(self):
if sys is None:
return
_lock = self._native_thread_internal_lock
if _lock is not None: # XXX: When would this be None?
with _lock:
self._num_worker_threads -= 1
def __ignore_current_greenlet_blocking(self, hub):
if hub is not None and hub.periodic_monitoring_thread is not None:
hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
def _worker(self):
# pylint:disable=too-many-branches
need_decrease = True
try:
while 1: # tiny bit faster than True on Py2
h = _get_hub() # Don't create one; only set if a worker function did it
if h is not None:
h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue
# While we block, don't let the monitoring thread, if any,
# report us as blocked. Indeed, so long as we never
# try to switch greenlets, don't report us as blocked---
# the threadpool is *meant* to run blocking tasks
self.__ignore_current_greenlet_blocking(h)
task = task_queue.get()
try:
if task is None:
need_decrease = False
self._decrease_size()
# we want first to decrease size, then decrease unfinished_tasks
# otherwise, _adjust might think there's one more idle thread that
# needs to be killed
return
func, args, kwargs, thread_result = task
try:
value = func(*args, **kwargs)
except: # pylint:disable=bare-except
exc_info = getattr(sys, 'exc_info', None)
if exc_info is None:
return
thread_result.handle_error((self, func), exc_info())
else:
if sys is None:
return
thread_result.set(value)
del value
finally:
del func, args, kwargs, thread_result, task
finally:
if sys is None:
return # pylint:disable=lost-exception
task_queue.task_done()
finally:
if need_decrease:
self._decrease_size()
if sys is not None:
hub = _get_hub()
if hub is not None:
hub.destroy(True)
del hub
def _apply_immediately(self):
# If we're being called from a different thread than the one that
# created us, e.g., because a worker task is trying to use apply()
......@@ -527,22 +580,21 @@ else:
from gevent._util import Lazy
from concurrent.futures import _base as cfb
def _wrap_error(future, fn):
def _ignore_error(future_proxy, fn):
def cbwrap(_):
del _
# we're called with the async result, but
# be sure to pass in ourself. Also automatically
# unlink ourself so that we don't get called multiple
# times.
# We're called with the async result (from the threadpool), but
# be sure to pass in the user-visible _FutureProxy object..
try:
fn(future)
fn(future_proxy)
except Exception: # pylint: disable=broad-except
future.hub.print_exception((fn, future), *sys.exc_info())
# Just print, don't raise to the hub's parent.
future_proxy.hub.print_exception((fn, future_proxy), None, None, None)
return cbwrap
def _wrap(future, fn):
def _wrap(future_proxy, fn):
def f(_):
fn(future)
fn(future_proxy)
return f
class _FutureProxy(object):
......@@ -553,7 +605,6 @@ else:
@Lazy
def _condition(self):
from gevent import monkey
if monkey.is_module_patched('threading') or self.done():
import threading
return threading.Condition()
......@@ -604,10 +655,11 @@ else:
raise concurrent.futures.TimeoutError()
def add_done_callback(self, fn):
"""Exceptions raised by *fn* are ignored."""
if self.done():
fn(self)
else:
self.asyncresult.rawlink(_wrap_error(self, fn))
self.asyncresult.rawlink(_ignore_error(self, fn))
def rawlink(self, fn):
self.asyncresult.rawlink(_wrap(self, fn))
......
......@@ -154,6 +154,10 @@ def format_run_info(thread_stacks=True,
return lines
def is_idle_threadpool_worker(frame):
return frame.f_locals and frame.f_locals.get('gevent_threadpool_worker_idle')
def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
import threading
......@@ -172,7 +176,7 @@ def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
if not thread:
# Is it an idle threadpool thread? thread pool threads
# don't have a Thread object, they're low-level
if frame.f_locals and frame.f_locals.get('gevent_threadpool_worker_idle'):
if is_idle_threadpool_worker(frame):
name = 'idle threadpool worker'
do_stacks = False
else:
......@@ -633,3 +637,15 @@ class assert_switches(object):
message += '\n'
message += '\n'.join(report_lines)
raise _FailedToSwitch(message)
def clear_stack_frames(frame):
"""Do our best to clear local variables in all frames in a stack."""
# On Python 3, frames have a .clear() method that can raise a RuntimeError.
while frame is not None:
try:
frame.clear()
except (RuntimeError, AttributeError):
pass
frame.f_locals.clear()
frame = frame.f_back
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment