Commit 320be8f0 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1147 from gevent/monitor-events

Start having the monitor thread emit events for monitored conditions.
parents 76cbf2fa 8917d73a
...@@ -26,7 +26,9 @@ ...@@ -26,7 +26,9 @@
- Add an optional monitoring thread for each hub. When enabled, this - Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring for more than 0.1s. You can add your own periodic monitoring
functions to this thread. functions to this thread. Set ``GEVENT_MONITOR_THREAD_ENABLE`` to
use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking
interval.
- When gevent prints a timestamp as part of an error message, it is - When gevent prints a timestamp as part of an error message, it is
now in UTC format as specified by RFC3339. now in UTC format as specified by RFC3339.
...@@ -42,6 +44,11 @@ ...@@ -42,6 +44,11 @@
are interested in, reducing CPU usage. Reported in :issue:`1144` by are interested in, reducing CPU usage. Reported in :issue:`1144` by
wwqgtxx. wwqgtxx.
- Add a simple event framework for decoupled communication. It uses
:mod:`zope.event` if that is installed. The monitoring thread emits
events when it detects certain conditions, like loop blocked or
memory limits exceeded.
1.3a2 (2018-03-06) 1.3a2 (2018-03-06)
================== ==================
......
...@@ -131,7 +131,7 @@ install: ...@@ -131,7 +131,7 @@ install:
# pip will build them from source using the MSVC compiler matching the # pip will build them from source using the MSVC compiler matching the
# target Python version and architecture # target Python version and architecture
# Note that psutil won't build under PyPy on Windows. # Note that psutil won't build under PyPy on Windows.
- "%CMD_IN_ENV% pip install -e git+https://github.com/cython/cython.git@63cd3bbb5eac22b92808eeb90b512359e3def20a#egg=cython" - "%CMD_IN_ENV% pip install -U cython zope.interface zope.event"
- "%CMD_IN_ENV% pip install -U setuptools wheel greenlet cffi dnspython idna requests" - "%CMD_IN_ENV% pip install -U setuptools wheel greenlet cffi dnspython idna requests"
- ps: - ps:
......
...@@ -23,10 +23,15 @@ idna ...@@ -23,10 +23,15 @@ idna
psutil psutil
# benchmarks use this # benchmarks use this
perf perf
# Used in a test # Events
zope.event
zope.interface zope.interface
# Tests
requests requests
# For viewing README.rst (restview --long-description), # For viewing README.rst (restview --long-description),
# CONTRIBUTING.rst, etc. # CONTRIBUTING.rst, etc.
# https://github.com/mgedmin/restview # https://github.com/mgedmin/restview
restview restview
-r rtd-requirements.txt
...@@ -39,10 +39,16 @@ extensions = [ ...@@ -39,10 +39,16 @@ extensions = [
'sphinx.ext.intersphinx', 'sphinx.ext.intersphinx',
'mysphinxext', 'mysphinxext',
'sphinx.ext.extlinks', 'sphinx.ext.extlinks',
'sphinx.ext.viewcode',
'repoze.sphinx.autointerface',
] ]
intersphinx_mapping = {'http://docs.python.org/': None, intersphinx_mapping = {
'https://greenlet.readthedocs.io/en/latest/': None} 'http://docs.python.org/': None,
'https://greenlet.readthedocs.io/en/latest/': None,
'https://zopeevent.readthedocs.io/en/latest/': None,
'https://zopecomponent.readthedocs.io/en/latest/': None,
}
extlinks = {'issue': ('https://github.com/gevent/gevent/issues/%s', extlinks = {'issue': ('https://github.com/gevent/gevent/issues/%s',
'issue #'), 'issue #'),
......
...@@ -23,4 +23,5 @@ API reference ...@@ -23,4 +23,5 @@ API reference
gevent.threadpool gevent.threadpool
gevent.time gevent.time
gevent.util gevent.util
gevent.events
lowlevel lowlevel
...@@ -241,6 +241,10 @@ def run_setup(ext_modules, run_make): ...@@ -241,6 +241,10 @@ def run_setup(ext_modules, run_make):
'dnspython', 'dnspython',
'idna', 'idna',
], ],
'events': [
'zope.event',
'zope.interface',
],
}, },
# It's always safe to pass the CFFI keyword, even if # It's always safe to pass the CFFI keyword, even if
# cffi is not installed: it's just ignored in that case. # cffi is not installed: it's just ignored in that case.
......
...@@ -436,9 +436,13 @@ class TrackGreenletTree(BoolSettingMixin, Setting): ...@@ -436,9 +436,13 @@ class TrackGreenletTree(BoolSettingMixin, Setting):
.. versionadded:: 1.3b1 .. versionadded:: 1.3b1
""" """
## Monitoring settings
# All env keys should begin with GEVENT_MONITOR
class MonitorThread(BoolSettingMixin, Setting): class MonitorThread(BoolSettingMixin, Setting):
name = 'monitor_thread' name = 'monitor_thread'
environment_key = 'GEVENT_ENABLE_MONITOR_THREAD' environment_key = 'GEVENT_MONITOR_THREAD_ENABLE'
default = False default = False
desc = """\ desc = """\
...@@ -466,6 +470,8 @@ class MonitorThread(BoolSettingMixin, Setting): ...@@ -466,6 +470,8 @@ class MonitorThread(BoolSettingMixin, Setting):
class MaxBlockingTime(FloatSettingMixin, Setting): class MaxBlockingTime(FloatSettingMixin, Setting):
name = 'max_blocking_time' name = 'max_blocking_time'
# This environment key doesn't follow the convention because it's
# meant to match a key used by existing projects
environment_key = 'GEVENT_MAX_BLOCKING_TIME' environment_key = 'GEVENT_MAX_BLOCKING_TIME'
default = 0.1 default = 0.1
......
...@@ -12,10 +12,13 @@ from greenlet import getcurrent ...@@ -12,10 +12,13 @@ from greenlet import getcurrent
from gevent import config as GEVENT_CONFIG from gevent import config as GEVENT_CONFIG
from gevent.monkey import get_original from gevent.monkey import get_original
from gevent.util import format_run_info from gevent.util import format_run_info
from gevent.events import notify
from gevent.events import EventLoopBlocked
from gevent._compat import thread_mod_name from gevent._compat import thread_mod_name
from gevent._util import gmctime from gevent._util import gmctime
# Clocks # Clocks
try: try:
# Python 3.3+ (PEP 418) # Python 3.3+ (PEP 418)
...@@ -268,7 +271,14 @@ class PeriodicMonitoringThread(object): ...@@ -268,7 +271,14 @@ class PeriodicMonitoringThread(object):
report.extend(format_run_info(greenlet_stacks=False, report.extend(format_run_info(greenlet_stacks=False,
current_thread_ident=self.monitor_thread_ident)) current_thread_ident=self.monitor_thread_ident))
report.append(report[0]) report.append(report[0])
hub.exception_stream.write('\n'.join(report)) stream = hub.exception_stream
for line in report:
# Printing line by line may interleave with other things,
# but it should also prevent a "reentrant call to print"
# when the report is large.
print(line, file=stream)
notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report))
return (active_greenlet, report) return (active_greenlet, report)
def ignore_current_greenlet_blocking(self): def ignore_current_greenlet_blocking(self):
......
# -*- coding: utf-8 -*-
# Copyright 2018 gevent. See LICENSE for details.
"""
Publish/subscribe event infrastructure.
When certain "interesting" things happen during the lifetime of the
process, gevent will "publish" an event (an object). That event is
delivered to interested "subscribers" (functions that take one
parameter, the event object).
Higher level frameworks may take this foundation and build richer
models on it.
If :mod:`zope.event` is installed, then it will be used to provide the
functionality of `notify` and `subscribers`. See
:mod:`zope.event.classhandler` for a simple class-based approach to
subscribing to a filtered list of events, and see `zope.component
<https://zopecomponent.readthedocs.io/en/latest/event.html>`_ for a
much higher-level, flexible system. If you are using one of these systems,
you generally will not want to directly modify `subscribers`.
.. versionadded:: 1.3b1
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__all__ = [
'subscribers',
'IEventLoopBlocked',
'EventLoopBlocked',
]
try:
from zope.event import subscribers
from zope.event import notify
except ImportError:
#: Applications may register for notification of events by appending a
#: callable to the ``subscribers`` list.
#:
#: Each subscriber takes a single argument, which is the event object
#: being published.
#:
#: Exceptions raised by subscribers will be propagated *without* running
#: any remaining subscribers.
subscribers = []
def notify(event):
"""
Notify all subscribers of ``event``.
"""
for subscriber in subscribers:
subscriber(event)
notify = notify # export
try:
from zope.interface import Interface
from zope.interface import implementer
from zope.interface import Attribute
except ImportError:
class Interface(object):
pass
def implementer(_iface):
def dec(c):
return c
return dec
def Attribute(s):
return s
class IEventLoopBlocked(Interface):
"""
The event emitted when the event loop is blocked.
This event is emitted in the monitor thread.
"""
greenlet = Attribute("The greenlet that appeared to be blocking the loop.")
blocking_time = Attribute("The approximate time in seconds the loop has been blocked.")
info = Attribute("A sequence of string lines providing extra info.")
@implementer(IEventLoopBlocked)
class EventLoopBlocked(object):
"""
The event emitted when the event loop is blocked.
Implements `IEventLoopBlocked`.
"""
def __init__(self, greenlet, blocking_time, info):
self.greenlet = greenlet
self.blocking_time = blocking_time
self.info = info
...@@ -611,6 +611,8 @@ class timer(_base.TimerMixin, watcher): ...@@ -611,6 +611,8 @@ class timer(_base.TimerMixin, watcher):
self._after, self._repeat = args self._after, self._repeat = args
if self._after and self._after < 0.001: if self._after and self._after < 0.001:
import warnings import warnings
# XXX: The stack level is hard to determine, could be getting here
# through a number of different ways.
warnings.warn("libuv only supports millisecond timer resolution; " warnings.warn("libuv only supports millisecond timer resolution; "
"all times less will be set to 1 ms", "all times less will be set to 1 ms",
stacklevel=6) stacklevel=6)
......
...@@ -127,6 +127,7 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {} ...@@ -127,6 +127,7 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
error_fatal = True error_fatal = True
uses_handle_error = True uses_handle_error = True
close_on_teardown = () close_on_teardown = ()
__old_subscribers = ()
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
# pylint:disable=arguments-differ # pylint:disable=arguments-differ
...@@ -136,6 +137,8 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {} ...@@ -136,6 +137,8 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def setUp(self): def setUp(self):
super(TestCase, self).setUp() super(TestCase, self).setUp()
from gevent import events
self.__old_subscribers = events.subscribers[:]
# Especially if we're running in leakcheck mode, where # Especially if we're running in leakcheck mode, where
# the same test gets executed repeatedly, we need to update the # the same test gets executed repeatedly, we need to update the
# current time. Tests don't always go through the full event loop, # current time. Tests don't always go through the full event loop,
...@@ -148,6 +151,9 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {} ...@@ -148,6 +151,9 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def tearDown(self): def tearDown(self):
if getattr(self, 'skipTearDown', False): if getattr(self, 'skipTearDown', False):
return return
from gevent import events
events.subscribers[:] = self.__old_subscribers
cleanup = getattr(self, 'cleanup', _noop) cleanup = getattr(self, 'cleanup', _noop)
cleanup() cleanup()
self._error = self._none self._error = self._none
......
...@@ -55,7 +55,6 @@ class TestPeriodicMonitoringThread(unittest.TestCase): ...@@ -55,7 +55,6 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
self.assertIs(self.hub, self.pmt.hub) self.assertIs(self.hub, self.pmt.hub)
del self.hub del self.hub
import gc
gc.collect() gc.collect()
self.assertIsNone(self.pmt.hub) self.assertIsNone(self.pmt.hub)
...@@ -151,6 +150,12 @@ class TestPeriodicMonitoringThread(unittest.TestCase): ...@@ -151,6 +150,12 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
def test_monitor_blocking(self): def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches, # Initially there's no active greenlet and no switches,
# so nothing is considered blocked # so nothing is considered blocked
from gevent.events import subscribers
from gevent.events import IEventLoopBlocked
from zope.interface.verify import verifyObject
events = []
subscribers.append(events.append)
self.assertFalse(self.pmt.monitor_blocking(self.hub)) self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Give it an active greenlet # Give it an active greenlet
...@@ -160,13 +165,18 @@ class TestPeriodicMonitoringThread(unittest.TestCase): ...@@ -160,13 +165,18 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
# We've switched, so we're not blocked # We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub)) self.assertFalse(self.pmt.monitor_blocking(self.hub))
self.assertFalse(events)
# Again without switching is a problem. # Again without switching is a problem.
self.assertTrue(self.pmt.monitor_blocking(self.hub)) self.assertTrue(self.pmt.monitor_blocking(self.hub))
self.assertTrue(events)
verifyObject(IEventLoopBlocked, events[0])
del events[:]
# But we can order it not to be a problem # But we can order it not to be a problem
self.pmt.ignore_current_greenlet_blocking() self.pmt.ignore_current_greenlet_blocking()
self.assertFalse(self.pmt.monitor_blocking(self.hub)) self.assertFalse(self.pmt.monitor_blocking(self.hub))
self.assertFalse(events)
# And back again # And back again
self.pmt.monitor_current_greenlet_blocking() self.pmt.monitor_current_greenlet_blocking()
......
# -*- coding: utf-8 -*-
# Copyright 2018 gevent. See LICENSE.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
from gevent import events
from zope.interface import verify
class TestImplements(unittest.TestCase):
def test_event_loop_blocked(self):
verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked)
class TestEvents(unittest.TestCase):
def test_is_zope(self):
from zope import event
self.assertIs(events.subscribers, event.subscribers)
self.assertIs(events.notify, event.notify)
if __name__ == '__main__':
unittest.main()
...@@ -50,8 +50,12 @@ class Test(greentest.TestCase): ...@@ -50,8 +50,12 @@ class Test(greentest.TestCase):
def test2(self): def test2(self):
timer = gevent.get_hub().loop.timer(0) timer = gevent.get_hub().loop.timer(0)
timer.start(hello2) timer.start(hello2)
gevent.sleep(0.1) try:
assert sys.exc_info() == (None, None, None), sys.exc_info() gevent.sleep(0.1)
self.assertEqual(sys.exc_info(), (None, None, None))
finally:
timer.close()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -39,8 +39,11 @@ class TestCloseSocketWhilePolling(greentest.TestCase): ...@@ -39,8 +39,11 @@ class TestCloseSocketWhilePolling(greentest.TestCase):
with self.assertRaises(Exception): with self.assertRaises(Exception):
sock = socket.socket() sock = socket.socket()
self._close_on_teardown(sock) self._close_on_teardown(sock)
get_hub().loop.timer(0, sock.close) t = get_hub().loop.timer(0, sock.close)
sock.connect(('python.org', 81)) try:
sock.connect(('python.org', 81))
finally:
t.close()
gevent.sleep(0) gevent.sleep(0)
......
...@@ -17,3 +17,4 @@ test__issue330.py ...@@ -17,3 +17,4 @@ test__issue330.py
test___ident.py test___ident.py
test___config.py test___config.py
test___monitor.py test___monitor.py
test__events.py
...@@ -133,3 +133,4 @@ test__issue_728.py ...@@ -133,3 +133,4 @@ test__issue_728.py
test__refcount_core.py test__refcount_core.py
test__api.py test__api.py
test__monitor.py test__monitor.py
test__events.py
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment